2019/09/29

ZeroMQ 2 Sockets and Patterns

說明 ØMQ patterns: request-reply, pub-sub, pipeline,包含以下項目

  1. create and work with ØMQ sockets
  2. send/receive messages
  3. 如何使用非同步 I/O model 實作 application
  4. 如何在一個 thread 處理多個 sockets
  5. 如何處理 fatal and nonfatal errors
  6. 如何關閉 ØMQ application
  7. 如何 send/receive multipart messages
  8. 如何 forward message
  9. 如何建立一個簡單的 message queueing broker
  10. 如何實作 multithread application

Socket API

  1. creating and destroying sockets: zmq_socket(), zmq_close()
  2. socket 的 options 參數: zmq_setsockopt(), zmq_getsockopt()
  3. writing/receiving messages: zmq_send(), zmq_recv()

注意 socket 永遠是 void pointer,message 是 structure。

注意 ØMQ 是非同步的


要將兩個 node 連接起來,一個用 zmq_bind(),另一個用 zmq_connect(),通常呼叫 zmq_bind() 的是 "server",有固定的 network address,另一個是 "client"

ØMQ 跟舊的 TCP connection 的差異在

  1. 可使用 inproc, ipc, tcp, pgm, epgm 這些 transport layer
  2. 一個 socket 可有多個 outgoing 及 incoming connections
  3. 沒有 zmq_accept() method,當 socket 連接到一個 endpoint,會自動開始接受 connection
  4. network 連線在背景處理,當網路異常斷線後,ØMQ會自動 reconnect
  5. application 不能直接處理 network connecton

最常見的是 client/server model,對所有 clients 來說,server 要保持 visible,因此 server 要使用 zmq_bind(),而 client 要用 zmq_connect()。ØMQ 可接受先啟動 client 再啟動 server。也就是先啟動呼叫 zmq_connect() 的 node。

server node 可使用 socket bind to 很多種 protocol 的 endpoints。但不能 bind 到相同的 endpoint 兩次以上。不過 ipc transport 可以讓一個 process bind to 已經被某個 process 使用的 endpoint,也就是可在 crash 以後,recover socket。

zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "inproc://somename");

server 就是架構中的 static component,而 client 是 dynamic parts。

socket 有特殊的類型分別,遵循 messaging pattern 的限制,例如:不能將 publisher 連接到 subscriber socket。


使用 zmq_msg_send() zmq_msg_recv() 收送訊息。 ØMQ 的 I/O model 跟傳統的 TCP 不同,TCP socket 是 1-to-1 的。

  • ØMQ socket 傳遞訊息類似 UDP,是帶有長度的 binary data,不像是 TCP 是 stream of bytes。
  • ØMQ socket 會在背景 thread 處理 I/O,訊息會送到 local input queue,且由 local output queue 送出,不管 application 在處理什麼東西。
  • ØMQ socket 根據類型不同,會有內建 one-to-N routing 行為

zmq_send 並不會真的將訊息送到 socket connection,而是先放到 I/O thread 的 queue,並以非同步方式發送。


Unicast Transports

ØMQ 提供 inproc, ipc, tcp 這些 unicast teansports,還有 epgm, pgm 這些 multicast transports。

最常見的是使用 tcp: disconnected TCP transport。disconnected 是因為在 endpoints 存在前,就可以連接。

windows platform 不支援 ipc,通常會用 ".ipc" extension,在 UNIX 要注意權限,因為不同 user id 都要有存取權限。

inproc: inter-thread transport,是 connected signaling transport,比 tcp, ipc 快,在 client 執行 connect 以前,server 必須要先執行 bind。


ØMQ is not a neutral carrier

有時會有 "如何用 ØMQ 實作 http server" 的問題。但結論是不行,因為 ØMQ 並不是一般化的 socket library,ØMQ 的 framing 機制無法跟其他 protocol 相容。

ØMQ v3.3 以後提供了 ZMQ_ROUTER_RAW 選項,可讓我讀寫沒有 ØMQ framing 機制的 message,就可以利用這個方式,讀寫正常的 http request 與 response,但目前還是實驗性的功能。


I/O Threads

ØMQ 是在 background thread 處理 I/O,通常一個 thread 就夠了,當產生一個 context,就會啟動一個 I/O thread,每秒可處理約 1 GB 的資料,但也可以在產生 socket 前,呼叫 zmq_ctx_set() 增加 I/O thread 的數量。

int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);

