2019年9月23日

ZeroMQ

2010年3月30日,AMQP的最初設計者 iMatix 的執行長 Pieter Hintjens 宣布 iMatix 要退出AMQP工作群組,並實作簡單又快速的ZeroMQ,且不支援未來會發布的AMQP/1.0。針對 AMQP 的問題,在 iMatrix 的 What is wrong with AMQP 有說明他的論點。

ØMQ(也寫作ZeroMQ、0MQ或ZMQ) 是一個為可伸縮的分散式或並行應用程式設計的高效能非同步訊息庫,它提供一個訊息佇列的功能,但ZeroMQ 不需要專門的訊息代理(message broker) 就可以運作。 目前已經有很多不同的程式語言的函式庫可以使用,可參考 ØMQ Language Bindings 有不同程式語言 libray 的安裝方式。

安裝

在 mac 可用 macport 安裝,但這部分是要直接使用 C 語言開發的情況下才要安裝。

sudo port install zmq

如果要使用 python,可使用 ZeroMQ for python pyzmq 這個 library,他已經將 libzmq.so 包裝在裡面,因此直接安裝 pyzmq 就可以使用 ZeroMQ

sudo pip3 install pyzmq

安裝完成後,以一個簡單的範例測試

client.py

import zmq

context = zmq.Context()

socket = context.socket(zmq.REQ)
socket.connect ("tcp://127.0.0.1:7788")

socket.send(b'hello')
print( socket.recv() )

server.py

import zmq

context = zmq.Context()

socket = context.socket(zmq.REP)
socket.bind ("tcp://*:7788")

print( socket.recv() )
socket.send(b'world')

一般網路程式,都需要先啟動 server 後,才能啟動 client,不過 ØMQ 不同,不管誰先啟動都沒關係。以下是在兩個 terminal 分別執行 client/server 的結果。

$ python client.py
b'world'
$ python server.py
b'hello'

pyzmq 的所有 API 文件在 The PyZMQ API

加法計算的小程式

addserver.py

import os
import zmq

context = zmq.Context()

socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:7788")

print( 'Worker %s is running ...' % os.getpid() )

while True:
    # receive request
    a, b = socket.recv_multipart()
    a = int(a)
    b = int(b)

    print( 'Compute %s + %s and send response' % (a, b) )
    socket.send_string( str(a + b) +" (from worker "+ str(os.getpid()) +")" )

addclient.py

import zmq
import random
import time

context = zmq.Context()

socket = context.socket(zmq.REQ)
socket.bind("tcp://*:7788")

# wait all worker connected
time.sleep(1)

for i in range(9):
    a = random.randint(0, 100)
    b = random.randint(0, 100)
    print( 'Compute %s + %s ...' % (a, b) )

    # send request to peer
    socket.send_multipart([bytes(str(a),"ascii"), bytes(str(b),"ascii")])

    # receive response from peer
    rep = socket.recv()
    print( ' = ' + rep.decode('UTF-8') )

在這裡例子必須先啟動 worker (addserver),我們先啟動三個 worker後,再啟動 addclient.py

$ python addserver.py
Worker 45777 is running ...
Compute 33 + 47 and send response
Compute 68 + 13 and send response
Compute 45 + 68 and send response

$ python addserver.py
Worker 45784 is running ...
Compute 69 + 88 and send response
Compute 86 + 13 and send response
Compute 22 + 8 and send response

$ python addserver.py
Worker 45792 is running ...
Compute 41 + 25 and send response
Compute 14 + 84 and send response
Compute 13 + 48 and send response

client 會用 send_multipart 的方式,將資料以 Round-Robin 方式平均分配給 workers,

$ python addclient.py
Compute 33 + 47 ...
 = 80 (from worker 45777)
Compute 41 + 25 ...
 = 66 (from worker 45792)
Compute 69 + 88 ...
 = 157 (from worker 45784)
Compute 68 + 13 ...
 = 81 (from worker 45777)
Compute 14 + 84 ...
 = 98 (from worker 45792)
