2018/05/28

CQRS: Command Query Responsibility Separation

CQS (Command Query Separation) 是由 Bertrand Meyer (Eiffel 語言的爸爸) 在 1988 於 "Object Oriented Software Construction" 這本書中提出的軟體架構概念,所有的 computing method 只會分成兩類,一種是執行某個 action 的 command,另一種是呼叫查詢並取得回傳資料,但不應該同時做兩種工作。換句話說,發問時,不能在該 method 裡面修改答案。

CQRS (Command Query Responsibility Separation) 應用 CQS 的概念,進一步將 Query 及 Command 物件分離,分別處理取得資料及修改資料的工作。

CQS 遇到的問題,會是 re-entrant 及 multi-thread,就是當一件工作處理到一半,被新的工作中斷這樣的問題,也就是 thread-safe 的問題,也就造成實作複雜度的問題。

然而這樣的問題,搭配著 Event Sourcing 的方法,將某個 APP state 的變更,收集成一個 sequence of events,以這種方式處理 command action,就能解決 thread-safe 的問題。

不過我們還是會遇到,command action 執行的速度跟 query 的時機點的問題,如果修改資料的動作在 query 以前還沒有完成,那麼前端就會查詢到舊狀態的資料,但資料還是會達到最終一致性,而不會有強一致性。

CQRS 的優點:

  1. Command 及 Query 分工明確,可分別進行效能調整及最佳化
  2. 將業務上的命令和查詢的職責分離能夠提高系統的性能、可擴展性和安全性
  3. 企業邏輯簡單清楚,能夠從事件歷程看到系統中的那些行為或者操作導致了系統的狀態變化。
  4. 將開發的邏輯概念,從數據驅動 (Data-Driven) 轉到任務驅動 (Task-Driven) 以及事件驅動 (Event-Driven)

在以下狀況,可以考慮使用CQRS模式:

  1. 當在業務邏輯層有很多操作需要相同的實體或者對象進行操作的時候。CQRS使得我們可以對讀和寫定義不同的實體和方法,從而可以減少或者避免對某一方面的更改造成衝突
  2. 用在特定任務的用戶互動系統,通常系統會引導用戶通過一系列複雜的步驟和操作,通常會需要一些複雜的領域模型。寫入資料的部分有很多和業務邏輯相關的命令操作,輸入驗證,業務邏輯驗證來保證數據的一致性。讀取資料沒有業務邏輯,僅僅是回傳 DTO。讀與寫的資料只需要達到最終一致性。
  3. 適用於一些需要對查詢性能和寫入性能分開進行優化的系統,尤其是讀/寫比非常高的系統。例如在很多系統中讀取資料的使用次數遠大於寫入資料。
  4. 對於系統在將來會隨著時間不斷演化,有可能會包含不同版本的模型,或者業務規則經常變化的系統
  5. 需要和其他系統整合,特別是需要和事件歷程 Event Sourcing 進行整合的系統,這樣子系統的臨時異常不會影響整個系統的其他部分。

在以下狀況,不適合使用CQRS:

  1. 領域模型或者業務邏輯比較簡單,這種情況下使用CQRS會把系統弄得太複雜
  2. 對於簡單的,CRUD模式的用戶介面以及與之相關的數據訪問操作已經足夠的話,都只是一個簡單的對數據進行增刪改查,沒有必要使用CQRS
  3. 不適合在整個系統中全部都使用 CQRS,在特定模組中CQRS可能比較有用

以下是一些查詢到的 CQRS 架構圖,從圖片可以看到跟傳統的 CRUD Data-Driven 架構的差異。

這種架構區分了 Business 及 Query model 的 DataBase,也可以想成將資料寫入了 Business Database,而前端使用者不會觸及該資料庫,是比較強調資料安全性的方式,但不能保證 Query Model 的 DB 裡面的資料一定會跟 Business Model DB 的資料一樣。

這種架構比較接近一個一般性的系統,資料庫是單一的,且可以在 Event Handler 中確認並檢查 Database 及 Analysis Database 的資料一致性。

這種架構圖比較強調資料流的過程,但基本上架構跟上面那個很接近,不過在 Comamnd 的部分,可注意到 Command 沒有 Reply DTO,也可以說,只要 Comamnd 有送進 Command Handler,就視為一個成功執行的 Command。

這裡強調 Command Bus 是以非同步的方式,送進 Command Handler,非同步可強化系統效能的表現,但如果要用同步的方式也可以,要等到 Command Event 送進 Event Database 後,才回應給前端確認該命令已經完成。而 Query 是用同步的方式,發送 Query 並等待要回傳的 ViewModel。

References

CQRS - Martin Fowler

CQRS 介紹

CQRS 命令查詢職責分離模式介紹

從CQS到CQRS

From CQS to CQRS

深度長文:我對CQRS/EventSourcing架構的思考

DDD CQRS架構和傳統架構的優缺點比較

Introduction to Domain Driven Design, CQRS and Event Sourcing

2018/05/21

Elixir 9 Protocol

inpsect 可以用 printable binary 形式回傳任何 value。但 elixir 是用什麼方式實作的? 是不是 guard clause

def inspect(value) when is_atom(value), do: ...
def inspect(value) when is_binary(value), do: ...

protocol 允許不同的資料類型用於相同的函數,不同資料型別的相同函數形態會有相同的行為。很像是 behavior,但 behavior 用在 module 裡面,protocol 可在 module 外面實作,這表示我們可以自由擴充 module 不足的功能。

defprotocol 定義 protocol,defprotocol 只定義 function,defimpl 實作要放在不同的地方。

defprotocol Inspect do
    def inspect(thing, opts)
end

增加了這兩個實作,就可以 inspect PID

defimpl Inspect, for: PID do
    def inspect(pid, _opts) do
        "#PID" <> IO.iodata_to_binary(:erlang.pid_to_list(pid))
    end
end

defimpl Inspect, for: Reference do
    def inspect(ref, _opts) do
        '#Ref' ++ rest = :erlang.ref_to_list(ref)
        "#Reference" <> IO.iodata_to_binary(rest)
    end
end
iex(1)> inspect self()
"#Process<0.89.0>"