以往已有案例,一個 socket 一次可以處理上千個 connections,傳統做法是一個 process/thread 處理一個 connection,該 process/thread 處理一個 socket,ØMQ 將整個架構壓縮到單一 process。

如果把 ØMQ 用在 inter-thread communication (multithreaded application),沒有 external socket I/O,可以將 I/O thread 設定為 0。

Messaging Patterns

ØMQ pattern 是由 pairs of sockets with matching type 實作的,在了解 pattern 前要先了解 socket type。

ØMQ 內建 pattern 為

  • Request-Reply: 連接 client 到 services,這是 remote procedure call 及 task distribution pattern

  • Pub-Sub: 連接一組 publishers 到 subscribers,這是 data distribution pattern

  • Pipeline: 用 fan-out/fan-in pattern 連接 nodes,可有多個 steps 及 loops,這是 parallel task distribution 與 collection pattern

  • Exclusive Pair: exclusivly 連接兩個 sockets,這是在連接一個 process 兩個 threads。

前三種在 chap1 已經說明了,現在要討論第四種。

以下是不同的 socket 組合的有效的 connect-bind pair,任意一端都可執行 bind

  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER (take care, REQ inserts an extra null frame)
  • DEALER and REP (take care, REP assumes a null frame)
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR

另外還有 XPUB, XSUB,這是 raw versions of PUB 及 SUB


以上是四種 ØMQ core patterns,以 C++ 實作。

根據這四個 pattern 還可組成 high-level messaging patterns,這些不在 core library 裡面,可在 ØMQ community 中找到,例如 Majordomo pattern: Reliable Request-Reply Pattern (chap4),可在 ØMQ 的 github repo 裡面找到。

Working with Messages

ØMQ 有兩組收送訊息的 API,有簡單的 one-liner 的zmq_send() 及 zmq_recv(),但是 zmq_recv() 並不能處理任意長度的 message size,如果超過 buffer 長度會 truncates messages,有另一組 API 會使用 zmq_msg_t structure

  • 訊息初始化: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data()
  • 收送訊息: zmq_msg_send(), zmq_msg_recv()
  • 釋放訊息: zmq_msg_close()
  • 存取訊息內容: zmq_msg_data(), zmq_msg_size(), zmq_msg_more()
  • 處理訊息屬性: zmq_msg_get(), zmq_msg_set()
  • 操作訊息: zmq_msg_copy(), zmq_msg_move()

ØMQ message 是 blobs of any size,可傳送任意一種資料格式,例如 protocol buffers, msgpack, JSON...

在記憶體中,使用 zmq_msg_t structure,在 C 語言的使用規則如下

  1. 產生 zmq_msg_t 物件,不是 blocks of data
  2. zmq_msg_init() 產生空白的訊息,並傳給 zmq_msg_recv()
  3. 使用 zmq_msg_init_size() 產生訊息,然後 allocate a block of data,利用 memcpy 複製資料,傳送訊息給 zmq_msg_send()
  4. 呼叫 zmq_msg_close() 釋放訊息,這會釋放 reference,最終會丟棄訊息
  5. 使用 zmq_msg_data() 存取訊息資料,使用 zmq_msg_size() 取得訊息長度
  6. 使用 zmq_msg_move(), zmq_msg_copy(), zmq_msg_init_data() 前要詳讀手冊
  7. zmq_msg_send() 傳送訊息後,ØMQ 會清除 message,也就是設定 size 為 0,不能傳送相同的訊息 2次,發送訊息後,就不能存取該訊息
  8. 這些規則不適用於 zmq_send(), zmq_recv(),那兩個 method 是傳送 byte arrays 而不是 message structure

如果想發送相同訊息2次,可用 zmq_msg_init() 並用 zmq_msg_copy() 複製,複製時只會複製 data reference。

ØMQ 支援 multipart messages,可在單一 message 傳送 list of frames,frames (也稱為 message parts) 是 ØMQ message 的 basic wire format,frame 是有指定長度的 block of data。

另外有 wire-level protocol 稱為 ZMTP,定義了 ØMQ 如何在 TCP connection 讀寫 frames。

一開始 ØMQ 只能送單一 frame,後來 API 增加了 "more" flag,可傳送 multipart messages。

  • message 可以是 one or more parts
  • message parts 稱為 "frames"
  • 每一個 frame 都是 zmq_msg_t object
  • 在 API 底層,收送訊息都是分開獨立的
  • higher-level APIs 提供發送整個 multipart messages 的 wrappers

