2014年4月28日

erlang - gen_server

OTP 函式庫中包含了完整的 web server、FTP server、CORBA ORB,OTP 的 behavior 將常用的行為模式包裝起來,可想成 「用 callback 模組進行參數化」的應用框架。

行為解決了問題的「非功能」部份, callback 解決了功能的部份
問題中的非功能部份(ex: 如何動態更新程式碼)對所有應用來說都是一樣的。

以下會

  1. 以 erlang 寫一個小 client-server 程式
  2. 將程式一般化
  3. 進入真正的程式碼

發展通用伺服器的過程

以下程式會慢慢地將程式中,非功能(一般化)與功能的部份區分開來。

server1: 基本伺服器, 用 callback module 將它參數化

server1 是一個基本的 server, 可以用 callback module 將它參數化,換句話說,就是把 callback 的程式跟 server 的程式分開。

%% server1.erl
-module(server1).
-export([start/2, rpc/2]).

start(Name, Mod) ->
    register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        {Name, Response} -> Response
    end.

loop(Name, Mod, State) ->
    receive
        {From, Request} ->
            {Response, State1} = Mod:handle(Request, State),
            From ! {Name, Response},
            loop(Name, Mod, State1)
    end.

以下是 name_server 的 callback module,一般 OTP 程式的習慣寫法,就是把 client 跟 server 端程式,放在同一個 module 裡面。

-module(name_server).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server1, [rpc/2]).

%% client apis,這是讓 client 端呼叫的程式介面
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines,這是讓 server 呼叫的
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

測試

1> server1:start(name_server, name_server).
true
2> name_server:add(tom, "home").
ok
3> name_server:whereis(tom).
{ok,"home"}

注意:name_server callback 沒有共時性、沒有生成、接收、送出、register,這表示我們可以寫 client-server module,而不需要知道下面的共時模型是什麼,這是所有伺服器的基本模型。

server2: 具有 Transaction 的伺服器

下面是一個會讓客戶當機的 server,如果查詢導致 Server 例外,客戶端就會當機。

-module(server2).
-export([start/2, rpc/2]).

start(Name, Mod) ->
    register(Name, spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        %% client 收到 server 回傳的 crash,就跳出程式
        {Name, crash} -> exit(rpc);
        {Name, ok, Response} -> Response
    end.

loop(Name, Mod, OldState) ->
    receive
        {From, Request} ->
            try Mod:handle(Request, OldState) of
                {Response, NewState} ->
                    From ! {Name, ok, Response},
                    loop(Name, Mod, NewState)
            catch
                _:Why ->
                    log_the_error(Name, Request, Why),
                    %% 因為程式出錯了,發送 crash 訊息給 client
                    From ! {Name, crash},
                    %% 再次 loop 等待處理下一個訊息
                    loop(Name, Mod, OldState)
            end
    end.

log_the_error(Name, Request, Why) ->
    io:format("Server ~p request ~p ~n"
                  "caused exception ~p~n", 
                  [Name, Request, Why]).

如果例外發生在處理器函數內,它以 State 的原始值進行迴圈
,如果函數成功,它會以「處理器函數所提供的」NewState值進行迴圈,當處理器發生錯誤時,伺服器的狀態會維持不變。

name_server 要改成使用 server2 的話,只需要修改 -import 的地方

-module(name_server).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server2, [rpc/2]).

%% client routines
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

測試

1> server2:start(name_server, name_server).
true
2> name_server:add(tom, "home").
ok
3> name_server:whereis(tom).
{ok,"home"}
4> name_server:whereis(jane).
error

server3: 具有程式碼熱抽換功能的伺服器

如果送給伺服器一個程式碼熱抽換 swap_code 的訊息,就會改變 callback module,變成呼叫新模組

-module(server3).
-export([start/2, rpc/2, swap_code/2]).

