2018年7月23日

RTSP/RTP vs RTMP vs WebRTC vs HLS vs SIP/RTP

多媒體資料包含了語音及影像,因為原始資料不同,壓縮格式也有差異,在一個影音資料中,通常包含了這兩種獨立的媒體,如果遇到多國語言的狀況,也可能會看到一個影片中夾帶了多個語音。當多媒體資料放到網路上,為了達到一邊下載,一邊觀看的功能,我們需要 streaming media 的網路協定,幫助我們將影音資料壓縮後,搭配協定傳送到收視端,透過媒體播放器解壓縮,並播放出來。

串流 streaming 的意思就是將影音資料壓縮後,切割成多個區塊的網路資料,分開但連續地發送給客戶端,客戶端不需要一次將整個多媒體資料下載完成後,才能播放影片,而是透過即時下載的分塊資料,先暫存在 buffer 中,只要客戶端有了幾秒鐘的影音 buffer,就可以一邊繼續下載,一邊播放影片。

骨灰級的使用者,可能有聽過 MMS(Microsoft Media Server),或是 RM(Real Media) 這兩種串流多媒體的標準,但現在幾乎都已經消失了,取而代之的是 RTSP, RTMP, WebRTC, HLS 及 SIP 這些協定。

RTSP

RTSP(Real Time Streaming Protocol) 是用來控制遠端多媒體的播放、錄製、暫停的控制協定,有點像是遠端遙控錄放影機的感覺,我們在網路的遠端,發送 RTSP 的控制指令,告訴 Server 我們想要看哪一個影片,找到後,就開始播放影片,也可以暫停。RTSP 協定中,看不到影音資料的內容,因為真正的影音資料是透過另一個協定 RTP(Real-time Transport Protocol) 發送的,在 RTSP 中,Server 會以 SDP(Session Description Protocol) 的形式,將 RTP 影音的資訊,包含 UDP Port,影音的壓縮格式等等資訊,告訴 client 端。

SIP

跟 RTSP 功能比較相近的是 SIP(Session Initiation Protocol),SIP 也是一種多媒體的控制協定,RTSP 像是個網路影音播放器,但 SIP 卻是一種網路電話,SIP 本身也只負責處理通訊的對談建立以及掛斷的處理,真正的影音多媒體資料,也同樣是由 SIP 以 SDP 的方式描述 RTP 影音資料的資訊,透過 RTP 將影音資料傳送到另一端,因為 SIP 是建立雙向對談的協定,因此 RTP 影音會是雙向的串流。

同樣的 RTP 的影像以及語音是分開在不同的 UDP Port,這個 Port 是透過 SDP 在通訊建立時,即時雙向建立起來的。因為電話這種應用必須要一邊講一邊聽,這是最常見的一種串流媒體的應用。

SIP 跟 RTSP 的基本差異在於,SIP 是雙向影音,而 RTSP 是單向點播,雖然說是這樣,但 RTSP 還是可以做到雙向視訊通話,不過那已經不是該協定的應用本意。目前 RTSP 最常見的應用場景是網路攝影機。

RTMP

RTMP(Real Time Messaging Protocol) 是由 adobe flash player 引領的串流媒體協定,因為 flash player 在過去的網頁曾經霸佔了很長一段黃金時代,只要裝了 flash player,不僅可以看串流影片,還可以存取電腦的麥克風及 webcam,還能玩一堆網頁遊戲。

不過 flash player 的時代已經走入歷史,但在裡面應用的 RTMP 串流協定還持續存活著,原因在於網路直播,因為 RTMP 有低延遲的特性,適合用在網路影音直播中,目前的原生 APP 網路影音直播都是採用 RMTP 協定,除非要轉到網頁瀏覽器播放,才會轉換到 HTML5 或是 HLS 的協定。

RTMP 跟 RTSP 與 SIP 不同的地方在於,RTMP 將媒體控制指令跟多媒體資料放在同一個協定中,標準是使用 TCP Port 1935,而不像是 RTP 一樣是使用 UDP,每一種媒體資料使用一個 UDP Port。

而且 RTMP 裡面還有著網路頻寬偵測的能力,因應著不同的網路速度,可以動態調整影音資料內容的解析度,以低解析度的影片應付低頻寬的網路環境。

WebRTC

WebRTC(Web Real-Time Communication) 是 google 開放的標準,目前已經提交成為 W3C 標準,專門支援在網頁瀏覽器中進行影音對談的 API。

不同於 Flash Player 的 plugin 機制,WebRTC 需要瀏覽器原生的支援,也就是內建於瀏覽器的影音對談 API,雖然 WebRTC 已經解除了 flash player 的窘境,不需要安裝 plugin,但這個協定/API 本身的能力跟等級,跟 RTMP 還是有落差。

如果要做少量的視訊會議對談,用 WebRTC 是可以做到的,但如果要做大量的 APP 影音直播,瞬間有上千或上萬個人在觀看的直播影音,目前還是要走 RTMP 搭配 HLS 的解決方案。

HLS

HLS (HTTP Live Streaming) 是 Apple 提出的基於 HTTP 延伸的串流媒體傳輸協定,在媒體播放過程中,允許客戶端因應網路速度動態調整不同解析度的媒體資料,在串流媒體一開始,客戶端會下載 m3u8 playlist,用來取得可以使用的媒體資料流。

HLS 跟 RTP 的差異在於 HLS 可穿過所有允許 HTTP 通過的 firewall 或 proxy,非常適合搭配 CDN 發布媒體串流。因此網路 APP 影音直播,才會以 RTMP 發布,在雲端轉換為 HLS,再提供給大量客戶端觀看這樣的機制。

ONVIF

ONVIF (Open Network Video Interface Forum) 是 Axis、Bosch Security System 及 Sony 在 2008 成立的標準論壇,目的在於讓不同品牌的網路影音設備能夠有共通的標準,能夠互通,幫助硬體生產及網路開發商能夠透過標準整合出各種不同的網路影音監視系統。

ONVIF 有五種 profile

  • Profile S for encompasses video streaming 網路監視系統
  • Profile G for video storage 視訊儲存及重播
  • Profile C for access control 門禁控制
  • Profile Q for Out-of-the-box interoperability 開箱即用,更簡便的操作介面
  • Profile A for Physical Access Control System (PACS) 跟 Profile C 很類似,但 Profile C 用於基本的門禁控制,Profile A 則是擴充,有比較複雜的控制邏輯

以往在網路攝影機的市場,大多是以 RTSP/RTP 的方式提供影音,但現在已經有支援 RTMP 的網路攝影機出現了。

References

即時串流通信協定 RTSP

直播終端技術比較:Native vs H5 vs WebRTC vs 小程序

網絡視頻監控:ONVIF標準協議6個常見問題

什麼是Onvif協議,誰開啟了Onvif時代?

ONVIF -- Profiles (S,G,C,Q,A)

Which protocol is best for a video live streaming from a server to an Android: RTSP, RTMP, HTTP or something else?

Streaming 通訊協定 RTP RTCP RTSP RTMP HLS 介紹

Streaming 通訊協定 RTP RTCP RTSP RTMP HLS 介紹

RTMP vs RTSP/RTP: Which to choose for an interactive livestream?

RTSP協議詳解

[RTSP]rtsp和sip的區別和聯繫

SDP (Session Description Protocol) 閱讀心得

可以用WebRTC來做視頻直播嗎?

2018年7月2日

Green Process, Green thread, Native Process

在某個作業系統中,要達到 multitasking 的能力,必須要由 OS 提供建立 thread 或是 process 分別處理不同工作的 library,thread 跟 process 的最大差異是,thread 是在一個 process 裡面運作,多個 thread 之間可以共享資料。

而 Green Thread 是 Java VM 裡面的特殊用語,當時 Java VM 是利用 libthread.so 發展支援多工的專案,該專案名稱為 "Green",所以就稱為 Green Thread。

至於有人寫說 Erlang VM 的多工是採用了 Green Process,這就像是借用了 Green Thread 跟 Native Process 的概念,因為 Erlang 的 Process 之間不能共享資料,而又跟 Java 一樣本身是 VM,所以就借用了 "Green",稱為 Green Process。

有個比 Green Process 更一般化的名詞為 Light-weight process,他會以單一個 kernel thread 實作,運作在 user space,且會共享 memory address space。

Green threads wiki 中明確的下了定義,只要是取代 OS 原生的機制,透過 runtime library 或是 VM,進行 thread 的 scheduling 處理,這種 thread 就稱為 Green Thread。Green Thread 會在 user space 而不是 kernel space 運作,讓多工服務不需要 native thread 的支援。

但現今的 Java VM 其實也已經放棄了 Green Thread,而改回使用 native thread。原因在於運作的速度,在抽象化 thread 多工模型時,耗費了太多精力,再加上新的多核心 CPU,如果要完全運用所有CPU核心的運算能力,也就是 SMP,透過 OS 的 thread 機制會是最簡單且快速的,因此現在的 VM 都是直接映射到 OS 的 thread,再搭配 Thread Pool 的做法,可減少 Thread 建立跟銷毀所消耗的資源。


References

What other systems beside Erlang are based on “Green Processes”?

What's the difference between “green threads” and Erlang's processes?

What is the difference between multicore programming in Erlang and other language?

Erlang調度器的一些細節以及它重要的原因(譯文)

Why not Green Threads?

Java的Green threads是Coroutine嗎?

2018年6月25日

清理 mac 的儲存空間

一直是習慣用 df 指令查看 disk 使用量,最近發現 disk 可用空間慢慢地不足,就開始清理一些不常用的文件跟軟體。

首先可在左上角的蘋果圖示中,點擊 "關於這台 Mac",然後可以看 "儲存空間" 這個項目,但是卻發現這裡的可用空間跟 df 查詢到的結果差異很大。

接著點擊右邊的 "管理",可看到 "文件"、"GarageBand" 等等項目的資訊,"GarageBand" 可清除 2G 的樂器音檔,文件區可查看佔用空間比較大的檔案資料,自己可以選擇要清除哪些檔案。

但做了這些動作後,df 的資訊還是沒有什麼改變。原因是在 Time Machine 的 Local Snapshots。

在 command line 用 sudo tmutil listlocalsnapshots / 指令,可查看目前 local 硬碟中,包含的 localsnapshot,因為最近頻繁地刪除檔案,導致 localsnapshot 產生了很多快照。

$ sudo tmutil listlocalsnapshots /
com.apple.TimeMachine.2018-06-19-105146
com.apple.TimeMachine.2018-06-19-114843
com.apple.TimeMachine.2018-06-19-125049
com.apple.TimeMachine.2018-06-19-154914
com.apple.TimeMachine.2018-06-19-164914
com.apple.TimeMachine.2018-06-19-175221
com.apple.TimeMachine.2018-06-19-230612
com.apple.TimeMachine.2018-06-20-000523
com.apple.TimeMachine.2018-06-20-093756
com.apple.TimeMachine.2018-06-20-104818

確認這些資料快照沒有回復的需求後,可以下指令逐個刪除 localsnapshot。

sudo tmutil deletelocalsnapshots 2018-06-20-000523

最後比較一下原本的 df 資訊,跟刪除所有 localsnapshots 以後的資訊差異。

$ df -H
Filesystem      Size   Used  Avail Capacity  iused               ifree %iused  Mounted on
/dev/disk1s1    480G   410G    68G    86%  2387622 9223372036852388185    0%   /
devfs           343k   343k     0B   100%     1160                   0  100%   /dev
/dev/disk1s4    480G   1.1G    68G     2%        1 9223372036854775806    0%   /private/var/vm
map -hosts        0B     0B     0B   100%        0                   0  100%   /net
map auto_home     0B     0B     0B   100%        0                   0  100%   /home
$ df -H
Filesystem      Size   Used  Avail Capacity  iused               ifree %iused  Mounted on
/dev/disk1s1    480G   286G   192G    60%  2387620 9223372036852388187    0%   /
devfs           343k   343k     0B   100%     1160                   0  100%   /dev
/dev/disk1s4    480G   1.1G   192G     1%        1 9223372036854775806    0%   /private/var/vm
map -hosts        0B     0B     0B   100%        0                   0  100%   /net
map auto_home     0B     0B     0B   100%        0                   0  100%   /home

