2018/05/07

Elixir 7 OTP

OTP 定義 application 的 hierarchy,一個 application 包含了多個 processes,這些 processes 是依照 "behaviors" 的規格實作,"server" behavior 就稱為 GenServer,另一個特殊的 "supervisor" behavior 用來監控 processes,並實作 restart process 的規則。

OTP Server: GenServer

產生新的 sequence project

$ mix new sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs
$ cd sequence/
$ mkdir lib/sequence

當 client 呼叫 server,就會進入 handle_call,其中第一個參數是 client 要送給 server 的資訊,第二個參數是 client 的 PID,第三個是 server state。

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  # :next_number 是 client 給的參數, _from 是 client's PID, current_number 是 server state
  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

end

startlink 類似 spawnlink,會產生一個新的 process

$ iex -S mix
Compiling 2 files (.ex)
Generated sequence app
iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.149.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, :next_number)
101
iex(4)> GenServer.call(pid, :next_number)
102

增加處理重設 number 的 fuction: def handle_call({:set_number, new_number}, _from, _current_number)

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_call({:set_number, new_number}, _from, _current_number) do
    { :reply, new_number, new_number }
  end

end

執行過程

iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.123.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, {:set_number, 20})
20
iex(4)> GenServer.call(pid, :next_number)
20
iex(5)> GenServer.call(pid, :next_number)
21
iex(6)> GenServer.call(pid, :next_number)
22

GenServer.call 是單向的呼叫,會等待 server 回傳結果,但有時候不需要等待 reply,就改用 GenServer.cast,Server 要實作 handle_cast。

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

end

執行過程

# reload Sequence.Server
iex(8)> r Sequence.Server
warning: redefining module Sequence.Server (current version loaded from _build/dev/lib/sequence/ebin/Elixir.Sequence.Server.beam)
  lib/sequence/server.ex:1

{:reloaded, Sequence.Server, [Sequence.Server]}

iex(9)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.143.0>}
iex(10)> GenServer.call(pid, :next_number)
100
iex(11)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(12)> GenServer.call(pid, :next_number)
301

追蹤 Server 執行過程

start_link 的第三個參數,是一些 options 設定,例如 [debug: [:trace]]