Compute 86 + 13 ...
 = 99 (from worker 45784)
Compute 45 + 68 ...
 = 113 (from worker 45777)
Compute 13 + 48 ...
 = 61 (from worker 45792)
Compute 22 + 8 ...
 = 30 (from worker 45784)

如果刻意實作一個 addserver2.py,只服務一次計算,就中止 socket。

import os
import zmq

context = zmq.Context()

socket = None

try:
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:7788")

    print( 'Worker %s is running ...' % os.getpid() )

    # while True:
    # receive request
    a, b = socket.recv_multipart()
    a = int(a)
    b = int(b)

    print( 'Compute %s + %s and send response' % (a, b) )
    socket.send_string( str(a + b) +" (from worker "+ str(os.getpid()) +")" )
finally:
    if( socket is not None):
        socket.close()
    if( context is not None):
        context.term()

執行後可以發現,另外兩個 worker 有將其他的 task 處理掉,不過這時要注意,server2.py 在結束前一定要 close socket,否則 client 會一直在等待那一個 worker。

$ python addserver2.py
Worker 45982 is running ...
Compute 49 + 96 and send response

$ python addserver.py
Worker 45967 is running ...
Compute 47 + 63 and send response
Compute 49 + 74 and send response
Compute 69 + 78 and send response
Compute 29 + 10 and send response

$ python addserver.py
Worker 45974 is running ...
Compute 56 + 7 and send response
Compute 62 + 36 and send response
Compute 3 + 47 and send response
Compute 75 + 66 and send response


$ python addclient.py
Compute 47 + 63 ...
 = 110 (from worker 45967)
Compute 49 + 96 ...
 = 145 (from worker 45982)
Compute 56 + 7 ...
 = 63 (from worker 45974)
Compute 49 + 74 ...
 = 123 (from worker 45967)
Compute 62 + 36 ...
 = 98 (from worker 45974)
Compute 69 + 78 ...
 = 147 (from worker 45967)
Compute 3 + 47 ...
 = 50 (from worker 45974)
Compute 29 + 10 ...
 = 39 (from worker 45967)
Compute 75 + 66 ...
 = 141 (from worker 45974)

The Zen of Zero: Ø 的意義

ZeroMQ 的Zero 一開始代表 "zero broker",以及 "zero latency",後來慢慢增加了 "zero administration", "zero cost", "zero waste" 這些意思,也就是說,ZeroMQ 就是要盡可能降低複雜度。ZeroMQ 就像是增強功能的 socket library,也可以說是 mailboxes with routing。

我們再回到 ØMQ 的首頁,看一下這個專案的目標

  • Distributed Messaging: 也就是在分散式的環境,不僅是跨機器,也可以跨 process,跨 thread 進行通訊

  • 可以用多種程式語言在多種平台上互通訊息

  • 在 inproc, IPC, TCP, TIPC, multicast 這些情境中傳遞訊息

  • 支援 pub-sub, push-pull, and router-dealer 這些傳遞訊息的 pattern

  • 用極小的 library 提供高速非同步 I/O engines

  • 有很多開源社群的支援

  • 支援高階程式語言及平台

  • 可建立 centralized, distributed, small, or large 這些架構

  • 有商業支援的自由軟體

以下依照 ØMQ - The Guide in python 的說明瞭解 ØMQ,範例程式可以在 GitHub repository 取得。

ØMQ 收送資料的方法

Request-Reply

這是最基本的,Client 跟 Server 互相收送資料,client 會發送 "Hello" 給 server,server 回應 "World" 給 client。

hwserver.py 會 bind tcp port 5555,並在收到資料後,回應 "World"

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")

hwclient.py,會連接到 localhost:5555,會發送 10 次 "Hello",每一次發送後就會等待 server 回應。

import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

#  Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s ..." % request)
    socket.send(b"Hello")

    #  Get the reply.
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

