2018/04/30

Elixir 6 MultipleProcesses

elixir 的 process 並不是 OS process,而是 erlang VM process。Elixir 使用 actor model for concurrency,actor 是一個獨立的 process,我們可 spawn prcess,發送及接收 messages。

Process

spawn-basic.ex

defmodule SpawnBasic do
  def greet do
    IO.puts "Hello"
  end
end

spawn 產生一個新的 process,#PID<0.93.0> 是 process id

iex(1)> c("spawn-basic.ex")
[SpawnBasic]
iex(2)> SpawnBasic.greet
Hello
:ok
iex(3)> spawn(SpawnBasic, :greet, [])
Hello
#PID<0.93.0>

在 Process 之間傳遞 Message

defmodule Spawn1 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
    end
  end
end

# here's a client 發送訊息給 pid,self 是 caller's PID
pid = spawn(Spawn1, :greet, [])
send pid, {self(), "World!"}

receive do
  {:ok, message} ->
    IO.puts message
end

這是舊的語法,現在要改為 send( pid, msg )

pid <- {self, "World!"}

send self(), "World!"
send( self, "World!")

發送多個訊息

defmodule Spawn2 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
    end
  end
end

# here's a client
pid = spawn(Spawn2, :greet, [])

send pid, {self(), "World!"}
receive do
  {:ok, message} ->
    IO.puts message
end

send pid, {self(), "Kermit!"}
receive do
  {:ok, message} ->
    IO.puts message
end

執行後,程式會停在這裡,這是因為,發送第一個訊息,Spawn2 greet 處理後,回傳結果後就結束了,所以當發送第二個訊息給 pid後,等待回應時,一直無法收到回傳的訊息。

Hello World!

改寫:在 receive 的部分,設定 timeout

defmodule Spawn2 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
    end
  end
end

# here's a client
pid = spawn(Spawn2, :greet, [])

send pid, {self(), "World!"}
receive do
  {:ok, message} ->
    IO.puts message
end

send pid, {self(), "Kermit!"}
receive do
  {:ok, message} ->
    IO.puts message
  after 500 ->
    IO.puts "The greeter has gone away"
end

執行結果

Hello World!
The greeter has gone away

最正確的寫法,應該要讓 Spawn 不斷接收並處理訊息

defmodule Spawn4 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
        greet
    end
  end
end

# here's a client
pid = spawn(Spawn4, :greet, [])

send pid, {self(), "World!"}
receive do
  {:ok, message} ->
    IO.puts message
end

send pid, {self(), "Kermit!"}
receive do
  {:ok, message} ->
    IO.puts message
  after 500 ->
    IO.puts "The greeter has gone away"
end
Hello World!
Hello Kermit!

Tail Recursion: 尾遞迴

defmodule TailRecursive do

  def factorial(n), do: _fact(n, 1)
  
  defp _fact(0, acc), do: acc
  defp _fact(n, acc), do: _fact(n-1, acc*n)

end

Process Overhead

這是一個用來測試 process 效能的小程式,一開始會產生 n 個 processes,第一個會送數字給第二個,加 1 後傳給第三個,最後一個會回傳結果給第一個。

defmodule Chain do

  def counter(next_pid) do
    receive do
      n ->
        send next_pid, n + 1
    end
  end

  def create_processes(n) do
    last = Enum.reduce 1..n, self(),
             fn (_,send_to) ->
                ## 產生 process 執行 :counter,將自己的 process PID 送到 send_to 由 spawn 傳給下一個 process
               spawn(Chain, :counter, [send_to])
             end

    # start the count by sending
    send last, 0

    # and wait for the result to come back to us
    receive do
      final_answer when is_integer(final_answer) ->
        "Result is #{inspect(final_answer)}"
    end
  end

  def run(n) do
    IO.puts inspect :timer.tc(Chain, :create_processes, [n])
  end

end
$ elixir -r chain.exs -e "Chain.run(10)"
{3932, "Result is 10"}

$ elixir -r chain.exs -e "Chain.run(1_000)"
{15951, "Result is 1000"}

$ elixir -r chain.exs -e "Chain.run(40_000)"
{444129, "Result is 40000"}

$ elixir -r chain.exs -e "Chain.run(50_000)"
{545041, "Result is 50000"}

$ elixir -r chain.exs -e "Chain.run(300_000)"

10:54:49.904 [error] Too many processes
** (SystemLimitError) a system limit has been reached