iex(6)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:trace]])
{:ok, #PID<0.148.0>}
iex(7)> GenServer.call(pid, :next_number)
*DBG* <0.148.0> got call next_number from <0.139.0>
*DBG* <0.148.0> sent 100 to <0.139.0>, new state 101
100
iex(8)> GenServer.cast(pid, {:increment_number, 200})
*DBG* <0.148.0> got cast {increment_number,200}
:ok
*DBG* <0.148.0> new state 301

[debug: [:statistics]] 產生統計資料,timestamp 為 {{y,m,d},{h,m,s}} 時間的 tuple

iex(11)>  {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:statistics]])
{:ok, #PID<0.155.0>}
iex(12)> GenServer.call(pid, :next_number)
100
iex(13)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(14)> :sys.statistics pid, :get
{:ok,
 [start_time: {{2017, 8, 30}, {9, 54, 38}},
  current_time: {{2017, 8, 30}, {9, 54, 53}}, reductions: 76, messages_in: 2,
  messages_out: 0]}

可用 :sys.trace enable/disable trace 功能

iex(17)> :sys.trace pid, true
:ok
iex(18)> GenServer.call(pid, :next_number)
*DBG* <0.155.0> got call next_number from <0.139.0>
*DBG* <0.155.0> sent 301 to <0.139.0>, new state 302
301
iex(19)> :sys.trace pid, false
:ok
iex(20)> GenServer.call(pid, :next_number)
302
iex(21)> :sys.get_state pid
303

增加 formatstatus,格式化 :sys.getstatus pid 的結果

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end
iex(8)> :sys.get_status pid
{:status, #PID<0.123.0>, {:module, :gen_server},
 [["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
   "$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>,
  [trace: true,
   statistics: {{{2017, 8, 30}, {10, 1, 2}}, {:reductions, 21}, 1, 0}],
  [header: 'Status for generic server <0.123.0>',
   data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
    {'Logged events', []}],
   data: [{'State', "My current state is '101', and I'm happy"}]]]}

GenServer Callbacks

GenServer 是使用 OTP protocol,必須實作六個 callback functions:

  1. init(state_arguments)

    GenServer 會在啟動 server 時呼叫,參數為 start_link 的第二個參數,啟動成功要回傳 {:ok, state},失敗要回傳 {:stop, reason}。

    有個 {:ok, state, timeout} 的回傳值 timeout,GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。

  2. handle_call(request, from , state)

    GenServer.call(pid, request) 呼叫的 function,成功要回傳 {:reply, result, newstate},預設可用 :badcall error 停掉 server。

  3. handle_cast(request, state)

    GenServer.cast(pid, request) 呼叫的 function,成功要回傳 {:noreply, newstate},失敗回傳 {:stop, reason, newstate}

  4. handle_info(info, state)

    處理沒有被 call/cast 處理的 messages,例如處理 :timeout,另外直接用 send 發送給這個 PID 的訊息,也是由 handle_info 處理。

  5. terminate(reason, state)

    當 server 停止時會呼叫此 function,但如果有用 supervisor 機制監控 server,就不用處理這個。

  6. codechange(fromversion, state, extra)

    OTP 可在不停掉系統的狀況下,替換 server code,新的 server 跟舊 server 有不同的狀態。

  7. format_status(reason, [pdict, state])

    用來客製化 state display 的訊息,要回傳 [data: [{'State', state_info}]]

OTP functions 的回傳值

  1. { :noreply, new_state [ , :hibernate | timeout ] }

    call and cast 使用,:hibernate 會將 server state 由 memory 移除,並在下一個 message 中,recover 該 state,儲存 memory state 會消耗掉一些 CPU resource。 timeout 可以設定為 :infinite (預設值),GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。

  2. { :stop, reason, new_state }

    server 終止的訊號

  3. { :reply, response, new_state [ , :hibernate | timeout ] }

    只有 handle_call 可以使用,可發送 response 給 client

  4. { :stop, reason, reply, new_state }

    只有 handle_call 可以使用,發送 response 及 server 終止的訊號給 client。


Naming a Process

在 start_link 加上 name: :seq 的 option

iex(9)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, name: :seq)
{:ok, #PID<0.132.0>}
iex(10)> GenServer.call(:seq, :next_number)
100
iex(11)> :sys.get_status :seq
{:status, #PID<0.132.0>, {:module, :gen_server},
 [["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
   "$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>, [],
  [header: 'Status for generic server seq',
   data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
    {'Logged events', []}],
   data: [{'State', "My current state is '101', and I'm happy"}]]]}

如何將呼叫 GenServer 的 function call (start_link, call, cast) 封裝起來

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
  end

  def next_number do
    GenServer.call __MODULE__, :next_number
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end
$ iex -S mix
iex(1)> Sequence.Server.start_link 200
{:ok, #PID<0.123.0>}
iex(2)> Sequence.Server.next_number
200
iex(3)> Sequence.Server.next_number
201
iex(4)> Sequence.Server.increment_number 100
:ok
iex(5)> Sequence.Server.next_number
302

OTP Supervisors

supervisors 負責 process monitoring and restarting

supervisor 是使用 OTP supervisor behavior,他會有一個 list of porcesses,並知道 process crash 時,要怎麼處理,另外也有避免 restart loops 的機制。

supervisor 是利用 erlang VM process-linking 及 -monitoring 的機制實作的。

產生一個包含 supervisor 的新專案

$ mix new --sup sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating lib/sequence/application.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs

mix.exs 是自動產生的,不需要修改內容

defmodule Sequence.Mixfile do
  use Mix.Project

  def project do
    [
      app: :sequence,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: {Sequence.Application, []}
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end
end

這是 lib/sequence/application.ex 的內容

defmodule Sequence.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      # Starts a worker by calling: Sequence.Worker.start_link(arg)
      # {Sequence.Worker, arg},
      {Sequence.Server, 123},
      {Sequence.Server2, 123}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Sequence.Supervisor]
    
    # 產生 supervisor
    {:ok, pid} = Supervisor.start_link(children, opts)
  end
end

由 supervisor 啟動兩個 worker: Sequnce.Server 及 Sequence.Server2

defmodule Sequence.Supervisor do
  use Supervisor.Behaviour

  def start_link(initial_number) do
    :supervisor.start_link(__MODULE__, initial_number)
  end

  def init(initial_number) do
    child_processes = [
        worker(Sequence.Server, [initial_number]),
        worker(Sequence.Server2, [initial_number])
    ]
    supervise child_processes, strategy: :one_for_one
  end
end

Sequence.Server 是從上面的測試複製下來的

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
  end

  def next_number do
    GenServer.call __MODULE__, :next_number
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end

Sequence.Server2 功能跟 Server1 一樣,差別是啟動時,指定了不同的 Process Name,另外注意改用 :gen_server 的方式呼叫。

defmodule Sequence.Server2 do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    :gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
  end

  def next_number do
    :gen_server.call :sequence, :next_number
  end

  def increment_number(delta) do
    :gen_server.cast :sequence, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  def init(current_number)
    when is_number(current_number) do
    { :ok, current_number }
  end

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end
end

Sequence.Server2 跟 Sequence.Server1 可用同樣的方式測試。

$ iex -S mix

Compiling 1 file (.ex)
Interactive Elixir (1.5.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :observer.start()
:ok
iex(2)> Sequence.Server.increment_number 3
:ok
iex(3)> Sequence.Server.next_number
126
iex(4)> Sequence.Server.next_number
127

## 刻意發送一個 increment_number 無法處理的 string,process 會 crash,並由 supervisor 重新啟動
iex(5)> Sequence.Server.increment_number "c"
:ok
iex(6)>
00:08:25.372 [error] GenServer Sequence.Server terminating
** (ArithmeticError) bad argument in arithmetic expression
    (sequence) lib/sequence/server.ex:27: Sequence.Server.handle_cast/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:increment_number, "c"}}
State: [data: [{'State', "My current state is '128', and I'm happy"}]]

nil
iex(7)> Sequence.Server.next_number
123
iex(8)> Sequence.Server.next_number
124
iex(9)>

OTP Application

OTP application is a bundle of code that comes with a descriptor. 概念上比較接近 component/service

OTP 會使用一個 name.app 的 application spec file,mix 會自動根據 mix.exs 產生這個設定檔。

$ mix new sequence

會產生一個 mix.exs,在 def application do 裡面加上 mod: { Sequence, 456 }

defmodule Sequence.Mixfile do
  use Mix.Project

  def project do
    [
      app: :sequence,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, 456 }
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end
end

修改 /lib/sequence.ex,因為剛剛 mod: {Sequence,456} 的設定,會自動執行 Sequence module 的 start(_type, initial_number),在裡面啟動 Sequence.Supervisor。

defmodule Sequence do
  use Application

  def start(_type, initial_number) do
    Sequence.Supervisor.start_link(initial_number)
  end
end

/lib/sequence/supervisor.ex 是 OTP supervisor,負責管理 Sequence.Server process

defmodule Sequence.Supervisor do
  use Supervisor

  def start_link(initial_number) do
    :supervisor.start_link(__MODULE__, initial_number)
  end

  def init(initial_number) do
    child_processes = [
        worker(Sequence.Server, [initial_number])
    ]
    supervise child_processes, strategy: :one_for_one
  end
end

/lib/sequence/server.ex

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    :gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
  end

  def next_number do
    :gen_server.call :sequence, :next_number
  end

  def increment_number(delta) do
    :gen_server.cast :sequence, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  def init(current_number)
  when is_number(current_number) do
    { :ok, current_number }
  end

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end
end

啟動 sequence project

$ iex -S mix
iex(1)> Sequence.Server.next_number
456

加上 registered: 設定 application 要註冊的 names,可利用這個設定值,讓 node/cluster 之間的 application name 都是唯一的,在此註冊這個 applcation 名稱為 Sequence.Server

  def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, 456 },
      registered: [Sequence.Server]
    ]
  end

