OTP 定義 application 的 hierarchy,一個 application 包含了多個 processes,這些 processes 是依照 "behaviors" 的規格實作,"server" behavior 就稱為 GenServer,另一個特殊的 "supervisor" behavior 用來監控 processes,並實作 restart process 的規則。
OTP Server: GenServer
產生新的 sequence project
$ mix new sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs
$ cd sequence/
$ mkdir lib/sequence
當 client 呼叫 server,就會進入 handle_call,其中第一個參數是 client 要送給 server 的資訊,第二個參數是 client 的 PID,第三個是 server state。
defmodule Sequence.Server do
# use 就是將 OTP GenServer behavior 套用到這個 module
use GenServer
# :next_number 是 client 給的參數, _from 是 client's PID, current_number 是 server state
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
end
startlink 類似 spawnlink,會產生一個新的 process
$ iex -S mix
Compiling 2 files (.ex)
Generated sequence app
iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.149.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, :next_number)
101
iex(4)> GenServer.call(pid, :next_number)
102
增加處理重設 number 的 fuction: def handle_call({:set_number, new_number}, _from, _current_number)
defmodule Sequence.Server do
# use 就是將 OTP GenServer behavior 套用到這個 module
use GenServer
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
def handle_call({:set_number, new_number}, _from, _current_number) do
{ :reply, new_number, new_number }
end
end
執行過程
iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.123.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, {:set_number, 20})
20
iex(4)> GenServer.call(pid, :next_number)
20
iex(5)> GenServer.call(pid, :next_number)
21
iex(6)> GenServer.call(pid, :next_number)
22
GenServer.call 是單向的呼叫,會等待 server 回傳結果,但有時候不需要等待 reply,就改用 GenServer.cast,Server 要實作 handle_cast。
defmodule Sequence.Server do
# use 就是將 OTP GenServer behavior 套用到這個 module
use GenServer
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
# 因為不需要 reply 給 client,最後只要更新 server state
def handle_cast({:increment_number, delta}, current_number) do
{ :noreply, current_number + delta}
end
end
執行過程
# reload Sequence.Server
iex(8)> r Sequence.Server
warning: redefining module Sequence.Server (current version loaded from _build/dev/lib/sequence/ebin/Elixir.Sequence.Server.beam)
lib/sequence/server.ex:1
{:reloaded, Sequence.Server, [Sequence.Server]}
iex(9)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.143.0>}
iex(10)> GenServer.call(pid, :next_number)
100
iex(11)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(12)> GenServer.call(pid, :next_number)
301
追蹤 Server 執行過程
start_link 的第三個參數,是一些 options 設定,例如 [debug: [:trace]]
。
iex(6)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:trace]])
{:ok, #PID<0.148.0>}
iex(7)> GenServer.call(pid, :next_number)
*DBG* <0.148.0> got call next_number from <0.139.0>
*DBG* <0.148.0> sent 100 to <0.139.0>, new state 101
100
iex(8)> GenServer.cast(pid, {:increment_number, 200})
*DBG* <0.148.0> got cast {increment_number,200}
:ok
*DBG* <0.148.0> new state 301
[debug: [:statistics]]
產生統計資料,timestamp 為 {{y,m,d},{h,m,s}} 時間的 tuple
iex(11)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:statistics]])
{:ok, #PID<0.155.0>}
iex(12)> GenServer.call(pid, :next_number)
100
iex(13)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(14)> :sys.statistics pid, :get
{:ok,
[start_time: {{2017, 8, 30}, {9, 54, 38}},
current_time: {{2017, 8, 30}, {9, 54, 53}}, reductions: 76, messages_in: 2,
messages_out: 0]}
可用 :sys.trace enable/disable trace 功能
iex(17)> :sys.trace pid, true
:ok
iex(18)> GenServer.call(pid, :next_number)
*DBG* <0.155.0> got call next_number from <0.139.0>
*DBG* <0.155.0> sent 301 to <0.139.0>, new state 302
301
iex(19)> :sys.trace pid, false
:ok
iex(20)> GenServer.call(pid, :next_number)
302
iex(21)> :sys.get_state pid
303
增加 formatstatus,格式化 :sys.getstatus pid 的結果
defmodule Sequence.Server do
# use 就是將 OTP GenServer behavior 套用到這個 module
use GenServer
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
# 因為不需要 reply 給 client,最後只要更新 server state
def handle_cast({:increment_number, delta}, current_number) do
{ :noreply, current_number + delta}
end
# 格式化 state 的 trace 資訊
def format_status(_reason, [ _pdict, state ]) do
[data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
end
end
iex(8)> :sys.get_status pid
{:status, #PID<0.123.0>, {:module, :gen_server},
[["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
"$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>,
[trace: true,
statistics: {{{2017, 8, 30}, {10, 1, 2}}, {:reductions, 21}, 1, 0}],
[header: 'Status for generic server <0.123.0>',
data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
{'Logged events', []}],
data: [{'State', "My current state is '101', and I'm happy"}]]]}
GenServer Callbacks
GenServer 是使用 OTP protocol,必須實作六個 callback functions:
init(state_arguments)
GenServer 會在啟動 server 時呼叫,參數為 start_link 的第二個參數,啟動成功要回傳 {:ok, state},失敗要回傳 {:stop, reason}。
有個 {:ok, state, timeout} 的回傳值 timeout,GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。
handle_call(request, from , state)
GenServer.call(pid, request) 呼叫的 function,成功要回傳 {:reply, result, newstate},預設可用 :badcall error 停掉 server。
handle_cast(request, state)
GenServer.cast(pid, request) 呼叫的 function,成功要回傳 {:noreply, newstate},失敗回傳 {:stop, reason,
newstate}
handle_info(info, state)
處理沒有被 call/cast 處理的 messages,例如處理 :timeout,另外直接用 send 發送給這個 PID 的訊息,也是由 handle_info 處理。
terminate(reason, state)
當 server 停止時會呼叫此 function,但如果有用 supervisor 機制監控 server,就不用處理這個。
codechange(fromversion, state, extra)
OTP 可在不停掉系統的狀況下,替換 server code,新的 server 跟舊 server 有不同的狀態。
format_status(reason, [pdict, state])
用來客製化 state display 的訊息,要回傳 [data: [{'State', state_info}]]
OTP functions 的回傳值
{ :noreply, new_state [ , :hibernate | timeout ] }
call and cast 使用,:hibernate 會將 server state 由 memory 移除,並在下一個 message 中,recover 該 state,儲存 memory state 會消耗掉一些 CPU resource。 timeout 可以設定為 :infinite (預設值),GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。
{ :stop, reason, new_state }
server 終止的訊號
{ :reply, response, new_state [ , :hibernate | timeout ] }
只有 handle_call 可以使用,可發送 response 給 client
{ :stop, reason, reply, new_state }
只有 handle_call 可以使用,發送 response 及 server 終止的訊號給 client。
Naming a Process
在 start_link 加上 name: :seq 的 option
iex(9)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, name: :seq)
{:ok, #PID<0.132.0>}
iex(10)> GenServer.call(:seq, :next_number)
100
iex(11)> :sys.get_status :seq
{:status, #PID<0.132.0>, {:module, :gen_server},
[["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
"$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>, [],
[header: 'Status for generic server seq',
data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
{'Logged events', []}],
data: [{'State', "My current state is '101', and I'm happy"}]]]}
如何將呼叫 GenServer 的 function call (start_link, call, cast) 封裝起來
defmodule Sequence.Server do
use GenServer
#####
# External API
def start_link(current_number) do
GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
end
def next_number do
GenServer.call __MODULE__, :next_number
end
def increment_number(delta) do
GenServer.cast __MODULE__, {:increment_number, delta}
end
#####
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
# 因為不需要 reply 給 client,最後只要更新 server state
def handle_cast({:increment_number, delta}, current_number) do
{ :noreply, current_number + delta}
end
# 格式化 state 的 trace 資訊
def format_status(_reason, [ _pdict, state ]) do
[data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
end
end
$ iex -S mix
iex(1)> Sequence.Server.start_link 200
{:ok, #PID<0.123.0>}
iex(2)> Sequence.Server.next_number
200
iex(3)> Sequence.Server.next_number
201
iex(4)> Sequence.Server.increment_number 100
:ok
iex(5)> Sequence.Server.next_number
302
OTP Supervisors
supervisors 負責 process monitoring and restarting
supervisor 是使用 OTP supervisor behavior,他會有一個 list of porcesses,並知道 process crash 時,要怎麼處理,另外也有避免 restart loops 的機制。
supervisor 是利用 erlang VM process-linking 及 -monitoring 的機制實作的。
產生一個包含 supervisor 的新專案
$ mix new --sup sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating lib/sequence/application.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs
mix.exs 是自動產生的,不需要修改內容
defmodule Sequence.Mixfile do
use Mix.Project
def project do
[
app: :sequence,
version: "0.1.0",
elixir: "~> 1.5",
start_permanent: Mix.env == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: {Sequence.Application, []}
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
]
end
end
這是 lib/sequence/application.ex 的內容
defmodule Sequence.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false
use Application
def start(_type, _args) do
# List all child processes to be supervised
children = [
# Starts a worker by calling: Sequence.Worker.start_link(arg)
# {Sequence.Worker, arg},
{Sequence.Server, 123},
{Sequence.Server2, 123}
]
# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: Sequence.Supervisor]
# 產生 supervisor
{:ok, pid} = Supervisor.start_link(children, opts)
end
end
由 supervisor 啟動兩個 worker: Sequnce.Server 及 Sequence.Server2
defmodule Sequence.Supervisor do
use Supervisor.Behaviour
def start_link(initial_number) do
:supervisor.start_link(__MODULE__, initial_number)
end
def init(initial_number) do
child_processes = [
worker(Sequence.Server, [initial_number]),
worker(Sequence.Server2, [initial_number])
]
supervise child_processes, strategy: :one_for_one
end
end
Sequence.Server 是從上面的測試複製下來的
defmodule Sequence.Server do
use GenServer
#####
# External API
def start_link(current_number) do
GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
end
def next_number do
GenServer.call __MODULE__, :next_number
end
def increment_number(delta) do
GenServer.cast __MODULE__, {:increment_number, delta}
end
#####
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
# 因為不需要 reply 給 client,最後只要更新 server state
def handle_cast({:increment_number, delta}, current_number) do
{ :noreply, current_number + delta}
end
# 格式化 state 的 trace 資訊
def format_status(_reason, [ _pdict, state ]) do
[data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
end
end
Sequence.Server2 功能跟 Server1 一樣,差別是啟動時,指定了不同的 Process Name,另外注意改用 :gen_server 的方式呼叫。
defmodule Sequence.Server2 do
use GenServer
#####
# External API
def start_link(current_number) do
:gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
end
def next_number do
:gen_server.call :sequence, :next_number
end
def increment_number(delta) do
:gen_server.cast :sequence, {:increment_number, delta}
end
#####
# GenServer implementation
def init(current_number)
when is_number(current_number) do
{ :ok, current_number }
end
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
def handle_cast({:increment_number, delta}, current_number) do
{ :noreply, current_number + delta}
end
end
Sequence.Server2 跟 Sequence.Server1 可用同樣的方式測試。
$ iex -S mix
Compiling 1 file (.ex)
Interactive Elixir (1.5.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :observer.start()
:ok
iex(2)> Sequence.Server.increment_number 3
:ok
iex(3)> Sequence.Server.next_number
126
iex(4)> Sequence.Server.next_number
127
## 刻意發送一個 increment_number 無法處理的 string,process 會 crash,並由 supervisor 重新啟動
iex(5)> Sequence.Server.increment_number "c"
:ok
iex(6)>
00:08:25.372 [error] GenServer Sequence.Server terminating
** (ArithmeticError) bad argument in arithmetic expression
(sequence) lib/sequence/server.ex:27: Sequence.Server.handle_cast/2
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:increment_number, "c"}}
State: [data: [{'State', "My current state is '128', and I'm happy"}]]
nil
iex(7)> Sequence.Server.next_number
123
iex(8)> Sequence.Server.next_number
124
iex(9)>
OTP Application
OTP application is a bundle of code that comes with a descriptor. 概念上比較接近 component/service
OTP 會使用一個 name.app 的 application spec file,mix 會自動根據 mix.exs 產生這個設定檔。
會產生一個 mix.exs,在 def application do 裡面加上 mod: { Sequence, 456 }
defmodule Sequence.Mixfile do
use Mix.Project
def project do
[
app: :sequence,
version: "0.1.0",
elixir: "~> 1.5",
start_permanent: Mix.env == :prod,
deps: deps()
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger],
mod: { Sequence, 456 }
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
]
end
end
修改 /lib/sequence.ex,因為剛剛 mod: {Sequence,456}
的設定,會自動執行 Sequence module 的 start(_type, initial_number)
,在裡面啟動 Sequence.Supervisor。
defmodule Sequence do
use Application
def start(_type, initial_number) do
Sequence.Supervisor.start_link(initial_number)
end
end
/lib/sequence/supervisor.ex 是 OTP supervisor,負責管理 Sequence.Server process
defmodule Sequence.Supervisor do
use Supervisor
def start_link(initial_number) do
:supervisor.start_link(__MODULE__, initial_number)
end
def init(initial_number) do
child_processes = [
worker(Sequence.Server, [initial_number])
]
supervise child_processes, strategy: :one_for_one
end
end
/lib/sequence/server.ex
defmodule Sequence.Server do
use GenServer
#####
# External API
def start_link(current_number) do
:gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
end
def next_number do
:gen_server.call :sequence, :next_number
end
def increment_number(delta) do
:gen_server.cast :sequence, {:increment_number, delta}
end
#####
# GenServer implementation
def init(current_number)
when is_number(current_number) do
{ :ok, current_number }
end
def handle_call(:next_number, _from, current_number) do
{ :reply, current_number, current_number+1 }
end
def handle_cast({:increment_number, delta}, current_number) do
{ :noreply, current_number + delta}
end
end
啟動 sequence project
$ iex -S mix
iex(1)> Sequence.Server.next_number
456
加上 registered: 設定 application 要註冊的 names,可利用這個設定值,讓 node/cluster 之間的 application name 都是唯一的,在此註冊這個 applcation 名稱為 Sequence.Server
def application do
[
extra_applications: [:logger],
mod: { Sequence, 456 },
registered: [Sequence.Server]
]
end
執行 mix compile 會編譯程式並產生 sequence/_build/dev/lib/sequence/ebin/sequence.app
$ mix compile
Compiling 3 files (.ex)
Generated sequence app
sequence.app 內容為
{application,sequence,
[{applications,[kernel,stdlib,elixir,logger]},
{description,"sequence"},
{modules,['Elixir.Sequence','Elixir.Sequence.Server',
'Elixir.Sequence.Supervisor']},
{vsn,"0.1.0"},
{extra_applications,[logger]},
{mod,{'Elixir.Sequence',456}},
{registered,['Elixir.Sequence.Server']}]}.
要讓 Application 更一般化,可改寫初始參數的傳遞方式
修改 mix.exs 將參數放在 env 裡面 env: [initial_number: 456]
def application do
[
extra_applications: [:logger],
mod: { Sequence, [] },
env: [initial_number: 456]
registered: [Sequence.Server]
]
end
修改 /lib/sequence.ex,參數由 Application.get_env 取得
defmodule Sequence do
use Application
def start(_type, _args) do
Sequence.Supervisor.start_link(Application.get_env(:sequence, :initial_number))
end
end
Code Release: EXRM (Elixir Release Manager)
release: a bundle that contains a particular version of your application including dependencies, configuration, metadata.
hot upgrade: 更新 running application,更新過程 user 不受影響
exrm: 建立在 erlang relx 之上,用來管理 release package
修改 mix.exs,增加 :exrm 這個 library depencency
defp deps do
[
{:exrm, "~> 1.0.8"}
]
end
修改 /lib/sequence/server.ex,增加 @vsn "0"
defmodule Sequence.Server do
use GenServer
@vsn "0"
安裝 exrm
mix do deps.get, deps.compile
產生 release package 在 rel/sequence 這個資料夾內,壓縮檔放在 rel/sequence/releases/0.0.1/sequence.tar.gz
$ mix release
....
==> The release for sequence-0.0.1 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`
解壓縮 sequence.tar.gz 就可以啟動 sequence app
$ ./sequence/bin/sequence console
iex(sequence@127.0.0.1)1> Sequence.Server.next_number
456
iex(sequence@127.0.0.1)2> Sequence.Server.next_number
457
保留這個 console 不動,回到 project 程式,修改 mix.exs,將版本號碼改為 0.0.2
def project do
[
app: :sequence,
version: "0.0.2",
elixir: "~> 1.5",
start_permanent: Mix.env == :prod,
deps: deps()
]
end
另外修改 lib/sequence/server.ex 的 next_number 用來測試是不是已經有換成新版的程式
def next_number do
#:gen_server.call(:sequence, :next_number)
with number = :gen_server.call(:sequence, :next_number),
do: "0.0.2 The next number is #{number}"
end
在 project folder 再做一次 mix release,但在 mac 出現了 .DS_Store 的錯誤(note: 必須要先有 rel/sequence/releases/0.0.1 這個 release 的資料,編譯新版 0.0.2 時,才會出現 relup 檔案,升級就是用這個檔案處理的)
$ mix release
Compiling 3 files (.ex)
Generated sequence app
Building release with MIX_ENV=dev.
This is an upgrade, verifying appups exist for updated dependencies..
==> Unable to access /test/sequence/rel/sequence/releases/.DS_Store/sequence.rel: enotdir
移除 .DS_Store 後,再做一次 mix release
$ rm /test/sequence/rel/sequence/releases/.DS_Store
[14:56]charley@cmbp ~/project/idea/book/ProgrammingElixir/test/sequence$ mix release
Building release with MIX_ENV=dev.
This is an upgrade, verifying appups exist for updated dependencies..
==> All dependencies have appups ready for release!
==> Generated .appup for sequence 0.0.1 -> 0.0.2
==> The release for sequence-0.0.2 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`
將 rel/sequence/releases/0.0.2/sequence.tar.gz 複製到剛剛解壓縮 0.1.0 版的正式環境目錄裡面的 sequence/releases/0.0.2/ 目錄中
用另一個 terminal 直接 hot-upgrade 到 0.0.2
$ bin/sequence upgrade 0.0.2
Release 0.0.2 not found, attempting to unpack releases/0.0.2/sequence.tar.gz
Unpacked successfully: "0.0.2"
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.2 is already unpacked, now installing.
Installed Release: 0.0.2
Made release permanent: "0.0.2"
回到剛剛的 server console,可驗證是否有更新到 0.0.2
iex(sequence@127.0.0.1)3> Sequence.Server.next_number
"0.0.2 The next number is 458"
如果新版發生問題,也能直接降版回 0.0.1
$ bin/sequence downgrade 0.0.1
Release 0.0.1 is marked old, switching to it.
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.1 is marked old, switching to it.
Installed Release: 0.0.1
Made release permanent: "0.0.1"
iex(sequence@127.0.0.1)4> Warning: "/Users/charley/Downloads/test/sequence/releases/0.0.1/relup" missing (optional)
iex(sequence@127.0.0.1)5> Sequence.Server.next_number
459
0.0.3 版,增加 code vsn 更新的 def code_change("0", old_state = { current_number, stash_pid }, _extra)
callback function
更新時 console 會出現這樣的資訊
22:59:39.030 [info] Changing code from 0 to 1
22:59:39.030 [info] {465, #PID<0.628.0>}
22:59:39.031 [info] %Sequence.Server.State{current_number: 465, delta: 1, stash_pid: #PID<0.628.0>}
以下為完整的 0.0.3 版 code
mix.exs
defmodule Sequence.Mixfile do
use Mix.Project
# ...
def project do
[
app: :sequence,
version: "0.0.3",
elixir: "~> 1.5",
start_permanent: Mix.env == :prod,
deps: deps()
]
end
# Configuration for the OTP application
def application do
[
mod: { Sequence, 456 },
registered: [ Sequence.Server ],
applications: [ :logger ]
]
end
defp deps do
[
{:exrm, "~> 1.0.8"}
]
end
end
lib/sequence.ex
defmodule Sequence do
use Application
def start(_type, initial_number) do
Sequence.Supervisor.start_link(initial_number)
end
end
lib/sequence/supervisor.ex
defmodule Sequence.Supervisor do
use Supervisor
def start_link(initial_number) do
result = {:ok, sup } = Supervisor.start_link(__MODULE__, [initial_number])
start_workers(sup, initial_number)
result
end
def start_workers(sup, initial_number) do
# Start the stash worker
{:ok, stash} =
Supervisor.start_child(sup, worker(Sequence.Stash, [initial_number]))
# and then the subsupervisor for the actual sequence server
Supervisor.start_child(sup, supervisor(Sequence.SubSupervisor, [stash]))
end
def init(_) do
supervise [], strategy: :one_for_one
end
end
lib/sequence/subsupervisor.ex
defmodule Sequence.SubSupervisor do
use Supervisor
def start_link(stash_pid) do
Supervisor.start_link(__MODULE__, stash_pid)
end
def init(stash_pid) do
child_processes = [ worker(Sequence.Server, [stash_pid]) ]
supervise child_processes, strategy: :one_for_one
end
end
lib/sequence/server.ex
defmodule Sequence.Server do
use GenServer
require Logger
@vsn "1"
defmodule State do
defstruct current_number: 0, stash_pid: nil, delta: 1
end
#####
# External API
# . . .
def start_link(stash_pid) do
GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
end
def next_number do
# GenServer.call __MODULE__, :next_number
with number = GenServer.call(__MODULE__, :next_number),
do: "0.0.2 The next number is #{number}"
end
def increment_number(delta) do
GenServer.cast __MODULE__, {:increment_number, delta}
end
#####
# GenServer implementation
# def init(stash_pid) do
# current_number = Sequence.Stash.get_value stash_pid
# { :ok, {current_number, stash_pid} }
# end
# def handle_call(:next_number, _from, {current_number, stash_pid}) do
# { :reply, current_number, {current_number+1, stash_pid} }
# end
# def handle_cast({:increment_number, delta}, {current_number, stash_pid}) do
# { :noreply, {current_number + delta, stash_pid}}
# end
# def terminate(_reason, {current_number, stash_pid}) do
# Sequence.Stash.save_value stash_pid, current_number
# end
def init(stash_pid) do
# 啟動時,以 process id 向 Sequence.Stash 取回 current_number
current_number = Sequence.Stash.get_value stash_pid
# 用 struct 記錄 state
{ :ok,
%State{current_number: current_number, stash_pid: stash_pid} }
end
def handle_call(:next_number, _from, state) do
{ :reply,
state.current_number,
%{ state | current_number: state.current_number + state.delta} }
end
def handle_cast({:increment_number, delta}, state) do
{:noreply,
%{ state | current_number: state.current_number + delta, delta: delta} }
end
def terminate(_reason, state) do
# 結束時,將 current_number 存到 Sequence.Stash
Sequence.Stash.save_value state.stash_pid, state.current_number
end
# 版本更新
def code_change("0", old_state = { current_number, stash_pid }, _extra) do
new_state = %State{current_number: current_number,
stash_pid: stash_pid,
delta: 1
}
Logger.info "Changing code from 0 to 1"
Logger.info inspect(old_state)
Logger.info inspect(new_state)
{ :ok, new_state }
end
end
lib/sequence/stash.ex
defmodule Sequence.Stash do
use GenServer
#####
# External API
def start_link(current_number) do
GenServer.start_link( __MODULE__, current_number)
end
def save_value(pid, value) do
GenServer.cast pid, {:save_value, value}
end
def get_value(pid) do
GenServer.call pid, :get_value
end
#####
# GenServer implementation
def handle_call(:get_value, _from, current_value) do
{ :reply, current_value, current_value }
end
def handle_cast({:save_value, value}, _current_value) do
{ :noreply, value}
end
end
Task & Agent
如果需要 background processing for maintaining state,但不想寫 spawn/send/receive,可使用 Task, Agents
Task: a function that runs in the background
tasks1.exs
defmodule Fib do
def of(0), do: 0
def of(1), do: 1
def of(n), do: Fib.of(n-1) + Fib.of(n-2)
end
IO.puts "Start the task"
# 非同步 呼叫 Fib.of
worker = Task.async(fn -> Fib.of(20) end)
IO.puts "Do something else"
# 等待完成後的結果
IO.puts "Wait for the task"
result = Task.await(worker)
IO.puts "The result is #{result}"
# 改用 Module, function, parameter 的方式傳入 Task.async
worker = Task.async(Fib, :of, [10])
result = Task.await(worker)
IO.puts "The result is #{result}"
$ elixir tasks1.exs
Start the task
Do something else
Wait for the task
The result is 6765
The result is 55
如何監控 Tasks
不要用 async 改以 start_link 將 task link to a currently supervised process。
由 supervisor 啟動 supervise tasks
import Supervisor.Spec
children = [
worker(Task, [ fn -> Fib.of(20) end ])
]
supervise children, strategy: :one_for_one
Agent: a background process that maintains state
可在 process/node/other nodes 存取 state
# count 會儲存 agent process 的 PID
iex(1)> {:ok, count} = Agent.start(fn -> 0 end)
{:ok, #PID<0.86.0>}
iex(2)> Agent.get(count, &(&1))
0
iex(3)> Agent.update(count, &(&1+1))
:ok
iex(4)> Agent.update(count, &(&1+1))
:ok
iex(5)> Agent.get(count, &(&1))
2
# 也可以將 Agent process 設定 local/global name
iex(6)> Agent.start(fn -> 5 end, name: Sum)
{:ok, #PID<0.92.0>}
iex(7)> Agent.get(Sum, &(&1))
5
iex(8)> Agent.update(Sum, &(&1+100))
:ok
iex(9)> Agent.get(Sum, &(&1))
105
agent_dict.exs
# 儲存 a list of word/frequency pairs in a map
# 將 map 存在 agent 中
defmodule Frequency do
def start_link do
# 以 start_link 啟動 Agent
Agent.start_link(fn -> %{} end, name: __MODULE__)
end
def add_word(word) do
Agent.update(__MODULE__,
fn map ->
Map.update(map, word, 1, &(&1+1))
end)
end
def count_for(word) do
Agent.get(__MODULE__, fn map -> map[word] end)
end
def words do
Agent.get(__MODULE__, fn map -> Map.keys(map) end)
end
end
iex(1)> c "agent_dict.exs"
[Frequency]
iex(2)> Frequency.start_link
{:ok, #PID<0.92.0>}
iex(3)> Frequency.add_word "You"
:ok
iex(4)> Frequency.words
["You"]
iex(5)> Frequency.add_word "are"
:ok
iex(6)> Frequency.add_word "here"
:ok
iex(7)> Frequency.add_word "are"
:ok
iex(8)> Frequency.add_word "ok"
:ok
iex(9)> Frequency.words
["You", "are", "here", "ok"]
iex(10)> Frequency.count_for("are")
2
iex(11)> Frequency.count_for("ok")
1
anagrams.exs
defmodule Dictionary do
@name __MODULE__
##
# External API
# 啟動 Dictionary Agent Process
def start_link,
do: Agent.start_link(fn -> %{} end, name: @name)
def add_words(words),
do: Agent.update(@name, &do_add_words(&1, words))
def anagrams_of(word),
do: Agent.get(@name, &Map.get(&1, signature_of(word)))
##
# Internal implementation
defp do_add_words(map, words),
do: Enum.reduce(words, map, &add_one_word(&1, &2))
defp add_one_word(word, map),
do: Map.update(map, signature_of(word), [word], &[word|&1])
defp signature_of(word),
do: word |> to_charlist |> Enum.sort |> to_string
end
defmodule WordlistLoader do
# 每一個 dictionary file 都用不同的 Task 載入
def load_from_files(file_names) do
file_names
|> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
|> Enum.map(&Task.await/1)
end
defp load_task(file_name) do
File.stream!(file_name, [], :line)
|> Enum.map(&String.trim/1)
|> Dictionary.add_words
end
end
iex(1)> Dictionary.start_link
{:ok, #PID<0.93.0>}
# 共有四個 list file
iex(2)> Enum.map(1..4, &"words/list#{&1}") |> WordlistLoader.load_from_files
[:ok, :ok, :ok, :ok]
iex(3)> Dictionary.anagrams_of "organ"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
"angor"]
Agent, Task 是以 OTP 實作,可以在多個 nodes 環境運作
名稱的部分改為 @name {:global, __MODULE__}
defmodule Dictionary do
@name {:global, __MODULE__}
##
# External API
def start_link,
do: Agent.start_link(fn -> HashDict.new end, name: @name)
def add_words(words),
do: Agent.update(@name, &do_add_words(&1, words))
def anagrams_of(word),
do: Agent.get(@name, &Dict.get(&1, signature_of(word)))
##
# Internal implementation
defp do_add_words(dict, words),
do: Enum.reduce(words, dict, &add_one_word(&1, &2))
defp add_one_word(word, dict),
do: Dict.update(dict, signature_of(word), [word], &[word|&1])
defp signature_of(word),
do: word |> to_charlist |> Enum.sort |> to_string
end
defmodule WordlistLoader do
def load_from_files(file_names) do
file_names
|> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
|> Enum.map(&Task.await/1)
end
defp load_task(file_name) do
File.stream!(file_name, [], :line)
|> Enum.map(&String.strip/1)
|> Dictionary.add_words
end
end
$ iex --sname one anagrams_dist.exs
iex(one@cmbp)1> Dictionary.
add_words/1 anagrams_of/1 start_link/0
iex(one@cmbp)1> Dictionary.start_link
{:ok, #PID<0.102.0>}
iex(one@cmbp)2> Dictionary.anagrams_of "argon"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
"angor"]
$ iex --sname two anagrams_dist.exs
iex(two@cmbp)1> Node.connect :one@cmbp
true
iex(two@cmbp)2> Node.list
[:one@cmbp]
iex(two@cmbp)3> WordlistLoader.load_from_files(~w{words/list1 words/list2})
[:ok, :ok]
iex(two@cmbp)4> WordlistLoader.load_from_files(~w{words/list3 words/list4})
[:ok, :ok]
iex(two@cmbp)5> Dictionary.anagrams_of "crate"
["recta", "react", "creta", "creat", "crate", "cater", "carte", "caret"]
References
Programming Elixir