elixir 的 process 並不是 OS process,而是 erlang VM process。Elixir 使用 actor model for concurrency,actor 是一個獨立的 process,我們可 spawn prcess,發送及接收 messages。
Process
spawn-basic.ex
defmodule SpawnBasic do
def greet do
IO.puts "Hello"
end
end
spawn 產生一個新的 process,#PID<0.93.0> 是 process id
iex(1)> c("spawn-basic.ex")
[SpawnBasic]
iex(2)> SpawnBasic.greet
Hello
:ok
iex(3)> spawn(SpawnBasic, :greet, [])
Hello
#PID<0.93.0>
在 Process 之間傳遞 Message
defmodule Spawn1 do
def greet do
receive do
{sender, msg} ->
send( sender, { :ok, "Hello #{msg}" } )
end
end
end
# here's a client 發送訊息給 pid,self 是 caller's PID
pid = spawn(Spawn1, :greet, [])
send pid, {self(), "World!"}
receive do
{:ok, message} ->
IO.puts message
end
這是舊的語法,現在要改為 send( pid, msg )
pid <- {self, "World!"}
send self(), "World!"
send( self, "World!")
發送多個訊息
defmodule Spawn2 do
def greet do
receive do
{sender, msg} ->
send( sender, { :ok, "Hello #{msg}" } )
end
end
end
# here's a client
pid = spawn(Spawn2, :greet, [])
send pid, {self(), "World!"}
receive do
{:ok, message} ->
IO.puts message
end
send pid, {self(), "Kermit!"}
receive do
{:ok, message} ->
IO.puts message
end
執行後,程式會停在這裡,這是因為,發送第一個訊息,Spawn2 greet 處理後,回傳結果後就結束了,所以當發送第二個訊息給 pid後,等待回應時,一直無法收到回傳的訊息。
Hello World!
改寫:在 receive 的部分,設定 timeout
defmodule Spawn2 do
def greet do
receive do
{sender, msg} ->
send( sender, { :ok, "Hello #{msg}" } )
end
end
end
# here's a client
pid = spawn(Spawn2, :greet, [])
send pid, {self(), "World!"}
receive do
{:ok, message} ->
IO.puts message
end
send pid, {self(), "Kermit!"}
receive do
{:ok, message} ->
IO.puts message
after 500 ->
IO.puts "The greeter has gone away"
end
執行結果
Hello World!
The greeter has gone away
最正確的寫法,應該要讓 Spawn 不斷接收並處理訊息
defmodule Spawn4 do
def greet do
receive do
{sender, msg} ->
send( sender, { :ok, "Hello #{msg}" } )
greet
end
end
end
# here's a client
pid = spawn(Spawn4, :greet, [])
send pid, {self(), "World!"}
receive do
{:ok, message} ->
IO.puts message
end
send pid, {self(), "Kermit!"}
receive do
{:ok, message} ->
IO.puts message
after 500 ->
IO.puts "The greeter has gone away"
end
Hello World!
Hello Kermit!
Tail Recursion: 尾遞迴
defmodule TailRecursive do
def factorial(n), do: _fact(n, 1)
defp _fact(0, acc), do: acc
defp _fact(n, acc), do: _fact(n-1, acc*n)
end
Process Overhead
這是一個用來測試 process 效能的小程式,一開始會產生 n 個 processes,第一個會送數字給第二個,加 1 後傳給第三個,最後一個會回傳結果給第一個。
defmodule Chain do
def counter(next_pid) do
receive do
n ->
send next_pid, n + 1
end
end
def create_processes(n) do
last = Enum.reduce 1..n, self(),
fn (_,send_to) ->
## 產生 process 執行 :counter,將自己的 process PID 送到 send_to 由 spawn 傳給下一個 process
spawn(Chain, :counter, [send_to])
end
# start the count by sending
send last, 0
# and wait for the result to come back to us
receive do
final_answer when is_integer(final_answer) ->
"Result is #{inspect(final_answer)}"
end
end
def run(n) do
IO.puts inspect :timer.tc(Chain, :create_processes, [n])
end
end
$ elixir -r chain.exs -e "Chain.run(10)"
{3932, "Result is 10"}
$ elixir -r chain.exs -e "Chain.run(1_000)"
{15951, "Result is 1000"}
$ elixir -r chain.exs -e "Chain.run(40_000)"
{444129, "Result is 40000"}
$ elixir -r chain.exs -e "Chain.run(50_000)"
{545041, "Result is 50000"}
$ elixir -r chain.exs -e "Chain.run(300_000)"
10:54:49.904 [error] Too many processes
** (SystemLimitError) a system limit has been reached
調整 vm 參數 --erl "+P 1000000"
$ elixir --erl "+P 1000000" -r chain.exs -e "Chain.run(300_000)"
{2924026, "Result is 300000"}
When Process Die
預設狀況下,沒有人會知道 Process 結束了
import :timer, only: [ sleep: 1 ]
defmodule Link1 do
def sad_method do
sleep 500
exit(99)
end
def run do
spawn(Link1, :sad_method, [])
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened as far as I am concerned"
end
end
end
Link1.run
Link1 不知道新的 sad_method process 已經結束了,在 1s 後 timeout
$ elixir -r link1.exs
Nothing happened as far as I am concerned
要解決上面的問題,可以用 spawn_link 連結兩個 Process,當 child process 因異常死亡時,會連帶把另一個 process 停掉。
import :timer, only: [ sleep: 1 ]
defmodule Link2 do
def sad_method do
sleep 500
exit(99)
end
def run do
spawn_link(Link2, :sad_method, [])
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened as far as I am concerned"
end
end
end
Link2.run
$ elixir -r link2.exs
** (EXIT from #PID<0.73.0>) 99
defmodule Link3 do
import :timer, only: [ sleep: 1 ]
def sad_function do
sleep 500
exit(:boom)
end
def run do
# 將 :trap_exit signal 轉換為 message :EXIT
Process.flag(:trap_exit, true)
spawn_link(Link3, :sad_function, [])
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened as far as I am concerned"
end
end
end
Link3.run
$ elixir -r link3.exs
MESSAGE RECEIVED: {:EXIT, #PID<0.79.0>, :boom}
Monitoring a Process
spawnmonitor 可在 spawna process 時連帶啟用 monitoring,或是使用 Process.monior 監控已經存在的 process。但如果是用 Process.monitor,可能會產生 race condition,如果其他 process 在呼叫 monitor call 完成前就死亡,就會收不到 notification。 spawnlink 及 spawn_monitor versions 為 atomic,所以可以 catch a failure。
import :timer, only: [ sleep: 1 ]
defmodule Monitor1 do
def sad_method do
sleep 500
exit(99)
end
def run do
res = spawn_monitor(Monitor1, :sad_method, [])
IO.puts inspect res
receive do
msg ->
IO.puts "MESSAGE RECEIVED: #{inspect msg}"
after 1000 ->
IO.puts "Nothing happened as far as I am concerned"
end
end
end
Monitor1.run
$ elixir -r monitor1.exs
{#PID<0.79.0>, #Reference<0.759175825.870842370.239990>}
MESSAGE RECEIVED: {:DOWN, #Reference<0.759175825.870842370.239990>, :process, #PID<0.79.0>, 99}
結果跟 spawnlink 差不多,如果是用 spawnlink,子 process crash 連帶會影響 monitor process,如果用spawn_monitor,就會知道 crash 的原因。
Parallel Map
通常 map 會回傳一個 apply function 到 collection 裡面每一個 elements 的 list,parallel map 的功能一樣,但會在分別獨立的 process 對每一個 element 都 apply function。
首先會將 collection map 到 fn,在 fn 內 spawn_link 產生 a list of PIDs,每一個 PID 都會對每一個 element 執行給定的 function。
第二個 |> 會將 list of PIDs 轉成 results,各自傳給 list 內每一個PID,注意 ^pid 可讓 receive 依照順序取得結果。
defmodule Parallel do
def pmap(collection, fun) do
me = self()
collection
|>
Enum.map(fn (elem) ->
spawn_link fn -> ( send me, { self(), fun.(elem) } ) end
end)
|>
Enum.map(fn (pid) ->
receive do { ^pid, result } -> result end
end)
end
end
iex(1)> Parallel.pmap 1..10, &(&1 * &1)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
Fibonacci Server
當 calculator 可處理下一個數字時,會發送 :ready 給 scheduler,scheduler 會以 :fib message 發送計算工作給 calculator,calculator 會以 :answer 回傳結果,scheduler 會發送 :shutdown 給 calculator 通知要 exit。
defmodule FibSolver do
def fib(scheduler) do
# 啟動後,發送 :ready 給 scheduler,還有自己的 PID
send scheduler, { :ready, self() }
receive do
{ :fib, n, client } ->
# 計算 n 的 fib 結果,回傳結果給 client
send client, { :answer, n, fib_calc(n), self() }
# 等待要處理的下一個 message
fib(scheduler)
{ :shutdown } ->
# 收到 shutdown 就結束工作
exit(0)
end
end
# very inefficient, deliberately
defp fib_calc(0), do: 1
defp fib_calc(1), do: 1
defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end
defmodule Scheduler do
# 由 run 啟動 scheduler
def run(num_processes, module, func, to_calculate) do
# 產生 num_processes 個 process,送入 scheduler_processes
(1..num_processes)
|> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
|> schedule_processes(to_calculate, [])
end
defp schedule_processes(processes, queue, results) do
receive do
{:ready, pid} when length(queue) > 0 ->
[ next | tail ] = queue
send pid, {:fib, next, self()}
schedule_processes(processes, tail, results)
{:ready, pid} ->
# queue 裡面已經沒有要計算的工作,就發送 :shutdown 給該 process
send pid, {:shutdown}
if length(processes) > 1 do
# 如果 processes 裡面還有 process,就從 processes 中去掉這個 pid,再迴圈繼續等待其他還沒執行完成的 processes
schedule_processes(List.delete(processes, pid), queue, results)
else
# 已經沒有 processes,所有 processes 都已經 shutdown,就排序結果
Enum.sort(results, fn {n1,_}, {n2,_} -> n1 <= n2 end)
end
# 接收 fib 計算的結果
{:answer, number, result, _pid} ->
schedule_processes(processes, queue, [ {number, result} | results ])
end
end
end
to_process = [27, 33, 35, 11, 36, 29, 18, 37, 21, 31, 19, 10, 14, 30,
15, 17, 23, 28, 25, 34, 22, 20, 13, 16, 32, 12, 26, 24]
Enum.each 1..10, fn num_processes ->
{time, result} = :timer.tc(Scheduler, :run,
[num_processes, FibSolver, :fib, to_process])
if num_processes == 1 do
IO.puts inspect result
IO.puts "\n # time (s)"
end
:io.format "~2B ~.2f~n", [num_processes, time/1000000.0]
end
執行結果
[{10, 89}, {11, 144}, {12, 233}, {13, 377}, {14, 610}, {15, 987}, {16, 1597}, {17, 2584}, {18, 4181}, {19, 6765}, {20, 10946}, {21, 17711}, {22, 28657}, {23, 46368}, {24, 75025}, {25, 121393}, {26, 196418}, {27, 317811}, {28, 514229}, {29, 832040}, {30, 1346269}, {31, 2178309}, {32, 3524578}, {33, 5702887}, {34, 9227465}, {35, 14930352}, {36, 24157817}, {37, 39088169}]
# time (s)
1 3.38
2 1.99
3 1.93
4 1.75
5 1.73
6 1.88
7 1.76
8 1.71
9 1.72
10 1.81
目前這個版本的計算過程如下,有很多重複的計算
fib(5)
= fib(4) + fib(3)
= fib(3) + fib(2) + fib(2) + fib(1)
= fib(2) + fib(1) + fib(1) + fib(0) + fib(1) + fib(0) + fib(1)
= fib(1) + fib(0) + fib(1) + fib(1) + fib(0) + fib(1) + fib(0) + fib(1)
Elixir module 不能儲存資料,但 process 可以儲存 state,elixir 提供 Agent module,可封裝 process 包含了 state。
fib_agent.exs
defmodule FibAgent do
def start_link do
Agent.start_link(fn -> %{ 0 => 0, 1 => 1 } end)
end
def fib(pid, n) when n >= 0 do
Agent.get_and_update(pid, &do_fib(&1, n))
end
defp do_fib(cache, n) do
case cache[n] do
nil ->
{ n_1, cache } = do_fib(cache, n-1)
result = n_1 + cache[n-2]
{ result, Map.put(cache, n, result) }
cached_value ->
{ cached_value , cache }
end
end
end
{:ok, agent} = FibAgent.start_link()
IO.puts FibAgent.fib(agent, 2000)
Nodes
erlang Beam VM 可處理自己的 event, process scheduling, memory, naming service, interprocess communication,nodes 可互相連接。
查詢 VM node name
iex(1)> Node.self
:nonode@nohost
可在啟動 iex 時以 --name 或 --sname 設定 node name
$ iex --name name1@cmbp.local
iex(name1@cmbp.local)1> Node.self
:"name1@cmbp.local"
$ iex --name name1
iex(name1@cmbp.local)1> Node.self
:"name1@cmbp.local"
可使用 Node.connect :"name1@cmbp" 連接另一個 node
$ iex --sname name1
-----
$ iex --sname name2
iex(name2@cmbp)1> Node.list
[]
iex(name2@cmbp)2> Node.connect :"name1"
false
iex(name2@cmbp)3> Node.connect :"name1@cmbp.local"
23:32:10.451 [error] ** System NOT running to use fully qualified hostnames **
** Hostname cmbp.local is illegal **
false
iex(name2@cmbp)4> Node.connect :"name1@cmbp"
true
iex(name2@cmbp)5> Node.list
[:name1@cmbp]
# 先產生一個 fun
iex(name1@cmbp)1> func = fn -> IO.inspect Node.self end
#Function<20.99386804/0 in :erl_eval.expr/5>
iex(name1@cmbp)2> spawn(func)
:name1@cmbp
#PID<0.96.0>
# 在 name1@cmbp 產生 func process
iex(name1@cmbp)3> Node.spawn(:"name1@cmbp", func)
:name1@cmbp
#PID<0.98.0>
# 在 name2@cmbp 產生 func process
# 雖然 process 在 name2,IO 還是在 name1
iex(name1@cmbp)4> Node.spawn(:"name2@cmbp", func)
#PID<9755.103.0>
:name2@cmbp
Nodes, Cookies, and Security
在啟動 VM 時,增加 cookie 設定,可增加安全性,相同 cookie 的 node 才能連接在一起。
$ iex --sname name2 --cookie cookie2
iex(name2@cmbp)1>
00:21:31.987 [error] ** Connection attempt from disallowed node :name1@cmbp **
nil
iex(name2@cmbp)2> Node.get_cookie
:cookie2
-------
$ iex --sname name1 --cookie cookie1
iex(name1@cmbp)1> Node.connect :"name2@cmbp"
false
erlang VM 預設會讀取 ~/.erlang.cookie 這個檔案的 cookie
Process Name
PID 有三個部分的數字,但只有兩個 fields: 第一個數字是 Node ID,後兩個數字是 low and high bits of the process ID,如果 export PID 到另一個 node,node ID 會設定為 process 存在的 node number。
iex(name2@cmbp)3> self()
#PID<0.90.0>
process 以 :global.register_name(@name, pid)
註冊 process name,後續就可以用 :global.whereis_name(@name)
找到這個 process。
如果在程式裡註冊 global process name,可能會遇到名稱一樣的問題,可以改用 mix.exs 設定檔,管理要註冊到 global state 的 process names。
defmodule Tick do
@interval 2000 # 2 seconds
@name :ticker
def start do
pid = spawn(__MODULE__, :generator, [[]])
:global.register_name(@name, pid)
end
def register(client_pid) do
send :global.whereis_name(@name), { :register, client_pid }
end
def generator(clients) do
receive do
{ :register, pid } ->
IO.puts "registering #{inspect pid}"
generator([pid|clients])
after
@interval ->
IO.puts "tick"
Enum.each clients, fn client ->
send client, { :tick }
end
generator(clients)
end
end
end
defmodule Client do
def start do
pid = spawn(__MODULE__, :receiver, [])
Tick.register(pid)
end
def receiver do
receive do
{ :tick } ->
IO.puts "tock in client"
receiver()
end
end
end
$ iex --sname name1
iex(name1@cmbp)1> c("ticker.ex")
[Client, Tick]
iex(name1@cmbp)2> Node.connect :"name2@cmbp"
true
iex(name1@cmbp)3> Tick.start
:yes
tick
tick
tick
iex(name1@cmbp)4> Client.start
registering #PID<0.111.0>
{:register, #PID<0.111.0>}
tick
tock in client
tick
tock in client
-------
$ iex --sname name2
iex(name2@cmbp)1> c("ticker.ex")
[Client, Tick]
iex(name2@cmbp)2> Client.start
{:register, #PID<0.104.0>}
tock in client
tock in client
tock in client
I/O, PIDs, and Nodes
Erlang VM 將 I/O 實作為 processes。可以直接透過 I/O server 的PID 對 open file/device 處理 IO。
VM 的 default IO device 可透過 :erlang.group_leader 取得,他會回傳 I/O Server 的 PID
$ iex --sname name1
-------
$ iex --sname name2
iex(name2@cmbp)1> Node.connect(:"name1@cmbp")
true
iex(name2@cmbp)2> :global.register_name(:name2, :erlang.group_leader)
:yes
------
# 回到 :name1
iex(name1@cmbp)1> name2 = :global.whereis_name :name2
#PID<9755.59.0>
iex(name1@cmbp)2> IO.puts(name2, "test")
:ok
------
# :name2 的 IO 可看到
test