可在 for: 後面使用的 Types 有

Any
Atom
BitString
Float
Function
Integer
List
Map
PID
Port
Record
Reference
Tuple

is_collection.exs

defprotocol Collection do
  @fallback_to_any true
  def is_collection?(value)
end

defimpl Collection, for: [List, Tuple, BitString, Map] do
  def is_collection?(_), do: true
end

defimpl Collection, for: Any do
  def is_collection?(_), do: false
end

Enum.each [ 1, 1.0, [1,2], {1,2}, %{}, "cat" ], fn value ->
  IO.puts "#{inspect value}:  #{Collection.is_collection?(value)}"
end
$ elixir is_collection.exs
1:  false
1.0:  false
[1, 2]:  true
{1, 2}:  true
%{}:  true
"cat":  true

Protocol and Structs

Elixir 沒有 classes,但支援 user-defined types

defmodule Blob do
  defstruct content: nil
end
iex(1)> c "basic.exs"
[Blob]
iex(2)> b = %Blob{content: 123}
%Blob{content: 123}
iex(3)> inspect b
"%Blob{content: 123}"

## structs 其實是 map,key為 __struct__
iex(4)> inspect b, structs: false
"%{__struct__: Blob, content: 123}"

Built-In Protocols

elixir 有以下內建的 protocols

  • Enumerable and Collectable
  • Inspect
  • List.Chars
  • String.Chars

首先定義一個以 0s 1s 表示的 integer

bitmap.exs

defmodule Bitmap do
  defstruct value: 0

  @doc """
  A simple accessor for the 2^bit value in an integer

      iex> b = %Bitmap{value: 5}
      %Bitmap{value: 5}
      iex> Bitmap.fetch_bit(b,2)
      1
      iex> Bitmap.fetch_bit(b,1)
      0
      iex> Bitmap.fetch_bit(b,0)
      1
  """
  def fetch_bit(%Bitmap{value: value}, bit) do
    use Bitwise

    (value >>> bit) &&& 1
  end
end
  • Enumerable and Collectable

Enumerable 定義了三個 functions

defprotocol Enumerable do
    # collection 的元素數量
    def count(collection)

    # 是否包含某個 value
    def member?(collection, value)

    # reduce fun to collection elements
    def reduce(collection, acc, fun)
end

針對剛剛的 Bitmap 實作 Enumerable

defimpl Enumerable,  for: Bitmap do
  import :math, only: [log: 1]

  def reduce(bitmap, {:cont, acc}, fun) do
    bit_count =  Enum.count(bitmap)
    _reduce({bitmap, bit_count}, { :cont, acc }, fun)
  end

  defp _reduce({_bitmap, -1}, { :cont, acc }, _fun), do: { :done, acc }

  defp _reduce({bitmap, bit_number}, { :cont, acc }, fun) do
    with bit = Bitmap.fetch_bit(bitmap, bit_number),
    do:  _reduce({bitmap, bit_number-1}, fun.(bit, acc), fun)
  end

  defp _reduce({_bitmap, _bit_number}, { :halt, acc }, _fun), do: { :halted, acc }

  defp _reduce({bitmap, bit_number}, { :suspend, acc }, fun), 
  do: { :suspended, acc, &_reduce({bitmap, bit_number}, &1, fun), fun } 

  def member?(value, bit_number) do
    { :ok, 0 <= bit_number && bit_number < Enum.count(value) }
  end

  def count(%Bitmap{value: value}) do              
    { :ok, trunc(log(abs(value))/log(2)) + 1 }
  end
end


fifty = %Bitmap{value: 50}

IO.puts Enum.count fifty    # => 6

IO.puts Enum.member? fifty, 4    # => true
IO.puts Enum.member? fifty, 6    # => false

IO.inspect Enum.reverse fifty       # => [0, 1, 0, 0, 1, 1, 0]
IO.inspect Enum.join fifty, ":"     # => "0:1:1:0:0:1:0"
iex(1)> c ("bitmap.exs")
[Bitmap]
iex(2)> c ("bitmap_enumerable.exs")
6
true
false
[0, 1, 0, 0, 1, 1, 0]
"0:1:1:0:0:1:0"
[Enumerable.Bitmap]

defimpl Collectable,  for: Bitmap do
  use Bitwise

  # 回傳 tuple,(1) value (2) function
  def into(%Bitmap{value: target}) do
    {target, fn
      acc, {:cont, next_bit} -> (acc <<< 1) ||| next_bit
      acc,  :done            -> %Bitmap{value: acc}
      _, :halt               -> :ok
    end}
  end
end
iex(3)> c("bitmap_collectable.exs")
[Collectable.Bitmap]
iex(4)> Enum.into [1,1,0,0,1,0], %Bitmap{value: 0}
%Bitmap{value: 50}
  • Inspect
defmodule Bitmap do
  defstruct value: 0

  defimpl Inspect do
    def inspect(%Bitmap{value: value}, _opts) do
      "%Bitmap{#{value}=#{as_binary(value)}}"
    end
    defp as_binary(value) do
      to_string(:io_lib.format("~.2B", [value]))
    end
  end
end
iex(6)> c("bitmap_inspect.exs")
[Bitmap, Inspect.Bitmap]
iex(7)> fifty = %Bitmap{value: 50}
%Bitmap{50=110010}
iex(8)> inspect fifty
"%Bitmap{50=110010}"
iex(9)> inspect fifty, structs: false
"%{__struct__: Bitmap, value: 50}"
iex(10)> %Bitmap{value: 12345678901234567890}
%Bitmap{12345678901234567890=1010101101010100101010011000110011101011000111110000101011010010}

defmodule Bitmap do
  defstruct value: 0

  defimpl Inspect, for: Bitmap do
    import Inspect.Algebra
    def inspect(%Bitmap{value: value}, _opts) do
      concat([
        nest(
         concat([
           "%Bitmap{",
           break(""),
           nest(concat([to_string(value),
                        "=",
                        break(""),
                        as_binary(value)]),
                2),
         ]), 2),
        break(""),
        "}"])
    end
    defp as_binary(value) do
      to_string(:io_lib.format("~.2B", [value]))
    end
  end
