2014/04/14

erlang - Socket Programming

有兩個主要的函式庫:gen_tcp 與 gen_udp

從 web server 取得資料, tcp client

nano_get_url 可取得網頁的 html 內容

nano_get_url() ->
    nano_get_url("www.google.com").

nano_get_url(Host) ->
    %% 對 Host:80 開啟 TCP socket
    {ok,Socket} = gen_tcp:connect(Host,80,[binary, {packet, 0}]),
    %% 對此 Socket,傳送 GET / HTTP/1.0\r\n\r\n 字串的資料
    ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"),
    %% 等待接收server 回傳的資料
    receive_data(Socket, []).

receive_data(Socket, SoFar) ->
    receive
    %% 因為是用 binary 的方式開啟 socket,如果收到資料片段 Bin
    %% 就將它接到 SoFar 中暫存起來
    {tcp,Socket,Bin} ->
        receive_data(Socket, [Bin|SoFar]);
    %% 如果收到 tcp_closed 訊號,就表示 web server 送完資料,把 socket 斷線
    %% 就將資料 reverse,再將 binary list 連接成一整個 binary 資料
    {tcp_closed,Socket} ->
        list_to_binary(reverse(SoFar))
    end.

測試後,可取得 B 這一塊 binary 資料,然後可以用 io:format("~p~n", [B]) 將所有資料列印到畫面上,也可以用 string:tokens(binary_to_list(B), "\r\n") ,以 \r\n 切割資料,然後一行一行印出來。