References

Mac顯示「系統」佔用太多硬碟空間?試試用這些方法把它清乾淨

【macOS 技巧】如何從 macOS Sierra 內置清理工具掀出塞爆空間的兇手!?

2018年6月11日

tmux

習慣使用 linux terminal 遠端處理 server 工作的人,有時會遇到一個問題,就是在遠端 terminal 處理過程中,有時會遇到一些程式處理很久,或是需要同時查看 log,系統 loading 的狀況,這時,就需要再對同一台機器打開另一個 terminal,導致 terminal 的頁籤越來越多。更麻煩的是有時候遇到網路異常斷線,所有 terminal 的連線中斷了,就必須要重新一個一個再連接 server。

tmux 是一個 terminal multiplexer,可讓使用者以單一terminal,連接多個 terminal sessions或是windows。換句話說,就不需要再連接多個 terminal tab。另外,更重要的功能是,tmux 內建了一個 terminal server,即使 terminal 斷線,只要 tmux server 還存活,任何時候再重連,都可以取回剛剛工作中的 terminal sessions/windows,繼續工作。

安裝

在 centos 安裝 tmux

yum install tmux

在 macos 安裝 tmux

sudo port install tmux

tmux 指令

只要在 terminal 執行 tmux,就會啟動 tmux server,另外還有一些常用的指令

# 啟動新的 terminal session
tmux new -s sessionanme
tmux new -s sessionanme -n windowname

# 列出所有 tmux sessions
tmux ls

# a/at/attach session
tmux at -t sessionname
tmux a #

# kill session
tmux kill-session -t sessioname

# 當 session 內所有shell都結束,該 session 就會中止
exit

進入 tmux 後,terminal 下方就會出現一條綠色的 status bar,很明確的顯示目前正在 tmux 工作 session 中。

tmux 專有名詞的概念

  • tmux server

    啟動 tmux 會產生一個 server,負責管理所有 sessions

  • session

    一個 terminal 可以有多個 sessions,通常一個 project 會使用一個 session。

  • window

    一個 session 可以有多個 window,每一個 window 會佔滿整個 terminal 畫面,可以開多個 window,讓某些 window 在背景運作。

  • pane

    每一個 windows 可切割多個區塊,每一個區塊就是一個 pane。通常會將 window 水平或垂直切割,增加多個 pane。

控制指令

進入 tmux 後,跟平常一樣,會在多個 shell 中切換執行工作,如果需要對 tmux 下指令,要用 Ctrl-b 功能鍵啟動。

因為 Ctrl-b 會有點難按,大部分都會改成其他的 function key,在 ~/.tmux.conf 設定檔中,可增加這些設定,將 Ctrl-b 改為 Ctrl-a

set -g prefix C-a
unbind C-b
bind C-a send-prefix

另外可在設定檔中加上快速鍵,以下設定,可以在 Ctrl-a 後,直接按 | ,就會水平方向增加一個 shell pane。

unbind %
bind | split-window -h
bind - split-window -v

啟用滑鼠,可用滑鼠修改 pane 的大小,捲動視窗

set -g mouse on

  • session 處理的指令 大部分都是在原本的 terminal 中,不是 tmux 的 function
tmux ls
tmux attach -t 0
tmux kill-session -t 0

Ctrl-a (由 Ctrl-b 改為 Ctrl-a) 後的 fuction

Ctrl-a 後的 function 功能
d detach session
s list session
:new new session
$ 為 session 命名
  • window 指令
Ctrl-a 後的 function 功能
c 產生新 window
& 關閉目前的 window
p 切換到上一個 window
n 切換到下一個 window
w list windows
f find window
  • pane 指令
Ctrl-a 後的 function 功能
% (改為 |) 水平分割新的 pane
" (改為 -) 垂直分割新的 pane
方向鍵 切換到其他 panes
x 關閉目前的 pane
o 交換 pane
空白鍵 切換 layout
q 顯示每個 pane 的編號,再按編號,可切換到該 pane
{ 跟上一個 pane 交換位置
} 跟下一個 pane 交換位置
z 切換 pane 最大/最小化

~/.tmux.conf 設定的內容

set -g default-terminal "screen-256color"
set -g display-time 3000
set -g escape-time 0
set -g history-limit 65535
set -g base-index 1
set -g pane-base-index 1

# Ctrl-b -> Ctrl-a
set -g prefix C-a
unbind C-b
bind C-a send-prefix

# enable mouse
set -g mouse on

# split window
unbind %
bind | split-window -h
bind - split-window -v

# <prefix> or Ctrl- or <prefix> Ctrl-
#   k --- previous pane
# h   l --- previous window / next window
#   j --- next pane

# navigate windows
unbind-key l
bind-key h select-window -t :-
bind-key C-h select-window -t :-
bind-key -n C-h select-window -t :-
bind-key l select-window -t :+
bind-key C-l select-window -t :+
bind-key -n C-l select-window -t :+

# navigate panes
bind-key j select-pane -t :.-
bind-key C-j select-pane -t :.-
bind-key -n C-j select-pane -t :.-
bind-key k select-pane -t :.+
bind-key C-k select-pane -t :.+
bind-key -n C-k select-pane -t :.+

References

終端機 session 管理神器 — tmux

終端機必備的多工良伴:tmux

tmux ,不只是 terminal multiplexer

Tutorial — tmux Basics

tmux cheatsheet

Tmux 快捷鍵 & 速查表

使用 tmux 與 tmuxifier 打造 Console 開發環境(比 screen 更棒)

2018年6月4日

資料科學可以回答的問題

資料科學會使用 Machine Learning 的演算法,這些演算法的使用方式,都是這三個步驟:讀取資料,轉譯,提供答案,但在選擇演算法之前,要先知道這些演算法能回答什麼問題,要問對問題,才能找到答案。

以下這些是可以回答的問題

  1. 這是A,還是B? Is this A or B? (two-class classification)
  2. 這是A、B、C 還是 D? Is this A or B or C or D? (multi-class classification)
  3. 有沒有奇怪的地方? Is this Weird? (anomaly detection)
  4. 這有多少/有幾個? How Much/How Many? (regression)
  5. 用迴歸演算法解決多元分類問題 Multi-Class Classification as Regression
  6. 用迴歸演算法解決二元分類問題 Two-Class Classification as Regression
  7. 資料是由什麼組成的?怎麼分類? How is this Data Organized? (unsupervised learning, clustering)
  8. 接下來該怎麼做? What Should I Do Now? (reinforcement learning)

這是A,還是B? Is this A or B? (two-class classification)

二元分類 two-class classification,用來解決只有兩種答案的問題,例如:

  • 這位客戶會不會續約?
  • 這張照片是貓還是狗?
  • 這位顧客會不會點最上面的連結?
  • 如果再開一千英里,這個輪胎會不會爆胎?
  • 五元折價券或是打七五折,哪一個促銷手段能吸引更多顧客?

這是A、B、C 還是 D? Is this A or B or C or D? (multi-class classification)

多元分類 multi-class classification,用來解決有多種答案的問題,例如:

  • 這是哪種動物的圖片?
  • 這是哪種飛機的雷達訊號?
  • 這篇新聞屬於哪一個主題?
  • 這則 twitter 隱含了哪一種情緒?
  • 這則錄音裡的講者是誰?

有沒有奇怪的地方? Is this Weird? (anomaly detection)

異常偵測 anomaly detection,用來辨別不正常的資料,當分析的情況發生率很低,導致樣本數也很少的時候,異常偵測就顯得特別有用。感覺上跟 二元分類 two-class classification 很像,差別在於二元分類的原始資料中,就包含了兩種答案,但是異常偵測則不一定。

例如:

  • 是不是信用卡盜刷
  • 壓力大小有任何異狀嗎?
  • 這則網路訊息正常嗎?
  • 這些消費記錄跟這位使用者過去的行為落差很大嗎?
  • 這些用電量在這個季節和時間算是正常的嗎?

這有多少/有幾個? How Much/How Many? (regression)

當解決的問題涉及數字而非分類時,這一類的演算法就稱為迴歸(regression),例如:

  • 下週二的氣溫為何?
  • 第四季在葡萄牙的銷售量會有多少?
  • 三十分鐘後,我的風力發電廠會有多少千瓦(kW)的需求?
  • 我下週會獲得多少新追蹤者?
  • 每一千個使用這種軸承的產品裡,有多少個能被使用超過一萬小時?

用迴歸演算法解決多元分類問題 Multi-Class Classification as Regression

有些看起來很像多元分類的問題,但更適合用迴歸解決。例如

  • 讀者對哪則新聞最感興趣

    乍看之下是個分類問題,但如果將問題換成「對讀者來說,每則新聞的有趣程度為何」並為每則新聞評分,接下來就只需要選出最高分的新聞。這類問題通常和排名或對比有關。

  • 我的車隊中,哪台廂型車最需要保養

    可以換成「我的車隊裡,每台廂型車需要保養的程度為何」

  • 哪 5% 的顧客隔年會跳槽到對手公司

    可以換成「每名顧客明年跳槽到對手公司的機率為何」。

用迴歸演算法解決二元分類問題 Two-Class Classification as Regression

二元分類問題也可以換成迴歸問題,這類問題也通常以「有多少可能性」、「有多少比例」開頭。例如:

  • 這位使用者有多大機率會點我的廣告?
  • 這台拉霸機有多少比例的回合會給獎金?
  • 這名員工有多大機率會造成內部安全的威脅?
  • 今天有多少比例的航班會準時抵達?

二元分類、多元分類、異狀偵測和迴歸等四種演算法之間都很相近,它們都是監督式學習(supervised learning)下的演算法。共通之處,在於建模時都用了一組包含回答的資料(這個過程稱作訓練,training),並被用來分類或預測一組不包含回答的資料(這個過程稱作評分,scoring)。

資料是由什麼組成的?怎麼分類? How is this Data Organized? (unsupervised learning, clustering)

這是非監督和強化式學習(unsupervised and reinforcement learning)的演算法。

判斷資料分類的方法有很多,其中一類是聚類法(clustering),包括資料群集(chunking)、分組(grouping)、聚束(bunching)、分段(segmentation)等等。聚類法所分析的資料不包含任何用來引導分群、說明分群意義和數量的數字或名字。聚類法的基礎是衡量資料之間的距離或相似度,也就是距離度量(distance metric)。距離度量可以是任何可測量的數據。

  • 哪些消費者對農產品有相似的品味?
  • 哪些觀眾喜歡同一類電影?
  • 哪些型號的印表機有類似的故障問題?
  • 這間變電所在每週的哪幾天有類似的電力需求?
  • 用什麼方法可以自然地將這些文件分成五類?

另一類演算法稱作降維法(dimensionality reduction)。降維是另一種簡化資料的方法,它可以讓資料的溝通變得更容易、處理變得更快、而且存取變得更簡單。降維的運作原理是創造出一套簡化資料的方法。等第積分平均(GPA)是一個很簡單的例子。

  • 哪幾組飛機引擎偵測器的數據呈同向(和反向)變化?
  • 成功的 CEO 有哪些共通的領導力特質?
  • 全美的油價起伏有哪些相似的特徵?
  • 這些文件裡有哪幾組詞彙常常同時出現?(它們和哪些主題有關?)

接下來該怎麼做? What Should I Do Now? (reinforcement learning)