start(Name, Mod) ->
    register(Name, 
             spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

swap_code(Name, Mod) -> rpc(Name, {swap_code, Mod}).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        {Name, Response} -> Response
    end.

loop(Name, Mod, OldState) ->
    receive
        {From, {swap_code, NewCallBackMod}} ->
            From ! {Name, ack},
            loop(Name, NewCallBackMod, OldState);
        {From, Request} ->
            {Response, NewState} = Mod:handle(Request, OldState),
            From ! {Name, Response},
            loop(Name, Mod, NewState)
    end.

因為 name_server 所註冊綁定的 process name 不能在程式運作過程中修改掉,但我們可以動態地把 name_server process 運作的 module 換掉,達到抽換程式碼的功能。

-module(name_server1).
-export([init/0, add/2, whereis/1, handle/2]).
-import(server3, [rpc/2]).

%% client routines
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

測試

1> server3:start(name_server, name_server1).
true
2> name_server1:add(tom, "home").
ok
3> name_server1:add(jane, "home").
ok

如果我們測試到一半,想要增加「找出 name_server 儲存的」所有名稱,由於目前 name_server1 並沒有這個函數,所以我們就撰寫一個新的 new_name_server,增加了 all_names 與 delete 函數。

-module(new_name_server).
-export([init/0, add/2, all_names/0, delete/1, whereis/1, handle/2]).
-import(server3, [rpc/2]).

%% interface
all_names()      -> rpc(name_server, allNames).
add(Name, Place) -> rpc(name_server, {add, Name, Place}).
delete(Name)     -> rpc(name_server, {delete, Name}).
whereis(Name)    -> rpc(name_server, {whereis, Name}).

%% callback routines
init() -> dict:new().

handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle(allNames, Dict)           -> {dict:fetch_keys(Dict), Dict};
handle({delete, Name}, Dict)     -> {ok, dict:erase(Name, Dict)};
handle({whereis, Name}, Dict)    -> {dict:find(Name, Dict), Dict}.

在 console 就可以直接用 swap_code 把 name_server 執行的 module 換成 new_name_server

4> server3:swap_code(name_server, new_name_server).
ack
5> new_name_server:all_names().
[jane,tom]

以往認為伺服器本身有記錄狀態,發送訊息就會改變狀態,如果要抽換程式邏輯,必須要把 server 停下來,改變程式,然後重新啟動,但實際上,可以透過訊息的方式,讓 server 的迴圈執行新版本的 server module。

Server 4: 具有 Transaction 與 程式碼熱抽換 合併

-module(server4).
-export([start/2, rpc/2, swap_code/2]).

start(Name, Mod) ->
    register(Name, spawn(fun() -> loop(Name,Mod,Mod:init()) end)).

swap_code(Name, Mod) -> rpc(Name, {swap_code, Mod}).

rpc(Name, Request) ->
    Name ! {self(), Request},
    receive
        {Name, crash} -> exit(rpc);
        {Name, ok, Response} -> Response
    end.

loop(Name, Mod, OldState) ->
    receive
        {From, {swap_code, NewCallbackMod}} ->
            From ! {Name, ok, ack},
            loop(Name, NewCallbackMod, OldState);
        {From, Request} ->
            try Mod:handle(Request, OldState) of
                {Response, NewState} ->
                    From ! {Name, ok, Response},
                    loop(Name, Mod, NewState)
            catch
                _: Why ->
                    log_the_error(Name, Request, Why),
                    From ! {Name, crash},
                    loop(Name, Mod, OldState)
            end
    end.

log_the_error(Name, Request, Why) ->
    io:format("Server ~p request ~p ~n"
                  "caused exception ~p~n", 
                  [Name, Request, Why]).

server 5

此程式不做任何事,直到我們告訴它 {become, F},就會變成伺服器 F。

-module(server5).
-export([start/0, rpc/2]).

start() -> spawn(fun() -> wait() end).

wait() ->
    receive
        {become, F} -> F()
    end.

rpc(Pid, Q) ->
    Pid ! {self(), Q},
    receive
        {Pid, Reply} -> Reply
    end.

這是計算階層的 server

-module(fac_server).
-export([loop/0]).

loop() ->
    receive
    {From, {fac, N}} ->
        From ! {self(), fac(N)},
        loop();
    {become, Something} ->
        Something()
    end.

fac(0) -> 1;
fac(N) -> N * fac(N-1).

測試

1> Pid = server5:start().
<0.33.0>
2> Pid ! {become, fun fac_server:loop/0}
.
{become,#Fun<fac_server.loop.0>}
3> server5:rpc(Pid, {fac, 10}).
3628800

OTP 的 gen_server 就像是最後的 server5,是個一般化,可變成任何一種 server 的 server library,它提供了所有非功能的行為與錯誤處理。

使用 gen_server

三個步驟寫出一個 gen_server callback module

  1. 為 callback module 取名
  2. 寫 client 使用的介面函數
  3. 寫 callback module 中需要的六個 callback funciton

範例

  1. 為 callback module 取名
    my_bank
  2. 寫 client 使用的介面函數
    start() 啟動銀行
    stop() 關閉銀行
    new_account(Who) 建立新銀行帳號
    deposit(Who, Amount) 存款
    withdraw(Who, Amount) 提款

    這些都是呼叫 gen_server 的函數

     start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
     stop()  -> gen_server:call(?MODULE, stop).
    
     new_account(Who)      -> gen_server:call(?MODULE, {new, Who}).
     deposit(Who, Amount)  -> gen_server:call(?MODULE, {add, Who, Amount}).
     withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).

    2.1 gen_server:start_link({local, Name}, Mode, ...) 會啟動一個本地伺服器,如果第一個參數填為 {global,GlobalName} ,則此伺服器為 global server
    2.2 ?MODULE 會展開成 my_bank
    2.3 gen_server:call(?MODULE, Term) 用來對 server 進行遠端呼叫 rpc

  3. 寫 callback module 中需要的六個 callback funcitons

callbak module 中要撰寫六個 callback functions
init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3

以下為一個 gen_server 程式,最簡單的樣板

-module().
%% 編譯時,如果沒有定義適當的callback function,就會產生警告
-behaviour(gen_server).
-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
     terminate/2, code_change/3]).