另外要注意

  • 可發送長度為 0 的訊息,例如當作 signal
  • ØMQ 保證發送 all parts for a message,或是 none of them
  • ØMQ 不會立即發送訊息,但要確認記憶體可放入 multipart message
  • message (single or multipart) 必須放入 memory,如果要發送 file,應該要拆成多個 pieces,分別用 single-part message 傳送,因為 multipart data 會耗用較多 memory
  • 完成接收訊息後,要呼叫 zmq_msg_close(),某些 programming language api 並不會自動 destroy objects

先不要使用 zmq_msg_init_data(),這是 zero-copy method,可能會造成一些問題

Handling Multiple Sockets

目前 main loop 都是用以下程序

  1. wait for message on socket
  2. process message
  3. repeat

但如果要同時在多個 endpoints 讀取資料要怎麼辦?最簡單的方式,是將所有 endpoints 都連接到一個 socket,然後讓 ØMQ 處理 fan-in,這種方式適用於有相同 pattern 的 endpoints

如果要一次讀取多個 sockets,可使用 zmq_poll()

msreader.py,使用一個簡單的 loop 讀取多個 sockets,後面的 sleep 可能會造成一些 delay 的問題。

另外因為依序在兩個 socket 讀取資料,也就是用 fair read 的形式。

# encoding: utf-8
#
#   Reading from multiple sockets
#   This version uses a simple recv loop

import zmq
import time

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Process messages from both sockets
# We prioritize traffic from the task ventilator
while True:

    # Process any waiting tasks
    while True:
        try:
            msg = receiver.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        # process task

    # Process any waiting weather updates
    while True:
        try:
            msg = subscriber.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        # process weather update

    # No activity, so sleep for 1 msec
    time.sleep(0.001)

mspoller.py,這是比較正確,使用 zmq_poll() 讀取多個 socket 的程式

# encoding: utf-8
#
#   Reading from multiple sockets
#   This version uses zmq.Poller()

import zmq

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

# Process messages from both sockets
while True:
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv()
        # process task

    if subscriber in socks:
        message = subscriber.recv()
        # process weather update

items structure 有以下四個 members:

typedef struct {
    void *socket;       //  ZeroMQ socket to poll on
    int fd;             //  OR, native file handle to poll on
    short events;       //  Events to poll on
    short revents;      //  Events returned after poll
} zmq_pollitem_t;
Multipart Message

ØMQ 可用多個 frames 組成一個 message,也就是 multipart message。

現在要了解如何實作 proxy,讀寫 forward 資料,但不修改資料。

multipart messages 裡面每一個 part 都是 zmq_msg item,如果發送了 5 parts message,就必須產生、發送並銷毀 5 個 zmq_msg items。

發送 multipart message 的方式 (接收每一個 frame 到一個 message object)

zmq_msg_send (&message, socket, ZMQ_SNDMORE);
…
zmq_msg_send (&message, socket, ZMQ_SNDMORE);
…
zmq_msg_send (&message, socket, 0);

這是接收並處理單一訊息多個 parts 的方式

while (1) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, socket, 0);
    //  Process the message frame
    …
    zmq_msg_close (&message);
    if (!zmq_msg_more (&message))
        break;      //  Last message frame
}

處理 multiparts message 要注意

  1. 發送 multipart message 時,在發送最後一個 part 完成後,才會真的將訊息發送出去
  2. 如果使用 zmq_poll(),接收了訊息的第一個 part 後,其他部分也都會收到
  3. 接收 multipart 訊息,會收到全部,要不然就是全部都沒收到
  4. 每一個 part 都是獨立的 zmq_msg item
  5. 不管有沒有檢查 more property,都會收到所有 parts of a message
  6. ØMQ 會在收到所有 parts of a message 後,才會發送出去
  7. 除非 close socket,否則沒有 cancel 部分 parts 的方法
Intermediaries and Proxies

ØMQ 重點是 decentralized intelligence,這不代表不需要 middle nodes,ØMQ 可實作出 pipes 或是 service oriented brokers,messaging 領域將這種產品稱為 intermidiation。ØMQ 則稱為 proxies, queues, forwarders, device 或 brokers。

在現實世界,稱為 wholesalers, distributors, managers...

Dynamic Discovery Problem

在大型分散式架構中,最大的問題是 discovery,各 components 之間如何互相知道對方的位址,也就是 "dynamic discovery problem"

最簡單的解決方式就是,hard-coding (configuring) network architecture,問題在於增加新的 piece 需要 reconfigure network。