end
iex(1)> c("bitmap_algebra.exs")
[Bitmap, Inspect.Bitmap]
iex(2)>
nil
iex(3)> big_bitmap = %Bitmap{value: 12345678901234567890}
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
iex(4)>
nil
iex(5)> IO.inspect big_bitmap
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
iex(6)> IO.inspect big_bitmap, structs: false
%{__struct__: Bitmap, value: 12345678901234567890}
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
  • List.Chars & String.Chars

bitmap_string.exs

defimpl String.Chars, for: Bitmap do
  def to_string(bitmap) do
    import Enum
    bitmap
    |> reverse
    |> chunk(3)
    |> map(fn three_bits -> three_bits |> reverse |> join end)
    |> reverse
    |> join("_")
  end
end
iex(1)> c("bitmap.exs")
[Bitmap]
iex(2)> c("bitmap_enumerable.exs")
[Enumerable.Bitmap]
iex(3)> c("bitmap_string.exs")
[String.Chars.Bitmap]


iex(4)> fifty = %Bitmap{value: 50}
%Bitmap{value: 50}
iex(5)> "Fifty in bits is: #{fifty}"
"Fifty in bits is: 110_010"

References

Programming Elixir

2018/05/14

Elixir 8 Macros

Macro 可能會讓程式碼更難閱讀,如果可以使用 function,就不要用 macro。本文以 erlang/elixir 不支援的 if 語法,說明如何以 macro 實作 if 語法。

if Statement

myif «condition» do
    «evaluate if true»
else
    «evaluate if false»
end

----

myif «condition»,
    do: «evaluate if true»,
    else: «evaluate if false

可用 function 實作 if 的語法

defmodule My do
    def myif(condition, clauses) do
        do_clause = Keyword.get(clauses, :do, nil)
        else_clause = Keyword.get(clauses, :else, nil)

    case condition do
        val when val in [false, nil]
            -> else_clause
        _otherwise
            -> do_clause
        end
    end
end
$ iex my.exs
iex(1)> My.myif 1==2, do: (IO.puts "1==2"), else: (IO.puts "1 != 2")
1==2
1 != 2
:ok

但結果是不正確的,因為 elixir 同時 evaluate do: 及 else:

Macro Inject Code

defmacro 定義 macro,當傳送參數給 macro,elixir 不會直接 evaluate,而是會以 tuple 方式發送程式碼。

defmodule My do
  defmacro macro(param) do
    IO.inspect param
  end
end

defmodule Test do
  require My

  # These values represent themselves
  My.macro :atom        #=> :atom
  My.macro 1            #=> 1
  My.macro 1.0          #=> 1.0
  My.macro [1,2,3]      #=> [1,2,3]
  My.macro "binaries"   #=> "binaries"
  My.macro { 1, 2 }     #=> {1,2}
  My.macro do: 1        #=> [do: 1]

  # And these are represented by 3-element tuples,以三個元素的 tuple 表示這些 macro

  My.macro { 1,2,3,4,5 }
  # =>  {:"{}",[line: 20],[1,2,3,4,5]}

  My.macro do: ( a = 1; a+a )
  # =>  [do:
  #      {:__block__,[],
  #        [{:=,[line: 22],[{:a,[line: 22],nil},1]},
  #         {:+,[line: 22],[{:a,[line: 22],nil},{:a,[line: 22],nil}]}]}]


  My.macro do
    1+2
  else
    3+4
  end
  # =>   [do: {:+,[line: 24],[1,2]},
  #       else: {:+,[line: 26],[3,4]}]

end
$ iex dumper.exs
:atom
1
1.0
[1, 2, 3]
"binaries"
{1, 2}
[do: 1]
{:{}, [line: 29], [1, 2, 3, 4, 5]}
[do: {:__block__, [],
  [{:=, [line: 32], [{:a, [line: 32], nil}, 1]},
   {:+, [line: 32], [{:a, [line: 32], nil}, {:a, [line: 32], nil}]}]}]
[do: {:+, [line: 40], [1, 2]}, else: {:+, [line: 42], [3, 4]}]

在一個 module 定義 macro,另一個 module 使用時,必須要先用 require,這樣才能確保 macro module 在目前這個 module 前先被編譯。


quote function

quote 可讓 code 保持尚未 evaluated 的形式。quote/2 能把 Elixir 代碼轉換成底層表示形式。

iex(1)> quote do: :atom
:atom
iex(2)> quote do: 1
1
iex(3)> quote do: 1.0
1.0
iex(4)> quote do: [1,2,3]
[1, 2, 3]
iex(5)> quote do: "binaries"
"binaries"
iex(6)> quote do: {1,2}
{1, 2}
iex(7)> quote do: [do: 1]
[do: 1]

iex(8)> quote do: {1,2,3,4,5}
{:{}, [], [1, 2, 3, 4, 5]}

iex(9)> quote do: (a = 1; a + a)
{:__block__, [],
 [{:=, [], [{:a, [], Elixir}, 1]},
  {:+, [context: Elixir, import: Kernel],
   [{:a, [], Elixir}, {:a, [], Elixir}]}]}

iex(10)> quote do: [ do: 1 + 2, else: 3 + 4]
[do: {:+, [context: Elixir, import: Kernel], [1, 2]},
 else: {:+, [context: Elixir, import: Kernel], [3, 4]}]

eg.exs

defmodule My do
  defmacro macro(code) do
    IO.inspect code
    # 會 evaluate code
    code
  end
end
defmodule Test do
  require My
  My.macro(IO.puts("hello"))
end

eg1.exs

defmodule My do
  defmacro macro(code) do
    IO.inspect code
    # 只會 evaluate do 裡面的 code,quota 裡面的 code 會回傳給呼叫 macro 的 code,然後被 evaluate
    quote do: IO.puts "Different code"
  end
end
defmodule Test do
  require My
  My.macro(IO.puts("hello"))
end
$ elixir eg.exs
{{:., [line: 17], [{:__aliases__, [counter: 0, line: 17], [:IO]}, :puts]},
 [line: 17], ["hello"]}
hello