1> B = socket_examples:nano_get_url().
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.com.tw/?gws_rd=cr&ei=W534Ur
qDKMLnkAWA-IHIDA\r\nCache-Control: private\r"...>>
2> io:format("~p~n", [B]).
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.com.tw/?gws_rd=cr&ei=W534Ur
qDKMLnkAWA-IHIDA\r\nCache-Control: private\r\nContent-Type: text/html; charset=U
TF-8\r\nSet-Cookie: PREF=ID=2d1f3f73d3fdb6d9:FF=0:TM=1392024923:LM=1392024923:S=
P-4nPCjpFPUCiXBs; expires=Wed, 10-Feb-2016 09:35:23 GMT; path=/; domain=.google.
...
3> string:tokens(binary_to_list(B), "\r\n").
["HTTP/1.0 302 Found",
 "Location: http://www.google.com.tw/?gws_rd=cr&ei=W534UrqDKMLnkAWA-IHIDA",
 "Cache-Control: private",
 "Content-Type: text/html; charset=UTF-8",
...

TCP Server: evaluate erlang expression, tcp server

需求是要有一個 tcp server, port 為 2345,他會等待 binary 訊息,裡面是一個 erlang expression,server運算後,將結果回傳給 client。

要想寫出任何一個 tcp 程式,必須先回答下列問題,因為 tcp socket 資料只是一個 bytes streaming,在傳輸期間,資料可以被打碎成任意長度的片段。

  1. 資料的格式,要如何知道 request 或 response 是由多少資料組成
  2. 在 request/response 內的資料,要如何編碼(marshaling)與解碼(de-marshaling)

{packet, N}

在 erlang 中,request/response可以用前置的 N(1或2或4) 個 bytes,來表示資料的長度,這也是 gen_tcp:connect 與 gentcp:listen 中 {packet, N} 參數的意義。

當我們利用 {packet, N} 開啟 socket 時,erlang driver 會自動將被打碎的資料片段,接合在一起。

term_to_binary 與 binary_to_term

erlang term 的編碼與解碼,可直接使用 term_to_binary 與 binary_to_term,這樣就不需要處理 http 或 xml 的文字編碼,不只速度快,傳送的資料也比較少。

程式

server 端的程式

start_nano_server() ->
    %% 開啟 tcp server port 2345,{packet, 4} 表示用 4 bytes 的資料長度 header
    %% gen_tcp:listen 會傳回 {ok, Socket} 或 {error, Why}
    {ok, Listen} = gen_tcp:listen(2345, [binary,     {packet, 4}, {reuseaddr, true}, {active, true}]),
    %% 將 Listen 綁定至 listen socket
    %% 在這裡,程式會暫停並等待 tcp client 連線
    {ok, Socket} = gen_tcp:accept(Listen),
    %% 當有 tcp client 連線後,就馬上關閉 Listen,這樣就不會再收到新的連線
    %% 且關閉後,不會影響既有的連線
    gen_tcp:close(Listen),
    %% 處理 socket 資料
    loop(Socket).

loop(Socket) ->
    receive
    {tcp, Socket, Bin} ->
        io:format("Server received binary = ~p~n",[Bin]),
        %% 將收到的資料 unmarshaling
        Str = binary_to_term(Bin),
        io:format("Server (unpacked)  ~p~n",[Str]),

        %% 估算 term
        Reply = lib_misc:string2value(Str),
        io:format("Server replying = ~p~n",[Reply]),

        %% 把結果 marshaling 之後,送進 socket
        gen_tcp:send(Socket, term_to_binary(Reply)),

        %% 等待此 tcp client 發送下一個 term,並處理
        loop(Socket);
    {tcp_closed, Socket} ->
        io:format("Server socket closed~n")
    end.

client 端的程式

nano_client_eval(Str) ->
    %% 開啟 socket,連接到 tcp port 2345
    {ok, Socket} = gen_tcp:connect("localhost", 2345,
            [binary, {packet, 4}]),
    %% 將 term 以 term_to_binary 編碼後,發送到 socket
    ok = gen_tcp:send(Socket, term_to_binary(Str)),

    %% 等待接收結果
    receive
    {tcp,Socket,Bin} ->
        io:format("Client received binary = ~p~n",[Bin]),
        %% 以 binary_to_term 解碼後,列印到畫面上
        Val = binary_to_term(Bin),
        io:format("Client result = ~p~n",[Val]),
        gen_tcp:close(Socket)
    end.

測試,啟動server時,畫面會停在這邊

1> socket_examples:start_nano_server().

啟動 client

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").

server 會收到資料,並估算結果

Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
ok

同時,client也會收到 server 回傳的結果

Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok

改進 server

剛剛的 server 只會接受一個 client 連線,接下來嘗試修改server,讓它能接受多個連線。改進的方式有以下兩種

  1. 序列伺服器:一次接受一個連線
  2. 平行伺服器:同時接收多個連線
序列伺服器:一次接受一個連線

原本的程式為

start_nano_server() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    {ok, Socket} = gen_tcp:accept(Listen),
    gen_tcp:close(Listen),
    loop(Socket).

將程式改為以下的樣子,在 loop 完成後,繼續呼叫 seq_loop,讓它等候下一個連線。

start_seq_server() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    seq_loop(Listen).

seq_loop(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    loop(Socket),
    seq_loop(Listen).

測試

1> socket_examples:start_seq_server().
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
                           112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,
                           48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
                           112,108,101,40,91,50,43,51,42,52,44,32,50,48,43,50,
                           48,93,41>>
Server (unpacked)  "list_to_tuple([2+3*4, 20+20])"
Server replying = {14,40}
Server socket closed

client 的部份

1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").
Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok
2> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 20+20])").
Client received binary = <<131,104,2,97,14,97,40>>
Client result = {14,40}
ok
平行伺服器:同時接收多個連線

在每一次 gen_tcp:accept 一產生新的連線後,就馬上 spawn 產生一個新的 process。