這是簡單的 Pub-Sub 架構,如果有 1 publisher,上百個 subscriber,需要在每一個 subscriber 都設定 publisher endpoint。subscriber 是動態增加的,publisher 是固定的,但如果要臨時增加 publisher,不容易調整架構。

另一種簡單的解決方案,就是增加一個固定的 network point,讓其他 node 連接,傳統 messaging 就是用這種做法,稱為 message broker。雖然 ØMQ 沒有內建 broker,但可以簡單完成一個 intermediary。

最好是將 intermediaries 視為簡單的 stateless message switches,類似 http proxy,增加 pub-sub proxy 可解決 dynamic discovery problem,proxy 會 open 1 XSUB socket,1 XPUB socket,並以 well-known IP addresses, ports 連接在一起,其他 processes 會連到這個 proxy 而不是互相連線。

Shared Queue (DEALER and ROUTER sockets)

基本的 client/server application如下,但通常會有多個 services 及 clients,這種方式的限制是 services 必須要是 stateless,request 的 state 可存放在 database。

如果要連接多個 clients 到多個 servers,可直接將每一個 client 連到多個 service endpoints。以下是將 client 連到 service A, B, C,發送四個 request,其中 service A 收到 R1, R4, service B 收到 R2, service C 收到 R3。

問題在於如果要臨時增加 3 個 services,就需要重新設定舊的 clients,增加 services,並 restart clients。

可改用以下 broker 解決這個問題。會使用 nonblocking zmq_poll() 監控兩個 sockets,然後在兩個 socket 之間傳遞訊息。

chap3 另外有 asynchronous request-reply flow 的方法

rrclient.py,連接到 broker,發送 10 個 requests,並等待 response

#
#   Request-reply client in Python
#   Connects REQ socket to tcp://localhost:5559
#   Sends "Hello" to server, expects "World" back
#
import zmq

#  Prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")

#  Do 10 requests, waiting each time for a response
for request in range(1,11):
    socket.send(b"Hello")
    message = socket.recv()
    print("Received reply %s [%s]" % (request, message))

rrbroker.py,可正確處理 multipart messages

# Simple request-reply broker
import zmq

# Prepare our context and sockets
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind("tcp://*:5559")
backend.bind("tcp://*:5560")

# Initialize poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

# Switch messages between sockets
while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)

    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

rrworker.py

#
#   Request-reply service in Python
#   Connects REP socket to tcp://localhost:5560
#   Expects "Hello" from client, replies with "World"
#
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:5560")

while True:
    message = socket.recv()
    print("Received request: %s" % message)
    socket.send(b"World")

以下是 Request-Reply Broker 的示意圖,只有中間的 broker 是固定的 node

ØMQ 內建的 proxy function

剛剛的 rrbroker 是很有用的元件,可用來建立 pub-sub forwarders,shared queues,ØMQ 包裝成一個 method zmq_proxy(frontend, backend, capture)

msgqueue.py,可直接替換掉 rrbroker.py

"""

   Simple message queuing broker
   Same as request-reply broker but using ``zmq.proxy``
"""

import zmq


def main():
    """ main method """

    context = zmq.Context()

    # Socket facing clients
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5559")

    # Socket facing services
    backend  = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5560")

    zmq.proxy(frontend, backend)

    # We never get here...
    frontend.close()
    backend.close()
    context.term()


if __name__ == "__main__":
    main()

實際上可以將 ROUTER/DEALER, XSUB/XPUB, PULL/PUSH 這三組 socket type 連接到 zmq.proxy()

Transport Bridging

ØMQ 常見問題是,如何將 ØMQ network 連接到某一種技術的網路中? 可能是 networking 或 messaging technology

解決方式是使用 bridge,bridge 就是在某個 socket 使用某一種 protocol,在另一個 socket 轉換到另一種 protocol,也就是 protocol interpreter

wuproxy.py,會使用 SUB 連接 weather sever,然後轉換以 PUB 發送到 external network

import zmq

context = zmq.Context()

# This is where the weather server sits
frontend = context.socket(zmq.SUB)
frontend.connect("tcp://192.168.55.210:5556")

# This is our public endpoint for subscribers
backend = context.socket(zmq.PUB)
backend.bind("tcp://10.1.1.0:8100")

# Subscribe on everything
frontend.setsockopt(zmq.SUBSCRIBE, b'')

# Shunt messages out to our own subscribers
while True:
    # Process all parts of the message
    message = frontend.recv_multipart()
    backend.send_multipart(message)