$ elixir eg1.exs
{{:., [line: 17], [{:__aliases__, [counter: 0, line: 17], [:IO]}, :puts]},
 [line: 17], ["hello"]}
Different code

unquote function

知道了如何獲取代碼的內部表示,那怎麼修改它呢?可利用 unquote/1 來插入新的代碼和值。當我們 unquote 一個表達式的時候,會把它運行的結果插入到 AST。

iex(1)> denominator = 2
2
iex(2)> quote do: divide(42, denominator)
{:divide, [], [42, {:denominator, [], Elixir}]}
iex(3)> quote do: divide(42, unquote(denominator))
{:divide, [], [42, 2]}

Expanding a list using unquote_splicing

# insert [3,4]
iex(4)> Code.eval_quoted(quote do: [1,2,unquote([3,4])])
{[1, 2, [3, 4]], []}

# insert 3,4 到前面的 list
iex(5)> Code.eval_quoted(quote do: [1,2,unquote_splicing([3,4])])
{[1, 2, 3, 4], []}

# '1234' 是 lists of characters
iex(6)> Code.eval_quoted(quote do: [?a, ?= ,unquote_splicing('1234')])
{'a=1234', []}


iex(7)> fragment = quote do: IO.puts("hello")
{{:., [], [{:__aliases__, [alias: false], [:IO]}, :puts]}, [], ["hello"]}
iex(8)> Code.eval_quoted fragment
hello
{:ok, []}
iex(9)> Code.eval_string("[a, a*b, c]", [a: 2, b: 3, c: 4])
{[2, 6, 4], [a: 2, b: 3, c: 4]}

myif Macro

defmodule My do
  defmacro if(condition, clauses) do
    do_clause   = Keyword.get(clauses, :do, nil)
    else_clause = Keyword.get(clauses, :else, nil)
    quote do
      case unquote(condition) do
        val when val in [false, nil] -> unquote(else_clause)
        _                            -> unquote(do_clause)
      end
    end
  end
end

defmodule Test do
  require My
  My.if 1==2 do
    IO.puts "1 == 2"
  else
    IO.puts "1 != 2"
  end
end
$ elixir myif.ex
1 != 2

References

Programming Elixir

2018/05/07

Elixir 7 OTP

OTP 定義 application 的 hierarchy,一個 application 包含了多個 processes,這些 processes 是依照 "behaviors" 的規格實作,"server" behavior 就稱為 GenServer,另一個特殊的 "supervisor" behavior 用來監控 processes,並實作 restart process 的規則。

OTP Server: GenServer

產生新的 sequence project

$ mix new sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs
$ cd sequence/
$ mkdir lib/sequence

當 client 呼叫 server,就會進入 handle_call,其中第一個參數是 client 要送給 server 的資訊,第二個參數是 client 的 PID,第三個是 server state。

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  # :next_number 是 client 給的參數, _from 是 client's PID, current_number 是 server state
  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

end

startlink 類似 spawnlink,會產生一個新的 process

$ iex -S mix
Compiling 2 files (.ex)
Generated sequence app
iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.149.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, :next_number)
101
iex(4)> GenServer.call(pid, :next_number)
102

增加處理重設 number 的 fuction: def handle_call({:set_number, new_number}, _from, _current_number)

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_call({:set_number, new_number}, _from, _current_number) do
    { :reply, new_number, new_number }
  end

end

執行過程

iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.123.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, {:set_number, 20})
20
iex(4)> GenServer.call(pid, :next_number)
20
iex(5)> GenServer.call(pid, :next_number)
21
iex(6)> GenServer.call(pid, :next_number)
22

GenServer.call 是單向的呼叫,會等待 server 回傳結果,但有時候不需要等待 reply,就改用 GenServer.cast,Server 要實作 handle_cast。

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

end

執行過程

# reload Sequence.Server
iex(8)> r Sequence.Server
warning: redefining module Sequence.Server (current version loaded from _build/dev/lib/sequence/ebin/Elixir.Sequence.Server.beam)
  lib/sequence/server.ex:1

{:reloaded, Sequence.Server, [Sequence.Server]}

iex(9)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.143.0>}
iex(10)> GenServer.call(pid, :next_number)
100
iex(11)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(12)> GenServer.call(pid, :next_number)
301

追蹤 Server 執行過程

start_link 的第三個參數,是一些 options 設定,例如 [debug: [:trace]]

