2019/10/28

PyZMQ

這是 PyZMQ 的說明文件,裡面有使用 PyZMQ 要注意的事情,有些功能,只有 PyZMQ 單獨增加的,不在 0MQ 裡面。

Working with libzmq DRAFT sockets

libzmq-4.2 提供了一些 unstable DRAFT APIs,另外還有 CLIENT-SERVER 與 RADIO-DISH patterns。

因為是 unstable,pyzmq 預設並不支援,如果 pyzmq 17 要支援 draft socket,可以自行編譯 pyzmq

首先要安裝有支援 draft socket 的 libzmq

export ZMQ_VERSION=4.2.2
export PREFIX=/usr/local

wget https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION}/zeromq-${ZMQ_VERSION}.tar.gz -O libzmq.tar.gz
tar -xzf libzmq.tar.gz
cd zeromq-${ZMQ_VERSION}
./configure --prefix=${PREFIX} --enable-drafts
make -j && make install

然後安裝有支援 draft 的 pyzmq

pip install -v --pre pyzmq \
  --install-option=--enable-drafts \
  --install-option=--zmq=${PREFIX}

--zmq=${PREFIX} 是 libzmq 的路徑,如果不是 /usr/local 就需要指定路徑。

More Than Just Binding

PyZMQ 除了單純提供 ZeroMQ Python 介面外,還有一些特殊的自訂方法。

The Core as Bindings

PyZMQ 分為四個 subpackages

zmq.core 是 bindings for ZeroMQ,每一個 Object 都是一個獨立的 module,例如:zmq.core.context 是 Context,zmq.core.poll 是 Poller,ZMQ constants 都放在 zmq.core.constants 裡面

將 core 分為 recompilation 及 derivative projects 有兩個原因

  1. PyZMQ 太龐大, recompile 需要花很多時間
  2. 第二個原因是 Cython (C-extension for Python),PyZMQ 是以 Cython 實作,會將 object 編譯為 pyx files,每一個都有 pxd header,其他 project 可直接以 Cython 撰寫 extensions,直接呼叫 C-level ZeroMQ APIs
Thread Safe

ZeroMQ 的 Context 是 threadsafe object,但 Socket 不是。故可在 multithread application 透過 zmq.Context.instance() 使用 Context,但必須在每個 thread 獨立產生 socket,跨 thread 使用 socket 會造成 c-level crash。

Socket Options as Attributes

0MQ 的 socket options 是以 set/getsockopt() method 存取。但在 PyZMQ 是以 attribute 的方式存取。

s = ctx.socket(zmq.DEALER)
s.identity = b'dealer'
s.hwm = 10
s.events
# 0
s.fd
# 16
Default Options on the Context

Context 的 attribute 會影響後續產生的 socket 的 default options,該 socket 不適用的 attribute 會直接略過。

ctx = zmq.Context()
ctx.linger = 0
rep = ctx.socket(zmq.REP)
req = ctx.socket(zmq.REQ)
Core Extensions

在 core 裡面有兩個功能,是 0MQ 沒有的

  • Builtin Serialization

jsonpickle 作為 Socket class 的 serialization method,socket 有 send_json()send_pyobj(),分別是以 json 及 pickle 序列化。收到時可以用 recv_json()recv_pyobj() 轉回物件。

unicode string 不能直接傳送,因此提供了 send_string()recv_string(),用來傳送 encode 為 utf-8 bytes 的字串。

  • MessageTracker

可用來追蹤 0MQ 的 message buffer,主要原因是 Python 0MQ 可處理 non-copying sends。因為 Python 的 buffer interface (例如 numpy array),可直接發送出去。

但在 asynchronous non-copying message system (ex: 0MQ or MPI),必須有方法可得知訊息是否已經被發送出去,這就是 MessageTracker 的用途。

MessageTracker 必須處理 threadsage communications (內建 Queue Object),產生 MessageTracker 需要 10 µs,追蹤很多 small messages 可能會稍微影響到效率。

traking is optional,透過 track flag 決定,該 flag 出現在 Frame constructor,non-copying send 與 receive。

MessageTracker 只有一個 method 一個 attribute,當 Frame 被追蹤到已經不被 0MQ 使用時,MessageTracker.done 會變成 True,另外 MessageTracker.wait() 會 block operation 直到 Frame 被 released。

Extensions
  1. zmq.log: Logging handlers for hooking Python logging up to the network
  2. zmq.devices: Custom devices and objects for running devices in the background
  3. zmq.eventloop: Tornado event loop 用在 0MQ sockets
  4. zmq.ssh: Simple tools for tunneling zeromq connections via ssh

在 PyZMQ 序列化 messages

  • Builtin Serialization

PyZMQ 有三種 serialization methods