Handling Errors and ETERM

ØMQ 以 fail-fast 及 resilience 方式處理 error。當 process 發生 internal error,就容易受到外界的攻擊,因此如果發生 internal error,就會 self-destruct。

當 ØMQ 偵測到 external fault,會對 calling node 發出 error,某些少數特殊狀況下,會直接丟棄訊息,沒有從這種 error 復原的方法。

在 C 的程式裡,需要對每一個 ØMQ call 都進行 error handling。

  • create object 的 method 回傳 null 就是 failed
  • 處理資料的 method,回傳了已處理的 bytes 數量,如果是 -1 就是發生 error
  • 其他 method 如果回傳 0 就是成功,-1 就是 error
  • error code 是在 errno 或是 zmq_errno() 提供
  • 可用 zmq_strerror() 轉換為 error 文字說明
void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc == -1) {
    printf ("E: bind failed: %s\n", strerror (errno));
    return -1;
}

有兩種例外狀況,應該視為 nonfatal

  1. 收到訊息,夾帶 ZMQ_DONTWAIT option,沒有 waiting data,ØMQ 會 return -1 並設定 errno 為 EAGAIN
  2. 某個 thread 呼叫 zmq_ctx_destory(),其他thread 會停止運作,zmq_ctx_destory() 會把 context 關掉,並 block 所有 function call,exit with -1,errno 設定為 ETERM

以下是如何正確停止 process 的範例。

因為 PUSH/PULL socket 是單向的,要增加 PUB/SUB socket,用來發送 KILL signal 給 worker

  • sink 產生新的 PUB socket 到新的 endpoint
  • worker bind input socket 到 endpoint
  • 當 sink 偵測到工作完成,就發送 kill 到 PUB socket
  • 當 worker 偵測 kill message,就 exit

taskworker2.py

# encoding: utf-8
#
#   Task worker - design 2
#   Adds pub-sub flow to receive and respond to kill signal

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Socket for control input
controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, b"")

# Process messages from receiver and controller
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
# Process messages from both sockets
while True:
    socks = dict(poller.poll())

    if socks.get(receiver) == zmq.POLLIN:
        message = receiver.recv_string()

        # Process task
        workload = int(message)  # Workload in msecs

        # Do the work
        time.sleep(workload / 1000.0)

        # Send results to sink
        sender.send_string(message)

        # Simple progress indicator for the viewer
        sys.stdout.write(".")
        sys.stdout.flush()

    # Any waiting controller command acts as 'KILL'
    if socks.get(controller) == zmq.POLLIN:
        break

# Finished
receiver.close()
sender.close()
controller.close()
context.term()

tasksink2.py,收到 100 個 task 結果後,就發送 controller.send(b"KILL")

# encoding: utf-8
#
#   Task sink - design 2
#   Adds pub-sub flow to send kill signal to workers

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Socket for worker control
controller = context.socket(zmq.PUB)
controller.bind("tcp://*:5559")

# Wait for start of batch
receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmiations
for task_nbr in range(100):
    receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(":")
    else:
        sys.stdout.write(".")
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print("Total elapsed time: %d msec" % total_msec)

# Send kill signal to workers
controller.send(b"KILL")

# Finished
receiver.close()
controller.close()
context.term()

Handling Interrupt Signals

正式程式需要針對中斷信號 ex: Ctrl-C 或 SIGTERM 停止程式。預設狀況下,只會 kill process,這表示 messages 不會被 flushed,檔案不會正常關閉。

interrupt.py,這是處理 Ctrl-C 的方式

#
#   Shows how to handle Ctrl-C
#
import signal
import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5558")

# SIGINT will normally raise a KeyboardInterrupt, just like any other Python call
try:
    socket.recv()
except KeyboardInterrupt:
    print("W: interrupt received, stopping...")
finally:
    # clean up
    socket.close()
    context.term()

程式提供 s_catch_signals() method,會抓到 Ctrl-C(SIGINT) 及 SIGTERM 信號,並設定 global variable: s_interrupted

在程式一開始就要呼叫 s_catch_signals(),處理 signaling handling

s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
    char *message = s_recv (client);
    if (!message)
        break;          //  Ctrl-C used
}
zmq_close (client);

偵測 Memory Leak

如果是使用自動管理記憶體的程式語言,就不用處理這個問題。如果是使用 C/C++,就要管理記憶體。

首先要安裝 valgrind

sudo apt-get install valgrind