調整 vm 參數 --erl "+P 1000000"

$ elixir --erl "+P 1000000" -r chain.exs -e "Chain.run(300_000)"
{2924026, "Result is 300000"}

When Process Die

預設狀況下,沒有人會知道 Process 結束了

import :timer, only: [ sleep: 1 ]

defmodule Link1 do
  def sad_method do
    sleep 500
    exit(99)
  end

  def run do
    spawn(Link1, :sad_method, [])

    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Link1.run

Link1 不知道新的 sad_method process 已經結束了,在 1s 後 timeout

$ elixir -r link1.exs
Nothing happened as far as I am concerned

要解決上面的問題,可以用 spawn_link 連結兩個 Process,當 child process 因異常死亡時,會連帶把另一個 process 停掉。

import :timer, only: [ sleep: 1 ]

defmodule Link2 do
  def sad_method do
    sleep 500
    exit(99)
  end

  def run do
    spawn_link(Link2, :sad_method, [])

    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Link2.run
$ elixir -r link2.exs
** (EXIT from #PID<0.73.0>) 99
defmodule Link3 do
  import :timer, only: [ sleep: 1 ]

  def sad_function do
    sleep 500
    exit(:boom)
  end
  def run do
    # 將 :trap_exit signal 轉換為 message :EXIT
    Process.flag(:trap_exit, true)
    
    spawn_link(Link3, :sad_function, [])
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Link3.run
$ elixir -r link3.exs
MESSAGE RECEIVED: {:EXIT, #PID<0.79.0>, :boom}

Monitoring a Process

spawnmonitor 可在 spawna process 時連帶啟用 monitoring,或是使用 Process.monior 監控已經存在的 process。但如果是用 Process.monitor,可能會產生 race condition,如果其他 process 在呼叫 monitor call 完成前就死亡,就會收不到 notification。 spawnlink 及 spawn_monitor versions 為 atomic,所以可以 catch a failure。

import :timer, only: [ sleep: 1 ]

defmodule Monitor1 do
  def sad_method do
    sleep 500
    exit(99)
  end

  def run do
    res = spawn_monitor(Monitor1, :sad_method, [])
    IO.puts inspect res

    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Monitor1.run
$ elixir -r monitor1.exs
{#PID<0.79.0>, #Reference<0.759175825.870842370.239990>}
MESSAGE RECEIVED: {:DOWN, #Reference<0.759175825.870842370.239990>, :process, #PID<0.79.0>, 99}

結果跟 spawnlink 差不多,如果是用 spawnlink,子 process crash 連帶會影響 monitor process,如果用spawn_monitor,就會知道 crash 的原因。

Parallel Map

通常 map 會回傳一個 apply function 到 collection 裡面每一個 elements 的 list,parallel map 的功能一樣,但會在分別獨立的 process 對每一個 element 都 apply function。

首先會將 collection map 到 fn,在 fn 內 spawn_link 產生 a list of PIDs,每一個 PID 都會對每一個 element 執行給定的 function。

第二個 |> 會將 list of PIDs 轉成 results,各自傳給 list 內每一個PID,注意 ^pid 可讓 receive 依照順序取得結果。

defmodule Parallel do
  def pmap(collection, fun) do
    me = self()

    collection
  |>
    Enum.map(fn (elem) ->
       spawn_link fn -> ( send me, { self(), fun.(elem) } ) end
     end)
  |>
    Enum.map(fn (pid) ->
      receive do { ^pid, result } -> result end
    end)
  end
end
iex(1)> Parallel.pmap 1..10, &(&1 * &1)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

Fibonacci Server

當 calculator 可處理下一個數字時,會發送 :ready 給 scheduler,scheduler 會以 :fib message 發送計算工作給 calculator,calculator 會以 :answer 回傳結果,scheduler 會發送 :shutdown 給 calculator 通知要 exit。

defmodule FibSolver do

  def fib(scheduler) do
    # 啟動後,發送 :ready 給 scheduler,還有自己的 PID
    send scheduler, { :ready, self() }
    receive do
      { :fib, n, client } ->
        # 計算 n 的 fib 結果,回傳結果給 client
        send client, { :answer, n, fib_calc(n), self() }
        # 等待要處理的下一個 message
        fib(scheduler)
      { :shutdown } ->
        # 收到 shutdown 就結束工作
        exit(0)
    end
  end

  # very inefficient, deliberately
  defp fib_calc(0), do: 1
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end

defmodule Scheduler do

  # 由 run 啟動 scheduler
  def run(num_processes, module, func, to_calculate) do
    # 產生 num_processes 個 process,送入 scheduler_processes
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:fib, next, self()}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        # queue 裡面已經沒有要計算的工作,就發送 :shutdown 給該 process
        send pid, {:shutdown}
        if length(processes) > 1 do
          # 如果 processes 裡面還有 process,就從 processes 中去掉這個 pid,再迴圈繼續等待其他還沒執行完成的 processes
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          # 已經沒有 processes,所有 processes 都已經 shutdown,就排序結果
          Enum.sort(results, fn {n1,_}, {n2,_}  -> n1 <= n2 end)
        end

      # 接收 fib 計算的結果
      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [ {number, result} | results ])

    end
  end
end

to_process = [27, 33, 35, 11, 36, 29, 18, 37, 21, 31, 19, 10, 14, 30,
              15, 17, 23, 28, 25, 34, 22, 20, 13, 16, 32, 12, 26, 24]

Enum.each 1..10, fn num_processes ->
  {time, result} = :timer.tc(Scheduler, :run,
                               [num_processes, FibSolver, :fib, to_process])
  if num_processes == 1 do
    IO.puts inspect result
    IO.puts "\n #   time (s)"
  end
  :io.format "~2B     ~.2f~n", [num_processes, time/1000000.0]
end

執行結果

[{10, 89}, {11, 144}, {12, 233}, {13, 377}, {14, 610}, {15, 987}, {16, 1597}, {17, 2584}, {18, 4181}, {19, 6765}, {20, 10946}, {21, 17711}, {22, 28657}, {23, 46368}, {24, 75025}, {25, 121393}, {26, 196418}, {27, 317811}, {28, 514229}, {29, 832040}, {30, 1346269}, {31, 2178309}, {32, 3524578}, {33, 5702887}, {34, 9227465}, {35, 14930352}, {36, 24157817}, {37, 39088169}]

 #   time (s)
 1     3.38
 2     1.99
 3     1.93
 4     1.75
 5     1.73
 6     1.88
 7     1.76
 8     1.71
 9     1.72
10     1.81

目前這個版本的計算過程如下,有很多重複的計算

fib(5)
= fib(4) + fib(3)
= fib(3) + fib(2) + fib(2) + fib(1)
= fib(2) + fib(1) + fib(1) + fib(0) + fib(1) + fib(0) + fib(1)
= fib(1) + fib(0) + fib(1) + fib(1) + fib(0) + fib(1) + fib(0) + fib(1)

Elixir module 不能儲存資料,但 process 可以儲存 state,elixir 提供 Agent module,可封裝 process 包含了 state。

fib_agent.exs

defmodule FibAgent do
  def start_link do
    Agent.start_link(fn -> %{ 0 => 0, 1 => 1 } end)
  end

  def fib(pid, n) when n >= 0 do
    Agent.get_and_update(pid, &do_fib(&1, n))
  end

  defp do_fib(cache, n) do
    case cache[n] do
      nil ->
        { n_1, cache } = do_fib(cache, n-1)
        result         = n_1 + cache[n-2]
        { result, Map.put(cache, n, result) }

      cached_value ->
        { cached_value , cache }
    end
  end

end

{:ok, agent} = FibAgent.start_link()
IO.puts FibAgent.fib(agent, 2000)

Nodes

erlang Beam VM 可處理自己的 event, process scheduling, memory, naming service, interprocess communication,nodes 可互相連接。

查詢 VM node name

iex(1)> Node.self
:nonode@nohost

可在啟動 iex 時以 --name 或 --sname 設定 node name

$ iex --name name1@cmbp.local
iex(name1@cmbp.local)1> Node.self
:"name1@cmbp.local"

$ iex --name name1
iex(name1@cmbp.local)1> Node.self
:"name1@cmbp.local"

可使用 Node.connect :"name1@cmbp" 連接另一個 node

$ iex --sname name1
-----
$ iex --sname name2

iex(name2@cmbp)1> Node.list
[]
iex(name2@cmbp)2> Node.connect :"name1"
false
iex(name2@cmbp)3> Node.connect :"name1@cmbp.local"

23:32:10.451 [error] ** System NOT running to use fully qualified hostnames **
** Hostname cmbp.local is illegal **

false
iex(name2@cmbp)4> Node.connect :"name1@cmbp"
true
iex(name2@cmbp)5> Node.list
[:name1@cmbp]
# 先產生一個 fun
iex(name1@cmbp)1> func = fn -> IO.inspect Node.self end
#Function<20.99386804/0 in :erl_eval.expr/5>

iex(name1@cmbp)2> spawn(func)
:name1@cmbp
#PID<0.96.0>

# 在 name1@cmbp 產生 func process
iex(name1@cmbp)3> Node.spawn(:"name1@cmbp", func)
:name1@cmbp
#PID<0.98.0>

# 在 name2@cmbp 產生 func process
# 雖然 process 在 name2,IO 還是在 name1
iex(name1@cmbp)4> Node.spawn(:"name2@cmbp", func)
#PID<9755.103.0>
:name2@cmbp

Nodes, Cookies, and Security

在啟動 VM 時,增加 cookie 設定,可增加安全性,相同 cookie 的 node 才能連接在一起。

$ iex --sname name2 --cookie cookie2
iex(name2@cmbp)1>
00:21:31.987 [error] ** Connection attempt from disallowed node :name1@cmbp **

nil
iex(name2@cmbp)2> Node.get_cookie
:cookie2

-------

$ iex --sname name1 --cookie cookie1
iex(name1@cmbp)1> Node.connect :"name2@cmbp"
false

erlang VM 預設會讀取 ~/.erlang.cookie 這個檔案的 cookie

Process Name

PID 有三個部分的數字,但只有兩個 fields: 第一個數字是 Node ID,後兩個數字是 low and high bits of the process ID,如果 export PID 到另一個 node,node ID 會設定為 process 存在的 node number。

iex(name2@cmbp)3> self()
#PID<0.90.0>

process 以 :global.register_name(@name, pid) 註冊 process name,後續就可以用 :global.whereis_name(@name) 找到這個 process。

如果在程式裡註冊 global process name,可能會遇到名稱一樣的問題,可以改用 mix.exs 設定檔,管理要註冊到 global state 的 process names。

defmodule Tick do

  @interval 2000   # 2 seconds

  @name  :ticker

  def start do
    pid = spawn(__MODULE__, :generator, [[]])
    :global.register_name(@name, pid)
  end

  def register(client_pid) do
    send :global.whereis_name(@name), { :register, client_pid }
  end

  def generator(clients) do
    receive do
      { :register, pid } ->
        IO.puts "registering #{inspect pid}"
        generator([pid|clients])

    after
      @interval ->
        IO.puts "tick"
        Enum.each clients, fn client ->
          send client, { :tick }
        end
        generator(clients)
    end
  end
end

defmodule Client do

  def start do
    pid = spawn(__MODULE__, :receiver, [])
    Tick.register(pid)
  end

  def receiver do
    receive do
      { :tick } ->
        IO.puts "tock in client"
        receiver()
    end
  end
end
$ iex --sname name1
iex(name1@cmbp)1> c("ticker.ex")
[Client, Tick]
iex(name1@cmbp)2> Node.connect :"name2@cmbp"
true
iex(name1@cmbp)3> Tick.start
:yes
tick
tick
tick
iex(name1@cmbp)4> Client.start
registering #PID<0.111.0>
{:register, #PID<0.111.0>}
tick
tock in client
tick
tock in client

-------
$ iex --sname name2
iex(name2@cmbp)1> c("ticker.ex")
[Client, Tick]
iex(name2@cmbp)2> Client.start
{:register, #PID<0.104.0>}
tock in client
tock in client
tock in client

I/O, PIDs, and Nodes

Erlang VM 將 I/O 實作為 processes。可以直接透過 I/O server 的PID 對 open file/device 處理 IO。

VM 的 default IO device 可透過 :erlang.group_leader 取得,他會回傳 I/O Server 的 PID

$ iex --sname name1

-------

$ iex --sname name2
iex(name2@cmbp)1> Node.connect(:"name1@cmbp")
true
iex(name2@cmbp)2> :global.register_name(:name2, :erlang.group_leader)
:yes

------

# 回到 :name1
iex(name1@cmbp)1> name2 = :global.whereis_name :name2
#PID<9755.59.0>
iex(name1@cmbp)2> IO.puts(name2, "test")
:ok

------

# :name2 的 IO 可看到

test

References

Programming Elixir

沒有留言:

張貼留言