執行 mix compile 會編譯程式並產生 sequence/_build/dev/lib/sequence/ebin/sequence.app

$ mix compile
Compiling 3 files (.ex)
Generated sequence app

sequence.app 內容為

{application,sequence,
             [{applications,[kernel,stdlib,elixir,logger]},
              {description,"sequence"},
              {modules,['Elixir.Sequence','Elixir.Sequence.Server',
                        'Elixir.Sequence.Supervisor']},
              {vsn,"0.1.0"},
              {extra_applications,[logger]},
              {mod,{'Elixir.Sequence',456}},
              {registered,['Elixir.Sequence.Server']}]}.

要讓 Application 更一般化,可改寫初始參數的傳遞方式

修改 mix.exs 將參數放在 env 裡面 env: [initial_number: 456]

def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, [] },
      env: [initial_number: 456]
      registered: [Sequence.Server]
    ]
  end

修改 /lib/sequence.ex,參數由 Application.get_env 取得

defmodule Sequence do
  use Application

  def start(_type, _args) do
    Sequence.Supervisor.start_link(Application.get_env(:sequence, :initial_number))
  end
end

Code Release: EXRM (Elixir Release Manager)

release: a bundle that contains a particular version of your application including dependencies, configuration, metadata.

hot upgrade: 更新 running application,更新過程 user 不受影響