iex(6)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:trace]])
{:ok, #PID<0.148.0>}
iex(7)> GenServer.call(pid, :next_number)
*DBG* <0.148.0> got call next_number from <0.139.0>
*DBG* <0.148.0> sent 100 to <0.139.0>, new state 101
100
iex(8)> GenServer.cast(pid, {:increment_number, 200})
*DBG* <0.148.0> got cast {increment_number,200}
:ok
*DBG* <0.148.0> new state 301

[debug: [:statistics]] 產生統計資料,timestamp 為 {{y,m,d},{h,m,s}} 時間的 tuple

iex(11)>  {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:statistics]])
{:ok, #PID<0.155.0>}
iex(12)> GenServer.call(pid, :next_number)
100
iex(13)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(14)> :sys.statistics pid, :get
{:ok,
 [start_time: {{2017, 8, 30}, {9, 54, 38}},
  current_time: {{2017, 8, 30}, {9, 54, 53}}, reductions: 76, messages_in: 2,
  messages_out: 0]}

可用 :sys.trace enable/disable trace 功能

iex(17)> :sys.trace pid, true
:ok
iex(18)> GenServer.call(pid, :next_number)
*DBG* <0.155.0> got call next_number from <0.139.0>
*DBG* <0.155.0> sent 301 to <0.139.0>, new state 302
301
iex(19)> :sys.trace pid, false
:ok
iex(20)> GenServer.call(pid, :next_number)
302
iex(21)> :sys.get_state pid
303

增加 formatstatus,格式化 :sys.getstatus pid 的結果

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end
iex(8)> :sys.get_status pid
{:status, #PID<0.123.0>, {:module, :gen_server},
 [["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
   "$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>,
  [trace: true,
   statistics: {{{2017, 8, 30}, {10, 1, 2}}, {:reductions, 21}, 1, 0}],
  [header: 'Status for generic server <0.123.0>',
   data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
    {'Logged events', []}],
   data: [{'State', "My current state is '101', and I'm happy"}]]]}

GenServer Callbacks

GenServer 是使用 OTP protocol,必須實作六個 callback functions:

  1. init(state_arguments)

    GenServer 會在啟動 server 時呼叫,參數為 start_link 的第二個參數,啟動成功要回傳 {:ok, state},失敗要回傳 {:stop, reason}。

    有個 {:ok, state, timeout} 的回傳值 timeout,GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。

  2. handle_call(request, from , state)

    GenServer.call(pid, request) 呼叫的 function,成功要回傳 {:reply, result, newstate},預設可用 :badcall error 停掉 server。

  3. handle_cast(request, state)

    GenServer.cast(pid, request) 呼叫的 function,成功要回傳 {:noreply, newstate},失敗回傳 {:stop, reason, newstate}

  4. handle_info(info, state)

    處理沒有被 call/cast 處理的 messages,例如處理 :timeout,另外直接用 send 發送給這個 PID 的訊息,也是由 handle_info 處理。

  5. terminate(reason, state)

    當 server 停止時會呼叫此 function,但如果有用 supervisor 機制監控 server,就不用處理這個。

  6. codechange(fromversion, state, extra)

    OTP 可在不停掉系統的狀況下,替換 server code,新的 server 跟舊 server 有不同的狀態。

  7. format_status(reason, [pdict, state])

    用來客製化 state display 的訊息,要回傳 [data: [{'State', state_info}]]

OTP functions 的回傳值

  1. { :noreply, new_state [ , :hibernate | timeout ] }

    call and cast 使用,:hibernate 會將 server state 由 memory 移除,並在下一個 message 中,recover 該 state,儲存 memory state 會消耗掉一些 CPU resource。 timeout 可以設定為 :infinite (預設值),GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。

  2. { :stop, reason, new_state }

    server 終止的訊號

  3. { :reply, response, new_state [ , :hibernate | timeout ] }

    只有 handle_call 可以使用,可發送 response 給 client

  4. { :stop, reason, reply, new_state }

    只有 handle_call 可以使用,發送 response 及 server 終止的訊號給 client。


Naming a Process

在 start_link 加上 name: :seq 的 option

iex(9)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, name: :seq)
{:ok, #PID<0.132.0>}
iex(10)> GenServer.call(:seq, :next_number)
100
iex(11)> :sys.get_status :seq
{:status, #PID<0.132.0>, {:module, :gen_server},
 [["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
   "$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>, [],
  [header: 'Status for generic server seq',
   data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
    {'Logged events', []}],
   data: [{'State', "My current state is '101', and I'm happy"}]]]}

如何將呼叫 GenServer 的 function call (start_link, call, cast) 封裝起來

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
  end

  def next_number do
    GenServer.call __MODULE__, :next_number
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end
$ iex -S mix
iex(1)> Sequence.Server.start_link 200
{:ok, #PID<0.123.0>}
iex(2)> Sequence.Server.next_number
200
iex(3)> Sequence.Server.next_number
201
iex(4)> Sequence.Server.increment_number 100
:ok
iex(5)> Sequence.Server.next_number
302

OTP Supervisors

supervisors 負責 process monitoring and restarting

supervisor 是使用 OTP supervisor behavior,他會有一個 list of porcesses,並知道 process crash 時,要怎麼處理,另外也有避免 restart loops 的機制。

supervisor 是利用 erlang VM process-linking 及 -monitoring 的機制實作的。

產生一個包含 supervisor 的新專案

$ mix new --sup sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating lib/sequence/application.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs

mix.exs 是自動產生的,不需要修改內容

defmodule Sequence.Mixfile do
  use Mix.Project

  def project do
    [
      app: :sequence,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: {Sequence.Application, []}
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end
end

這是 lib/sequence/application.ex 的內容

defmodule Sequence.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      # Starts a worker by calling: Sequence.Worker.start_link(arg)
      # {Sequence.Worker, arg},
      {Sequence.Server, 123},
      {Sequence.Server2, 123}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Sequence.Supervisor]
    
    # 產生 supervisor
    {:ok, pid} = Supervisor.start_link(children, opts)
  end
end

由 supervisor 啟動兩個 worker: Sequnce.Server 及 Sequence.Server2

defmodule Sequence.Supervisor do
  use Supervisor.Behaviour

  def start_link(initial_number) do
    :supervisor.start_link(__MODULE__, initial_number)
  end

  def init(initial_number) do
    child_processes = [
        worker(Sequence.Server, [initial_number]),
        worker(Sequence.Server2, [initial_number])
    ]
    supervise child_processes, strategy: :one_for_one
  end
end

Sequence.Server 是從上面的測試複製下來的

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
  end

  def next_number do
    GenServer.call __MODULE__, :next_number
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end

Sequence.Server2 功能跟 Server1 一樣,差別是啟動時,指定了不同的 Process Name,另外注意改用 :gen_server 的方式呼叫。

defmodule Sequence.Server2 do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    :gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
  end

  def next_number do
    :gen_server.call :sequence, :next_number
  end

  def increment_number(delta) do
    :gen_server.cast :sequence, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  def init(current_number)
    when is_number(current_number) do
    { :ok, current_number }
  end

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end
end

Sequence.Server2 跟 Sequence.Server1 可用同樣的方式測試。

$ iex -S mix

Compiling 1 file (.ex)
Interactive Elixir (1.5.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :observer.start()
:ok
iex(2)> Sequence.Server.increment_number 3
:ok
iex(3)> Sequence.Server.next_number
126
iex(4)> Sequence.Server.next_number
127

## 刻意發送一個 increment_number 無法處理的 string,process 會 crash,並由 supervisor 重新啟動
iex(5)> Sequence.Server.increment_number "c"
:ok
iex(6)>
00:08:25.372 [error] GenServer Sequence.Server terminating
** (ArithmeticError) bad argument in arithmetic expression
    (sequence) lib/sequence/server.ex:27: Sequence.Server.handle_cast/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:increment_number, "c"}}
State: [data: [{'State', "My current state is '128', and I'm happy"}]]

nil
iex(7)> Sequence.Server.next_number
123
iex(8)> Sequence.Server.next_number
124
iex(9)>

OTP Application

OTP application is a bundle of code that comes with a descriptor. 概念上比較接近 component/service

OTP 會使用一個 name.app 的 application spec file,mix 會自動根據 mix.exs 產生這個設定檔。

$ mix new sequence

會產生一個 mix.exs,在 def application do 裡面加上 mod: { Sequence, 456 }

defmodule Sequence.Mixfile do
  use Mix.Project

  def project do
    [
      app: :sequence,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, 456 }
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end
end

修改 /lib/sequence.ex,因為剛剛 mod: {Sequence,456} 的設定,會自動執行 Sequence module 的 start(_type, initial_number),在裡面啟動 Sequence.Supervisor。

defmodule Sequence do
  use Application

  def start(_type, initial_number) do
    Sequence.Supervisor.start_link(initial_number)
  end
end

/lib/sequence/supervisor.ex 是 OTP supervisor,負責管理 Sequence.Server process

defmodule Sequence.Supervisor do
  use Supervisor

  def start_link(initial_number) do
    :supervisor.start_link(__MODULE__, initial_number)
  end

  def init(initial_number) do
    child_processes = [
        worker(Sequence.Server, [initial_number])
    ]
    supervise child_processes, strategy: :one_for_one
  end
end

/lib/sequence/server.ex

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    :gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
  end

  def next_number do
    :gen_server.call :sequence, :next_number
  end

  def increment_number(delta) do
    :gen_server.cast :sequence, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  def init(current_number)
  when is_number(current_number) do
    { :ok, current_number }
  end

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end
end

啟動 sequence project

$ iex -S mix
iex(1)> Sequence.Server.next_number
456

加上 registered: 設定 application 要註冊的 names,可利用這個設定值,讓 node/cluster 之間的 application name 都是唯一的,在此註冊這個 applcation 名稱為 Sequence.Server

  def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, 456 },
      registered: [Sequence.Server]
    ]
  end