第三類演算法和行動有關,即強化學習(reinforcement learning)演算法。這些演算法和監督式和非監督式都不太一樣。

比方說,迴歸演算法雖然可以用來預測明天的最高溫為華氏 98 度,但它不能用來判斷該做什麼;另一方面,強化學習演算法就可以用來判斷該採取的行動,例如趁天氣還沒變熱的時候,先開辦公大樓內上半層的冷氣。

強化學習演算法很適合用於需要在無人監督情況下、完成許多簡單決策自動化系統,例如電梯、電熱器、冷氣和照明系統。由於強化學習最初被開發的目的是用來操縱機器人,任何自動物件也能使用這套演算法,像是偵查用無人機或掃地機器人。強化學習的問題總是和該採取什麼行動有關,雖然最後往往還是機器在處理這些問題。

  • 我該把廣告放在網頁何處,才能讓讀者最有機會點到它?
  • 我該把溫度調高或調低一點,還是維持現狀?
  • 我該再掃一次客廳還是繼續充電?
  • 我現在該買入多少股?
  • 看到黃燈時,我該保持當前速度、煞車還是加速?

References

What Types of Questions Can Data Science Answer?

五種可以用機器學習回答的問題

Which Algorithm Family Can Answer My Question?

2018年5月28日

CQRS: Command Query Responsibility Separation

CQS (Command Query Separation) 是由 Bertrand Meyer (Eiffel 語言的爸爸) 在 1988 於 "Object Oriented Software Construction" 這本書中提出的軟體架構概念,所有的 computing method 只會分成兩類,一種是執行某個 action 的 command,另一種是呼叫查詢並取得回傳資料,但不應該同時做兩種工作。換句話說,發問時,不能在該 method 裡面修改答案。

CQRS (Command Query Responsibility Separation) 應用 CQS 的概念,進一步將 Query 及 Command 物件分離,分別處理取得資料及修改資料的工作。

CQS 遇到的問題,會是 re-entrant 及 multi-thread,就是當一件工作處理到一半,被新的工作中斷這樣的問題,也就是 thread-safe 的問題,也就造成實作複雜度的問題。

然而這樣的問題,搭配著 Event Sourcing 的方法,將某個 APP state 的變更,收集成一個 sequence of events,以這種方式處理 command action,就能解決 thread-safe 的問題。

不過我們還是會遇到,command action 執行的速度跟 query 的時機點的問題,如果修改資料的動作在 query 以前還沒有完成,那麼前端就會查詢到舊狀態的資料,但資料還是會達到最終一致性,而不會有強一致性。

CQRS 的優點:

  1. Command 及 Query 分工明確,可分別進行效能調整及最佳化
  2. 將業務上的命令和查詢的職責分離能夠提高系統的性能、可擴展性和安全性
  3. 企業邏輯簡單清楚,能夠從事件歷程看到系統中的那些行為或者操作導致了系統的狀態變化。
  4. 將開發的邏輯概念,從數據驅動 (Data-Driven) 轉到任務驅動 (Task-Driven) 以及事件驅動 (Event-Driven)

在以下狀況,可以考慮使用CQRS模式:

  1. 當在業務邏輯層有很多操作需要相同的實體或者對象進行操作的時候。CQRS使得我們可以對讀和寫定義不同的實體和方法,從而可以減少或者避免對某一方面的更改造成衝突
  2. 用在特定任務的用戶互動系統,通常系統會引導用戶通過一系列複雜的步驟和操作,通常會需要一些複雜的領域模型。寫入資料的部分有很多和業務邏輯相關的命令操作,輸入驗證,業務邏輯驗證來保證數據的一致性。讀取資料沒有業務邏輯,僅僅是回傳 DTO。讀與寫的資料只需要達到最終一致性。
  3. 適用於一些需要對查詢性能和寫入性能分開進行優化的系統,尤其是讀/寫比非常高的系統。例如在很多系統中讀取資料的使用次數遠大於寫入資料。
  4. 對於系統在將來會隨著時間不斷演化,有可能會包含不同版本的模型,或者業務規則經常變化的系統
  5. 需要和其他系統整合,特別是需要和事件歷程 Event Sourcing 進行整合的系統,這樣子系統的臨時異常不會影響整個系統的其他部分。

在以下狀況,不適合使用CQRS:

  1. 領域模型或者業務邏輯比較簡單,這種情況下使用CQRS會把系統弄得太複雜
  2. 對於簡單的,CRUD模式的用戶介面以及與之相關的數據訪問操作已經足夠的話,都只是一個簡單的對數據進行增刪改查,沒有必要使用CQRS
  3. 不適合在整個系統中全部都使用 CQRS,在特定模組中CQRS可能比較有用

以下是一些查詢到的 CQRS 架構圖,從圖片可以看到跟傳統的 CRUD Data-Driven 架構的差異。

這種架構區分了 Business 及 Query model 的 DataBase,也可以想成將資料寫入了 Business Database,而前端使用者不會觸及該資料庫,是比較強調資料安全性的方式,但不能保證 Query Model 的 DB 裡面的資料一定會跟 Business Model DB 的資料一樣。

這種架構比較接近一個一般性的系統,資料庫是單一的,且可以在 Event Handler 中確認並檢查 Database 及 Analysis Database 的資料一致性。

這種架構圖比較強調資料流的過程,但基本上架構跟上面那個很接近,不過在 Comamnd 的部分,可注意到 Command 沒有 Reply DTO,也可以說,只要 Comamnd 有送進 Command Handler,就視為一個成功執行的 Command。

這裡強調 Command Bus 是以非同步的方式,送進 Command Handler,非同步可強化系統效能的表現,但如果要用同步的方式也可以,要等到 Command Event 送進 Event Database 後,才回應給前端確認該命令已經完成。而 Query 是用同步的方式,發送 Query 並等待要回傳的 ViewModel。

References

CQRS - Martin Fowler

CQRS 介紹

CQRS 命令查詢職責分離模式介紹

從CQS到CQRS

From CQS to CQRS

深度長文:我對CQRS/EventSourcing架構的思考

DDD CQRS架構和傳統架構的優缺點比較

Introduction to Domain Driven Design, CQRS and Event Sourcing

2018年5月21日

Elixir 9 Protocol

inpsect 可以用 printable binary 形式回傳任何 value。但 elixir 是用什麼方式實作的? 是不是 guard clause

def inspect(value) when is_atom(value), do: ...
def inspect(value) when is_binary(value), do: ...

protocol 允許不同的資料類型用於相同的函數,不同資料型別的相同函數形態會有相同的行為。很像是 behavior,但 behavior 用在 module 裡面,protocol 可在 module 外面實作,這表示我們可以自由擴充 module 不足的功能。

defprotocol 定義 protocol,defprotocol 只定義 function,defimpl 實作要放在不同的地方。

defprotocol Inspect do
    def inspect(thing, opts)
end

增加了這兩個實作,就可以 inspect PID

defimpl Inspect, for: PID do
    def inspect(pid, _opts) do
        "#PID" <> IO.iodata_to_binary(:erlang.pid_to_list(pid))
    end
end

defimpl Inspect, for: Reference do
    def inspect(ref, _opts) do
        '#Ref' ++ rest = :erlang.ref_to_list(ref)
        "#Reference" <> IO.iodata_to_binary(rest)
    end
end
iex(1)> inspect self()
"#Process<0.89.0>"

可在 for: 後面使用的 Types 有

Any
Atom
BitString
Float
Function
Integer
List
Map
PID
Port
Record
Reference
Tuple

is_collection.exs

defprotocol Collection do
  @fallback_to_any true
  def is_collection?(value)
end

defimpl Collection, for: [List, Tuple, BitString, Map] do
  def is_collection?(_), do: true
end

defimpl Collection, for: Any do
  def is_collection?(_), do: false
end

Enum.each [ 1, 1.0, [1,2], {1,2}, %{}, "cat" ], fn value ->
  IO.puts "#{inspect value}:  #{Collection.is_collection?(value)}"
end
$ elixir is_collection.exs
1:  false
1.0:  false
[1, 2]:  true
{1, 2}:  true
%{}:  true
"cat":  true

Protocol and Structs

Elixir 沒有 classes,但支援 user-defined types

defmodule Blob do
  defstruct content: nil
end
iex(1)> c "basic.exs"
[Blob]
iex(2)> b = %Blob{content: 123}
%Blob{content: 123}
iex(3)> inspect b
"%Blob{content: 123}"

## structs 其實是 map,key為 __struct__
iex(4)> inspect b, structs: false
"%{__struct__: Blob, content: 123}"

Built-In Protocols

elixir 有以下內建的 protocols

  • Enumerable and Collectable
  • Inspect
  • List.Chars
  • String.Chars

首先定義一個以 0s 1s 表示的 integer

bitmap.exs

defmodule Bitmap do
  defstruct value: 0

  @doc """
  A simple accessor for the 2^bit value in an integer

      iex> b = %Bitmap{value: 5}
      %Bitmap{value: 5}
      iex> Bitmap.fetch_bit(b,2)
      1
      iex> Bitmap.fetch_bit(b,1)
      0
      iex> Bitmap.fetch_bit(b,0)
      1
  """
  def fetch_bit(%Bitmap{value: value}, bit) do
    use Bitwise

    (value >>> bit) &&& 1
  end
end
  • Enumerable and Collectable

Enumerable 定義了三個 functions

defprotocol Enumerable do
    # collection 的元素數量
    def count(collection)

    # 是否包含某個 value
    def member?(collection, value)

    # reduce fun to collection elements
    def reduce(collection, acc, fun)
end

針對剛剛的 Bitmap 實作 Enumerable

defimpl Enumerable,  for: Bitmap do
  import :math, only: [log: 1]

  def reduce(bitmap, {:cont, acc}, fun) do
    bit_count =  Enum.count(bitmap)
    _reduce({bitmap, bit_count}, { :cont, acc }, fun)
  end

  defp _reduce({_bitmap, -1}, { :cont, acc }, _fun), do: { :done, acc }

  defp _reduce({bitmap, bit_number}, { :cont, acc }, fun) do
    with bit = Bitmap.fetch_bit(bitmap, bit_number),
    do:  _reduce({bitmap, bit_number-1}, fun.(bit, acc), fun)
  end

  defp _reduce({_bitmap, _bit_number}, { :halt, acc }, _fun), do: { :halted, acc }

  defp _reduce({bitmap, bit_number}, { :suspend, acc }, fun), 
  do: { :suspended, acc, &_reduce({bitmap, bit_number}, &1, fun), fun } 

  def member?(value, bit_number) do
    { :ok, 0 <= bit_number && bit_number < Enum.count(value) }
  end

  def count(%Bitmap{value: value}) do              
    { :ok, trunc(log(abs(value))/log(2)) + 1 }
  end
end


fifty = %Bitmap{value: 50}

IO.puts Enum.count fifty    # => 6

IO.puts Enum.member? fifty, 4    # => true
IO.puts Enum.member? fifty, 6    # => false

IO.inspect Enum.reverse fifty       # => [0, 1, 0, 0, 1, 1, 0]
IO.inspect Enum.join fifty, ":"     # => "0:1:1:0:0:1:0"
iex(1)> c ("bitmap.exs")
[Bitmap]
iex(2)> c ("bitmap_enumerable.exs")
6
true
false
[0, 1, 0, 0, 1, 1, 0]
"0:1:1:0:0:1:0"
[Enumerable.Bitmap]

defimpl Collectable,  for: Bitmap do
  use Bitwise

  # 回傳 tuple,(1) value (2) function
  def into(%Bitmap{value: target}) do
    {target, fn
      acc, {:cont, next_bit} -> (acc <<< 1) ||| next_bit
      acc,  :done            -> %Bitmap{value: acc}
      _, :halt               -> :ok
    end}
  end