exrm: 建立在 erlang relx 之上,用來管理 release package

修改 mix.exs,增加 :exrm 這個 library depencency

  defp deps do
    [
      {:exrm, "~> 1.0.8"}
    ]
  end

修改 /lib/sequence/server.ex,增加 @vsn "0"

defmodule Sequence.Server do
  use GenServer

  @vsn "0"

安裝 exrm

mix do deps.get, deps.compile

產生 release package 在 rel/sequence 這個資料夾內,壓縮檔放在 rel/sequence/releases/0.0.1/sequence.tar.gz

$ mix release
....
==> The release for sequence-0.0.1 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`

解壓縮 sequence.tar.gz 就可以啟動 sequence app

$ ./sequence/bin/sequence console
iex(sequence@127.0.0.1)1> Sequence.Server.next_number
456
iex(sequence@127.0.0.1)2> Sequence.Server.next_number
457

保留這個 console 不動,回到 project 程式,修改 mix.exs,將版本號碼改為 0.0.2

  def project do
    [
      app: :sequence,
      version: "0.0.2",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

另外修改 lib/sequence/server.ex 的 next_number 用來測試是不是已經有換成新版的程式

  def next_number do
    #:gen_server.call(:sequence, :next_number)
    with number = :gen_server.call(:sequence, :next_number),
    do: "0.0.2 The next number is #{number}"
  end

在 project folder 再做一次 mix release,但在 mac 出現了 .DS_Store 的錯誤(note: 必須要先有 rel/sequence/releases/0.0.1 這個 release 的資料,編譯新版 0.0.2 時,才會出現 relup 檔案,升級就是用這個檔案處理的)

$ mix release
Compiling 3 files (.ex)
Generated sequence app
Building release with MIX_ENV=dev.

This is an upgrade, verifying appups exist for updated dependencies..
==> Unable to access /test/sequence/rel/sequence/releases/.DS_Store/sequence.rel: enotdir

移除 .DS_Store 後,再做一次 mix release

$ rm /test/sequence/rel/sequence/releases/.DS_Store
[14:56]charley@cmbp ~/project/idea/book/ProgrammingElixir/test/sequence$ mix release
Building release with MIX_ENV=dev.

This is an upgrade, verifying appups exist for updated dependencies..
==> All dependencies have appups ready for release!
==> Generated .appup for sequence 0.0.1 -> 0.0.2
==> The release for sequence-0.0.2 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`

將 rel/sequence/releases/0.0.2/sequence.tar.gz 複製到剛剛解壓縮 0.1.0 版的正式環境目錄裡面的 sequence/releases/0.0.2/ 目錄中

用另一個 terminal 直接 hot-upgrade 到 0.0.2

$ bin/sequence upgrade 0.0.2
Release 0.0.2 not found, attempting to unpack releases/0.0.2/sequence.tar.gz
Unpacked successfully: "0.0.2"
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.2 is already unpacked, now installing.
Installed Release: 0.0.2
Made release permanent: "0.0.2"

