有兩個主要的函式庫:gen_tcp 與 gen_udp
從 web server 取得資料, tcp client
nano_get_url 可取得網頁的 html 內容
nano_get_url() ->
nano_get_url("www.google.com").
nano_get_url(Host) ->
%% 對 Host:80 開啟 TCP socket
{ok,Socket} = gen_tcp:connect(Host,80,[binary, {packet, 0}]),
%% 對此 Socket,傳送 GET / HTTP/1.0\r\n\r\n 字串的資料
ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"),
%% 等待接收server 回傳的資料
receive_data(Socket, []).
receive_data(Socket, SoFar) ->
receive
%% 因為是用 binary 的方式開啟 socket,如果收到資料片段 Bin
%% 就將它接到 SoFar 中暫存起來
{tcp,Socket,Bin} ->
receive_data(Socket, [Bin|SoFar]);
%% 如果收到 tcp_closed 訊號,就表示 web server 送完資料,把 socket 斷線
%% 就將資料 reverse,再將 binary list 連接成一整個 binary 資料
{tcp_closed,Socket} ->
list_to_binary(reverse(SoFar))
end.
測試後,可取得 B 這一塊 binary 資料,然後可以用 io:format("~p~n", [B]) 將所有資料列印到畫面上,也可以用 string:tokens(binary_to_list(B), "\r\n") ,以 \r\n 切割資料,然後一行一行印出來。
1> B = socket_examples:nano_get_url().
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.com.tw/?gws_rd=cr&ei=W534Ur
qDKMLnkAWA-IHIDA\r\nCache-Control: private\r"...>>
2> io:format("~p~n", [B]).
<<"HTTP/1.0 302 Found\r\nLocation: http://www.google.com.tw/?gws_rd=cr&ei=W534Ur
qDKMLnkAWA-IHIDA\r\nCache-Control: private\r\nContent-Type: text/html; charset=U
TF-8\r\nSet-Cookie: PREF=ID=2d1f3f73d3fdb6d9:FF=0:TM=1392024923:LM=1392024923:S=
P-4nPCjpFPUCiXBs; expires=Wed, 10-Feb-2016 09:35:23 GMT; path=/; domain=.google.
...
3> string:tokens(binary_to_list(B), "\r\n").
["HTTP/1.0 302 Found",
"Location: http://www.google.com.tw/?gws_rd=cr&ei=W534UrqDKMLnkAWA-IHIDA",
"Cache-Control: private",
"Content-Type: text/html; charset=UTF-8",
...
TCP Server: evaluate erlang expression, tcp server
需求是要有一個 tcp server, port 為 2345,他會等待 binary 訊息,裡面是一個 erlang expression,server運算後,將結果回傳給 client。
要想寫出任何一個 tcp 程式,必須先回答下列問題,因為 tcp socket 資料只是一個 bytes streaming,在傳輸期間,資料可以被打碎成任意長度的片段。
- 資料的格式,要如何知道 request 或 response 是由多少資料組成
- 在 request/response 內的資料,要如何編碼(marshaling)與解碼(de-marshaling)
{packet, N}
在 erlang 中,request/response可以用前置的 N(1或2或4) 個 bytes,來表示資料的長度,這也是 gen_tcp:connect 與 gentcp:listen 中 {packet, N} 參數的意義。
當我們利用 {packet, N} 開啟 socket 時,erlang driver 會自動將被打碎的資料片段,接合在一起。
term_to_binary 與 binary_to_term
erlang term 的編碼與解碼,可直接使用 term_to_binary 與 binary_to_term,這樣就不需要處理 http 或 xml 的文字編碼,不只速度快,傳送的資料也比較少。
程式
server 端的程式
start_nano_server() ->
%% 開啟 tcp server port 2345,{packet, 4} 表示用 4 bytes 的資料長度 header
%% gen_tcp:listen 會傳回 {ok, Socket} 或 {error, Why}
{ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
%% 將 Listen 綁定至 listen socket
%% 在這裡,程式會暫停並等待 tcp client 連線
{ok, Socket} = gen_tcp:accept(Listen),
%% 當有 tcp client 連線後,就馬上關閉 Listen,這樣就不會再收到新的連線
%% 且關閉後,不會影響既有的連線
gen_tcp:close(Listen),
%% 處理 socket 資料
loop(Socket).
loop(Socket) ->
receive
{tcp, Socket, Bin} ->
io:format("Server received binary = ~p~n",[Bin]),
%% 將收到的資料 unmarshaling
Str = binary_to_term(Bin),
io:format("Server (unpacked) ~p~n",[Str]),
%% 估算 term
Reply = lib_misc:string2value(Str),
io:format("Server replying = ~p~n",[Reply]),
%% 把結果 marshaling 之後,送進 socket
gen_tcp:send(Socket, term_to_binary(Reply)),
%% 等待此 tcp client 發送下一個 term,並處理
loop(Socket);
{tcp_closed, Socket} ->
io:format("Server socket closed~n")
end.
client 端的程式
nano_client_eval(Str) ->
%% 開啟 socket,連接到 tcp port 2345
{ok, Socket} = gen_tcp:connect("localhost", 2345,
[binary, {packet, 4}]),
%% 將 term 以 term_to_binary 編碼後,發送到 socket
ok = gen_tcp:send(Socket, term_to_binary(Str)),
%% 等待接收結果
receive
{tcp,Socket,Bin} ->
io:format("Client received binary = ~p~n",[Bin]),
%% 以 binary_to_term 解碼後,列印到畫面上
Val = binary_to_term(Bin),
io:format("Client result = ~p~n",[Val]),
gen_tcp:close(Socket)
end.
測試,啟動server時,畫面會停在這邊
1> socket_examples:start_nano_server().
啟動 client
1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").
server 會收到資料,並估算結果
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,48,93,41>>
Server (unpacked) "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
ok
同時,client也會收到 server 回傳的結果
Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok
改進 server
剛剛的 server 只會接受一個 client 連線,接下來嘗試修改server,讓它能接受多個連線。改進的方式有以下兩種
- 序列伺服器:一次接受一個連線
- 平行伺服器:同時接收多個連線
序列伺服器:一次接受一個連線
原本的程式為
start_nano_server() ->
{ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
{ok, Socket} = gen_tcp:accept(Listen),
gen_tcp:close(Listen),
loop(Socket).
將程式改為以下的樣子,在 loop 完成後,繼續呼叫 seq_loop,讓它等候下一個連線。
start_seq_server() ->
{ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
seq_loop(Listen).
seq_loop(Listen) ->
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket),
seq_loop(Listen).
測試
1> socket_examples:start_seq_server().
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
112,108,101,40,91,50,43,51,42,52,44,32,49,48,43,50,
48,93,41>>
Server (unpacked) "list_to_tuple([2+3*4, 10+20])"
Server replying = {14,30}
Server socket closed
Server received binary = <<131,107,0,29,108,105,115,116,95,116,111,95,116,117,
112,108,101,40,91,50,43,51,42,52,44,32,50,48,43,50,
48,93,41>>
Server (unpacked) "list_to_tuple([2+3*4, 20+20])"
Server replying = {14,40}
Server socket closed
client 的部份
1> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 10+20])").
Client received binary = <<131,104,2,97,14,97,30>>
Client result = {14,30}
ok
2> socket_examples:nano_client_eval("list_to_tuple([2+3*4, 20+20])").
Client received binary = <<131,104,2,97,14,97,40>>
Client result = {14,40}
ok
平行伺服器:同時接收多個連線
在每一次 gen_tcp:accept 一產生新的連線後,就馬上 spawn 產生一個新的 process。
start_parallel_server() ->
{ok, Listen} = gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
spawn(fun() -> par_connect(Listen) end).
par_connect(Listen) ->
{ok, Socket} = gen_tcp:accept(Listen),
spawn(fun() -> par_connect(Listen) end),
loop(Socket).
註記
- 建立 socket 的行程(呼叫 gen_tcp:accept 或是 gen_tcp:connect )被稱為該 socket 的控制行程,來自 socket 的所有訊息都會送到控制行程中,如果控制行程死亡,socket 就會被關閉。可使用 gen_tcp:controlling_process(Socket, NewPid) 將控制行程換成 NewPid
- 平行伺服器可建立數千個連線,我們可能想限制同時連線的數量,可以使用一個 counter 記錄連線數。
- 接受連線後,最好明確設定 socket 選項:
{ok, Socket} = gen_tcp:accept(Listen), inet:setopts(Socket, [{packet, 4}, binary, {nodelay,true}, {active, true}]), loop(Socket).
- 在 Erlang R11B-3,數個 erlang process 可對相同的 listen socket 呼叫 gen_tcp:accept/1,這可調整平行伺服器,用一些 pre-spawned processes pool,全部都等待 gen_tcp:accept/1。
控制 socket
erlang socket 可以三種模式開啟:active, active once, passive。active once 是建立主動 socket,收到訊息後,想接收下一個訊息,必須先重新啟用才行。
方式是在 gen_tcp:connect(Address, Port, Options) 或是 gen_tcp_listen(Port, Options) 的 Options 中使用 {active, true | false | once} 的設定。
主動與被動 socket 的差異:當 Socket 收到訊息時
- active socket 收到資料時,會送出 {tcp, Socket, Data} 給控制行程,控制行程無法控制這些訊息的流入。惡劣的客戶端就可能會送出數千個訊息給伺服器。
- passive socket 的控制行程必須呼叫 gen_tcp:recv(Socket, N),才能從 socket 接收 N bytes 的訊息,如果 N = 0,全部有效的 bytes data 都會被送出來。因此伺服器就能自行選擇何時呼叫 gen_tcp:recv ,這樣才能控制訊息流。
我們可用三種方式,撰寫 server 接收資料的迴圈
- active 訊息接收 - nonblocking
- passive 訊息接收 - blocking
- 混合訊息接收 - 部份 blocking
active 訊息接收 - nonblocking
process 無法控制進入 server 迴圈的訊息流,如果 client 端產生資料的速度比 server 消化還快,系統就會受到 message flooding,存放訊息的 mailbox 就有可能會 overflow,造成系統 crash。
因為無法阻塞 client,只有在我們確信能夠應付 client 的資料量時,才能使用 nonblocking server。
{ok, Listen} = gen_tcp:listen(Port, [..., {active, true}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
receive
{tcp, Socket, Data} ->
... do something with the data
{tcp_closed, Socket} ->
...
end.
passive 訊息接收 - blocking
server loop 只會在想要接收訊息時,才會呼叫 gen_tcp:recv。客戶端會被阻塞,直到伺服器呼叫 recv 為止。
注意:OS有做一些緩衝處理,即使尚未呼叫 recv,OS也會允許客戶端在被阻塞之前,可以送少量資料進來。
{ok, Listen} = gen_tcp:listen(Port, [..., {active, false}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
case gen_tcp:recv(Socket, N) of
{ok, B} ->
... do something with the data
loop(Socket);
{error, closed} ->
...
end.
混合訊息接收 - 部份 blocking
只使用 passive mode 並不是最正確的作法,因為在 passive mode,只能等待一個 socket 的資料,對於必須等待多個 socket 資料的 server 來說,這是行不通的。
我們可以在開啟 Socket 時,使用 {active, once} ,這時候,Socket 只會對一個訊息主動。在控制行程收到一個訊息後,必須主動呼叫 inet:setopts 才能再次讓下一個訊息被接收,在此之前,系統會阻塞訊息。
使用 {active,once} ,使用者可實現 traffic shaping ,且避免 server 被過度積極湧入的訊息淹沒。
{ok, Listen} = gen_tcp:listen(Port, [..., {active, once}, ...]),
{ok, Socket} = gen_tcp:accept(Listen),
loop(Socket).
loop(Socket) ->
receive
{tcp, Socket, Data} ->
... do something with the data
%% when you are ready to receive next message
inet:setopts(Socket, [{active, once}]),
loop(Socket);
{tcp_closed, Socket} ->
...
end.
連線來源
要知道 client 端的資訊,可以呼叫 inet:peername(Socket)
@spec inet:peername(Socket) -> {ok, {IP_Address, Port} | {error, Why}}
IP_Address 中
{N1, N2, N3, N4} 代表 IPv4
{K1, K2, K3, K4, K5, K6, K7, K8} 代表 IPv6
所有 Ni 與 Ki 都是介於 0 ~ 255 的整數
Socket 的錯誤處理
因為每一個 Socket 都有控制行程,當控制行程死亡,Socket 就會自動關閉。
error_test() ->
%% 產生 server process
spawn(fun() -> error_test_server() end),
%% 先暫停兩秒,讓 server process 啟動
lib_misc:sleep(2000),
%% client 連線到 server
{ok,Socket} = gen_tcp:connect("localhost",4321,[binary, {packet, 2}]),
io:format("connected to:~p~n",[Socket]),
%% 發送訊息 123
gen_tcp:send(Socket, <<"123">>),
receive
%% 接收所有的 response 訊息
Any ->
io:format("Any=~p~n",[Any])
end.
error_test_server() ->
{ok, Listen} = gen_tcp:listen(4321, [binary,{packet,2}]),
{ok, Socket} = gen_tcp:accept(Listen),
error_test_server_loop(Socket).
error_test_server_loop(Socket) ->
receive
{tcp, Socket, Data} ->
io:format("received:~p~n",[Data]),
%% <<"123">> 會讓這一行當掉,server 的控制行程會 crash
%% 因此會讓客戶端收到 {tcp_closed, Socket} 訊息
atom_to_list(Data),
error_test_server_loop(Socket)
end.
測試
2> socket_examples:error_test().
connected to:#Port<0.594>
received:<<"123">>
Any={tcp_closed,#Port<0.594>}
ok
3>
=ERROR REPORT==== 11-Feb-2014::15:50:16 ===
Error in process <0.35.0> with exit value: {badarg,[{erlang,atom_to_list,[<<3 bytes>>],[]},{socket_examples,error_test_server_loop,1,[{file,"d:/projectcase/erlang/erlangotp/src/socket_examples.erl"},{line,117}]}]}
UDP server and client
UDP datagram 是不可靠的,順序可能被調換、可能會遺失、也可能會重複,且是 connectionless 的,客戶端不需要建立連線,就可以送訊息。
UDP 相當適合「大量客戶端,傳送小訊息到server」的應用情境。
UDP 比 TCP 簡單,因為 server 不需要管理與維護連線。
UDP server 一般的形式如下,server不會收到 socket 關閉的訊息:
server(Port) ->
{ok, Socket} = gen_udp:open(Port, [binary]),
loop(Socket).
loop(Socket) ->
receive
{udp, Socket, Host, Port, Bin} ->
BinReply = ...,
gen_udp:send(Socket, Host, Port, BinReply),
loop(Socket)
end.
client 必須要有 after timeout 機制,因為可能永遠等不到回應的訊息。
client(Request) ->
{ok, Socket} = gen_udp:open(0, [binary]),
ok = gen_udp:send(Socket, "localhost", 4000, Request),
Value = receive
{udp, Socket, _, _, Bin} ->
{ok, Bin}
after 2000 ->
error
end,
gen_udp:close(Socket),
Value.
UDP 階乘 server
範例程式
-module(udp_test).
-export([start_server/0, client/1]).
start_server() ->
%% 產生 server process
spawn(fun() -> server(4000) end).
%% The server
server(Port) ->
%% 開啟 udp server,接收 binary 資料
{ok, Socket} = gen_udp:open(Port, [binary]),
io:format("server opened socket:~p~n",[Socket]),
loop(Socket).
loop(Socket) ->
receive
{udp, Socket, Host, Port, Bin} = Msg ->
io:format("server received:~p~n",[Msg]),
%% 將 binary 轉換為 erlang term
N = binary_to_term(Bin),
%% 運算階乘
Fac = fac(N),
%% 把結果回傳給 client
gen_udp:send(Socket, Host, Port, term_to_binary(Fac)),
%% 處理下一個訊息
loop(Socket)
end.
fac(0) -> 1;
fac(N) -> N * fac(N-1).
%% The client
client(N) ->
{ok, Socket} = gen_udp:open(0, [binary]),
io:format("client opened socket=~p~n",[Socket]),
%% 以 term_to_binary 將 term 轉換為 binary
ok = gen_udp:send(Socket, "localhost", 4000,
term_to_binary(N)),
Value = receive
{udp, Socket, _, _, Bin} = Msg ->
io:format("client received:~p~n",[Msg]),
binary_to_term(Bin)
after 2000 ->
0
end,
gen_udp:close(Socket),
Value.
測試,server 的部份
1> udp_test:start_server().
server opened socket:#Port<0.516>
<0.33.0>
2> server received:{udp,#Port<0.516>,{127,0,0,1},54201,<<131,97,40>>}
2> server received:{udp,#Port<0.516>,{127,0,0,1},54202,<<131,97,20>>}
2> server received:{udp,#Port<0.516>,{127,0,0,1},60708,<<131,97,10>>}
client 的部份
1> udp_test:client(40).
client opened socket=#Port<0.516>
client received:{udp,#Port<0.516>,
{127,0,0,1},
4000,
<<131,110,20,0,0,0,0,0,64,37,5,255,100,222,15,8,126,242,
199,132,27,232,234,142>>}
815915283247897734345611269596115894272000000000
2> udp_test:client(20).
client opened socket=#Port<0.527>
client received:{udp,#Port<0.527>,
{127,0,0,1},
4000,
<<131,110,8,0,0,0,180,130,124,103,195,33>>}
2432902008176640000
3> udp_test:client(10).
client opened socket=#Port<0.528>
client received:{udp,#Port<0.528>,{127,0,0,1},4000,<<131,98,0,55,95,0>>}
3628800
補充說明
要注意,因為 UDP 是 connectionless 的協定,server無法藉由拒絕讀取資料而阻塞客戶端。
大型的 UDP packet 可能會被切成片段,以利在網路上傳輸,切割會發生在 router 所接受的 MTU(maximum transfer unit) 比 UDP packet 還小的時候 。一般會建議在 UDP 裡,一開始先使用較小的封包,然後慢慢增加資料量,並量測 throughput,如果 throughput 突然下降,就表示封包太大了。
UDP packet 有可能會被傳送兩次,所以在寫 RPC code 時要小心,不然可能就會執行兩次,傳回兩次。要避免發生這個問題時,可以加上 make_ref。
client(Request) ->
{ok, Socket} -> gen_udp:open(0, [binary]),
%% 產生唯一的識別參考
Ref = make_ref(),
B1 = term_to_binary(Ref, Request),
ok = gen_udp:send(Socket, "localhost", 4000, B1),
wait_for_ref(Socket, Ref).
wait_for_ref(Socket, Ref) ->
receive
{udp, Socket, _, _, Bin} ->
case binary_to_term(Bin) of
{Ref, Val} ->
Val;
{_SomeoOtherRef, _} ->
wait_for_ref(Socket, Ref)
end;
after 1000 ->
...
end.
廣播到多台機器
我們需要兩個 ports,一個用來送出廣播,一個用來傾聽廣播。
-module(broadcast).
-compile(export_all).
% 將IoList廣播到LAN的所有機器
% 負責廣播的 process 會開啟 port 5010
send(IoList) ->
case inet:ifget("eth0", [broadaddr]) of
{ok, [{broadaddr, Ip}]} ->
{ok, S} = gen_udp:open(5010, [{broadcast, true}]),
gen_udp:send(S, Ip, 6000, IoList),
gen_udp:close(S);
_ ->
io:format("Bad interface name, or\n"
"broadcasting not supported\n")
end.
% 因為 windows 的 network interface 並沒有辦法直接取得 broadaddr
% 就直接把 broadcast address 寫在程式裡面
sendwindows(IoList) ->
{ok, S} = gen_udp:open(5010, [{broadcast, true}]),
gen_udp:send(S, "192.168.1.255", 6000, IoList),
gen_udp:close(S).
% 負責接收廣播的 process 會開啟 port 6000,並等待接收訊息
listen() ->
{ok, _} = gen_udp:open(6000),
loop().
loop() ->
receive
Any ->
io:format("received:~p~n", [Any]),
loop()
end.
測試:server的部份
1> broadcast:listen().
received:{udp,#Port<0.516>,{192,168,1,57},5010,"{test"}
client
1> broadcast:sendwindows([123, "test"]).
ok
參考
Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World
沒有留言:
張貼留言