執行 mix compile 會編譯程式並產生 sequence/_build/dev/lib/sequence/ebin/sequence.app

$ mix compile
Compiling 3 files (.ex)
Generated sequence app

sequence.app 內容為

{application,sequence,
             [{applications,[kernel,stdlib,elixir,logger]},
              {description,"sequence"},
              {modules,['Elixir.Sequence','Elixir.Sequence.Server',
                        'Elixir.Sequence.Supervisor']},
              {vsn,"0.1.0"},
              {extra_applications,[logger]},
              {mod,{'Elixir.Sequence',456}},
              {registered,['Elixir.Sequence.Server']}]}.

要讓 Application 更一般化,可改寫初始參數的傳遞方式

修改 mix.exs 將參數放在 env 裡面 env: [initial_number: 456]

def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, [] },
      env: [initial_number: 456]
      registered: [Sequence.Server]
    ]
  end

修改 /lib/sequence.ex,參數由 Application.get_env 取得

defmodule Sequence do
  use Application

  def start(_type, _args) do
    Sequence.Supervisor.start_link(Application.get_env(:sequence, :initial_number))
  end
end

Code Release: EXRM (Elixir Release Manager)

release: a bundle that contains a particular version of your application including dependencies, configuration, metadata.

hot upgrade: 更新 running application,更新過程 user 不受影響

exrm: 建立在 erlang relx 之上,用來管理 release package

修改 mix.exs,增加 :exrm 這個 library depencency

  defp deps do
    [
      {:exrm, "~> 1.0.8"}
    ]
  end

修改 /lib/sequence/server.ex,增加 @vsn "0"

defmodule Sequence.Server do
  use GenServer

  @vsn "0"

安裝 exrm

mix do deps.get, deps.compile

產生 release package 在 rel/sequence 這個資料夾內,壓縮檔放在 rel/sequence/releases/0.0.1/sequence.tar.gz

$ mix release
....
==> The release for sequence-0.0.1 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`

解壓縮 sequence.tar.gz 就可以啟動 sequence app

$ ./sequence/bin/sequence console
iex(sequence@127.0.0.1)1> Sequence.Server.next_number
456
iex(sequence@127.0.0.1)2> Sequence.Server.next_number
457

保留這個 console 不動,回到 project 程式,修改 mix.exs,將版本號碼改為 0.0.2

  def project do
    [
      app: :sequence,
      version: "0.0.2",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

另外修改 lib/sequence/server.ex 的 next_number 用來測試是不是已經有換成新版的程式

  def next_number do
    #:gen_server.call(:sequence, :next_number)
    with number = :gen_server.call(:sequence, :next_number),
    do: "0.0.2 The next number is #{number}"
  end

在 project folder 再做一次 mix release,但在 mac 出現了 .DS_Store 的錯誤(note: 必須要先有 rel/sequence/releases/0.0.1 這個 release 的資料,編譯新版 0.0.2 時,才會出現 relup 檔案,升級就是用這個檔案處理的)

$ mix release
Compiling 3 files (.ex)
Generated sequence app
Building release with MIX_ENV=dev.

This is an upgrade, verifying appups exist for updated dependencies..
==> Unable to access /test/sequence/rel/sequence/releases/.DS_Store/sequence.rel: enotdir

移除 .DS_Store 後,再做一次 mix release

$ rm /test/sequence/rel/sequence/releases/.DS_Store
[14:56]charley@cmbp ~/project/idea/book/ProgrammingElixir/test/sequence$ mix release
Building release with MIX_ENV=dev.

This is an upgrade, verifying appups exist for updated dependencies..
==> All dependencies have appups ready for release!
==> Generated .appup for sequence 0.0.1 -> 0.0.2
==> The release for sequence-0.0.2 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`

將 rel/sequence/releases/0.0.2/sequence.tar.gz 複製到剛剛解壓縮 0.1.0 版的正式環境目錄裡面的 sequence/releases/0.0.2/ 目錄中

用另一個 terminal 直接 hot-upgrade 到 0.0.2

$ bin/sequence upgrade 0.0.2
Release 0.0.2 not found, attempting to unpack releases/0.0.2/sequence.tar.gz
Unpacked successfully: "0.0.2"
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.2 is already unpacked, now installing.
Installed Release: 0.0.2
Made release permanent: "0.0.2"