主要兩種是 json 及 pickle,method 分別是 send_json() send_pyobj() 以及 recv_json()recv_pyobj()`

  • 自訂 Serialization

例如可使用 msgpack (Message Pack serialization library) 或是 protobuf (Google Protocol Buffers),並增加 zlib (Python stdlib module for zip compression) 壓縮,或是使用 blosc (A blocking, shuffling and loss-less (and crazy-fast) compression library)

自訂 Serialization 有兩種方式

  1. 寫一個以 socket 為參數的 function
  2. subclass Socket

例如使用 pickle 及 zlib

import zlib, cPickle as pickle

def send_zipped_pickle(socket, obj, flags=0, protocol=-1):
    """pickle an object, and zip the pickle before sending it"""
    p = pickle.dumps(obj, protocol)
    z = zlib.compress(p)
    return socket.send(z, flags=flags)

def recv_zipped_pickle(socket, flags=0, protocol=-1):
    """inverse of send_zipped_pickle"""
    z = socket.recv(flags)
    p = zlib.decompress(z)
    return pickle.loads(p)

PyZMQ 支援 numpy 的 non-copying buffer,以下為 non-copying send/recv of numpy arrays,包含用來重建 array 的 dtype/shape data

import numpy

def send_array(socket, A, flags=0, copy=True, track=False):
    """send a numpy array with metadata"""
    md = dict(
        dtype = str(A.dtype),
        shape = A.shape,
    )
    socket.send_json(md, flags|zmq.SNDMORE)
    return socket.send(A, flags, copy=copy, track=track)

def recv_array(socket, flags=0, copy=True, track=False):
    """recv a numpy array"""
    md = socket.recv_json(flags=flags)
    msg = socket.recv(flags=flags, copy=copy, track=track)
    buf = buffer(msg)
    A = numpy.frombuffer(buf, dtype=md['dtype'])
    return A.reshape(md['shape'])

Devices

0MQ 有 device 的概念,可用來管理 send-recv pattern,連結多個 sockets。為了可完整執行程式,devices 必須內建 while(True) loop,因此會 blocking 其他程式的執行。PyZMQ 提供可在背景執行的 devices,以及自訂 3-socket MonitoredQueue deivce

BackgroundDevices

通常會很少在 python 主程式中透過 device() 產生 zmq device,因為會 blocking 其他程式。因此需要在背景 threads/process 執行 devices。

PyZMQ 提供 ThreadDevice 以及 ProcessDevice。因為 thead safe 的關係,參數為 socket type,而不是 socket,透過 foo_in() proxy method 產生 socket。另外設定的 methods (bind/connect/setsockopt) 也都是 proxy methods,內部的 socket 會加上 in_out_ 的 prefix,為 in_socket out_socket

from zmq.devices import ProcessDevice

pd = ProcessDevice(zmq.QUEUE, zmq.ROUTER, zmq.DEALER)
pd.bind_in('tcp://*:12345')
pd.connect_out('tcp://127.0.0.1:12543')
pd.setsockopt_in(zmq.IDENTITY, 'ROUTER')
pd.setsockopt_out(zmq.IDENTITY, 'DEALER')
pd.start()
# it will now be running in a background process
MonitoredQueue

0MQ 的內建 device 為 QUEUE,這是 symmetric 2-socket device,支援任意 pattern。為了效能,monitored_queue() 以 Cython 實作。

QUEUE device 的缺點是,不支援 ROUTER 同時為 input output sockets,這是因為 ROUTER 接收訊息時,會加上 IDENTITY of the socket,然後發送訊息(用在 reply 的 routing)。這會造成 output socket 會將訊息送回 sender的問題,因此 ROUTER-ROUTER connection 必須要將前兩個部分的 message 交換順序。

from zmq.devices import monitored_queue
ins = ctx.socket(zmq.ROUTER)
outs = ctx.socket(zmq.DEALER)
mons = ctx.socket(zmq.PUB)
configure_sockets(ins,outs,mons)
monitored_queue(ins, outs, mons, in_prefix='in', out_prefix='out')

PUB socket 最適合用在 monitor socket,因為他不會接收訊息,in/out prefix 也符合 PUB/SUB topic subscription model。

ThreadMonitoredQueueProcessMonitoredQueue 可在背景運作,增加了 foo_mon() 設定 monitor socket

Eventloops

pyzmq 17 整合了 eventloops,且不需要設定。但因使用了 edge-triggered file descriptor,可能會發生問題。

AsyncIO

PyZMQ 15 增加 zmq.asyncio 支援 asyncio,包含了 Socket subclass 會回傳 `asyncio.Future 物件,可用在 asyncio coroutines。使用該 API 必須 import zmq.asyncio.Context,以該 context 產生的 socket,會由任何 blocking method 回傳 Futures。

import asyncio
import zmq
from zmq.asyncio import Context

ctx = Context.instance()