end
iex(3)> c("bitmap_collectable.exs")
[Collectable.Bitmap]
iex(4)> Enum.into [1,1,0,0,1,0], %Bitmap{value: 0}
%Bitmap{value: 50}
  • Inspect
defmodule Bitmap do
  defstruct value: 0

  defimpl Inspect do
    def inspect(%Bitmap{value: value}, _opts) do
      "%Bitmap{#{value}=#{as_binary(value)}}"
    end
    defp as_binary(value) do
      to_string(:io_lib.format("~.2B", [value]))
    end
  end
end
iex(6)> c("bitmap_inspect.exs")
[Bitmap, Inspect.Bitmap]
iex(7)> fifty = %Bitmap{value: 50}
%Bitmap{50=110010}
iex(8)> inspect fifty
"%Bitmap{50=110010}"
iex(9)> inspect fifty, structs: false
"%{__struct__: Bitmap, value: 50}"
iex(10)> %Bitmap{value: 12345678901234567890}
%Bitmap{12345678901234567890=1010101101010100101010011000110011101011000111110000101011010010}

defmodule Bitmap do
  defstruct value: 0

  defimpl Inspect, for: Bitmap do
    import Inspect.Algebra
    def inspect(%Bitmap{value: value}, _opts) do
      concat([
        nest(
         concat([
           "%Bitmap{",
           break(""),
           nest(concat([to_string(value),
                        "=",
                        break(""),
                        as_binary(value)]),
                2),
         ]), 2),
        break(""),
        "}"])
    end
    defp as_binary(value) do
      to_string(:io_lib.format("~.2B", [value]))
    end
  end
end
iex(1)> c("bitmap_algebra.exs")
[Bitmap, Inspect.Bitmap]
iex(2)>
nil
iex(3)> big_bitmap = %Bitmap{value: 12345678901234567890}
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
iex(4)>
nil
iex(5)> IO.inspect big_bitmap
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
iex(6)> IO.inspect big_bitmap, structs: false
%{__struct__: Bitmap, value: 12345678901234567890}
%Bitmap{12345678901234567890=
    1010101101010100101010011000110011101011000111110000101011010010}
  • List.Chars & String.Chars

bitmap_string.exs

defimpl String.Chars, for: Bitmap do
  def to_string(bitmap) do
    import Enum
    bitmap
    |> reverse
    |> chunk(3)
    |> map(fn three_bits -> three_bits |> reverse |> join end)
    |> reverse
    |> join("_")
  end
end
iex(1)> c("bitmap.exs")
[Bitmap]
iex(2)> c("bitmap_enumerable.exs")
[Enumerable.Bitmap]
iex(3)> c("bitmap_string.exs")
[String.Chars.Bitmap]


iex(4)> fifty = %Bitmap{value: 50}
%Bitmap{value: 50}
iex(5)> "Fifty in bits is: #{fifty}"
"Fifty in bits is: 110_010"

References

Programming Elixir

2018年5月14日

Elixir 8 Macros

Macro 可能會讓程式碼更難閱讀,如果可以使用 function,就不要用 macro。本文以 erlang/elixir 不支援的 if 語法,說明如何以 macro 實作 if 語法。

if Statement

myif «condition» do
    «evaluate if true»
else
    «evaluate if false»
end

----

myif «condition»,
    do: «evaluate if true»,
    else: «evaluate if false

可用 function 實作 if 的語法

defmodule My do
    def myif(condition, clauses) do
        do_clause = Keyword.get(clauses, :do, nil)
        else_clause = Keyword.get(clauses, :else, nil)

    case condition do
        val when val in [false, nil]
            -> else_clause
        _otherwise
            -> do_clause
        end
    end
end
$ iex my.exs
iex(1)> My.myif 1==2, do: (IO.puts "1==2"), else: (IO.puts "1 != 2")
1==2
1 != 2
:ok

但結果是不正確的,因為 elixir 同時 evaluate do: 及 else:

Macro Inject Code

defmacro 定義 macro,當傳送參數給 macro,elixir 不會直接 evaluate,而是會以 tuple 方式發送程式碼。

defmodule My do
  defmacro macro(param) do
    IO.inspect param
  end
end

defmodule Test do
  require My

  # These values represent themselves
  My.macro :atom        #=> :atom
  My.macro 1            #=> 1
  My.macro 1.0          #=> 1.0
  My.macro [1,2,3]      #=> [1,2,3]
  My.macro "binaries"   #=> "binaries"
  My.macro { 1, 2 }     #=> {1,2}
  My.macro do: 1        #=> [do: 1]

  # And these are represented by 3-element tuples,以三個元素的 tuple 表示這些 macro

  My.macro { 1,2,3,4,5 }
  # =>  {:"{}",[line: 20],[1,2,3,4,5]}

  My.macro do: ( a = 1; a+a )
  # =>  [do:
  #      {:__block__,[],
  #        [{:=,[line: 22],[{:a,[line: 22],nil},1]},
  #         {:+,[line: 22],[{:a,[line: 22],nil},{:a,[line: 22],nil}]}]}]


  My.macro do
    1+2
  else
    3+4
  end
  # =>   [do: {:+,[line: 24],[1,2]},
  #       else: {:+,[line: 26],[3,4]}]

end
$ iex dumper.exs
:atom
1
1.0
[1, 2, 3]
"binaries"
{1, 2}
[do: 1]
{:{}, [line: 29], [1, 2, 3, 4, 5]}
[do: {:__block__, [],
  [{:=, [line: 32], [{:a, [line: 32], nil}, 1]},
   {:+, [line: 32], [{:a, [line: 32], nil}, {:a, [line: 32], nil}]}]}]
[do: {:+, [line: 40], [1, 2]}, else: {:+, [line: 42], [3, 4]}]

在一個 module 定義 macro,另一個 module 使用時,必須要先用 require,這樣才能確保 macro module 在目前這個 module 前先被編譯。


quote function

quote 可讓 code 保持尚未 evaluated 的形式。quote/2 能把 Elixir 代碼轉換成底層表示形式。

iex(1)> quote do: :atom
:atom
iex(2)> quote do: 1
1
iex(3)> quote do: 1.0
1.0
iex(4)> quote do: [1,2,3]
[1, 2, 3]
iex(5)> quote do: "binaries"
"binaries"
iex(6)> quote do: {1,2}
{1, 2}
iex(7)> quote do: [do: 1]
[do: 1]

iex(8)> quote do: {1,2,3,4,5}
{:{}, [], [1, 2, 3, 4, 5]}

iex(9)> quote do: (a = 1; a + a)
{:__block__, [],
 [{:=, [], [{:a, [], Elixir}, 1]},
  {:+, [context: Elixir, import: Kernel],
   [{:a, [], Elixir}, {:a, [], Elixir}]}]}

iex(10)> quote do: [ do: 1 + 2, else: 3 + 4]
[do: {:+, [context: Elixir, import: Kernel], [1, 2]},
 else: {:+, [context: Elixir, import: Kernel], [3, 4]}]

eg.exs

defmodule My do
  defmacro macro(code) do
    IO.inspect code
    # 會 evaluate code
    code
  end
end
defmodule Test do
  require My
  My.macro(IO.puts("hello"))
end

eg1.exs

defmodule My do
  defmacro macro(code) do
    IO.inspect code
    # 只會 evaluate do 裡面的 code,quota 裡面的 code 會回傳給呼叫 macro 的 code,然後被 evaluate
    quote do: IO.puts "Different code"
  end
end
defmodule Test do
  require My
  My.macro(IO.puts("hello"))
end
$ elixir eg.exs
{{:., [line: 17], [{:__aliases__, [counter: 0, line: 17], [:IO]}, :puts]},
 [line: 17], ["hello"]}
hello

$ elixir eg1.exs
{{:., [line: 17], [{:__aliases__, [counter: 0, line: 17], [:IO]}, :puts]},
 [line: 17], ["hello"]}
Different code

unquote function

知道了如何獲取代碼的內部表示,那怎麼修改它呢?可利用 unquote/1 來插入新的代碼和值。當我們 unquote 一個表達式的時候,會把它運行的結果插入到 AST。

iex(1)> denominator = 2
2
iex(2)> quote do: divide(42, denominator)
{:divide, [], [42, {:denominator, [], Elixir}]}
iex(3)> quote do: divide(42, unquote(denominator))
{:divide, [], [42, 2]}

Expanding a list using unquote_splicing

# insert [3,4]
iex(4)> Code.eval_quoted(quote do: [1,2,unquote([3,4])])
{[1, 2, [3, 4]], []}

# insert 3,4 到前面的 list
iex(5)> Code.eval_quoted(quote do: [1,2,unquote_splicing([3,4])])
{[1, 2, 3, 4], []}

# '1234' 是 lists of characters
iex(6)> Code.eval_quoted(quote do: [?a, ?= ,unquote_splicing('1234')])
{'a=1234', []}


iex(7)> fragment = quote do: IO.puts("hello")
{{:., [], [{:__aliases__, [alias: false], [:IO]}, :puts]}, [], ["hello"]}
iex(8)> Code.eval_quoted fragment
hello
{:ok, []}
iex(9)> Code.eval_string("[a, a*b, c]", [a: 2, b: 3, c: 4])
{[2, 6, 4], [a: 2, b: 3, c: 4]}

myif Macro

defmodule My do
  defmacro if(condition, clauses) do
    do_clause   = Keyword.get(clauses, :do, nil)
    else_clause = Keyword.get(clauses, :else, nil)
    quote do
      case unquote(condition) do
        val when val in [false, nil] -> unquote(else_clause)
        _                            -> unquote(do_clause)
      end
    end
  end
end

defmodule Test do
  require My
  My.if 1==2 do
    IO.puts "1 == 2"
  else
    IO.puts "1 != 2"
  end
end
$ elixir myif.ex
1 != 2

References

Programming Elixir

2018年5月7日

Elixir 7 OTP

OTP 定義 application 的 hierarchy,一個 application 包含了多個 processes,這些 processes 是依照 "behaviors" 的規格實作,"server" behavior 就稱為 GenServer,另一個特殊的 "supervisor" behavior 用來監控 processes,並實作 restart process 的規則。

OTP Server: GenServer

產生新的 sequence project

$ mix new sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs
$ cd sequence/
$ mkdir lib/sequence

當 client 呼叫 server,就會進入 handle_call,其中第一個參數是 client 要送給 server 的資訊,第二個參數是 client 的 PID,第三個是 server state。

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  # :next_number 是 client 給的參數, _from 是 client's PID, current_number 是 server state
  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

end

startlink 類似 spawnlink,會產生一個新的 process

$ iex -S mix
Compiling 2 files (.ex)
Generated sequence app
iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.149.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, :next_number)
101
iex(4)> GenServer.call(pid, :next_number)
102

增加處理重設 number 的 fuction: def handle_call({:set_number, new_number}, _from, _current_number)

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_call({:set_number, new_number}, _from, _current_number) do
    { :reply, new_number, new_number }
  end

end

執行過程

iex(1)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.123.0>}
iex(2)> GenServer.call(pid, :next_number)
100
iex(3)> GenServer.call(pid, {:set_number, 20})
20
iex(4)> GenServer.call(pid, :next_number)
20
iex(5)> GenServer.call(pid, :next_number)
21
iex(6)> GenServer.call(pid, :next_number)
22

GenServer.call 是單向的呼叫,會等待 server 回傳結果,但有時候不需要等待 reply,就改用 GenServer.cast,Server 要實作 handle_cast。

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

end

執行過程

# reload Sequence.Server
iex(8)> r Sequence.Server
warning: redefining module Sequence.Server (current version loaded from _build/dev/lib/sequence/ebin/Elixir.Sequence.Server.beam)
  lib/sequence/server.ex:1