我們可以先啟動 client,再啟動 server,或是同時啟動很多個 client。如果在 client 執行到一半,用 Ctrl-C 將 server 中斷,然後重新啟動 hwserver.py,會發現 client 並沒有繼續往下執行。這個問題會在 chap 4 Reliable Request-Reply Patterns 裡面討論。

Publish-Subscribe

剛剛的是 server-client 之間雙向的傳輸,第二種常見的 pattern 為單向的 data distribution,server 會持續發送資料給一些 clients。

wuserver.py 是天氣資訊 server 的範例,會持續一直發送資料,發送時會亂數處理一個 zipcode

#
#   Weather update server
#   Binds PUB socket to tcp://*:5556
#   Publishes random weather updates
#

import zmq
from random import randrange


context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)

    socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

wuclient.py 會透過 socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) 註冊只需要取得 zipcode 10001 的天氣資訊,程式後面取得五次天氣的更新資料,然後平均。

#
#   Weather update client
#   Connects SUB socket to tcp://localhost:5556
#   Collects weather updates and finds avg temp in zipcode
#

import sys
import zmq


#  Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")

# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"

# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
    zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

# Process 5 updates
total_temp = 0
for update_nbr in range(5):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

print("Average temperature for zipcode '%s' was %dF" % (
      zip_filter, total_temp / (update_nbr+1))
)

SUB socket 必須要用 zmq_setsockopt() 設定要註冊取得的資訊是什麼,如果沒有註冊,就取不到任何資料。PUB-SUB socket pair 是非同步的。如果對 SUB socket 發送訊息(zmq_send()),就會發生 error,對 PUB socket 接收訊息(zmq_recv())也會發生 error。PUB socket 並不會理會有沒有 SUB socket 接收資料,會持續不斷發送資料出去。

另外,PUB-SUB sockets 中,不能保證 subscriber 什麼時候開始接收到訊息,就算是先啟動了 subscriber 再啟動 publisher,問題還是一樣會發生,subscriber 永遠會漏掉沒接收到 publisher 發送的第一個訊息。這是因為 subscriber 連接到 publisher 需要時間,但publisher 可能已經開始發送訊息了。

subscriber 並不能保證接收到所有 publisher 發送的訊息,在 chap2 會說明並解決這個問題。如果 publisher 有無限多的資料要發送給 subscriber,這個情境下,就不需要在意是不是接收到所有資料。

另外有一些要注意的事項:

  1. subscriber 可連接多個 publisher,資料會依次接收 (fair-queued),subscriber 不會一直接收到某一個 publisher 的資料
  2. 如果 publisher 沒有任何連接的 subscribers,就會直接丟棄所有訊息
  3. 如果是 TCP,且 subscriber 網路速度很慢,message 會被放在 publisher 的 queue,接下來會看到如何保護 publisher,解決這個問題

  4. 在 ØMQ 3.X 以後,如果是用 tcp:// 或是 ipc://,filter 會發生在 publisher 端,如果是用 epgm://,filter 會發生在 subscriber 端。在 2.X,所有 filtering 都發生在 subscriber 端。

Divide & Conquer - Parallel Pipeline

  • ventilator 會產生可平行處理得 tasks
  • 有多個 workers 處理 tasks
  • sink 會接收 worker 的處理結果

taskvent.py 會產生 100 個 tasks,在開始發送 task 之前,要先啟動某個數量的 worker

# Task ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket

import zmq
import random
import time

try:
    raw_input
except NameError:
    # Python 3
    raw_input = input

context = zmq.Context()

# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

# Socket with direct access to the sink: used to synchronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

print("Press Enter when the workers are ready: ")
_ = raw_input()
print("Sending tasks to workers...")

# The first message is "0" and signals start of batch
sink.send(b'0')

# Initialize random number generator
random.seed()

# Send 100 tasks
total_msec = 0
for task_nbr in range(100):

    # Random workload from 1 to 100 msecs
    workload = random.randint(1, 100)
    total_msec += workload

    sender.send_string(u'%i' % workload)

print("Total expected cost: %s msec" % total_msec)