async def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')
    while True:
        msg = await s.recv_multipart()
        print('received', msg)
    s.close()

PyZMQ <17 必須要先註冊 zmq poller

import zmq.asyncio
zmq.asyncio.install()

ctx = zmq.asyncio.Context()
Tornado IOLoop

Tornado 可在 filedescriptors 及 native socket 進行 event polling,這邊使用了 ioloop,並將 IOStream 轉成 ZMQStream 處理 0MQ socket 的 event polling。ZMQStream object 運作類似 socket,但不是直接呼叫 recv(),而是註冊 on_recv() callback,另外也有 on_send() callback。

PyZMQ 15 增加了 zmq.eventloop.future,包含 Socket subclass,會回傳 Future object,用在 tornado coroutines。使用該 API 必須 import zmq.eventloop.future.Context,以該 context 產生的 socket,會由任何 blocking method 回傳 Futures。

from tornado import gen, ioloop
import zmq
from zmq.eventloop.future import Context

ctx = Context.instance()

@gen.coroutine
def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')
    while True:
        msg = yield s.recv_multipart()
        print('received', msg)
    s.close()

ZMQStream 可註冊接收訊息的 callback 處理 message

ZMQStream 有 send()send_multipart(),但不會直接發送訊息,會等到 IOLoop 有空時,才發送訊息。發送出去的訊息,會進行 on_send() callback。

ZMQStream.on_recv() 是 ZMQStream 的主要 method,註冊接收訊息的 callback,且只會是 multipart。

以下是 echo socket 的範例

s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
stream = ZMQStream(s)
def echo(msg):
    stream.send_multipart(msg)
stream.on_recv(echo)
ioloop.IOLoop.instance().start()

on_recv() 有一個 copy flag,如果為 False,俵是 on_recv 是透過 Frame object 而不是 bytes 處理 message。


ZMQStream.on_recv_stream() 類似 on_recv,但會註冊 message 及 stream,可用在多個 streams 的 single callback 狀況。

s1 = ctx.socket(zmq.REP)
s1.bind('tcp://localhost:12345')
stream1 = ZMQStream(s1)

s2 = ctx.socket(zmq.REP)
s2.bind('tcp://localhost:54321')
stream2 = ZMQStream(s2)

def echo(stream, msg):
    stream.send_multipart(msg)

stream1.on_recv_stream(echo)
stream2.on_recv_stream(echo)

ioloop.IOLoop.instance().start()

flush() 可將所有訊息都取出來,用以完成 priority event 處理。可指定只 flush recv evnets 或是 send events,也可設定 flush 的訊息數量。


PyZMQ <17 ,必須要先 ioloop.install() 才能使用 tornado ioloop

可在 tornado app使用 PyZMQ ioloop,可透過 ioloop.install() 通知 tornado 使用 zmq 的 poller

from zmq.eventloop import ioloop
ioloop.install()

也可以由 pyzmq 產生 global instance,這會要求 tornado’s tornado.ioloop.IOLoop 使用 zmq’s poller 並註冊目前的 instance

from zmq.eventloop.ioloop import IOLoop
loop = IOLoop.current()

install() 或是取得 instance 都必須在 global * instance 註冊前呼叫,否則會產生錯誤。


可不需要註冊 global instance,就使用 tornado,首先要通知 tornado 使用 zmq poller

from zmq.eventloop.ioloop import ZMQIOLoop

loop = ZMQIOLoop()

然後初始化 tornado 及 ZMQStream,必須傳入 io_loop 參數

from tornado.testing import AsyncTestCase
from zmq.eventloop.ioloop import ZMQIOLoop
from zmq.eventloop.zmqstream import ZMQStream

class TestZMQBridge(AsyncTestCase):

     # Use a ZMQ-compatible I/O loop so that we can use `ZMQStream`.
     def get_new_ioloop(self):
         return ZMQIOLoop()

也可以手動註冊 IOLoop 為 global tornado instance

from zmq.eventloop.ioloop import ZMQIOLoop
loop = ZMQIOLoop()
loop.install()
gevent

PyZMQ >= 2.2.0.1 ,以 zmq.green 提供了 gevent API。Socket.send/recv and zmq.Poller are gevent-aware

import zmq.green as zmq

PyZMQ ≥ 2.2.0.2,green.devicegreen.eventloop are gevent-aware

不能同時使用兩種 eventloops 例如 tornado + gevent

建議要使用 gevent >=1.0 版

Asynchronous Logging

Python 提供 logging module,透過 Handler 綁定 log 處理。可利用 PUBHandler class 將 log 透過 PUB socket 廣播出去。

PUB/SUB and Topics

SUB 只會接收到註冊的 topic 的訊息

sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, 'topic1')
sub.setsockopt(zmq.SUBSCRIBE, 'topic2')
PUBHandler