%% 呼叫 gen_server:start_link(Name, CallBackMod, StartArgs, Opts) 啟動 server
%% 第一個呼叫的是 Mod:init(StartArgs)
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% 啟動時會被呼叫的函數,回傳值裡面的 State 會在其他函數中使用
init([]) -> {ok, State}.

%% 這是rpc 遠端呼叫時 server 的處理函數,Reply 會被送回 client 端
handle_call(_Request, _From, State) -> {reply, Reply, State}.
handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.

terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, Extra) -> {ok, State}.

以下為 my_bank 實際上用 ets 實作的程式

-module(my_bank).

-behaviour(gen_server).
-export([start/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         terminate/2, code_change/3]).
-compile(export_all).

%% client functions
start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop()  -> gen_server:call(?MODULE, stop).

new_account(Who)      -> gen_server:call(?MODULE, {new, Who}).
deposit(Who, Amount)  -> gen_server:call(?MODULE, {add, Who, Amount}).
withdraw(Who, Amount) -> gen_server:call(?MODULE, {remove, Who, Amount}).

%% 6 server callback functions
init([]) -> {ok, ets:new(?MODULE,[])}.

handle_call({new,Who}, _From, Tab) ->
    %% 先找看看帳號有沒有存在,舊帳號就不能再 ets:insert
    Reply = case ets:lookup(Tab, Who) of
                []  -> ets:insert(Tab, {Who,0}), 
                       {welcome, Who};
                [_] -> {Who, you_already_are_a_customer}
            end,
    {reply, Reply, Tab};
%% Who 存錢 X,最後得到餘額
handle_call({add,Who,X}, _From, Tab) ->
    Reply = case ets:lookup(Tab, Who) of
                []  -> not_a_customer;
                [{Who,Balance}] ->
                    NewBalance = Balance + X,
                    ets:insert(Tab, {Who, NewBalance}),
                    {thanks, Who, your_balance_is,  NewBalance}    
            end,
    {reply, Reply, Tab};
%% Who 提款 X,最後得到餘額
handle_call({remove,Who, X}, _From, Tab) ->
    Reply = case ets:lookup(Tab, Who) of
                []  -> not_a_customer;
                %% 扣錢之前,要先判斷一下夠不夠,餘額不能變成負值
                [{Who,Balance}] when X =< Balance ->
                    NewBalance = Balance - X,
                    ets:insert(Tab, {Who, NewBalance}),
                    {thanks, Who, your_balance_is,  NewBalance};    
                [{Who,Balance}] ->
                    {sorry,Who,you_only_have,Balance,in_the_bank}
            end,
    {reply, Reply, Tab};
handle_call(stop, _From, Tab) ->
    %% server 收到 stop,會回傳以下訊息,然後停止 server
    %% 第二個 normal 會變成 terminate 的第一個參數 _Reason
    %% 第三個參數 stopped 會變成 my_bank:stop() 的回傳值
    {stop, normal, stopped, Tab}.

handle_cast(_Msg, State) -> {noreply, State}.
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, Extra) -> {ok, State}.

測試