# Give 0MQ time to deliver
time.sleep(1)

taskwork.py 取得 task 並 sleep 一段時間,然後發送完成的訊息

# Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket

import sys
import time
import zmq


context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Process tasks forever
while True:
    s = receiver.recv()

    # Simple progress indicator for the viewer
    sys.stdout.write('.')
    sys.stdout.flush()

    # Do the work
    time.sleep(int(s)*0.001)

    # Send results to sink
    sender.send(b'')

tasksink.py,以 taskvent.py 發送的 b'0' 為啟動的訊號,接收 100 個 task 的結果,計算花費的時間

# Task sink
# Binds PULL socket to tcp://localhost:5558
# Collects results from workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import sys
import time
import zmq


context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmations
for task_nbr in range(100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
  • worker 數量越多,處理完成的時間越短。
  • worker 上接 ventilator,下接 sink,數量可任意變動
  • ventilator 的 PUSH 會平均發送 task 給 workers,也稱為 load balancing
  • sink 的 PULL 會平均接收 workers 的結果,也稱為 fair queueing
  • 同樣會發生 "slow joiner" 的問題,會造成 load balancing 分配不均。如果某個 PULL socket 比較快 join,就會在其他 PULL socket 連接前,就先取得較多 task。如果需要較正確的 load balancing 機制,會在 chap3 Advanced Request-Reply Patterns 裡面說明。

ØMQ 的 string

ØMQ 在傳送資料時,必須要指定傳送的資料量,也就是說,我們要自己進行資料格式化。以 C 語言來說,string 是以 null byte 結束,因此可以這樣發送,也就是發送 6 bytes

zmq_send(requester, "Hello", 6, 0);

在 python 的 string 並沒有 null byte,因此在發送時,會在前面增加一個 byte 為字串長度

socket.send("Hello")

但如果在 C 語言接收 python 發送的資料,會發生沒有正確結束的字串的錯誤。因此最好的方式,是 allocate 新的 buffer,並增加一個 byte,複製 string 然後最後填上 null。以下 s_recv 只能處理 255 chars。

static char *
s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    if (size > 255)
        size = 255;
    buffer [size] = 0;
    return strdup (buffer);
}

ZeroMQ strings 是以指定長度的方式處理,發送時並不會在後面加上 null。

ØMQ 的版本

不同版本的 ØMQ 傳送資料可能會發生問題,以下是在 python 確認 ØMQ 版本資訊的方式,libzmq 的部分才是 ØMQ 的 library 的版本號碼。

import zmq

print("Current libzmq version is %s" % zmq.zmq_version())
print("Current  pyzmq version is %s" % zmq.__version__)

執行結果為

Current libzmq version is 4.2.5
Current  pyzmq version is 17.1.2

Programming in ØMQ

ØMQ 都是由取得 context 開始,然後 create socket,C語言是 zmq_ctx_new(),在 process 裡面只需要一個 context,因為 context 是單一 process 裡面所有 socket 的 container,並負責 inproc sockets 的傳輸工作,如果產生了兩個 context,就像是兩個獨立的 ØMQ instances。

在程式開始呼叫一次 zmq_ctx_new(),結束前呼叫一次 zmq_ctx_destroy()

如果有使用 fork() system call,每個 process 都需要自己的 context,如果在呼叫 fork 前先在 main process 呼叫 zmq_ctx_new(),每個 child process 都會取得自己的contexts,必須在 parent process 進行 context 管理。


python 有自動回收的功能,但 C 必須要自己回收 context,否則會產生 memory leak,如果讓 socket 維持 open,zmq_ctx_destory 會永遠停在那邊,等待 socket close。

使用 ØMQ 要注意的 objects 只有 messages, sockets, contexts

  • 盡可能使用 zmq_send(), zmq_recv(),可避免使用 zmq_msg_t objects

  • 如果使用 zmq_msg_recv(),必須要呼叫 zmq_msg_close() 用來 release the received message

  • 如果程式 opening/closing 很多 socket,可能需要 redesign application,這些 socket handles 會等到 context destory 才會被回收。

  • 離開 application 前,要先 close sockets,然後呼叫 zmq_ctx_destory()