回到剛剛的 server console,可驗證是否有更新到 0.0.2

iex(sequence@127.0.0.1)3> Sequence.Server.next_number
"0.0.2 The next number is 458"

如果新版發生問題,也能直接降版回 0.0.1

$ bin/sequence downgrade 0.0.1
Release 0.0.1 is marked old, switching to it.
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.1 is marked old, switching to it.
Installed Release: 0.0.1
Made release permanent: "0.0.1"
iex(sequence@127.0.0.1)4> Warning: "/Users/charley/Downloads/test/sequence/releases/0.0.1/relup" missing (optional)
iex(sequence@127.0.0.1)5> Sequence.Server.next_number
459

0.0.3 版,增加 code vsn 更新的 def code_change("0", old_state = { current_number, stash_pid }, _extra) callback function

更新時 console 會出現這樣的資訊

22:59:39.030 [info]  Changing code from 0 to 1

22:59:39.030 [info]  {465, #PID<0.628.0>}

22:59:39.031 [info]  %Sequence.Server.State{current_number: 465, delta: 1, stash_pid: #PID<0.628.0>}

以下為完整的 0.0.3 版 code

mix.exs

defmodule Sequence.Mixfile do
  use Mix.Project

  # ...

  def project do
    [
      app:     :sequence,
      version: "0.0.3",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps:    deps()
    ]
  end

  # Configuration for the OTP application
  def application do
    [
      mod: { Sequence, 456 },
      registered: [ Sequence.Server ],
      applications: [ :logger ]
    ]
  end

  defp deps do
    [
      {:exrm, "~> 1.0.8"}
    ]
  end
end

lib/sequence.ex

defmodule Sequence do
  use Application

  def start(_type, initial_number) do
    Sequence.Supervisor.start_link(initial_number)
  end
end

lib/sequence/supervisor.ex

defmodule Sequence.Supervisor do
  use Supervisor

  def start_link(initial_number) do
    result = {:ok, sup } = Supervisor.start_link(__MODULE__, [initial_number]) 
    start_workers(sup, initial_number)
    result
  end

  def start_workers(sup, initial_number) do
    # Start the stash worker
    {:ok, stash} = 
       Supervisor.start_child(sup, worker(Sequence.Stash, [initial_number]))

    # and then the subsupervisor for the actual sequence server
    Supervisor.start_child(sup, supervisor(Sequence.SubSupervisor, [stash]))
  end

  def init(_) do
    supervise [], strategy: :one_for_one
  end

end

lib/sequence/subsupervisor.ex

defmodule Sequence.SubSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(Sequence.Server, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end

end

lib/sequence/server.ex

defmodule Sequence.Server do
  use GenServer
  require Logger

  @vsn "1"
  defmodule State do
    defstruct current_number: 0, stash_pid: nil, delta: 1
  end

  #####
  # External API
  # . . .
  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  def next_number do
    # GenServer.call __MODULE__, :next_number
    with number = GenServer.call(__MODULE__, :next_number),
    do: "0.0.2 The next number is #{number}"
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  # def init(stash_pid) do
  #   current_number = Sequence.Stash.get_value stash_pid
  #   { :ok, {current_number, stash_pid} }
  # end

  # def handle_call(:next_number, _from, {current_number, stash_pid}) do
  #   { :reply, current_number, {current_number+1, stash_pid} }
  # end

  # def handle_cast({:increment_number, delta}, {current_number, stash_pid}) do
  #   { :noreply, {current_number + delta, stash_pid}}
  # end

  # def terminate(_reason, {current_number, stash_pid}) do
  #   Sequence.Stash.save_value stash_pid, current_number
  # end

  def init(stash_pid) do
    # 啟動時,以 process id 向 Sequence.Stash 取回 current_number
    current_number = Sequence.Stash.get_value stash_pid

    # 用 struct 記錄 state
    { :ok,
      %State{current_number: current_number, stash_pid: stash_pid} }
  end

  def handle_call(:next_number, _from, state) do
    { :reply,
      state.current_number,
      %{ state | current_number: state.current_number + state.delta} }
  end

  def handle_cast({:increment_number, delta}, state) do
    {:noreply,
     %{ state | current_number: state.current_number + delta, delta: delta} }
  end

  def terminate(_reason, state) do
    # 結束時,將 current_number 存到 Sequence.Stash
    Sequence.Stash.save_value state.stash_pid, state.current_number
  end

  # 版本更新
  def code_change("0", old_state = { current_number, stash_pid }, _extra) do
    new_state = %State{current_number: current_number,
                       stash_pid: stash_pid,
                       delta: 1
                      }
    Logger.info "Changing code from 0 to 1"
    Logger.info inspect(old_state)
    Logger.info inspect(new_state)
    { :ok, new_state }
  end
end

lib/sequence/stash.ex

defmodule Sequence.Stash do
  use GenServer

  #####
  # External API  

  def start_link(current_number) do
    GenServer.start_link( __MODULE__, current_number)
  end

  def save_value(pid, value) do
    GenServer.cast pid, {:save_value, value}
  end

  def get_value(pid) do
    GenServer.call pid, :get_value
  end

  #####
  # GenServer implementation

  def handle_call(:get_value, _from, current_value) do 
    { :reply, current_value, current_value }
  end

  def handle_cast({:save_value, value}, _current_value) do
    { :noreply, value}
  end
end

Task & Agent

如果需要 background processing for maintaining state,但不想寫 spawn/send/receive,可使用 Task, Agents

Task: a function that runs in the background

tasks1.exs

defmodule Fib do
  def of(0), do: 0
  def of(1), do: 1
  def of(n), do: Fib.of(n-1) + Fib.of(n-2)
end

IO.puts "Start the task"

# 非同步 呼叫 Fib.of
worker = Task.async(fn -> Fib.of(20) end)
IO.puts "Do something else"
# 等待完成後的結果
IO.puts "Wait for the task"
result = Task.await(worker)

IO.puts "The result is #{result}"

# 改用 Module, function, parameter 的方式傳入 Task.async
worker = Task.async(Fib, :of, [10])
result = Task.await(worker)
IO.puts "The result is #{result}"
$ elixir tasks1.exs
Start the task
Do something else
Wait for the task
The result is 6765
The result is 55

如何監控 Tasks

  1. 不要用 async 改以 start_link 將 task link to a currently supervised process。

  2. 由 supervisor 啟動 supervise tasks

import Supervisor.Spec
children = [
    worker(Task, [ fn -> Fib.of(20) end ])
]
supervise children, strategy: :one_for_one

Agent: a background process that maintains state

可在 process/node/other nodes 存取 state

# count 會儲存 agent process 的 PID
iex(1)> {:ok, count} = Agent.start(fn -> 0 end)
{:ok, #PID<0.86.0>}
iex(2)> Agent.get(count, &(&1))
0
iex(3)> Agent.update(count, &(&1+1))
:ok
iex(4)> Agent.update(count, &(&1+1))
:ok
iex(5)> Agent.get(count, &(&1))
2

# 也可以將 Agent process 設定 local/global name
iex(6)> Agent.start(fn -> 5 end, name: Sum)
{:ok, #PID<0.92.0>}
iex(7)> Agent.get(Sum, &(&1))
5
iex(8)> Agent.update(Sum, &(&1+100))
:ok
iex(9)> Agent.get(Sum, &(&1))
105

agent_dict.exs

# 儲存 a list of word/frequency pairs in a map
# 將 map 存在 agent 中
defmodule Frequency do

  def start_link do
    # 以 start_link 啟動 Agent
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def add_word(word) do
    Agent.update(__MODULE__,
                 fn map ->
                      Map.update(map, word, 1, &(&1+1))
                 end)
  end

  def count_for(word) do
    Agent.get(__MODULE__, fn map -> map[word] end)
  end

  def words do
    Agent.get(__MODULE__, fn map -> Map.keys(map) end)
  end

end
iex(1)> c "agent_dict.exs"
[Frequency]
iex(2)> Frequency.start_link
{:ok, #PID<0.92.0>}
iex(3)> Frequency.add_word "You"
:ok
iex(4)> Frequency.words
["You"]
iex(5)> Frequency.add_word "are"
:ok
iex(6)> Frequency.add_word "here"
:ok
iex(7)> Frequency.add_word "are"
:ok
iex(8)> Frequency.add_word "ok"
:ok
iex(9)> Frequency.words
["You", "are", "here", "ok"]
iex(10)> Frequency.count_for("are")
2
iex(11)> Frequency.count_for("ok")
1

anagrams.exs

defmodule Dictionary do

  @name __MODULE__

  ##
  # External API

  # 啟動 Dictionary Agent Process
  def start_link,
  do: Agent.start_link(fn -> %{} end, name: @name)

  def add_words(words),
  do: Agent.update(@name, &do_add_words(&1, words))

  def anagrams_of(word),
  do: Agent.get(@name, &Map.get(&1, signature_of(word)))

  ##
  # Internal implementation
  defp do_add_words(map, words),
  do: Enum.reduce(words, map, &add_one_word(&1, &2))

  defp add_one_word(word, map),
  do: Map.update(map, signature_of(word), [word], &[word|&1])

  defp signature_of(word),
  do: word |> to_charlist |> Enum.sort |> to_string

end

defmodule WordlistLoader do

  # 每一個 dictionary file 都用不同的 Task 載入
  def load_from_files(file_names) do
    file_names
    |> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
    |> Enum.map(&Task.await/1)
  end

  defp load_task(file_name) do
    File.stream!(file_name, [], :line)
    |> Enum.map(&String.trim/1)
    |> Dictionary.add_words
  end
end
iex(1)> Dictionary.start_link
{:ok, #PID<0.93.0>}

# 共有四個 list file
iex(2)> Enum.map(1..4, &"words/list#{&1}") |> WordlistLoader.load_from_files
[:ok, :ok, :ok, :ok]
iex(3)> Dictionary.anagrams_of "organ"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
 "angor"]

Agent, Task 是以 OTP 實作,可以在多個 nodes 環境運作

名稱的部分改為 @name {:global, __MODULE__}

defmodule Dictionary do

  @name {:global, __MODULE__}

  ##
  # External API

  def start_link, 
  do: Agent.start_link(fn -> HashDict.new end, name: @name)

  def add_words(words),
  do: Agent.update(@name, &do_add_words(&1, words))

  def anagrams_of(word),
  do: Agent.get(@name, &Dict.get(&1, signature_of(word)))

  ##
  # Internal implementation

  defp do_add_words(dict, words),
  do: Enum.reduce(words, dict, &add_one_word(&1, &2))
  
  defp add_one_word(word, dict),
  do: Dict.update(dict, signature_of(word), [word], &[word|&1])

  defp signature_of(word),
  do: word |> to_charlist |> Enum.sort |> to_string

end

defmodule WordlistLoader do
  def load_from_files(file_names) do
    file_names
    |> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
    |> Enum.map(&Task.await/1)
  end

  defp load_task(file_name) do
    File.stream!(file_name, [], :line)
    |> Enum.map(&String.strip/1)
    |> Dictionary.add_words
  end
end
$ iex --sname one anagrams_dist.exs
iex(one@cmbp)1> Dictionary.
add_words/1      anagrams_of/1    start_link/0
iex(one@cmbp)1> Dictionary.start_link
{:ok, #PID<0.102.0>}
iex(one@cmbp)2> Dictionary.anagrams_of "argon"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
 "angor"]
$ iex --sname two anagrams_dist.exs
iex(two@cmbp)1> Node.connect :one@cmbp
true
iex(two@cmbp)2> Node.list
[:one@cmbp]
iex(two@cmbp)3> WordlistLoader.load_from_files(~w{words/list1 words/list2})
[:ok, :ok]
iex(two@cmbp)4> WordlistLoader.load_from_files(~w{words/list3 words/list4})
[:ok, :ok]
iex(two@cmbp)5> Dictionary.anagrams_of "crate"
["recta", "react", "creta", "creat", "crate", "cater", "carte", "caret"]

References

Programming Elixir

沒有留言:

張貼留言