{:reloaded, Sequence.Server, [Sequence.Server]}

iex(9)> { :ok, pid } = GenServer.start_link(Sequence.Server, 100)
{:ok, #PID<0.143.0>}
iex(10)> GenServer.call(pid, :next_number)
100
iex(11)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(12)> GenServer.call(pid, :next_number)
301

追蹤 Server 執行過程

start_link 的第三個參數,是一些 options 設定,例如 [debug: [:trace]]

iex(6)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:trace]])
{:ok, #PID<0.148.0>}
iex(7)> GenServer.call(pid, :next_number)
*DBG* <0.148.0> got call next_number from <0.139.0>
*DBG* <0.148.0> sent 100 to <0.139.0>, new state 101
100
iex(8)> GenServer.cast(pid, {:increment_number, 200})
*DBG* <0.148.0> got cast {increment_number,200}
:ok
*DBG* <0.148.0> new state 301

[debug: [:statistics]] 產生統計資料,timestamp 為 {{y,m,d},{h,m,s}} 時間的 tuple

iex(11)>  {:ok,pid} = GenServer.start_link(Sequence.Server, 100, [debug: [:statistics]])
{:ok, #PID<0.155.0>}
iex(12)> GenServer.call(pid, :next_number)
100
iex(13)> GenServer.cast(pid, {:increment_number, 200})
:ok
iex(14)> :sys.statistics pid, :get
{:ok,
 [start_time: {{2017, 8, 30}, {9, 54, 38}},
  current_time: {{2017, 8, 30}, {9, 54, 53}}, reductions: 76, messages_in: 2,
  messages_out: 0]}

可用 :sys.trace enable/disable trace 功能

iex(17)> :sys.trace pid, true
:ok
iex(18)> GenServer.call(pid, :next_number)
*DBG* <0.155.0> got call next_number from <0.139.0>
*DBG* <0.155.0> sent 301 to <0.139.0>, new state 302
301
iex(19)> :sys.trace pid, false
:ok
iex(20)> GenServer.call(pid, :next_number)
302
iex(21)> :sys.get_state pid
303

增加 formatstatus,格式化 :sys.getstatus pid 的結果

defmodule Sequence.Server do
  # use 就是將 OTP GenServer behavior 套用到這個 module
  use GenServer

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end
iex(8)> :sys.get_status pid
{:status, #PID<0.123.0>, {:module, :gen_server},
 [["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
   "$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>,
  [trace: true,
   statistics: {{{2017, 8, 30}, {10, 1, 2}}, {:reductions, 21}, 1, 0}],
  [header: 'Status for generic server <0.123.0>',
   data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
    {'Logged events', []}],
   data: [{'State', "My current state is '101', and I'm happy"}]]]}

GenServer Callbacks

GenServer 是使用 OTP protocol,必須實作六個 callback functions:

  1. init(state_arguments)

    GenServer 會在啟動 server 時呼叫,參數為 start_link 的第二個參數,啟動成功要回傳 {:ok, state},失敗要回傳 {:stop, reason}。

    有個 {:ok, state, timeout} 的回傳值 timeout,GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。

  2. handle_call(request, from , state)

    GenServer.call(pid, request) 呼叫的 function,成功要回傳 {:reply, result, newstate},預設可用 :badcall error 停掉 server。

  3. handle_cast(request, state)

    GenServer.cast(pid, request) 呼叫的 function,成功要回傳 {:noreply, newstate},失敗回傳 {:stop, reason, newstate}

  4. handle_info(info, state)

    處理沒有被 call/cast 處理的 messages,例如處理 :timeout,另外直接用 send 發送給這個 PID 的訊息,也是由 handle_info 處理。

  5. terminate(reason, state)

    當 server 停止時會呼叫此 function,但如果有用 supervisor 機制監控 server,就不用處理這個。

  6. codechange(fromversion, state, extra)

    OTP 可在不停掉系統的狀況下,替換 server code,新的 server 跟舊 server 有不同的狀態。

  7. format_status(reason, [pdict, state])

    用來客製化 state display 的訊息,要回傳 [data: [{'State', state_info}]]

OTP functions 的回傳值

  1. { :noreply, new_state [ , :hibernate | timeout ] }

    call and cast 使用,:hibernate 會將 server state 由 memory 移除,並在下一個 message 中,recover 該 state,儲存 memory state 會消耗掉一些 CPU resource。 timeout 可以設定為 :infinite (預設值),GenServer 如果在 timeout ms 後,一直沒有收到 message,會自動產生一個 :timeout message 給 server,server 必須要用 handle_info 處理。

  2. { :stop, reason, new_state }

    server 終止的訊號

  3. { :reply, response, new_state [ , :hibernate | timeout ] }

    只有 handle_call 可以使用,可發送 response 給 client

  4. { :stop, reason, reply, new_state }

    只有 handle_call 可以使用,發送 response 及 server 終止的訊號給 client。


Naming a Process

在 start_link 加上 name: :seq 的 option

iex(9)> {:ok,pid} = GenServer.start_link(Sequence.Server, 100, name: :seq)
{:ok, #PID<0.132.0>}
iex(10)> GenServer.call(:seq, :next_number)
100
iex(11)> :sys.get_status :seq
{:status, #PID<0.132.0>, {:module, :gen_server},
 [["$ancestors": [#PID<0.121.0>, #PID<0.59.0>],
   "$initial_call": {Sequence.Server, :init, 1}], :running, #PID<0.121.0>, [],
  [header: 'Status for generic server seq',
   data: [{'Status', :running}, {'Parent', #PID<0.121.0>},
    {'Logged events', []}],
   data: [{'State', "My current state is '101', and I'm happy"}]]]}

如何將呼叫 GenServer 的 function call (start_link, call, cast) 封裝起來

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
  end

  def next_number do
    GenServer.call __MODULE__, :next_number
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end
$ iex -S mix
iex(1)> Sequence.Server.start_link 200
{:ok, #PID<0.123.0>}
iex(2)> Sequence.Server.next_number
200
iex(3)> Sequence.Server.next_number
201
iex(4)> Sequence.Server.increment_number 100
:ok
iex(5)> Sequence.Server.next_number
302

OTP Supervisors

supervisors 負責 process monitoring and restarting

supervisor 是使用 OTP supervisor behavior,他會有一個 list of porcesses,並知道 process crash 時,要怎麼處理,另外也有避免 restart loops 的機制。

supervisor 是利用 erlang VM process-linking 及 -monitoring 的機制實作的。

產生一個包含 supervisor 的新專案

$ mix new --sup sequence
* creating README.md
* creating .gitignore
* creating mix.exs
* creating config
* creating config/config.exs
* creating lib
* creating lib/sequence.ex
* creating lib/sequence/application.ex
* creating test
* creating test/test_helper.exs
* creating test/sequence_test.exs

mix.exs 是自動產生的,不需要修改內容

defmodule Sequence.Mixfile do
  use Mix.Project

  def project do
    [
      app: :sequence,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: {Sequence.Application, []}
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end
end

這是 lib/sequence/application.ex 的內容

defmodule Sequence.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      # Starts a worker by calling: Sequence.Worker.start_link(arg)
      # {Sequence.Worker, arg},
      {Sequence.Server, 123},
      {Sequence.Server2, 123}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: Sequence.Supervisor]
    
    # 產生 supervisor
    {:ok, pid} = Supervisor.start_link(children, opts)
  end
end

由 supervisor 啟動兩個 worker: Sequnce.Server 及 Sequence.Server2

defmodule Sequence.Supervisor do
  use Supervisor.Behaviour

  def start_link(initial_number) do
    :supervisor.start_link(__MODULE__, initial_number)
  end

  def init(initial_number) do
    child_processes = [
        worker(Sequence.Server, [initial_number]),
        worker(Sequence.Server2, [initial_number])
    ]
    supervise child_processes, strategy: :one_for_one
  end
end

Sequence.Server 是從上面的測試複製下來的

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    GenServer.start_link(__MODULE__, current_number, name: __MODULE__)
  end

  def next_number do
    GenServer.call __MODULE__, :next_number
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  # 因為不需要 reply 給 client,最後只要更新 server state
  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end

  # 格式化 state 的 trace 資訊
  def format_status(_reason, [ _pdict, state ]) do
    [data: [{'State', "My current state is '#{inspect state}', and I'm happy"}]]
  end

end

Sequence.Server2 功能跟 Server1 一樣,差別是啟動時,指定了不同的 Process Name,另外注意改用 :gen_server 的方式呼叫。

defmodule Sequence.Server2 do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    :gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
  end

  def next_number do
    :gen_server.call :sequence, :next_number
  end

  def increment_number(delta) do
    :gen_server.cast :sequence, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  def init(current_number)
    when is_number(current_number) do
    { :ok, current_number }
  end

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end
end

Sequence.Server2 跟 Sequence.Server1 可用同樣的方式測試。

$ iex -S mix

Compiling 1 file (.ex)
Interactive Elixir (1.5.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> :observer.start()
:ok
iex(2)> Sequence.Server.increment_number 3
:ok
iex(3)> Sequence.Server.next_number
126
iex(4)> Sequence.Server.next_number
127

## 刻意發送一個 increment_number 無法處理的 string,process 會 crash,並由 supervisor 重新啟動
iex(5)> Sequence.Server.increment_number "c"
:ok
iex(6)>
00:08:25.372 [error] GenServer Sequence.Server terminating
** (ArithmeticError) bad argument in arithmetic expression
    (sequence) lib/sequence/server.ex:27: Sequence.Server.handle_cast/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:increment_number, "c"}}
State: [data: [{'State', "My current state is '128', and I'm happy"}]]

nil
iex(7)> Sequence.Server.next_number
123
iex(8)> Sequence.Server.next_number
124
iex(9)>

OTP Application

OTP application is a bundle of code that comes with a descriptor. 概念上比較接近 component/service

OTP 會使用一個 name.app 的 application spec file,mix 會自動根據 mix.exs 產生這個設定檔。

$ mix new sequence

會產生一個 mix.exs,在 def application do 裡面加上 mod: { Sequence, 456 }

defmodule Sequence.Mixfile do
  use Mix.Project

  def project do
    [
      app: :sequence,
      version: "0.1.0",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

  # Run "mix help compile.app" to learn about applications.
  def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, 456 }
    ]
  end

  # Run "mix help deps" to learn about dependencies.
  defp deps do
    [
      # {:dep_from_hexpm, "~> 0.3.0"},
      # {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"},
    ]
  end
end

修改 /lib/sequence.ex,因為剛剛 mod: {Sequence,456} 的設定,會自動執行 Sequence module 的 start(_type, initial_number),在裡面啟動 Sequence.Supervisor。

defmodule Sequence do
  use Application

  def start(_type, initial_number) do
    Sequence.Supervisor.start_link(initial_number)
  end
end

/lib/sequence/supervisor.ex 是 OTP supervisor,負責管理 Sequence.Server process

defmodule Sequence.Supervisor do
  use Supervisor

  def start_link(initial_number) do
    :supervisor.start_link(__MODULE__, initial_number)
  end

  def init(initial_number) do
    child_processes = [
        worker(Sequence.Server, [initial_number])
    ]
    supervise child_processes, strategy: :one_for_one
  end
end

/lib/sequence/server.ex

defmodule Sequence.Server do
  use GenServer

  #####
  # External API

  def start_link(current_number) do
    :gen_server.start_link({ :local, :sequence }, __MODULE__, current_number, [])
  end

  def next_number do
    :gen_server.call :sequence, :next_number
  end

  def increment_number(delta) do
    :gen_server.cast :sequence, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  def init(current_number)
  when is_number(current_number) do
    { :ok, current_number }
  end

  def handle_call(:next_number, _from, current_number) do
    { :reply, current_number, current_number+1 }
  end

  def handle_cast({:increment_number, delta}, current_number) do
    { :noreply, current_number + delta}
  end
end

啟動 sequence project

$ iex -S mix
iex(1)> Sequence.Server.next_number
456

加上 registered: 設定 application 要註冊的 names,可利用這個設定值,讓 node/cluster 之間的 application name 都是唯一的,在此註冊這個 applcation 名稱為 Sequence.Server

  def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, 456 },
      registered: [Sequence.Server]
    ]
  end

執行 mix compile 會編譯程式並產生 sequence/_build/dev/lib/sequence/ebin/sequence.app

$ mix compile
Compiling 3 files (.ex)
Generated sequence app

sequence.app 內容為

{application,sequence,
             [{applications,[kernel,stdlib,elixir,logger]},
              {description,"sequence"},
              {modules,['Elixir.Sequence','Elixir.Sequence.Server',
                        'Elixir.Sequence.Supervisor']},
              {vsn,"0.1.0"},
              {extra_applications,[logger]},
              {mod,{'Elixir.Sequence',456}},
              {registered,['Elixir.Sequence.Server']}]}.

要讓 Application 更一般化,可改寫初始參數的傳遞方式

修改 mix.exs 將參數放在 env 裡面 env: [initial_number: 456]

def application do
    [
      extra_applications: [:logger],
      mod: { Sequence, [] },
      env: [initial_number: 456]
      registered: [Sequence.Server]
    ]
  end

修改 /lib/sequence.ex,參數由 Application.get_env 取得

defmodule Sequence do
  use Application

  def start(_type, _args) do
    Sequence.Supervisor.start_link(Application.get_env(:sequence, :initial_number))
  end
end

Code Release: EXRM (Elixir Release Manager)

release: a bundle that contains a particular version of your application including dependencies, configuration, metadata.

hot upgrade: 更新 running application,更新過程 user 不受影響

exrm: 建立在 erlang relx 之上,用來管理 release package

修改 mix.exs,增加 :exrm 這個 library depencency

  defp deps do
    [
      {:exrm, "~> 1.0.8"}
    ]
  end

修改 /lib/sequence/server.ex,增加 @vsn "0"

defmodule Sequence.Server do
  use GenServer

  @vsn "0"

安裝 exrm

mix do deps.get, deps.compile

產生 release package 在 rel/sequence 這個資料夾內,壓縮檔放在 rel/sequence/releases/0.0.1/sequence.tar.gz

$ mix release
....
==> The release for sequence-0.0.1 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`

解壓縮 sequence.tar.gz 就可以啟動 sequence app

$ ./sequence/bin/sequence console
iex(sequence@127.0.0.1)1> Sequence.Server.next_number
456
iex(sequence@127.0.0.1)2> Sequence.Server.next_number
457

保留這個 console 不動,回到 project 程式,修改 mix.exs,將版本號碼改為 0.0.2

  def project do
    [
      app: :sequence,
      version: "0.0.2",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps: deps()
    ]
  end

另外修改 lib/sequence/server.ex 的 next_number 用來測試是不是已經有換成新版的程式

  def next_number do
    #:gen_server.call(:sequence, :next_number)
    with number = :gen_server.call(:sequence, :next_number),
    do: "0.0.2 The next number is #{number}"
  end

在 project folder 再做一次 mix release,但在 mac 出現了 .DS_Store 的錯誤(note: 必須要先有 rel/sequence/releases/0.0.1 這個 release 的資料,編譯新版 0.0.2 時,才會出現 relup 檔案,升級就是用這個檔案處理的)

$ mix release
Compiling 3 files (.ex)
Generated sequence app
Building release with MIX_ENV=dev.

This is an upgrade, verifying appups exist for updated dependencies..
==> Unable to access /test/sequence/rel/sequence/releases/.DS_Store/sequence.rel: enotdir

移除 .DS_Store 後,再做一次 mix release

$ rm /test/sequence/rel/sequence/releases/.DS_Store
[14:56]charley@cmbp ~/project/idea/book/ProgrammingElixir/test/sequence$ mix release
Building release with MIX_ENV=dev.

This is an upgrade, verifying appups exist for updated dependencies..
==> All dependencies have appups ready for release!
==> Generated .appup for sequence 0.0.1 -> 0.0.2
==> The release for sequence-0.0.2 is ready!
==> You can boot a console running your release with `$ rel/sequence/bin/sequence console`

將 rel/sequence/releases/0.0.2/sequence.tar.gz 複製到剛剛解壓縮 0.1.0 版的正式環境目錄裡面的 sequence/releases/0.0.2/ 目錄中

用另一個 terminal 直接 hot-upgrade 到 0.0.2

$ bin/sequence upgrade 0.0.2
Release 0.0.2 not found, attempting to unpack releases/0.0.2/sequence.tar.gz
Unpacked successfully: "0.0.2"
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.2 is already unpacked, now installing.
Installed Release: 0.0.2
Made release permanent: "0.0.2"

回到剛剛的 server console,可驗證是否有更新到 0.0.2

iex(sequence@127.0.0.1)3> Sequence.Server.next_number
"0.0.2 The next number is 458"

如果新版發生問題,也能直接降版回 0.0.1

$ bin/sequence downgrade 0.0.1
Release 0.0.1 is marked old, switching to it.
Generating vm.args/sys.config for upgrade...
sys.config ready!
vm.args ready!
Release 0.0.1 is marked old, switching to it.
Installed Release: 0.0.1
Made release permanent: "0.0.1"
iex(sequence@127.0.0.1)4> Warning: "/Users/charley/Downloads/test/sequence/releases/0.0.1/relup" missing (optional)
iex(sequence@127.0.0.1)5> Sequence.Server.next_number
459

0.0.3 版,增加 code vsn 更新的 def code_change("0", old_state = { current_number, stash_pid }, _extra) callback function

更新時 console 會出現這樣的資訊

22:59:39.030 [info]  Changing code from 0 to 1

22:59:39.030 [info]  {465, #PID<0.628.0>}

22:59:39.031 [info]  %Sequence.Server.State{current_number: 465, delta: 1, stash_pid: #PID<0.628.0>}

以下為完整的 0.0.3 版 code

mix.exs

defmodule Sequence.Mixfile do
  use Mix.Project

  # ...

  def project do
    [
      app:     :sequence,
      version: "0.0.3",
      elixir: "~> 1.5",
      start_permanent: Mix.env == :prod,
      deps:    deps()
    ]
  end

  # Configuration for the OTP application
  def application do
    [
      mod: { Sequence, 456 },
      registered: [ Sequence.Server ],
      applications: [ :logger ]
    ]
  end

  defp deps do
    [
      {:exrm, "~> 1.0.8"}
    ]
  end
end

lib/sequence.ex

defmodule Sequence do
  use Application

  def start(_type, initial_number) do
    Sequence.Supervisor.start_link(initial_number)
  end
end

lib/sequence/supervisor.ex

defmodule Sequence.Supervisor do
  use Supervisor

  def start_link(initial_number) do
    result = {:ok, sup } = Supervisor.start_link(__MODULE__, [initial_number]) 
    start_workers(sup, initial_number)
    result
  end

  def start_workers(sup, initial_number) do
    # Start the stash worker
    {:ok, stash} = 
       Supervisor.start_child(sup, worker(Sequence.Stash, [initial_number]))

    # and then the subsupervisor for the actual sequence server
    Supervisor.start_child(sup, supervisor(Sequence.SubSupervisor, [stash]))
  end

  def init(_) do
    supervise [], strategy: :one_for_one
  end

end

lib/sequence/subsupervisor.ex

defmodule Sequence.SubSupervisor do
  use Supervisor

  def start_link(stash_pid) do
    Supervisor.start_link(__MODULE__, stash_pid)
  end

  def init(stash_pid) do
    child_processes = [ worker(Sequence.Server, [stash_pid]) ]
    supervise child_processes, strategy: :one_for_one
  end

end

lib/sequence/server.ex

defmodule Sequence.Server do
  use GenServer
  require Logger

  @vsn "1"
  defmodule State do
    defstruct current_number: 0, stash_pid: nil, delta: 1
  end

  #####
  # External API
  # . . .
  def start_link(stash_pid) do
    GenServer.start_link(__MODULE__, stash_pid, name: __MODULE__)
  end

  def next_number do
    # GenServer.call __MODULE__, :next_number
    with number = GenServer.call(__MODULE__, :next_number),
    do: "0.0.2 The next number is #{number}"
  end

  def increment_number(delta) do
    GenServer.cast __MODULE__, {:increment_number, delta}
  end

  #####
  # GenServer implementation

  # def init(stash_pid) do
  #   current_number = Sequence.Stash.get_value stash_pid
  #   { :ok, {current_number, stash_pid} }
  # end

  # def handle_call(:next_number, _from, {current_number, stash_pid}) do
  #   { :reply, current_number, {current_number+1, stash_pid} }
  # end

  # def handle_cast({:increment_number, delta}, {current_number, stash_pid}) do
  #   { :noreply, {current_number + delta, stash_pid}}
  # end

  # def terminate(_reason, {current_number, stash_pid}) do
  #   Sequence.Stash.save_value stash_pid, current_number
  # end

  def init(stash_pid) do
    # 啟動時,以 process id 向 Sequence.Stash 取回 current_number
    current_number = Sequence.Stash.get_value stash_pid

    # 用 struct 記錄 state
    { :ok,
      %State{current_number: current_number, stash_pid: stash_pid} }
  end

  def handle_call(:next_number, _from, state) do
    { :reply,
      state.current_number,
      %{ state | current_number: state.current_number + state.delta} }
  end

  def handle_cast({:increment_number, delta}, state) do
    {:noreply,
     %{ state | current_number: state.current_number + delta, delta: delta} }
  end

  def terminate(_reason, state) do
    # 結束時,將 current_number 存到 Sequence.Stash
    Sequence.Stash.save_value state.stash_pid, state.current_number
  end

  # 版本更新
  def code_change("0", old_state = { current_number, stash_pid }, _extra) do
    new_state = %State{current_number: current_number,
                       stash_pid: stash_pid,
                       delta: 1
                      }
    Logger.info "Changing code from 0 to 1"
    Logger.info inspect(old_state)
    Logger.info inspect(new_state)
    { :ok, new_state }
  end
end

lib/sequence/stash.ex

defmodule Sequence.Stash do
  use GenServer

  #####
  # External API  

  def start_link(current_number) do
    GenServer.start_link( __MODULE__, current_number)
  end

  def save_value(pid, value) do
    GenServer.cast pid, {:save_value, value}
  end

  def get_value(pid) do
    GenServer.call pid, :get_value
  end

  #####
  # GenServer implementation

  def handle_call(:get_value, _from, current_value) do 
    { :reply, current_value, current_value }
  end

  def handle_cast({:save_value, value}, _current_value) do
    { :noreply, value}
  end
end

Task & Agent

如果需要 background processing for maintaining state,但不想寫 spawn/send/receive,可使用 Task, Agents

Task: a function that runs in the background

tasks1.exs

defmodule Fib do
  def of(0), do: 0
  def of(1), do: 1
  def of(n), do: Fib.of(n-1) + Fib.of(n-2)
end

IO.puts "Start the task"

# 非同步 呼叫 Fib.of
worker = Task.async(fn -> Fib.of(20) end)
IO.puts "Do something else"
# 等待完成後的結果
IO.puts "Wait for the task"
result = Task.await(worker)

IO.puts "The result is #{result}"

# 改用 Module, function, parameter 的方式傳入 Task.async
worker = Task.async(Fib, :of, [10])
result = Task.await(worker)
IO.puts "The result is #{result}"
$ elixir tasks1.exs
Start the task
Do something else
Wait for the task
The result is 6765
The result is 55

如何監控 Tasks

  1. 不要用 async 改以 start_link 將 task link to a currently supervised process。

  2. 由 supervisor 啟動 supervise tasks

import Supervisor.Spec
children = [
    worker(Task, [ fn -> Fib.of(20) end ])
]
supervise children, strategy: :one_for_one

Agent: a background process that maintains state

可在 process/node/other nodes 存取 state

# count 會儲存 agent process 的 PID
iex(1)> {:ok, count} = Agent.start(fn -> 0 end)
{:ok, #PID<0.86.0>}
iex(2)> Agent.get(count, &(&1))
0
iex(3)> Agent.update(count, &(&1+1))
:ok
iex(4)> Agent.update(count, &(&1+1))
:ok
iex(5)> Agent.get(count, &(&1))
2

# 也可以將 Agent process 設定 local/global name
iex(6)> Agent.start(fn -> 5 end, name: Sum)
{:ok, #PID<0.92.0>}
iex(7)> Agent.get(Sum, &(&1))
5
iex(8)> Agent.update(Sum, &(&1+100))
:ok
iex(9)> Agent.get(Sum, &(&1))
105

agent_dict.exs

# 儲存 a list of word/frequency pairs in a map
# 將 map 存在 agent 中
defmodule Frequency do

  def start_link do
    # 以 start_link 啟動 Agent
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def add_word(word) do
    Agent.update(__MODULE__,
                 fn map ->
                      Map.update(map, word, 1, &(&1+1))
                 end)
  end

  def count_for(word) do
    Agent.get(__MODULE__, fn map -> map[word] end)
  end

  def words do
    Agent.get(__MODULE__, fn map -> Map.keys(map) end)
  end

end
iex(1)> c "agent_dict.exs"
[Frequency]
iex(2)> Frequency.start_link
{:ok, #PID<0.92.0>}
iex(3)> Frequency.add_word "You"
:ok
iex(4)> Frequency.words
["You"]
iex(5)> Frequency.add_word "are"
:ok
iex(6)> Frequency.add_word "here"
:ok
iex(7)> Frequency.add_word "are"
:ok
iex(8)> Frequency.add_word "ok"
:ok
iex(9)> Frequency.words
["You", "are", "here", "ok"]
iex(10)> Frequency.count_for("are")
2
iex(11)> Frequency.count_for("ok")
1

anagrams.exs

defmodule Dictionary do

  @name __MODULE__

  ##
  # External API

  # 啟動 Dictionary Agent Process
  def start_link,
  do: Agent.start_link(fn -> %{} end, name: @name)

  def add_words(words),
  do: Agent.update(@name, &do_add_words(&1, words))

  def anagrams_of(word),
  do: Agent.get(@name, &Map.get(&1, signature_of(word)))

  ##
  # Internal implementation
  defp do_add_words(map, words),
  do: Enum.reduce(words, map, &add_one_word(&1, &2))

  defp add_one_word(word, map),
  do: Map.update(map, signature_of(word), [word], &[word|&1])

  defp signature_of(word),
  do: word |> to_charlist |> Enum.sort |> to_string

end

defmodule WordlistLoader do

  # 每一個 dictionary file 都用不同的 Task 載入
  def load_from_files(file_names) do
    file_names
    |> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
    |> Enum.map(&Task.await/1)
  end

  defp load_task(file_name) do
    File.stream!(file_name, [], :line)
    |> Enum.map(&String.trim/1)
    |> Dictionary.add_words
  end
end
iex(1)> Dictionary.start_link
{:ok, #PID<0.93.0>}

# 共有四個 list file
iex(2)> Enum.map(1..4, &"words/list#{&1}") |> WordlistLoader.load_from_files
[:ok, :ok, :ok, :ok]
iex(3)> Dictionary.anagrams_of "organ"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
 "angor"]

Agent, Task 是以 OTP 實作,可以在多個 nodes 環境運作

名稱的部分改為 @name {:global, __MODULE__}

defmodule Dictionary do

  @name {:global, __MODULE__}

  ##
  # External API

  def start_link, 
  do: Agent.start_link(fn -> HashDict.new end, name: @name)

  def add_words(words),
  do: Agent.update(@name, &do_add_words(&1, words))

  def anagrams_of(word),
  do: Agent.get(@name, &Dict.get(&1, signature_of(word)))

  ##
  # Internal implementation

  defp do_add_words(dict, words),
  do: Enum.reduce(words, dict, &add_one_word(&1, &2))
  
  defp add_one_word(word, dict),
  do: Dict.update(dict, signature_of(word), [word], &[word|&1])

  defp signature_of(word),
  do: word |> to_charlist |> Enum.sort |> to_string

end

defmodule WordlistLoader do
  def load_from_files(file_names) do
    file_names
    |> Stream.map(fn name -> Task.async(fn -> load_task(name) end) end)
    |> Enum.map(&Task.await/1)
  end

  defp load_task(file_name) do
    File.stream!(file_name, [], :line)
    |> Enum.map(&String.strip/1)
    |> Dictionary.add_words
  end
end
$ iex --sname one anagrams_dist.exs
iex(one@cmbp)1> Dictionary.
add_words/1      anagrams_of/1    start_link/0
iex(one@cmbp)1> Dictionary.start_link
{:ok, #PID<0.102.0>}
iex(one@cmbp)2> Dictionary.anagrams_of "argon"
["ronga", "rogan", "orang", "nagor", "groan", "grano", "goran", "argon",
 "angor"]
$ iex --sname two anagrams_dist.exs
iex(two@cmbp)1> Node.connect :one@cmbp
true
iex(two@cmbp)2> Node.list
[:one@cmbp]
iex(two@cmbp)3> WordlistLoader.load_from_files(~w{words/list1 words/list2})
[:ok, :ok]
iex(two@cmbp)4> WordlistLoader.load_from_files(~w{words/list3 words/list4})
[:ok, :ok]
iex(two@cmbp)5> Dictionary.anagrams_of "crate"
["recta", "react", "creta", "creat", "crate", "cater", "carte", "caret"]

References

Programming Elixir

2018年4月30日

Elixir 6 MultipleProcesses

elixir 的 process 並不是 OS process,而是 erlang VM process。Elixir 使用 actor model for concurrency,actor 是一個獨立的 process,我們可 spawn prcess,發送及接收 messages。

Process

spawn-basic.ex

defmodule SpawnBasic do
  def greet do
    IO.puts "Hello"
  end
end

spawn 產生一個新的 process,#PID<0.93.0> 是 process id

iex(1)> c("spawn-basic.ex")
[SpawnBasic]
iex(2)> SpawnBasic.greet
Hello
:ok
iex(3)> spawn(SpawnBasic, :greet, [])
Hello
#PID<0.93.0>

在 Process 之間傳遞 Message

defmodule Spawn1 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
    end
  end
end

# here's a client 發送訊息給 pid,self 是 caller's PID
pid = spawn(Spawn1, :greet, [])
send pid, {self(), "World!"}

receive do
  {:ok, message} ->
    IO.puts message
end

這是舊的語法,現在要改為 send( pid, msg )

pid <- {self, "World!"}

send self(), "World!"
send( self, "World!")

發送多個訊息

defmodule Spawn2 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
    end
  end
end

# here's a client
pid = spawn(Spawn2, :greet, [])

send pid, {self(), "World!"}
receive do
  {:ok, message} ->
    IO.puts message
end

send pid, {self(), "Kermit!"}
receive do
  {:ok, message} ->
    IO.puts message
end

執行後,程式會停在這裡,這是因為,發送第一個訊息,Spawn2 greet 處理後,回傳結果後就結束了,所以當發送第二個訊息給 pid後,等待回應時,一直無法收到回傳的訊息。

Hello World!

改寫:在 receive 的部分,設定 timeout

defmodule Spawn2 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
    end
  end
end

# here's a client
pid = spawn(Spawn2, :greet, [])

send pid, {self(), "World!"}
receive do
  {:ok, message} ->
    IO.puts message
end

send pid, {self(), "Kermit!"}
receive do
  {:ok, message} ->
    IO.puts message
  after 500 ->
    IO.puts "The greeter has gone away"
end

執行結果

Hello World!
The greeter has gone away

最正確的寫法,應該要讓 Spawn 不斷接收並處理訊息

defmodule Spawn4 do
  def greet do
    receive do
      {sender, msg} ->
        send( sender, { :ok, "Hello #{msg}" } )
        greet
    end
  end
end

# here's a client
pid = spawn(Spawn4, :greet, [])

send pid, {self(), "World!"}
receive do
  {:ok, message} ->
    IO.puts message
end

send pid, {self(), "Kermit!"}
receive do
  {:ok, message} ->
    IO.puts message
  after 500 ->
    IO.puts "The greeter has gone away"
end
Hello World!
Hello Kermit!

Tail Recursion: 尾遞迴

defmodule TailRecursive do

  def factorial(n), do: _fact(n, 1)
  
  defp _fact(0, acc), do: acc
  defp _fact(n, acc), do: _fact(n-1, acc*n)

end

Process Overhead

這是一個用來測試 process 效能的小程式,一開始會產生 n 個 processes,第一個會送數字給第二個,加 1 後傳給第三個,最後一個會回傳結果給第一個。

defmodule Chain do

  def counter(next_pid) do
    receive do
      n ->
        send next_pid, n + 1
    end
  end

  def create_processes(n) do
    last = Enum.reduce 1..n, self(),
             fn (_,send_to) ->
                ## 產生 process 執行 :counter,將自己的 process PID 送到 send_to 由 spawn 傳給下一個 process
               spawn(Chain, :counter, [send_to])
             end

    # start the count by sending
    send last, 0

    # and wait for the result to come back to us
    receive do
      final_answer when is_integer(final_answer) ->
        "Result is #{inspect(final_answer)}"
    end
  end

  def run(n) do
    IO.puts inspect :timer.tc(Chain, :create_processes, [n])
  end

end
$ elixir -r chain.exs -e "Chain.run(10)"
{3932, "Result is 10"}

$ elixir -r chain.exs -e "Chain.run(1_000)"
{15951, "Result is 1000"}

$ elixir -r chain.exs -e "Chain.run(40_000)"
{444129, "Result is 40000"}

$ elixir -r chain.exs -e "Chain.run(50_000)"
{545041, "Result is 50000"}

$ elixir -r chain.exs -e "Chain.run(300_000)"

10:54:49.904 [error] Too many processes
** (SystemLimitError) a system limit has been reached

調整 vm 參數 --erl "+P 1000000"

$ elixir --erl "+P 1000000" -r chain.exs -e "Chain.run(300_000)"
{2924026, "Result is 300000"}

When Process Die

預設狀況下,沒有人會知道 Process 結束了

import :timer, only: [ sleep: 1 ]

defmodule Link1 do
  def sad_method do
    sleep 500
    exit(99)
  end

  def run do
    spawn(Link1, :sad_method, [])

    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Link1.run

Link1 不知道新的 sad_method process 已經結束了,在 1s 後 timeout

$ elixir -r link1.exs
Nothing happened as far as I am concerned

要解決上面的問題,可以用 spawn_link 連結兩個 Process,當 child process 因異常死亡時,會連帶把另一個 process 停掉。

import :timer, only: [ sleep: 1 ]

defmodule Link2 do
  def sad_method do
    sleep 500
    exit(99)
  end

  def run do
    spawn_link(Link2, :sad_method, [])

    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Link2.run
$ elixir -r link2.exs
** (EXIT from #PID<0.73.0>) 99
defmodule Link3 do
  import :timer, only: [ sleep: 1 ]

  def sad_function do
    sleep 500
    exit(:boom)
  end
  def run do
    # 將 :trap_exit signal 轉換為 message :EXIT
    Process.flag(:trap_exit, true)
    
    spawn_link(Link3, :sad_function, [])
    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Link3.run
$ elixir -r link3.exs
MESSAGE RECEIVED: {:EXIT, #PID<0.79.0>, :boom}

Monitoring a Process

spawnmonitor 可在 spawna process 時連帶啟用 monitoring,或是使用 Process.monior 監控已經存在的 process。但如果是用 Process.monitor,可能會產生 race condition,如果其他 process 在呼叫 monitor call 完成前就死亡,就會收不到 notification。 spawnlink 及 spawn_monitor versions 為 atomic,所以可以 catch a failure。

import :timer, only: [ sleep: 1 ]

defmodule Monitor1 do
  def sad_method do
    sleep 500
    exit(99)
  end

  def run do
    res = spawn_monitor(Monitor1, :sad_method, [])
    IO.puts inspect res

    receive do
      msg ->
        IO.puts "MESSAGE RECEIVED: #{inspect msg}"
    after 1000 ->
        IO.puts "Nothing happened as far as I am concerned"
    end
  end
end

Monitor1.run
$ elixir -r monitor1.exs
{#PID<0.79.0>, #Reference<0.759175825.870842370.239990>}
MESSAGE RECEIVED: {:DOWN, #Reference<0.759175825.870842370.239990>, :process, #PID<0.79.0>, 99}

結果跟 spawnlink 差不多,如果是用 spawnlink,子 process crash 連帶會影響 monitor process,如果用spawn_monitor,就會知道 crash 的原因。

Parallel Map

通常 map 會回傳一個 apply function 到 collection 裡面每一個 elements 的 list,parallel map 的功能一樣,但會在分別獨立的 process 對每一個 element 都 apply function。

首先會將 collection map 到 fn,在 fn 內 spawn_link 產生 a list of PIDs,每一個 PID 都會對每一個 element 執行給定的 function。

第二個 |> 會將 list of PIDs 轉成 results,各自傳給 list 內每一個PID,注意 ^pid 可讓 receive 依照順序取得結果。

defmodule Parallel do
  def pmap(collection, fun) do
    me = self()

    collection
  |>
    Enum.map(fn (elem) ->
       spawn_link fn -> ( send me, { self(), fun.(elem) } ) end
     end)
  |>
    Enum.map(fn (pid) ->
      receive do { ^pid, result } -> result end
    end)
  end
end
iex(1)> Parallel.pmap 1..10, &(&1 * &1)
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

Fibonacci Server

當 calculator 可處理下一個數字時,會發送 :ready 給 scheduler,scheduler 會以 :fib message 發送計算工作給 calculator,calculator 會以 :answer 回傳結果,scheduler 會發送 :shutdown 給 calculator 通知要 exit。

defmodule FibSolver do

  def fib(scheduler) do
    # 啟動後,發送 :ready 給 scheduler,還有自己的 PID
    send scheduler, { :ready, self() }
    receive do
      { :fib, n, client } ->
        # 計算 n 的 fib 結果,回傳結果給 client
        send client, { :answer, n, fib_calc(n), self() }
        # 等待要處理的下一個 message
        fib(scheduler)
      { :shutdown } ->
        # 收到 shutdown 就結束工作
        exit(0)
    end
  end

  # very inefficient, deliberately
  defp fib_calc(0), do: 1
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end

defmodule Scheduler do

  # 由 run 啟動 scheduler
  def run(num_processes, module, func, to_calculate) do
    # 產生 num_processes 個 process,送入 scheduler_processes
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:fib, next, self()}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        # queue 裡面已經沒有要計算的工作,就發送 :shutdown 給該 process
        send pid, {:shutdown}
        if length(processes) > 1 do
          # 如果 processes 裡面還有 process,就從 processes 中去掉這個 pid,再迴圈繼續等待其他還沒執行完成的 processes
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          # 已經沒有 processes,所有 processes 都已經 shutdown,就排序結果
          Enum.sort(results, fn {n1,_}, {n2,_}  -> n1 <= n2 end)
        end

      # 接收 fib 計算的結果
      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [ {number, result} | results ])

    end
  end
end

to_process = [27, 33, 35, 11, 36, 29, 18, 37, 21, 31, 19, 10, 14, 30,
              15, 17, 23, 28, 25, 34, 22, 20, 13, 16, 32, 12, 26, 24]

Enum.each 1..10, fn num_processes ->
  {time, result} = :timer.tc(Scheduler, :run,
                               [num_processes, FibSolver, :fib, to_process])
  if num_processes == 1 do
    IO.puts inspect result
    IO.puts "\n #   time (s)"
  end
  :io.format "~2B     ~.2f~n", [num_processes, time/1000000.0]
end

執行結果

[{10, 89}, {11, 144}, {12, 233}, {13, 377}, {14, 610}, {15, 987}, {16, 1597}, {17, 2584}, {18, 4181}, {19, 6765}, {20, 10946}, {21, 17711}, {22, 28657}, {23, 46368}, {24, 75025}, {25, 121393}, {26, 196418}, {27, 317811}, {28, 514229}, {29, 832040}, {30, 1346269}, {31, 2178309}, {32, 3524578}, {33, 5702887}, {34, 9227465}, {35, 14930352}, {36, 24157817}, {37, 39088169}]

 #   time (s)
 1     3.38
 2     1.99
 3     1.93
 4     1.75
 5     1.73
 6     1.88
 7     1.76
 8     1.71
 9     1.72
10     1.81

目前這個版本的計算過程如下,有很多重複的計算

fib(5)
= fib(4) + fib(3)
= fib(3) + fib(2) + fib(2) + fib(1)
= fib(2) + fib(1) + fib(1) + fib(0) + fib(1) + fib(0) + fib(1)
= fib(1) + fib(0) + fib(1) + fib(1) + fib(0) + fib(1) + fib(0) + fib(1)

Elixir module 不能儲存資料,但 process 可以儲存 state,elixir 提供 Agent module,可封裝 process 包含了 state。

fib_agent.exs

defmodule FibAgent do
  def start_link do
    Agent.start_link(fn -> %{ 0 => 0, 1 => 1 } end)
  end

  def fib(pid, n) when n >= 0 do
    Agent.get_and_update(pid, &do_fib(&1, n))
  end

  defp do_fib(cache, n) do
    case cache[n] do
      nil ->
        { n_1, cache } = do_fib(cache, n-1)
        result         = n_1 + cache[n-2]
        { result, Map.put(cache, n, result) }

      cached_value ->
        { cached_value , cache }
    end
  end

end

{:ok, agent} = FibAgent.start_link()
IO.puts FibAgent.fib(agent, 2000)

Nodes

erlang Beam VM 可處理自己的 event, process scheduling, memory, naming service, interprocess communication,nodes 可互相連接。

查詢 VM node name

iex(1)> Node.self
:nonode@nohost

可在啟動 iex 時以 --name 或 --sname 設定 node name

$ iex --name name1@cmbp.local
iex(name1@cmbp.local)1> Node.self
:"name1@cmbp.local"

$ iex --name name1
iex(name1@cmbp.local)1> Node.self
:"name1@cmbp.local"

可使用 Node.connect :"name1@cmbp" 連接另一個 node

$ iex --sname name1
-----
$ iex --sname name2

iex(name2@cmbp)1> Node.list
[]
iex(name2@cmbp)2> Node.connect :"name1"
false
iex(name2@cmbp)3> Node.connect :"name1@cmbp.local"

23:32:10.451 [error] ** System NOT running to use fully qualified hostnames **
** Hostname cmbp.local is illegal **

false
iex(name2@cmbp)4> Node.connect :"name1@cmbp"
true
iex(name2@cmbp)5> Node.list
[:name1@cmbp]
# 先產生一個 fun
iex(name1@cmbp)1> func = fn -> IO.inspect Node.self end
#Function<20.99386804/0 in :erl_eval.expr/5>

iex(name1@cmbp)2> spawn(func)
:name1@cmbp
#PID<0.96.0>

# 在 name1@cmbp 產生 func process
iex(name1@cmbp)3> Node.spawn(:"name1@cmbp", func)
:name1@cmbp
#PID<0.98.0>

# 在 name2@cmbp 產生 func process
# 雖然 process 在 name2,IO 還是在 name1
iex(name1@cmbp)4> Node.spawn(:"name2@cmbp", func)
#PID<9755.103.0>
:name2@cmbp

Nodes, Cookies, and Security

在啟動 VM 時,增加 cookie 設定,可增加安全性,相同 cookie 的 node 才能連接在一起。

$ iex --sname name2 --cookie cookie2
iex(name2@cmbp)1>
00:21:31.987 [error] ** Connection attempt from disallowed node :name1@cmbp **

nil
iex(name2@cmbp)2> Node.get_cookie
:cookie2

-------

$ iex --sname name1 --cookie cookie1
iex(name1@cmbp)1> Node.connect :"name2@cmbp"
false

erlang VM 預設會讀取 ~/.erlang.cookie 這個檔案的 cookie

Process Name

PID 有三個部分的數字,但只有兩個 fields: 第一個數字是 Node ID,後兩個數字是 low and high bits of the process ID,如果 export PID 到另一個 node,node ID 會設定為 process 存在的 node number。

iex(name2@cmbp)3> self()
#PID<0.90.0>

process 以 :global.register_name(@name, pid) 註冊 process name,後續就可以用 :global.whereis_name(@name) 找到這個 process。

如果在程式裡註冊 global process name,可能會遇到名稱一樣的問題,可以改用 mix.exs 設定檔,管理要註冊到 global state 的 process names。

defmodule Tick do

  @interval 2000   # 2 seconds

  @name  :ticker

  def start do
    pid = spawn(__MODULE__, :generator, [[]])
    :global.register_name(@name, pid)
  end

  def register(client_pid) do
    send :global.whereis_name(@name), { :register, client_pid }
  end

  def generator(clients) do
    receive do
      { :register, pid } ->
        IO.puts "registering #{inspect pid}"
        generator([pid|clients])

    after
      @interval ->
        IO.puts "tick"
        Enum.each clients, fn client ->
          send client, { :tick }
        end
        generator(clients)
    end
  end
end

defmodule Client do

  def start do
    pid = spawn(__MODULE__, :receiver, [])
    Tick.register(pid)
  end

  def receiver do
    receive do
      { :tick } ->
        IO.puts "tock in client"
        receiver()
    end
  end
end
$ iex --sname name1
iex(name1@cmbp)1> c("ticker.ex")
[Client, Tick]
iex(name1@cmbp)2> Node.connect :"name2@cmbp"
true
iex(name1@cmbp)3> Tick.start
:yes
tick
tick
tick
iex(name1@cmbp)4> Client.start
registering #PID<0.111.0>
{:register, #PID<0.111.0>}
tick
tock in client
tick
tock in client

-------
$ iex --sname name2
iex(name2@cmbp)1> c("ticker.ex")
[Client, Tick]
iex(name2@cmbp)2> Client.start
{:register, #PID<0.104.0>}
tock in client
tock in client
tock in client

I/O, PIDs, and Nodes

Erlang VM 將 I/O 實作為 processes。可以直接透過 I/O server 的PID 對 open file/device 處理 IO。

VM 的 default IO device 可透過 :erlang.group_leader 取得,他會回傳 I/O Server 的 PID

$ iex --sname name1

-------

$ iex --sname name2
iex(name2@cmbp)1> Node.connect(:"name1@cmbp")
true
iex(name2@cmbp)2> :global.register_name(:name2, :erlang.group_leader)
:yes

------

# 回到 :name1
iex(name1@cmbp)1> name2 = :global.whereis_name :name2
#PID<9755.59.0>
iex(name1@cmbp)2> IO.puts(name2, "test")
:ok

------

# :name2 的 IO 可看到

test

References

Programming Elixir