1> my_bank:start().
{ok,<0.33.0>}
2> my_bank:deposit("tom", 10).
not_a_customer
3> my_bank:new_account("tom").
{welcome,"tom"}
4> my_bank:new_account("tom").
{"tom",you_already_are_a_customer}
5> my_bank:deposit("tom", 10).
{thanks,"tom",your_balance_is,10}
6> my_bank:deposit("tom", 15).
{thanks,"tom",your_balance_is,25}
7> my_bank:withdraw("tom", 5).
{thanks,"tom",your_balance_is,20}
8> my_bank:withdraw("tom", 35).
{sorry,"tom",you_only_have,20,in_the_bank}

gen_server callback functions 的細節

  1. 啟動 server
    呼叫 gen_server:start_link(Name,Mod,InitArgs,Opts) 就會啟動 server,建立一個名稱為 Name 的 server,callback module 為 Mod,Opts 控制這個 server 的行為:可指定訊息的記錄、除錯函數等等。在呼叫 Mod:init(InitArgs) 之後,此伺服器就被啟動完成

     @spec init(Args) -> {ok, State} |
                     {ok, State, Timeout} |
                     ignore |
                     {stop, Reason}

    當 init 回傳 {ok, State},就代表啟動成功,且初始狀態為 State

  2. 呼叫 server 時,客戶端會呼叫 gen_server:call(Name, Request),進而呼叫 callback function: handle_call/3

     @spec handle_call(Request, From, State) -> {reply, Reply, State} |
                     {reply, Reply, State, Timeout} |
                     {noreply, State} |
                     {noreply, State, Timeout} |
                     {stop, Reason, Reply, State} |
                     {stop, Reason, State}

    Request 是 gen_server:call 的第二個參數
    From 是 client 端發出 Request 的 process PID
    State 是server目前的狀態

    通常會回傳 {reply, Reply, NewState} ,Reply 會回傳給 client 端,變成 gen_server:call 的回傳值,而 NewState 是 server 的下一個狀態

  3. gen_server:cast(Name,Name) 這是不具有回傳值的呼叫,單純的發送訊息給 server

     @spec handle_cast(Msg, State) -> {noreply, NewState} |
                     {noreply, NewState, Timeout} |
                     {stop, Reason, NewState}

    通常會回傳 {noreply, NewState} 改變 server 的狀態

  4. gen_server:handle_info(Info, State) 是用來處理「自發訊息」的
    例如當 server process 被連結到另一個行程,且會捕捉離開訊息,那就可能會收到 {'EXIT', Pid, What} 訊息。或是系統中任何需要知道此 server 的 PID,都可以發送訊息過來,這樣的訊息最後會變成 Info 的值。

     @spec handle_info(Info, State) -> {noreply, State} |
                     {noreply, State, Timeout} |
                     {stop, Reason, State}
  5. server 終結的理由很多,但最後都會呼叫 terminate(Reason, NewState)
    5.1 handle_call, handle_cast, handle_info 可能會傳回 {stop, Reason, NewState}
    5.2 server可能會以 {'EXIT', reason } 當機

     @spec terminate(_Reason, State) -> void()

    可以在這裡把資料放進 disk,或是產生新訊息傳給其他 processes,或是直接丟棄

  6. 程式碼更新時,會呼叫 code_change

     @spec code_change(OldVsn, State, Extra) -> {ok, NewState}

在 emacs 提供的 gen_server template

%%%-------------------------------------------------------------------
%%% File    : gen_server_template.erl
%%% Author  : my name <yourname@localhost.localdomain>
%%% Description : 
%%%
%%% Created :  1 Jan 2014 by my name <yourname@localhost.localdomain>
%%%-------------------------------------------------------------------
-module().

-behaviour(gen_server).

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
     terminate/2, code_change/3]).

-record(state, {}).

%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%====================================================================
%% gen_server callbacks
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%%                         {ok, State, Timeout} |
%%                         ignore               |
%%                         {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
    {ok, #state{}}.

%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%%                                      {reply, Reply, State, Timeout} |
%%                                      {noreply, State} |
%%                                      {noreply, State, Timeout} |
%%                                      {stop, Reason, Reply, State} |
%%                                      {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%%                                      {noreply, State, Timeout} |
%%                                      {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
    {noreply, State}.

%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%%                                       {noreply, State, Timeout} |
%%                                       {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
    {noreply, State}.

%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
    ok.

%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

參考

Erlang and OTP in Action
Programming Erlang: Software for a Concurrent World