start_parallel_server() ->
    {ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
    spawn(fun() -> par_connect(Listen) end).

par_connect(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    spawn(fun() -> par_connect(Listen) end),
    loop(Socket).
註記
  1. 建立 socket 的行程(呼叫 gen_tcp:accept 或是 gen_tcp:connect )被稱為該 socket 的控制行程,來自 socket 的所有訊息都會送到控制行程中,如果控制行程死亡,socket 就會被關閉。可使用 gen_tcp:controlling_process(Socket, NewPid) 將控制行程換成 NewPid
  2. 平行伺服器可建立數千個連線,我們可能想限制同時連線的數量,可以使用一個 counter 記錄連線數。
  3. 接受連線後,最好明確設定 socket 選項:
     {ok, Socket} = gen_tcp:accept(Listen),
     inet:setopts(Socket, [{packet, 4}, binary, {nodelay,true}, {active, true}]),
     loop(Socket).
  4. 在 Erlang R11B-3,數個 erlang process 可對相同的 listen socket 呼叫 gen_tcp:accept/1,這可調整平行伺服器,用一些 pre-spawned processes pool,全部都等待 gen_tcp:accept/1。

控制 socket

erlang socket 可以三種模式開啟:active, active once, passive。active once 是建立主動 socket,收到訊息後,想接收下一個訊息,必須先重新啟用才行。

方式是在 gen_tcp:connect(Address, Port, Options) 或是 gen_tcp_listen(Port, Options) 的 Options 中使用 {active, true | false | once} 的設定。

主動與被動 socket 的差異:當 Socket 收到訊息時

  1. active socket 收到資料時,會送出 {tcp, Socket, Data} 給控制行程,控制行程無法控制這些訊息的流入。惡劣的客戶端就可能會送出數千個訊息給伺服器。
  2. passive socket 的控制行程必須呼叫 gen_tcp:recv(Socket, N),才能從 socket 接收 N bytes 的訊息,如果 N = 0,全部有效的 bytes data 都會被送出來。因此伺服器就能自行選擇何時呼叫 gen_tcp:recv ,這樣才能控制訊息流。

我們可用三種方式,撰寫 server 接收資料的迴圈

  1. active 訊息接收 - nonblocking
  2. passive 訊息接收 - blocking
  3. 混合訊息接收 - 部份 blocking
active 訊息接收 - nonblocking

process 無法控制進入 server 迴圈的訊息流,如果 client 端產生資料的速度比 server 消化還快,系統就會受到 message flooding,存放訊息的 mailbox 就有可能會 overflow,造成系統 crash。

因為無法阻塞 client,只有在我們確信能夠應付 client 的資料量時,才能使用 nonblocking server。

{ok, Listen} = gen_tcp:listen(Port, [..., {active, true}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            ... do something with the data
        {tcp_closed, Socket} ->
            ...
    end.
passive 訊息接收 - blocking

server loop 只會在想要接收訊息時,才會呼叫 gen_tcp:recv。客戶端會被阻塞,直到伺服器呼叫 recv 為止。

注意:OS有做一些緩衝處理,即使尚未呼叫 recv,OS也會允許客戶端在被阻塞之前,可以送少量資料進來。

{ok, Listen} = gen_tcp:listen(Port, [..., {active, false}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    case gen_tcp:recv(Socket, N) of
        {ok, B} ->
            ... do something with the data
            loop(Socket);
        {error, closed} ->
            ...
    end.
混合訊息接收 - 部份 blocking

只使用 passive mode 並不是最正確的作法,因為在 passive mode,只能等待一個 socket 的資料,對於必須等待多個 socket 資料的 server 來說,這是行不通的。

我們可以在開啟 Socket 時,使用 {active, once} ,這時候,Socket 只會對一個訊息主動。在控制行程收到一個訊息後,必須主動呼叫 inet:setopts 才能再次讓下一個訊息被接收,在此之前,系統會阻塞訊息。

使用 {active,once} ,使用者可實現 traffic shaping ,且避免 server 被過度積極湧入的訊息淹沒。

{ok, Listen} = gen_tcp:listen(Port, [..., {active, once}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).

loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            ... do something with the data

            %% when you are ready to receive next message
            inet:setopts(Socket, [{active, once}]),
            loop(Socket);
        {tcp_closed, Socket} ->
            ...
    end.

連線來源

要知道 client 端的資訊,可以呼叫 inet:peername(Socket)

@spec inet:peername(Socket) -> {ok, {IP_Address, Port} | {error, Why}}

IP_Address 中
{N1, N2, N3, N4} 代表 IPv4
{K1, K2, K3, K4, K5, K6, K7, K8} 代表 IPv6

所有 Ni 與 Ki 都是介於 0 ~ 255 的整數

Socket 的錯誤處理

因為每一個 Socket 都有控制行程,當控制行程死亡,Socket 就會自動關閉。

error_test() ->
    %% 產生 server process
    spawn(fun() -> error_test_server() end),
    %% 先暫停兩秒,讓 server process 啟動
    lib_misc:sleep(2000),
    %% client 連線到 server
    {ok,Socket} = gen_tcp:connect("localhost",4321,[binary, {packet, 2}]),
    io:format("connected to:~p~n",[Socket]),
    %% 發送訊息 123
    gen_tcp:send(Socket, <<"123">>),
    receive
        %% 接收所有的 response 訊息
        Any ->
            io:format("Any=~p~n",[Any])
    end.

error_test_server() ->
    {ok, Listen} = gen_tcp:listen(4321, [binary,{packet,2}]),
    {ok, Socket} = gen_tcp:accept(Listen),
    error_test_server_loop(Socket).

error_test_server_loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            io:format("received:~p~n",[Data]),
            %% <<"123">> 會讓這一行當掉,server 的控制行程會 crash
            %% 因此會讓客戶端收到 {tcp_closed, Socket} 訊息
            atom_to_list(Data),
            error_test_server_loop(Socket)
    end.

測試

2> socket_examples:error_test().
connected to:#Port<0.594>
received:<<"123">>
Any={tcp_closed,#Port<0.594>}
ok
3>
=ERROR REPORT==== 11-Feb-2014::15:50:16 ===
Error in process <0.35.0> with exit value: {badarg,[{erlang,atom_to_list,[<<3 bytes>>],[]},{socket_examples,error_test_server_loop,1,[{file,"d:/projectcase/erlang/erlangotp/src/socket_examples.erl"},{line,117}]}]}

UDP server and client

UDP datagram 是不可靠的,順序可能被調換、可能會遺失、也可能會重複,且是 connectionless 的,客戶端不需要建立連線,就可以送訊息。

UDP 相當適合「大量客戶端,傳送小訊息到server」的應用情境。

UDP 比 TCP 簡單,因為 server 不需要管理與維護連線。

UDP server 一般的形式如下,server不會收到 socket 關閉的訊息:

server(Port) ->
    {ok, Socket} = gen_udp:open(Port, [binary]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} ->
            BinReply = ...,
            gen_udp:send(Socket, Host, Port, BinReply),
            loop(Socket)
    end.

client 必須要有 after timeout 機制,因為可能永遠等不到回應的訊息。

client(Request) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    ok = gen_udp:send(Socket, "localhost", 4000, Request),
    Value = receive
            {udp, Socket, _, _, Bin} ->
                {ok, Bin}
            after 2000 ->
                error
        end,
    gen_udp:close(Socket),
    Value.

UDP 階乘 server

範例程式

-module(udp_test).
-export([start_server/0, client/1]).

start_server() ->
    %% 產生 server process
    spawn(fun() -> server(4000) end).

%% The server           
server(Port) ->
    %% 開啟 udp server,接收 binary 資料
    {ok, Socket} = gen_udp:open(Port, [binary]),
    io:format("server opened socket:~p~n",[Socket]),
    loop(Socket).

loop(Socket) ->
    receive
        {udp, Socket, Host, Port, Bin} = Msg ->
            io:format("server received:~p~n",[Msg]),
            %% 將 binary 轉換為 erlang term
            N = binary_to_term(Bin),
            %% 運算階乘
            Fac = fac(N),
            %% 把結果回傳給 client
            gen_udp:send(Socket, Host, Port, term_to_binary(Fac)),
            %% 處理下一個訊息
            loop(Socket)
    end.

fac(0) -> 1;
fac(N) -> N * fac(N-1).

%% The client
client(N) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
    io:format("client opened socket=~p~n",[Socket]),
    %% 以 term_to_binary 將 term 轉換為 binary
    ok = gen_udp:send(Socket, "localhost", 4000, 
                      term_to_binary(N)),
    Value = receive
                {udp, Socket, _, _, Bin} = Msg ->
                    io:format("client received:~p~n",[Msg]),
                    binary_to_term(Bin)
            after 2000 ->
                    0
            end,
    gen_udp:close(Socket),
    Value.

測試,server 的部份

1> udp_test:start_server().
server opened socket:#Port<0.516>
<0.33.0>
2> server received:{udp,#Port<0.516>,{127,0,0,1},54201,<<131,97,40>>}
2> server received:{udp,#Port<0.516>,{127,0,0,1},54202,<<131,97,20>>}
2> server received:{udp,#Port<0.516>,{127,0,0,1},60708,<<131,97,10>>}

client 的部份

1> udp_test:client(40).
client opened socket=#Port<0.516>
client received:{udp,#Port<0.516>,
                     {127,0,0,1},
                     4000,
                     <<131,110,20,0,0,0,0,0,64,37,5,255,100,222,15,8,126,242,
                       199,132,27,232,234,142>>}
815915283247897734345611269596115894272000000000
2> udp_test:client(20).
client opened socket=#Port<0.527>
client received:{udp,#Port<0.527>,
                     {127,0,0,1},
                     4000,
                     <<131,110,8,0,0,0,180,130,124,103,195,33>>}
2432902008176640000
3> udp_test:client(10).
client opened socket=#Port<0.528>
client received:{udp,#Port<0.528>,{127,0,0,1},4000,<<131,98,0,55,95,0>>}
3628800

補充說明

要注意,因為 UDP 是 connectionless 的協定,server無法藉由拒絕讀取資料而阻塞客戶端。

大型的 UDP packet 可能會被切成片段,以利在網路上傳輸,切割會發生在 router 所接受的 MTU(maximum transfer unit) 比 UDP packet 還小的時候 。一般會建議在 UDP 裡,一開始先使用較小的封包,然後慢慢增加資料量,並量測 throughput,如果 throughput 突然下降,就表示封包太大了。

UDP packet 有可能會被傳送兩次,所以在寫 RPC code 時要小心,不然可能就會執行兩次,傳回兩次。要避免發生這個問題時,可以加上 make_ref。

client(Request) ->
    {ok, Socket} -> gen_udp:open(0, [binary]),

    %% 產生唯一的識別參考
    Ref = make_ref(),
    B1 = term_to_binary(Ref, Request),
    ok = gen_udp:send(Socket, "localhost", 4000, B1),
    wait_for_ref(Socket, Ref).

wait_for_ref(Socket, Ref) ->
    receive
        {udp, Socket, _, _, Bin} ->
            case binary_to_term(Bin) of
                {Ref, Val} ->
                    Val;
                {_SomeoOtherRef, _} ->
                    wait_for_ref(Socket, Ref)
        end;
    after 1000 ->
        ...
    end.

廣播到多台機器

我們需要兩個 ports,一個用來送出廣播,一個用來傾聽廣播。

-module(broadcast).
-compile(export_all).

% 將IoList廣播到LAN的所有機器
% 負責廣播的 process 會開啟 port 5010
send(IoList) ->
    case inet:ifget("eth0", [broadaddr]) of
        {ok, [{broadaddr, Ip}]} ->
            {ok, S} =  gen_udp:open(5010, [{broadcast, true}]),
            gen_udp:send(S, Ip, 6000, IoList),
            gen_udp:close(S);
        _ ->
            io:format("Bad interface name, or\n"
                          "broadcasting not supported\n")
    end.

% 因為 windows 的 network interface 並沒有辦法直接取得 broadaddr
% 就直接把 broadcast address 寫在程式裡面
sendwindows(IoList) ->
    {ok, S} =  gen_udp:open(5010, [{broadcast, true}]),
    gen_udp:send(S, "192.168.1.255", 6000, IoList),
    gen_udp:close(S).

% 負責接收廣播的 process 會開啟 port 6000,並等待接收訊息
listen() ->
    {ok, _} = gen_udp:open(6000),
    loop().

loop() ->
    receive
        Any ->
            io:format("received:~p~n", [Any]),
            loop()
    end.

測試:server的部份

1> broadcast:listen().
received:{udp,#Port<0.516>,{192,168,1,57},5010,"{test"}

client

1> broadcast:sendwindows([123, "test"]).
ok

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World

沒有留言:

張貼留言