預設狀況下,ØMQ 會讓 valgrind 產生很多資料,可建立一個vg.supp 檔案,填入以下內容,移除 warnings

{
    <socketcall_sendto>
    Memcheck:Param
    socketcall.sendto(msg)
    fun:send
    ...
}
{
    <socketcall_sendto>
    Memcheck:Param
    socketcall.send(msg)
    fun:send
    ...
}

-DDEBUG 選項,編譯程式

利用 valgrind 執行程式

valgrind --tool=memcheck --leak-check=full --suppressions=vg.supp someprog

Multithreading with ØMQ

ØMQ 應該是寫 multithread 程式做好的工具,且不需要 mutexes, locks, 或其他 inter-thread communication 機制

concurrent programming 要注意: "don't share state"

利用 ØMQ 撰寫 multithread code 要注意的事項

  1. 分離 data,不要在多個 thread 之間分享資料,只能用 ØMQ context (threadsafe) 傳遞資料。
  2. 不要使用傳統的 concurrency mechanisms,例如: mutexes, criticl sections, semaphores...
  3. 在 process 啟動時,產生一個 ØMQ context,然後傳給其他需要使用 inproc socket 的 threads
  4. 利用 attached thread 產生 structure,並以 inproc 的 PAIR socket 連接到 parent threads,pattern 為 bind parent socket,然後產生連接該 socket 的 child thread
  5. 使用 attached threads 以他們自己的 contexts 模擬獨立 tasks,以 tcp 方式連線。然後再將程式移到 stand-alone processes。
  6. 只能用 ØMQ message 在 thread 之間傳遞資料
  7. 不要在 thread 之間分享 ØMQ socket,因為 ØMQ socket 不是 threadsafe

只能在產生 socket 的 thread 裡面使用或關閉 sockets

ØMQ 是使用 native OS threads,而不是 "green" threads,可利用 Intel 的 ThreadChecker 工具查看程式在做的工作。缺點是 native threading API 並不 portable

單一個 ØMQ thread 可以在單 CPU core 全速運作,不需要 wait。

mtserver.py

"""
   Multithreaded Hello World server
"""
import time
import threading
import zmq


def worker_routine(worker_url, context=None):
    """Worker routine"""
    context = context or zmq.Context.instance()
    # Socket to talk to dispatcher
    socket = context.socket(zmq.REP)

    socket.connect(worker_url)

    while True:

        string  = socket.recv()

        print("Received request: [ %s ]" % (string))

        # do some 'work'
        time.sleep(1)

        #send reply back to client
        socket.send(b"World")


def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Socket to talk to clients
    clients = context.socket(zmq.ROUTER)
    clients.bind(url_client)

    # Socket to talk to workers
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Launch pool of worker threads
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker,))
        thread.daemon = True
        thread.start()

    zmq.proxy(clients, workers)

    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()


if __name__ == "__main__":
    main()
  • server 啟動多個 worker threads,每一個 worker thread 都產生 REP socket,處理 request。worker thread 是單一 threaded 的 server,差異只是由 tcp 換成 inproc
  • server 產生 ROUTER socket,連接 clients (tcp)
  • server 產生 DEALER socket,連接 worker (inproc)
  • server 啟動 proxy,連接 ROUTER/DEALER sockets

create thread 的 function 在每一種 programming language 都不同,POSIX library 是 pthreads,但 windows 是不同的 API

Signaling Between Threads (PAIR Sockets)

如何在多個 thread 之間協作:使用 ØMQ messages

mtrelay.py

"""
   Multithreaded relay
"""

import threading
import zmq

def step1(context=None):
    """Step 1"""
    context = context or zmq.Context.instance()
    # Signal downstream to step 2
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step2")

    sender.send(b"")


def step2(context=None):
    """Step 2"""
    context = context or zmq.Context.instance()
    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step2")

    thread = threading.Thread(target=step1)
    thread.start()

    # Wait for signal
    msg = receiver.recv()

    # Signal downstream to step 3
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step3")
    sender.send(b"")


def main():
    """ server routine """
    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step3")

    thread = threading.Thread(target=step2)
    thread.start()

    # Wait for signal
    string = receiver.recv()

    print("Test successful!")

    receiver.close()
    context.term()


if __name__ == "__main__":
    main()
  • thread 透過 shared context 以 inproc socket 通訊
  • parent thread 產生 socket,連接到 inproc:// endpoint,將 context 傳給 child thread

