2014年3月17日

erlang concurrency

erlang 的互動模型

「共時」有兩種不同的作法,分別是「共享狀態共時」,與「訊息傳遞共時」。大多數的主流語言(Java、C#、C++)採用「共享狀態共時」的設計,也就是所有的process/thread共用同一塊記憶體,並藉由此共享記憶體,互相傳遞資料。共享記憶體就表示,因為 process/thread 可能會同時修改某個記憶體,為了避免發生資料同步的錯誤,我們需要用 key/lock 的方式,先上鎖,修改資料,再釋放鎖

Erlang 採用「訊息傳遞共時」,就像是人跟人之間的溝通方式,我們的大腦沒有連接在一起,我無法直接改變你的記憶體,我必須透過說話或是動作,傳遞訊息給你,然後你會接收到,並改變自己的記憶,我也要透過詢問的方式,才能知道你是不是有收到訊息。erlang process使用自己的記憶體,process 之間不具有共享記憶體,只能透過訊息的傳遞,互相溝通。

Mary: Hi, Joe,我的電話是 12345678
Mary: 你有聽到我的訊息嗎?
Joe: 是的,你的電話是 12345678

人與人之間互動的過程,就是傳送訊息 -> 詢問 -> 觀察回應。

erlang 本身是由數十、數千、數萬個小行程組合而成的,每一個行程都是獨立的,彼此透過訊息溝通,就像是在一個大房間裡面,有人負責管理與監控,有人負責執行分配的工作,工作完成就告訴負責人,下達命令,直接大喊就可以了。

如果這些人當中,有幾個人因為某個原因不正常地死掉了,它會在死前告訴大家原因,另外就由負責分配工作的人,決定要怎麼處理這個問題。

  1. erlang 程式是由許多 process 組成的,這些 process 可以互相傳遞訊息
  2. 這些訊息可能被接收或了解,也可能不會,如果想知道某個訊息是不是有被處理,就需要發送另一個詢問的訊息,並等待回覆
  3. 兩個 process 可以被連結在一起,如果其中一個死亡,另一個會接收到訊息,被告知對方是因為什麼原因死掉的

共時編程 concurrency programming

erlang 的 process 屬於程式語言,並不是指作業系統的 process。

  1. process 建立與銷毀,相當快速
  2. process 收送訊息,相當快速
  3. 不管在linux 或 windows,process 的行為不變
  4. 可以同時產生很多 process
  5. process各自獨立,不共享記憶體
  6. process只能透過訊息互相溝通

三個利器:spawn, send, receive

語法

  1. Pid = spawn(Fun)
    產生一個 process,負責計算 Fun,spawn 會回傳一個 Pid(process identifier, Pid),可透過此 Pid 對此行程發送訊息。
  2. Pid ! Message
    送出訊息給 Pid,送出的訊息是非同步的(asynchronous),發送端不需要等待回應,而是繼續做自己的事。
    Pid1!Pid2!...!M 會送出 M 給 Pid1,Pid2,...
  3. receive ... end
    接收訊息,語法為
     receive
         Pattern1 [when Guard1] ->
             Expression1;
         Pattern2 [when Guard2] ->
             Expression2;
         ...
     end

計算面積的範例

產生 一個計算面積的process,它會一直等待計算條件,計算後,將結果列印到畫面上,最後在 shell process 將計算條件以訊息方式發送給該 process。

%% area_server0.erl
-module(area_server0).  
-export([loop/0]). 

loop() ->
    receive
    {rectangle, Width, Ht} -> 
        io:format("Area of rectangle is ~p~n",[Width * Ht]),
        loop();
    {circle, R} -> 
        io:format("Area of circle is ~p~n", [3.14159 * R * R]),
        loop();
    Other ->
        io:format("I don't know what the area of a ~p is ~n",[Other]),
        loop()
    end.

測試

1> Pid = spawn(fun area_server0:loop/0).
<0.33.0>
2> Pid!{circle, 10}.
Area of circle is 314.159
{circle,10}
3> Pid!{rectangle, 10, 20}.
Area of rectangle is 200
{rectangle,10,20}
4> Pid!{rectangle, 10, 20.4}.
Area of rectangle is 204.0
{rectangle,10,20.4}

這段程式在 erlide 測試會有問題,一直看不到結果,我猜想原因應該跟 No error message in Eclipse consloe/erang shell 一樣,因為 erlide 並不是直接跟 shell 互動,而是透過一層遠端呼叫,再呼叫 shell,這個問題到現在還沒解決。

erlide 是個好的開發 IDE,但以上面的問題來看,程式測試可能還是要回到 console 自己用 erl 測試,才會得到比較確切的回應結果。

計算面積的範例 v2

上面的範例,訊息送給 server process 之後,就直接列印結果到畫面上,接下來,嘗試把計算後的結果,再回傳給發送計算要求訊息的 process。

rpc(Pid, Request) 負責將訊息送給 Pid, Pid ! {self(), Request},同時告訴 server process 發送端的 Pid:self(),讓 server process 可以將結果回傳給發送端。

%% area_server1.erl
-module(area_server1).  
-export([loop/0, rpc/2]). 


rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
    Response ->
        Response
    end.

loop() ->
    receive
    {From, {rectangle, Width, Ht}} -> 
        From ! Width * Ht,
        loop();
    {From, {circle, R}} -> 
        From !  3.14159 * R * R,
        loop();
    {From, Other} ->
        From ! {error,Other},
        loop()
    end.

測試

7> Pid=spawn(fun area_server1:loop/0).
<0.41.0>
8> area_server1:rpc(Pid, {rectangle, 10, 20}).
200

計算面積的範例 v3

這裡的問題是,rpc 裡面並不是等待 server process 的回應,而是等待所有訊息,並直接列印出來。接下來進一步,修改 rpc 等待的訊息內容,以 server process 的 Pid 作為識別的標籤 {Pid, Response}。

%% area_server2.erl
-module(area_server2).  
-export([loop/0, rpc/2]). 

rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
    {Pid, Response} ->
        Response
    end.

loop() ->
    receive
    {From, {rectangle, Width, Ht}} -> 
        From ! {self(), Width * Ht},
        loop();
    {From, {circle, R}} -> 
        From !  {self(), 3.14159 * R * R},
        loop();
    {From, Other} ->
        From ! {self(), {error,Other}},
        loop()
    end.

計算面積的範例 v4

最後,將 spawn 與 rpc 呼叫,再包裝一層 function,讓client呼叫跟運算的實作,隱藏在 module:area_server_final 裡面,好處是其他程式碼在使用 area_server_final 時,啟動服務 start 跟使用服務 area 的界面是一致的、不會改變的,實作 area_server_final 則可以依照需求跟狀況而改變,

%% area_server_final.erl
-module(area_server_final).  
-export([start/0, area/2]). 

start() -> spawn(fun loop/0).

area(Pid, What) ->
    rpc(Pid, What).

rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
    {Pid, Response} ->
        Response
    end.

loop() ->
    receive
    {From, {rectangle, Width, Ht}} -> 
        From ! {self(), Width * Ht},
        loop();
    {From, {circle, R}} -> 
        From !  {self(), 3.14159 * R * R},
        loop();
    {From, Other} ->
        From ! {self(), {error,Other}},
        loop()
    end.

注意

跟 Java 的概念不同,Java 通常會將 client 與 server 的實作分在兩個 Java class,甚至是區分不同的 package,Java 的 client 與 server 要在不同的 JVM 運作,不同的執行機器,程式碼也需要做相應的切割,因為 client 與 server 都不需要對方的程式碼細節。

erlang 的優勢是,不管 client 與 server 程式是在單機執行,或是在兩台機器上執行,不同機器的 erlang vm 都是相通的,程式不需要太多修改就可以由遠端直接 deploy 程式並運作的,在這樣的狀況下,切割程式變得有些多餘了,另外因為 erlang 的程式碼比較精簡,因此 client 與 server 通常放在同一個 module 裡面。

建立 process 需要花多久時間

%% processes.erl
-module(processes).
-export([max/1]).

%% max(N) 
%%   Create N processes then destroy them
%%   See how much time this takes

max(N) ->
    % 向 erlang vm 查詢系統最大的 process 數量
    Max = erlang:system_info(process_limit),
    io:format("Maximum allowed processes:~p~n",[Max]),

    statistics(runtime),
    statistics(wall_clock),

    % 以匿名函數產生 process,內容為呼叫wait(),一直在等待收到 die 的訊息
    % 這一行也可以寫成
    % L = for(1, N, fun() -> spawn(fun wait/0) end),
    % 這樣就變成直接呼叫 wait
    L = for(1, N, fun() -> spawn(fun() -> wait() end) end),
    {_, Time1} = statistics(runtime),
    {_, Time2} = statistics(wall_clock),

    % 發送自殺訊息給把剛剛產生的所有 process停掉
    lists:foreach(fun(Pid) -> Pid ! die end, L),

    % 計算每一個 process 的平均時間
    U1 = Time1 * 1000 / N,
    U2 = Time2 * 1000 / N,
    io:format("Process spawn time=~p (~p) microseconds~n",
          [U1, U2]).

wait() ->
    receive
    die -> void
    end.

for(N, N, F) -> [F()];
for(I, N, F) -> [F()|for(I+1, N, F)].

測試

1> processes:max(10000).
Maximum allowed processes:262144
Process spawn time=1.6 (4.7) microseconds
ok
2> processes:max(200000).
Maximum allowed processes:262144
Process spawn time=3.51 (3.585) microseconds
ok

如果超過了 process 數量的上限,就會出現 error report

3> processes:max(300000).
Eshell V5.10.4  (abort with ^G)
*** ERROR: Shell process terminated! ***

=ERROR REPORT==== 21-Jan-2014::15:12:51 ===
Too many processes

根據 erlang Profilingwall_clock, runtime - what is the difference? 的說明,統計消耗的時間時,最好同時看消耗 CPU 的時間(runtime) 與 系統時間 (wall_clock),如果只看 runtime,可能會忽略了 OS kernel、memory swapping與disk I/O 的時間。

有時限的接收

有時候可能會等很久,卻等不到訊息,可能有很多原因,為 receive 加上 after Time 的 timeout,可以設定等待訊息最長可以等多久。

receive
    Pattern1 [when Guard1] ->
        Expression1;
    Pattern2 [when Guard2] ->
        Expression2;
    ...
after Time ->
    ExpressionTimeout
end

選擇式接收

訊息的收送,並不是直接送給 process,因為有可能同時會接收到很多個訊息,每個 process 內部都有一個 mailbox,功能是用來存放送給此 process 的訊息。

發送訊息給 process,事實上是發送到該 process 的 mailbox 中,只有在process執行 receive 時,才會去檢查 mailbox。

receive 的處理程序如下

  1. 如果有 after 區塊,進入 receive 會啟動一個計時器
  2. 取得 mailbox 的第一個訊息,並與 Pattern1、Pattern2... 比對,比對成功,該訊息就會從 mailbox 移除,並估算後面的 Expression
  3. 如果沒有 Pattern 吻合,mailbox 中的第一個訊息會被移除,並放入 save queue,接下來比對下一個訊息,直到有吻合的訊息,或是所有訊息都比對失敗
  4. 如果沒有訊息吻合,此行程會被暫停且重新排程,下此有訊息被放入 mailbox 會再執行一次。當新訊息到達,save queue 裡面的訊息不會重新被比對。
  5. 一旦有訊息吻合,就會重新把 save queue 裡面的訊息放入 mailbox,順序不變。如果有設定計時器,就將它清空。
  6. 如果等待訊息,計時器的時間到了,就會估算 ExpressionTimeout,並將 save queue 裡面的訊息放回 mailbox,順序不變。

Time 設定為 0

傳送給 erlang process 的訊息會放在 mailbox 裡面等待該 process 處理。
將 Time 設定為 0 可以用來檢查 mailbox 是否有訊息,如果有訊息,就執行 flush_buffer(),將 mailbox 清空,如果沒有就直接完成接收訊息的程序。

flush_buffer() ->
    receive
        _Any ->
            flush_buffer()
    after 0 ->
        true
    end.

將 Time 設定為 0 也可以用來實現「優先權接收」,如果 mailbox 裡面沒有訊息符合 {alarm, X},就會接收第一個訊息,如果完全沒有訊息,就會暫停最裡面的 receive,並傳回後來收到的第一個訊息。

注意:只有在 mailbox 裡面所有的訊息都比對過 {alarm, X} 之後,after 區塊才會被執行。
注意2:使用較大的 mailbox,再加上 priority_receive,這會造成程式效率變差,所以如果要這樣寫,要先確定 mailbox 不能太大。

priority_receive() ->
    receive
        {alarm, X} ->
            {alarm, X}
    after 0 ->
            receive
                Any ->
                    Any
            end
    end.

Time 設定為 infinity

如果 Time 設定為 infinity,則時限永遠不會被觸發,這會讓 receive 無限地等待下去。

實現一個計時器

此計時器會在 Time ms 時間之後,計算 Fun,如果時間還沒到收到 cancel時,就會中止此計時器。

-module(stimer).
-export([start/2, cancel/1]).

start(Time, Fun) -> spawn(fun() -> timer(Time, Fun) end).

cancel(Pid) -> Pid ! cancel.

timer(Time, Fun) ->
    receive
        cancel ->
            void
    after Time ->
            Fun()
    end.

測試

1> Pid = stimer:start(5000, fun() -> io:format("timer event~n") end ).
<0.35.0>
2> stimer:cancel(Pid).
cancel
3> f(Pid).
ok
4> Pid = stimer:start(5000, fun() -> io:format("timer event~n") end ).
<0.42.0>
5> timer event

註冊行程

如果要把訊息送給某個 process,我們需要知道並紀錄 PID,這很不方便,因為要把 PID 傳送給所有想跟該行程溝通的 processes。但相反地,這又是個安全機制,因為沒有告訴其他行程 PID,所以其他行程無法跟該行程互動。

erlang 可公開一個行程的 PID,將行程變成 registered process,有四個 BIF 可管理 registered process。

  1. register(anAtom, Pid)
    註冊 Pid 的名稱為 AnAtom,如果 AnAtom 已經被使用過,則會註冊失敗。
  2. unregister(anAtom)
    取消註冊。行程死亡會自動取消註冊。
  3. whereis(anAtom) -> Pid | undefined
    判斷 AnAtom 是否已經註冊,回傳其 Pid,沒有註冊就回傳 undefined
  4. registered() -> [anAtom::atom()]
    傳回系統所有已經註冊的行程清單

註冊 anAtom 後的行程,可以直接用 anAtom!Message 傳送訊息給它

範例

以下為時鐘範例,每 Time ms 後,會估算一次 Fun,持續執行,直到行程收到 stop 訊息。

%% clock.erl
-module(clock).
-export([start/2, stop/0]).

start(Time, Fun) ->
    register(clock, spawn(fun() -> tick(Time, Fun) end)).

stop() ->
    clock!stop.

tick(Time, Fun) ->
    receive
        stop ->
            void
    after Time ->
        Fun(),
        tick(Time, Fun)
    end.

測試

3> clock:start(5000, fun() -> io:format("TICK ~p~n", [erlang:now()]) end).
true
4> TICK {1390,359662,831700}
4> TICK {1390,359667,839700}
4> TICK {1390,359672,846700}
4> TICK {1390,359677,854700}
4> TICK {1390,359682,862700}
4> TICK {1390,359687,869700}
4> TICK {1390,359692,877700}
4> clock:stop().
stop

如何寫出共時程式

以下為一個共時程式的 template,可以從此小程式慢慢寫出自己需要的功能。

% ctemplate.erl
-module(ctemplate).
-compile(export_all).

start() ->
    spawn(fun() -> loop([]) end).

rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->
            Response
    end.

loop(X) ->
    receive
        {From, Message} ->
            io:format("Received: ~p~n", [Message]),
            From ! {self(), ok},
            loop(X)
    end.

process

產生 process

有兩種可產生 process,第二種要指定 Module, Function 與 Args,簡稱為 MFA。只有第二種方式,才會嘗試採用該模組最新版的程式,如果要有動態更新程式的功能,必須使用 MFA spawn。

Pid = spawn( fun() -> do_something() end )
Pid = spawn( Module, Function, ListOfArgs )

可用以下方式在別的 erlang 機器 node 產生 process

Pid = spawn(Node, Module, Function, ListOfArgs)

此外,還有一個名為 spawn_opt(...) 的版本,可以接受一個選項列表 spawn_option

Pid = spawn_opt(fun() -> do_something() end, [link])

還有一個 spawn_link,用此方式產生的 process,如果意外中止時,parent process 會一起中止,而用 spawn 產生的 process 中止時,parent process 不受影響。

Pid = spawn_link(Node, Module, Function, Args)

監視 process

如果不要讓 process 以 link 方式連結在一起,可以使用 monitor,這可讓一個 process 在不影響目標 process 的情況下進行監視。

當 Pid 行程如果 exit,監視的 processes 會收到一個含有唯一引用 Ref 的訊息。

Ref = monitor(process , Pid)

拋出異常中止行程

除非 Reason 被捕獲,否則此呼叫,會將行程中止,並將異常 Reason ,發送給所有跟此行程連結的行程。

exit(Reason)

直接向行程發送退出信號

此行程會向 Pid 行程發送中止信號,如果 Reason 是 kill,則無法被接收方行程捕獲。

exit(Pid, Reason)

設置 trap_exit

預設狀況下,一旦接收到 exit 退出信號,行程就會退出,為了避免被中止,可以設置 trap_exit 捕獲退出信號,除了 kill 無法被捕獲外,其他退出信號都會轉成無害的訊息。

process_flag(trap_exit, true)

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World