如果是 python,會自動 destory object, socket, context,記得要放在 final block。

multithread 環境在下一章討論。

不要在多個 thread 裡面使用相同的 socket。

必須要設定 LINGER value (1 second) 然後 close socket

destroy context 會造成每個 thread 產生 blocking receives/polls/send,並會 return error,程式必須 catch error,設定 linger on, close socket 然後 exit。

不要呼叫兩次 destroy the same context,zmq_ctx_destory 會 block all sockets。

Why we needed ØMQ

application 需要處理訊息及 message queue,但常會自己利用 TCP/UDP 開發。

以下是一般 TCP 程式要注意的問題

  1. 如何處理 I/O?要在 application 或是背景處理 I/O?blocking I/O 的 architecture 並不 scalable,background I/O 很難設計

  2. 如何處理 dynamic components,如何區分 client/server 的 components

  3. 如何處理 message,如何設計容易 write/read 的資料格式,不會造成 buffer overflow

  4. 如何處理不能立即發送的訊息?尤其是在等待 component online 的時候,要丟棄訊息,放到資料庫或是放到 memory queue?

  5. 如何處理 message queue,當 reading 速度很慢,發生 queue 異常時要怎麼辦?

  6. 如何處理遺失的訊息?要等待 refresh,發送 resend或是建立 reliability layer,確保訊息一定會送達。

  7. 如果需要另一種 tranport layer 要怎麼辦?例如要用 multicast 取代 TCP,或是使用 IPv6

  8. 如何 route message,要重送相同的訊息給不同 peers,或是要發送 reply 給原本的 requester?

  9. 如何用不同程式語言撰寫 API?要重新實作 wire-level protocol,或要 repackage library?

  10. 如何在不同 architecture 讀取資料?如何設定 data type?

  11. 如何處理 network error

Hadoop Zookeeper 的 C API code 有 4200 lines 處理 client/server network communication protocol,且沒有文件,對其他開發者來說很難理解。

AMQP 試圖要用 "broker" 處理 addressing, routing, queueing,這會產生大量 client/server prototcol APIs。broker 對於大型網路來說,簡化了複雜度,但這會多了一個 black box,漸漸會讓 broker 變成另一個 bottleneck。需要另一個 team 去維護 broker,也需要 backup boxes。

ØMQ 的優勢

  1. 在 background thread 非同步處理 I/O,以 lock-free 方式跟 application thread 溝通。concurrent ØMQ 不需要 lock, semaphore, 或 wait states

  2. components 可隨時連線或斷線,ØMQ 會自動 reconnect,可用任意順序啟動 components

  3. 需要時會自動 queue message

  4. 有處理 over-full queue 的機制,當 queue full 時,ØMQ 會依照 pattern 決定要 block senders 或是丟棄 messages。

  5. 可使用 TCP, multicast, in-process, inter-process 的 transport layer

  6. 可處理 slow/blocked readers,依照不同 pattern 有不同處理方式

  7. 以 request-reply 或是 pub-sub 方式 route message

  8. 可用一個 function call,建立 proxies 用來 queue, forward, capture messages,proxies 可降低 network 互聯的複雜度

  9. 有保證訊息送達的機制

  10. 可以處理任意格式的訊息,可以是 msgpack, protocol buffers 或其他格式

  11. 可處理 network error

  12. 不需要太多運算資源

References

Learning ZeroMQ with pyzmq

新世紀通訊函式庫 – ZeroMQ

Learning and Using ØMQ http://zguide.zeromq.org

ØMQ - Get The Software

python zeromq安装

ØMQ - wiki

zeromq用來怎麼玩?

ZeroMQ 簡介

ZeroMQ Message Transport Protocol ZMTP 3.1

Easy cluster parallelization with ZeroMQ

沒有留言:

張貼留言