erlang 的互動模型
「共時」有兩種不同的作法,分別是「共享狀態共時」,與「訊息傳遞共時」。大多數的主流語言(Java、C#、C++)採用「共享狀態共時」的設計,也就是所有的process/thread共用同一塊記憶體,並藉由此共享記憶體,互相傳遞資料。共享記憶體就表示,因為 process/thread 可能會同時修改某個記憶體,為了避免發生資料同步的錯誤,我們需要用 key/lock 的方式,先上鎖,修改資料,再釋放鎖
Erlang 採用「訊息傳遞共時」,就像是人跟人之間的溝通方式,我們的大腦沒有連接在一起,我無法直接改變你的記憶體,我必須透過說話或是動作,傳遞訊息給你,然後你會接收到,並改變自己的記憶,我也要透過詢問的方式,才能知道你是不是有收到訊息。erlang process使用自己的記憶體,process 之間不具有共享記憶體,只能透過訊息的傳遞,互相溝通。
Mary: Hi, Joe,我的電話是 12345678
Mary: 你有聽到我的訊息嗎?
Joe: 是的,你的電話是 12345678
人與人之間互動的過程,就是傳送訊息 -> 詢問 -> 觀察回應。
erlang 本身是由數十、數千、數萬個小行程組合而成的,每一個行程都是獨立的,彼此透過訊息溝通,就像是在一個大房間裡面,有人負責管理與監控,有人負責執行分配的工作,工作完成就告訴負責人,下達命令,直接大喊就可以了。
如果這些人當中,有幾個人因為某個原因不正常地死掉了,它會在死前告訴大家原因,另外就由負責分配工作的人,決定要怎麼處理這個問題。
- erlang 程式是由許多 process 組成的,這些 process 可以互相傳遞訊息
- 這些訊息可能被接收或了解,也可能不會,如果想知道某個訊息是不是有被處理,就需要發送另一個詢問的訊息,並等待回覆
- 兩個 process 可以被連結在一起,如果其中一個死亡,另一個會接收到訊息,被告知對方是因為什麼原因死掉的
共時編程 concurrency programming
erlang 的 process 屬於程式語言,並不是指作業系統的 process。
- process 建立與銷毀,相當快速
- process 收送訊息,相當快速
- 不管在linux 或 windows,process 的行為不變
- 可以同時產生很多 process
- process各自獨立,不共享記憶體
- process只能透過訊息互相溝通
三個利器:spawn, send, receive
語法
- Pid = spawn(Fun)
產生一個 process,負責計算 Fun,spawn 會回傳一個 Pid(process identifier, Pid),可透過此 Pid 對此行程發送訊息。 - Pid ! Message
送出訊息給 Pid,送出的訊息是非同步的(asynchronous),發送端不需要等待回應,而是繼續做自己的事。
Pid1!Pid2!...!M 會送出 M 給 Pid1,Pid2,... - receive ... end
接收訊息,語法為receive Pattern1 [when Guard1] -> Expression1; Pattern2 [when Guard2] -> Expression2; ... end
計算面積的範例
產生 一個計算面積的process,它會一直等待計算條件,計算後,將結果列印到畫面上,最後在 shell process 將計算條件以訊息方式發送給該 process。
%% area_server0.erl
-module(area_server0).
-export([loop/0]).
loop() ->
receive
{rectangle, Width, Ht} ->
io:format("Area of rectangle is ~p~n",[Width * Ht]),
loop();
{circle, R} ->
io:format("Area of circle is ~p~n", [3.14159 * R * R]),
loop();
Other ->
io:format("I don't know what the area of a ~p is ~n",[Other]),
loop()
end.
測試
1> Pid = spawn(fun area_server0:loop/0).
<0.33.0>
2> Pid!{circle, 10}.
Area of circle is 314.159
{circle,10}
3> Pid!{rectangle, 10, 20}.
Area of rectangle is 200
{rectangle,10,20}
4> Pid!{rectangle, 10, 20.4}.
Area of rectangle is 204.0
{rectangle,10,20.4}
這段程式在 erlide 測試會有問題,一直看不到結果,我猜想原因應該跟 No error message in Eclipse consloe/erang shell 一樣,因為 erlide 並不是直接跟 shell 互動,而是透過一層遠端呼叫,再呼叫 shell,這個問題到現在還沒解決。
erlide 是個好的開發 IDE,但以上面的問題來看,程式測試可能還是要回到 console 自己用 erl 測試,才會得到比較確切的回應結果。
計算面積的範例 v2
上面的範例,訊息送給 server process 之後,就直接列印結果到畫面上,接下來,嘗試把計算後的結果,再回傳給發送計算要求訊息的 process。
rpc(Pid, Request) 負責將訊息送給 Pid, Pid ! {self(), Request},同時告訴 server process 發送端的 Pid:self(),讓 server process 可以將結果回傳給發送端。
%% area_server1.erl
-module(area_server1).
-export([loop/0, rpc/2]).
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
Response ->
Response
end.
loop() ->
receive
{From, {rectangle, Width, Ht}} ->
From ! Width * Ht,
loop();
{From, {circle, R}} ->
From ! 3.14159 * R * R,
loop();
{From, Other} ->
From ! {error,Other},
loop()
end.
測試
7> Pid=spawn(fun area_server1:loop/0).
<0.41.0>
8> area_server1:rpc(Pid, {rectangle, 10, 20}).
200
計算面積的範例 v3
這裡的問題是,rpc 裡面並不是等待 server process 的回應,而是等待所有訊息,並直接列印出來。接下來進一步,修改 rpc 等待的訊息內容,以 server process 的 Pid 作為識別的標籤 {Pid, Response}。
%% area_server2.erl
-module(area_server2).
-export([loop/0, rpc/2]).
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
loop() ->
receive
{From, {rectangle, Width, Ht}} ->
From ! {self(), Width * Ht},
loop();
{From, {circle, R}} ->
From ! {self(), 3.14159 * R * R},
loop();
{From, Other} ->
From ! {self(), {error,Other}},
loop()
end.
計算面積的範例 v4
最後,將 spawn 與 rpc 呼叫,再包裝一層 function,讓client呼叫跟運算的實作,隱藏在 module:area_server_final 裡面,好處是其他程式碼在使用 area_server_final 時,啟動服務 start 跟使用服務 area 的界面是一致的、不會改變的,實作 area_server_final 則可以依照需求跟狀況而改變,
%% area_server_final.erl
-module(area_server_final).
-export([start/0, area/2]).
start() -> spawn(fun loop/0).
area(Pid, What) ->
rpc(Pid, What).
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
loop() ->
receive
{From, {rectangle, Width, Ht}} ->
From ! {self(), Width * Ht},
loop();
{From, {circle, R}} ->
From ! {self(), 3.14159 * R * R},
loop();
{From, Other} ->
From ! {self(), {error,Other}},
loop()
end.
注意
跟 Java 的概念不同,Java 通常會將 client 與 server 的實作分在兩個 Java class,甚至是區分不同的 package,Java 的 client 與 server 要在不同的 JVM 運作,不同的執行機器,程式碼也需要做相應的切割,因為 client 與 server 都不需要對方的程式碼細節。
erlang 的優勢是,不管 client 與 server 程式是在單機執行,或是在兩台機器上執行,不同機器的 erlang vm 都是相通的,程式不需要太多修改就可以由遠端直接 deploy 程式並運作的,在這樣的狀況下,切割程式變得有些多餘了,另外因為 erlang 的程式碼比較精簡,因此 client 與 server 通常放在同一個 module 裡面。
建立 process 需要花多久時間
%% processes.erl
-module(processes).
-export([max/1]).
%% max(N)
%% Create N processes then destroy them
%% See how much time this takes
max(N) ->
% 向 erlang vm 查詢系統最大的 process 數量
Max = erlang:system_info(process_limit),
io:format("Maximum allowed processes:~p~n",[Max]),
statistics(runtime),
statistics(wall_clock),
% 以匿名函數產生 process,內容為呼叫wait(),一直在等待收到 die 的訊息
% 這一行也可以寫成
% L = for(1, N, fun() -> spawn(fun wait/0) end),
% 這樣就變成直接呼叫 wait
L = for(1, N, fun() -> spawn(fun() -> wait() end) end),
{_, Time1} = statistics(runtime),
{_, Time2} = statistics(wall_clock),
% 發送自殺訊息給把剛剛產生的所有 process停掉
lists:foreach(fun(Pid) -> Pid ! die end, L),
% 計算每一個 process 的平均時間
U1 = Time1 * 1000 / N,
U2 = Time2 * 1000 / N,
io:format("Process spawn time=~p (~p) microseconds~n",
[U1, U2]).
wait() ->
receive
die -> void
end.
for(N, N, F) -> [F()];
for(I, N, F) -> [F()|for(I+1, N, F)].
測試
1> processes:max(10000).
Maximum allowed processes:262144
Process spawn time=1.6 (4.7) microseconds
ok
2> processes:max(200000).
Maximum allowed processes:262144
Process spawn time=3.51 (3.585) microseconds
ok
如果超過了 process 數量的上限,就會出現 error report
3> processes:max(300000).
Eshell V5.10.4 (abort with ^G)
*** ERROR: Shell process terminated! ***
=ERROR REPORT==== 21-Jan-2014::15:12:51 ===
Too many processes
根據 erlang Profiling 跟 wall_clock, runtime - what is the difference? 的說明,統計消耗的時間時,最好同時看消耗 CPU 的時間(runtime) 與 系統時間 (wall_clock),如果只看 runtime,可能會忽略了 OS kernel、memory swapping與disk I/O 的時間。
有時限的接收
有時候可能會等很久,卻等不到訊息,可能有很多原因,為 receive 加上 after Time 的 timeout,可以設定等待訊息最長可以等多久。
receive
Pattern1 [when Guard1] ->
Expression1;
Pattern2 [when Guard2] ->
Expression2;
...
after Time ->
ExpressionTimeout
end
選擇式接收
訊息的收送,並不是直接送給 process,因為有可能同時會接收到很多個訊息,每個 process 內部都有一個 mailbox,功能是用來存放送給此 process 的訊息。
發送訊息給 process,事實上是發送到該 process 的 mailbox 中,只有在process執行 receive 時,才會去檢查 mailbox。
receive 的處理程序如下
- 如果有 after 區塊,進入 receive 會啟動一個計時器
- 取得 mailbox 的第一個訊息,並與 Pattern1、Pattern2... 比對,比對成功,該訊息就會從 mailbox 移除,並估算後面的 Expression
- 如果沒有 Pattern 吻合,mailbox 中的第一個訊息會被移除,並放入 save queue,接下來比對下一個訊息,直到有吻合的訊息,或是所有訊息都比對失敗
- 如果沒有訊息吻合,此行程會被暫停且重新排程,下此有訊息被放入 mailbox 會再執行一次。當新訊息到達,save queue 裡面的訊息不會重新被比對。
- 一旦有訊息吻合,就會重新把 save queue 裡面的訊息放入 mailbox,順序不變。如果有設定計時器,就將它清空。
- 如果等待訊息,計時器的時間到了,就會估算 ExpressionTimeout,並將 save queue 裡面的訊息放回 mailbox,順序不變。
Time 設定為 0
傳送給 erlang process 的訊息會放在 mailbox 裡面等待該 process 處理。
將 Time 設定為 0 可以用來檢查 mailbox 是否有訊息,如果有訊息,就執行 flush_buffer(),將 mailbox 清空,如果沒有就直接完成接收訊息的程序。
flush_buffer() ->
receive
_Any ->
flush_buffer()
after 0 ->
true
end.
將 Time 設定為 0 也可以用來實現「優先權接收」,如果 mailbox 裡面沒有訊息符合 {alarm, X},就會接收第一個訊息,如果完全沒有訊息,就會暫停最裡面的 receive,並傳回後來收到的第一個訊息。
注意:只有在 mailbox 裡面所有的訊息都比對過 {alarm, X} 之後,after 區塊才會被執行。
注意2:使用較大的 mailbox,再加上 priority_receive,這會造成程式效率變差,所以如果要這樣寫,要先確定 mailbox 不能太大。
priority_receive() ->
receive
{alarm, X} ->
{alarm, X}
after 0 ->
receive
Any ->
Any
end
end.
Time 設定為 infinity
如果 Time 設定為 infinity,則時限永遠不會被觸發,這會讓 receive 無限地等待下去。
實現一個計時器
此計時器會在 Time ms 時間之後,計算 Fun,如果時間還沒到收到 cancel時,就會中止此計時器。
-module(stimer).
-export([start/2, cancel/1]).
start(Time, Fun) -> spawn(fun() -> timer(Time, Fun) end).
cancel(Pid) -> Pid ! cancel.
timer(Time, Fun) ->
receive
cancel ->
void
after Time ->
Fun()
end.
測試
1> Pid = stimer:start(5000, fun() -> io:format("timer event~n") end ).
<0.35.0>
2> stimer:cancel(Pid).
cancel
3> f(Pid).
ok
4> Pid = stimer:start(5000, fun() -> io:format("timer event~n") end ).
<0.42.0>
5> timer event
註冊行程
如果要把訊息送給某個 process,我們需要知道並紀錄 PID,這很不方便,因為要把 PID 傳送給所有想跟該行程溝通的 processes。但相反地,這又是個安全機制,因為沒有告訴其他行程 PID,所以其他行程無法跟該行程互動。
erlang 可公開一個行程的 PID,將行程變成 registered process,有四個 BIF 可管理 registered process。
- register(anAtom, Pid)
註冊 Pid 的名稱為 AnAtom,如果 AnAtom 已經被使用過,則會註冊失敗。 - unregister(anAtom)
取消註冊。行程死亡會自動取消註冊。 - whereis(anAtom) -> Pid | undefined
判斷 AnAtom 是否已經註冊,回傳其 Pid,沒有註冊就回傳 undefined - registered() -> [anAtom::atom()]
傳回系統所有已經註冊的行程清單
註冊 anAtom 後的行程,可以直接用 anAtom!Message 傳送訊息給它
範例
以下為時鐘範例,每 Time ms 後,會估算一次 Fun,持續執行,直到行程收到 stop 訊息。
%% clock.erl
-module(clock).
-export([start/2, stop/0]).
start(Time, Fun) ->
register(clock, spawn(fun() -> tick(Time, Fun) end)).
stop() ->
clock!stop.
tick(Time, Fun) ->
receive
stop ->
void
after Time ->
Fun(),
tick(Time, Fun)
end.
測試
3> clock:start(5000, fun() -> io:format("TICK ~p~n", [erlang:now()]) end).
true
4> TICK {1390,359662,831700}
4> TICK {1390,359667,839700}
4> TICK {1390,359672,846700}
4> TICK {1390,359677,854700}
4> TICK {1390,359682,862700}
4> TICK {1390,359687,869700}
4> TICK {1390,359692,877700}
4> clock:stop().
stop
如何寫出共時程式
以下為一個共時程式的 template,可以從此小程式慢慢寫出自己需要的功能。
% ctemplate.erl
-module(ctemplate).
-compile(export_all).
start() ->
spawn(fun() -> loop([]) end).
rpc(Pid, Request) ->
Pid ! {self(), Request},
receive
{Pid, Response} ->
Response
end.
loop(X) ->
receive
{From, Message} ->
io:format("Received: ~p~n", [Message]),
From ! {self(), ok},
loop(X)
end.
process
產生 process
有兩種可產生 process,第二種要指定 Module, Function 與 Args,簡稱為 MFA。只有第二種方式,才會嘗試採用該模組最新版的程式,如果要有動態更新程式的功能,必須使用 MFA spawn。
Pid = spawn( fun() -> do_something() end )
Pid = spawn( Module, Function, ListOfArgs )
可用以下方式在別的 erlang 機器 node 產生 process
Pid = spawn(Node, Module, Function, ListOfArgs)
此外,還有一個名為 spawn_opt(...) 的版本,可以接受一個選項列表 spawn_option。
Pid = spawn_opt(fun() -> do_something() end, [link])
還有一個 spawn_link,用此方式產生的 process,如果意外中止時,parent process 會一起中止,而用 spawn 產生的 process 中止時,parent process 不受影響。
Pid = spawn_link(Node, Module, Function, Args)
監視 process
如果不要讓 process 以 link 方式連結在一起,可以使用 monitor,這可讓一個 process 在不影響目標 process 的情況下進行監視。
當 Pid 行程如果 exit,監視的 processes 會收到一個含有唯一引用 Ref 的訊息。
Ref = monitor(process , Pid)
拋出異常中止行程
除非 Reason 被捕獲,否則此呼叫,會將行程中止,並將異常 Reason ,發送給所有跟此行程連結的行程。
exit(Reason)
直接向行程發送退出信號
此行程會向 Pid 行程發送中止信號,如果 Reason 是 kill,則無法被接收方行程捕獲。
exit(Pid, Reason)
設置 trap_exit
預設狀況下,一旦接收到 exit 退出信號,行程就會退出,為了避免被中止,可以設置 trap_exit 捕獲退出信號,除了 kill 無法被捕獲外,其他退出信號都會轉成無害的訊息。
process_flag(trap_exit, true)
參考
Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World
沒有留言:
張貼留言