回到剛剛的 server console,可驗證是否有更新到 0.0.2

iex(sequence@127.0.0.1)3> Sequence.Server.next_number
"0.0.2 The next number is 458"

如果新版發生問題,也能直接降版回 0.0.1

$ bin/sequence downgrade 0.0.1
Release 0.0.1 is marked old, switching to it.
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.1 is marked old, switching to it.
Installed Release: 0.0.1
Made release permanent: "0.0.1"
iex(sequence@127.0.0.1)4> Warning: "/Users/charley/Downloads/test/sequence/releases/0.0.1/relup" missing (optional)
iex(sequence@127.0.0.1)5> Sequence.Server.next_number
459

0.0.3 版,增加 code vsn 更新的 def code_change("0", old_state = { current_number, stash_pid }, _extra) callback function

更新時 console 會出現這樣的資訊

22:59:39.030 [info]  Changing code from 0 to 1

22:59:39.030 [info]  {465, #PID<0.628.0>}

22:59:39.031 [info]  %Sequence.Server.State{current_number: 465, delta: 1, stash_pid: #PID<0.628.0>}

以下為完整的 0.0.3 版 code

mix.exs

defmodule Sequence.Mixfile do
  use Mix.Project

  # ...

  def project do
    [
      app:     :sequence,
      version: "0.0.3",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps:    deps()
    ]
  end

  # Configuration for the OTP application
  def application do
    [
      mod: { Sequence, 456 },
      registered: [ Sequence.Server ],
      applications: [ :logger ]
    ]
  end

  defp deps do
    [
      {:exrm, "~> 1.0.8"}
    ]
  end
end

lib/sequence.ex

defmodule Sequence do
  use Application

  def start(_type, initial_number) do
    Sequence.Supervisor.start_link(initial_number)
  end
end

lib/sequence/supervisor.ex

defmodule Sequence.Supervisor do
  use Supervisor

  def start_link(initial_number) do
    result = {:ok, sup } = Supervisor.start_link(__MODULE__, [initial_number]) 
    start_workers(sup, initial_number)
    result
  end

  def start_workers(sup, initial_number) do
    # Start the stash worker
    {:ok, stash} = 
       Supervisor.start_child(sup, worker(Sequence.Stash, [initial_number]))

    # and then the subsupervisor for the actual sequence server
    Supervisor.start_child(sup, supervisor(Sequence.SubSupervisor, [stash]))
  end

  def init(_) do
    supervise [], strategy: :one_for_one
  end

end

lib/sequence/subsupervisor.ex

defmodule Sequence.SubSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(Sequence.Server, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end

end

lib/sequence/server.ex

defmodule Sequence.Server do
  use GenServer
  require Logger

  @vsn "1"
  defmodule State do
    defstruct current_number: 0, stash_pid: nil, delta: 1
  end

  #####
  # External API
  # . . .
  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  def next_number do
    # GenServer.call __MODULE__, :next_number
    with number = GenServer.call(__MODULE__, :next_number),
    do: "0.0.2 The next number is #{number}"
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  # def init(stash_pid) do
  #   current_number = Sequence.Stash.get_value stash_pid
  #   { :ok, {current_number, stash_pid} }
  # end

  # def handle_call(:next_number, _from, {current_number, stash_pid}) do
  #   { :reply, current_number, {current_number+1, stash_pid} }
  # end

  # def handle_cast({:increment_number, delta}, {current_number, stash_pid}) do
  #   { :noreply, {current_number + delta, stash_pid}}
  # end

  # def terminate(_reason, {current_number, stash_pid}) do
  #   Sequence.Stash.save_value stash_pid, current_number
  # end

  def init(stash_pid) do
    # 啟動時,以 process id 向 Sequence.Stash 取回 current_number
    current_number = Sequence.Stash.get_value stash_pid

    # 用 struct 記錄 state
    { :ok,
      %State{current_number: current_number, stash_pid: stash_pid} }
  end

  def handle_call(:next_number, _from, state) do
    { :reply,
      state.current_number,
      %{ state | current_number: state.current_number + state.delta} }
  end

  def handle_cast({:increment_number, delta}, state) do
    {:noreply,
     %{ state | current_number: state.current_number + delta, delta: delta} }
  end

  def terminate(_reason, state) do
    # 結束時,將 current_number 存到 Sequence.Stash
    Sequence.Stash.save_value state.stash_pid, state.current_number
  end

  # 版本更新
  def code_change("0", old_state = { current_number, stash_pid }, _extra) do
    new_state = %State{current_number: current_number,
                       stash_pid: stash_pid,
                       delta: 1
                      }
    Logger.info "Changing code from 0 to 1"
    Logger.info inspect(old_state)
    Logger.info inspect(new_state)
    { :ok, new_state }
  end
end

lib/sequence/stash.ex

defmodule Sequence.Stash do
  use GenServer

  #####
  # External API  

  def start_link(current_number) do
    GenServer.start_link( __MODULE__, current_number)
  end

  def save_value(pid, value) do
    GenServer.cast pid, {:save_value, value}
  end

  def get_value(pid) do
    GenServer.call pid, :get_value
  end

  #####
  # GenServer implementation

  def handle_call(:get_value, _from, current_value) do 
    { :reply, current_value, current_value }
  end

  def handle_cast({:save_value, value}, _current_value) do
    { :noreply, value}
  end
end

Task & Agent

如果需要 background processing for maintaining state,但不想寫 spawn/send/receive,可使用 Task, Agents

Task: a function that runs in the background

tasks1.exs

defmodule Fib do
  def of(0), do: 0
  def of(1), do: 1
  def of(n), do: Fib.of(n-1) + Fib.of(n-2)
end

IO.puts "Start the task"

# 非同步 呼叫 Fib.of
worker = Task.async(fn -> Fib.of(20) end)
IO.puts "Do something else"
# 等待完成後的結果
IO.puts "Wait for the task"
result = Task.await(worker)

IO.puts "The result is #{result}"

# 改用 Module, function, parameter 的方式傳入 Task.async
worker = Task.async(Fib, :of, [10])
result = Task.await(worker)
IO.puts "The result is #{result}"
$ elixir tasks1.exs
Start the task
Do something else
Wait for the task
The result is 6765
The result is 55

如何監控 Tasks

  1. 不要用 async 改以 start_link 將 task link to a currently supervised process。

  2. 由 supervisor 啟動 supervise tasks

import Supervisor.Spec
children = [
    worker(Task, [ fn -> Fib.of(20) end ])
]
supervise children, strategy: :one_for_one

Agent: a background process that maintains state

可在 process/node/other nodes 存取 state

# count 會儲存 agent process 的 PID
iex(1)> {:ok, count} = Agent.start(fn -> 0 end)
{:ok, #PID<0.86.0>}
iex(2)> Agent.get(count, &(&1))
0
iex(3)> Agent.update(count, &(&1+1))
:ok
iex(4)> Agent.update(count, &(&1+1))
:ok
iex(5)> Agent.get(count, &(&1))
2

# 也可以將 Agent process 設定 local/global name
iex(6)> Agent.start(fn -> 5 end, name: Sum)
{:ok, #PID<0.92.0>}
iex(7)> Agent.get(Sum, &(&1))
5
iex(8)> Agent.update(Sum, &(&1+100))
:ok
iex(9)> Agent.get(Sum, &(&1))
105

agent_dict.exs

# 儲存 a list of word/frequency pairs in a map
# 將 map 存在 agent 中
defmodule Frequency do

  def start_link do
    # 以 start_link 啟動 Agent
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def add_word(word) do
    Agent.update(__MODULE__,
                 fn map ->
                      Map.update(map, word, 1, &(&1+1))
                 end)
  end

  def count_for(word) do
    Agent.get(__MODULE__, fn map -> map[word] end)
  end

  def words do
    Agent.get(__MODULE__, fn map -> Map.keys(map) end)
  end

end
iex(1)> c "agent_dict.exs"
[Frequency]
iex(2)> Frequency.start_link
{:ok, #PID<0.92.0>}
iex(3)> Frequency.add_word "You"
:ok
iex(4)> Frequency.words
["You"]
iex(5)> Frequency.add_word "are"
:ok
iex(6)> Frequency.add_word "here"
:ok
iex(7)> Frequency.add_word "are"
:ok
iex(8)> Frequency.add_word "ok"
:ok
iex(9)> Frequency.words
["You", "are", "here", "ok"]
iex(10)> Frequency.count_for("are")
2
iex(11)> Frequency.count_for("ok")
1

anagrams.exs

defmodule Dictionary do

  @name __MODULE__

  ##
  # External API

  # 啟動 Dictionary Agent Process
  def start_link,
  do: Agent.start_link(fn -> %{} end, name: @name)

  def add_words(words),
  do: Agent.update(@name, &do_add_words(&1, words))

  def anagrams_of(word),
  do: Agent.get(@name, &Map.get(&1, signature_of(word)))

  ##
  # Internal implementation
  defp do_add_words(map, words),
  do: Enum.reduce(words, map, &add_one_word(&1, &2))

  defp add_one_word(word, map),
  do: Map.update(map, signature_of(word), [word], &[word|&1])

  defp signature_of(word),
  do: word |> to_charlist |> Enum.sort |> to_string

end

defmodule WordlistLoader do

  # 每一個 dictionary file 都用不同的 Task 載入
  def load_from_files(file_names) do
    file_names
    |> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
    |> Enum.map(&Task.await/1)
  end

  defp load_task(file_name) do
    File.stream!(file_name, [], :line)
    |> Enum.map(&String.trim/1)
    |> Dictionary.add_words
  end
end
iex(1)> Dictionary.start_link
{:ok, #PID<0.93.0>}

# 共有四個 list file
iex(2)> Enum.map(1..4, &"words/list#{&1}") |> WordlistLoader.load_from_files
[:ok, :ok, :ok, :ok]
iex(3)> Dictionary.anagrams_of "organ"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
 "angor"]

Agent, Task 是以 OTP 實作,可以在多個 nodes 環境運作

名稱的部分改為 @name {:global, __MODULE__}

defmodule Dictionary do

  @name {:global, __MODULE__}

  ##
  # External API

  def start_link, 
  do: Agent.start_link(fn -> HashDict.new end, name: @name)

  def add_words(words),
  do: Agent.update(@name, &do_add_words(&1, words))

  def anagrams_of(word),
  do: Agent.get(@name, &Dict.get(&1, signature_of(word)))

  ##
  # Internal implementation

  defp do_add_words(dict, words),
  do: Enum.reduce(words, dict, &add_one_word(&1, &2))
  
  defp add_one_word(word, dict),
  do: Dict.update(dict, signature_of(word), [word], &[word|&1])

  defp signature_of(word),
  do: word |> to_charlist |> Enum.sort |> to_string

end

defmodule WordlistLoader do
  def load_from_files(file_names) do
    file_names
    |> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
    |> Enum.map(&Task.await/1)
  end

  defp load_task(file_name) do
    File.stream!(file_name, [], :line)
    |> Enum.map(&String.strip/1)
    |> Dictionary.add_words
  end
end
$ iex --sname one anagrams_dist.exs
iex(one@cmbp)1> Dictionary.
add_words/1      anagrams_of/1    start_link/0
iex(one@cmbp)1> Dictionary.start_link
{:ok, #PID<0.102.0>}
iex(one@cmbp)2> Dictionary.anagrams_of "argon"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
 "angor"]
$ iex --sname two anagrams_dist.exs
iex(two@cmbp)1> Node.connect :one@cmbp
true
iex(two@cmbp)2> Node.list
[:one@cmbp]
iex(two@cmbp)3> WordlistLoader.load_from_files(~w{words/list1 words/list2})
[:ok, :ok]
iex(two@cmbp)4> WordlistLoader.load_from_files(~w{words/list3 words/list4})
[:ok, :ok]
iex(two@cmbp)5> Dictionary.anagrams_of "crate"
["recta", "react", "creta", "creat", "crate", "cater", "carte", "caret"]

References

Programming Elixir