使用 PAIR socket 的原因:

  1. 可使用 PUSH/PULL,但 PUSH 會將 message 傳給所有 receivers,如果不小心產生了兩個 receivers,可能會遺失一半的 signals,PAIR 的優點是,只能有一個 connection,it's "exclusive"
  2. 可使用 DEALER/ROUTER,但 ROUTER 會將訊息包裝起來,將size 為 0 的 signal 轉換為 multipart message,如果不在意,就可以使用 DEALER/ROUTER。另外 DEALER 跟 PUSH 一樣,會平均分配訊息給接收端
  3. 可使用 PUB/SUB,且 PUB 沒有 PUSH, DEALER 的問題。但需要以 empty subscription 設定 subscriber,用起來麻煩。

Node Coordination

當我們需要在網路多個 nodes 之間協作,PAIR socket 不適用,在 threads 跟 nodes 協作是不同的,因為 nodes 可能會隨時上線或斷線,PAIR sockets 在遠端 node 斷線後,不會自動 reconnect。

node與 thread 另一個差異是 thread 數量通常是固定的,但 nodes 數量會異動。

  • publisher 已經先知道會有幾個 subscribers
  • publisher 啟動並等待所有 subscribers 連線,subscriber 透過 REQ/REP 通知 publisher 已經上線
  • 當所有 subscribers 都連線後,publisher 就開始發送資料

synpub.py

#
#  Synchronized publisher
#
import zmq

#  We wait for 10 subscribers
SUBSCRIBERS_EXPECTED = 10

def main():
    context = zmq.Context()

    # Socket to talk to clients
    publisher = context.socket(zmq.PUB)
    # set SNDHWM, so we don't drop messages for slow subscribers
    publisher.sndhwm = 1100000
    publisher.bind('tcp://*:5561')

    # Socket to receive signals
    syncservice = context.socket(zmq.REP)
    syncservice.bind('tcp://*:5562')

    # Get synchronization from subscribers
    subscribers = 0
    while subscribers < SUBSCRIBERS_EXPECTED:
        # wait for synchronization request
        msg = syncservice.recv()
        # send synchronization reply
        syncservice.send(b'')
        subscribers += 1
        print("+1 subscriber (%i/%i)" % (subscribers, SUBSCRIBERS_EXPECTED))

    # Now broadcast exactly 1M updates followed by END
    for i in range(1000000):
        publisher.send(b'Rhubarb')

    publisher.send(b'END')

if __name__ == '__main__':
    main()

synpub.py

#
#  Synchronized subscriber
#
import time

import zmq

def main():
    context = zmq.Context()

    # First, connect our subscriber socket
    subscriber = context.socket(zmq.SUB)
    subscriber.connect('tcp://localhost:5561')
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')

    time.sleep(1)

    # Second, synchronize with publisher
    syncclient = context.socket(zmq.REQ)
    syncclient.connect('tcp://localhost:5562')

    # send a synchronization request
    syncclient.send(b'')

    # wait for synchronization reply
    syncclient.recv()

    # Third, get our updates and report how many we got
    nbr = 0
    while True:
        msg = subscriber.recv()
        if msg == b'END':
            break
        nbr += 1

    print ('Received %d updates' % nbr)

if __name__ == '__main__':
    main()

以 bash 啟動

echo "Starting subscribers..."

for ((a=0; a<10; a++)); do
        python syncsub.py &
done

echo "Starting publisher..."
python syncpub.py

Zero-Copy

ØMQ message API 可由 application buffer 直接傳遞訊息,不需要 copying data,這可改善效能。

先以 zmq_msg_init_data() 產生訊息,並設定 reference of data block (利用 malloc() 取得的資料區塊),傳遞給 zmq_msg_send()

void my_free (void *data, void *hint) {
    free (data);
}
//  Send message from buffer, which we allocate and ZeroMQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (&message, socket, 0);

發送 message 後不需要呼叫 zmq_msg_close(),libzmq 會自動呼叫

無法在 receive 端提供 Zero-Copy 功能。

multipart message 也可使用 zero-copy

Pub-Sub Message Envelopes

在 pub-sub pattern 可將 key/data 分到不同的 message frame 裡面,稱為 envelope

psenvpub.py

"""
   Pubsub envelope publisher
"""
import time
import zmq

def main():
    """main method"""

    # Prepare our context and publisher
    context   = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5563")

    while True:
        # Write two messages, each with an envelope and content
        publisher.send_multipart([b"A", b"We don't want to see this"])
        publisher.send_multipart([b"B", b"We would like to see this"])
        time.sleep(1)

    # We never get here but clean up anyhow
    publisher.close()
    context.term()