最基本的 logging,就直接產生 PUB socket

pub = context.socket(zmq.PUB)
pub.bind('tcp://*:12345')
handler = PUBHandler(pub)
logger = logging.getLogger()
logger.addHandler(handler)

再來可將 PUBHandler 增加 root_topic 以及 loglevel

handler.root_topic = 'myprogram'

基本的 topic 形式為 <handler.root_topic>.<loglevel> 例如 ‘myprogram.INFO`

>>> logger.info('hello there')
>>> print sub.recv_multipart()
['myprogram.INFO', 'hello there']
Subtopics

可在 log level 後面增加 topic tree,以 zmq.log.handlers.TOPIC_DELIM 作為分隔符號

>>> log_msg  = "hello there"
>>> subtopic = "sub.topic"
>>> msg = zmq.log.handlers.TOPIC_DELIM.join([subtopic, log_msg])
>>> logger.warn(msg)
>>> print sub.recv_multipart()
['myprogram.WARN.sub.topic', 'hello there']

Tunneling PyZMQ connection with SSH

IPython 可利用 SSH 將 0MQ connection 加密,為 zmq.ssh module。

PyZMQ 預設透過 pexpect 傳送 ssh command,也支援 paramiko for tunnels

SSH tunnel 包含五個部分

  1. server
  2. remote ip: server 看到的 remote machine ip
  3. remote port
  4. local ip: local machine 使用的網路介面
  5. local port

一但建立了 tunnel,就會將 localip:localport 連接到 remoteip:remoteport

通常會將 0MQ url 用在 remote machine,但以 ssh server 產生 tunnel

在相同 LAN 的 remote machine 使用以下指令

sock.connect("tcp://10.0.1.2:5555")

為了讓外部網路的機器,建立連線,必須要在 LAN 的某個 server

from zmq import ssh
ssh.tunnel_connection(sock, "tcp://10.0.1.2:5555", "server")

"server" 的部分,實際上要寫成 "user@server:port" (user@remote-server-ip:5555)

tunnel_connection() 會將任意的 localhost port 轉接到 real server,並以新的 local url 連接 socket

Python 2.5, 3

pyversion_compat.h

有許多 function 需要 C-buffers 跟 Python objects 之間的轉換,為了支援不同 python 版本,需要增加 utils/pyversion_compat.h 定義 missing symbols with macros。

Bytes and Strings

在 Python >=2.6,PyZMQ 程式要支援 Python 3 必須使用 b'message' 這樣的語法。

為了相容 python 2.6, 3,必須區分 bytes, unicode, basestring 的定義。因為 2.x, 3.x 的 str 是不同的,bytes 在 2.5 沒有定義,unicode, basestring 在 3.x 沒有定義。

Explicit Type 2.x 3.x
bytes str bytes
unicode unicode str
basestring basestring (str, bytes)

b'message' 的語法在 2.5 不能使用,故在 test suite 是改用 "message".encode()

Buffers

有兩種 buffer interface,可轉換 object 為 C-buffer,memoryview 是新的,可在 python 2.7 以後使用,buffer 是舊的,不能在 python 3 使用。


__str__

str 不是 platform indepenedent type,在 err.strerror()Message.__str__() 都需要回傳 native str objects,但通常是需要 bytes object,故要檢查 str type。如果是 unicode,就要decode

# ...
b = natural_result()
if str is unicode:
    return b.decode()
else:
    return b
Exceptions

舊的語法,在 python 3 不支援 try: s.send(msg) except zmq.ZMQError, e: handle(e)

新的語法

try:
    s.send(msg)
except zmq.ZMQError as e:
    handle(e)

Unicode

Python < 3,str 物件是 C string,有 endswith() split method,另外有 unicode 物件處理特殊編碼的字串。

Python 3 的 str 改變了,完整支援 unicode。C string 必須轉換為 bytes 物件。

為解決歧義,都是將 C array 編碼為 bytes,且將 unicode object 視為 strings


因為 PyZMQ 是 C library 的封裝,要使用 bytes,或是以 buffer interface 提供的物件 (ex: memoryview)

相關的 methods 有 socket.send/recv socket.get/setsockopt socket.bind/connect

因常需要發送 unicode string,故增加了 socket.<method>_string() wrappers,附帶 encoding 參數,預設為 utf-8。

socket.bind/connect 跟其他的不同,需要 setter,沒有 getter,故可強制在裡面將 unicode object 轉為 bytes。


methods:

  1. socket.bind
  2. socket.connect
  3. socket.send
  4. socket.recv
  5. socket.send_string
  6. socket.recv_string
  7. socket.setsockopt
  8. socket.getsockopt
  9. socket.setsockopt_string
  10. socket.getsockopt_string

References

PyZMQ docs

沒有留言:

張貼留言