if __name__ == "__main__":
    main()

psenvsub.py

"""
   Pubsub envelope subscriber
"""
import zmq

def main():
    """ main method """

    # Prepare our context and publisher
    context    = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5563")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

    while True:
        # Read envelope with address
        [address, contents] = subscriber.recv_multipart()
        print("[%s] %s" % (address, contents))

    # We never get here but clean up anyhow
    subscriber.close()
    context.term()


if __name__ == "__main__":
    main()

如果有多個 publisher,需要知道 address,可以在發送時,增加 address 的資料。

High-Water Mark

如果在 process 之間發送很多訊息,但記憶體不足,或是一些delay 可能會造成 server crash。

想像 process A 快速發送很多訊息給 process B,但 B 因為 busy 處理速度變慢,有些訊息會停在 B 的 network buffer,有些在網路上,有些在 A 的 network buffer,其他的會累積在 A 的記憶體裡面,很快地會造成 A crash。

這在 message brokers 常常發生,B 造成這樣的問題。

一種解決方式是 flow control,通知 A 停止發送訊息。

但某些情境無法使用這種方法,例如會持續發生的 twitter feed。

另一種方式,是由 transport layer 提供機制,將訊息的 buffer 設定容量上限,而在 buffer 到達上限時,採用某種處理方案,例如 wait。

ØMQ 採用 HWM (high-water mark) 的概念,定義 internal pipes 的容量,每一個連出或連入的 socket 都有獨立的 pipe,設定發送或接收的 HWM,例如 PUB, SUB 只有 send buffer,而 SUB, PULL, REQ, REP 只有 receive buffer, 而 DEALER, ROUTER, PAIR 有 send 及 receive buffers。

在 ØMQ v2.x,HWM 預設為無限大。在 ØMQ v3.x 預設為 1000。

當 socket buffer 到達 HWM 容量,可以根據 socket type 決定 block 或是 drop data。PUB, ROUTER sockets 會 drop data,其他的 sockets 會 block data。

在 inproc 傳輸裡面,sender 與 receiver 會共用 buffers,因此實際上 HWM 是雙向設定的 HWM 的總和。

HWM 並不一定是正確值,預設是最多可使用 1000 messages,實際上會因為 libmq 實作 queue 的方式,造成 buffer size 比 1000 小。

Missing Message Problem Solver

在實作 ØMQ application 時,常會遇到遺失訊息的問題,以下是遺失訊息可能的發生原因:

  • SUB socket 必須要用 zmq_setsockopt() 設定 ZMQ_SUBSCRIBE subscription 條件,否則可能會收到到訊息,因為是以 prefix 方式做為條件,如果設定為 "" 會收到所有訊息
  • 在啟動 PUB socket 並發送 data 後,才啟動 SUB socket,可能會遺失前面的訊息,必須調整架構讓 SUB socket 先啟動,然後 PUB socket 再發送訊息。
  • 如果同步了 SUB, PUB socket,還是有可能會遺失訊息,這是因為 internal queue 會在連線完整建立後,才會產生出來,如果遇到這樣的問題,就將 bind/connect 的方向交換,讓 SUB socket binds,而 PUB socket connects
  • 如果使用了 REP, REQ sockets,但沒有遵循 send/recv/send/recv 的順序,ØMQ 會發生 error。就好像遺失訊息一樣,使用 REQ, REQ 必須遵循 sned/recv 的順序,並檢查 ØMQ calls 的 error code
  • 使用 PUSH socket,第一個 PULL socket 會取得較多訊息,因為只會在所有 PULL sockets 都連上的時候(需要幾 ms),才能公平分配訊息。對於 low data rate 的狀況,可以用 ROUTER/DEALER 以及 load balancing pattern 取代 PUSH/PULL
  • 如果透過 threads 分享 socket,可能會造成 crash
  • 如果使用 inproc,確保 sockets 都是使用相同的 context,否則 connecting side 會發生問題。另外要先 bind 再 connect,因為 inproc 不像是 tcp,不是 disconnected transport protocol。

  • 如果使用 ROUTER,意外狀況會發生遺失訊息的狀況,例如發送了 malformed identify frames,或是忘了發送 identity frame。通常要在 ROUTER 設定 ZMQ_ROUTER_MANDATORY option,且要在 send call 都要檢查 return code

References

ØMQ - The Guide

沒有留言:

張貼留言