這是 PyZMQ 的說明文件,裡面有使用 PyZMQ 要注意的事情,有些功能,只有 PyZMQ 單獨增加的,不在 0MQ 裡面。
Working with libzmq DRAFT sockets
libzmq-4.2 提供了一些 unstable DRAFT APIs,另外還有 CLIENT-SERVER 與 RADIO-DISH patterns。
因為是 unstable,pyzmq 預設並不支援,如果 pyzmq 17 要支援 draft socket,可以自行編譯 pyzmq
首先要安裝有支援 draft socket 的 libzmq
export ZMQ_VERSION=4.2.2
export PREFIX=/usr/local
wget https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION}/zeromq-${ZMQ_VERSION}.tar.gz -O libzmq.tar.gz
tar -xzf libzmq.tar.gz
cd zeromq-${ZMQ_VERSION}
./configure --prefix=${PREFIX} --enable-drafts
make -j && make install
然後安裝有支援 draft 的 pyzmq
pip install -v --pre pyzmq \
--install-option=--enable-drafts \
--install-option=--zmq=${PREFIX}
--zmq=${PREFIX}
是 libzmq 的路徑,如果不是 /usr/local
就需要指定路徑。
More Than Just Binding
PyZMQ 除了單純提供 ZeroMQ Python 介面外,還有一些特殊的自訂方法。
The Core as Bindings
PyZMQ 分為四個 subpackages
zmq.core
是 bindings for ZeroMQ,每一個 Object 都是一個獨立的 module,例如:zmq.core.context
是 Context,zmq.core.poll
是 Poller,ZMQ constants 都放在 zmq.core.constants
裡面
將 core 分為 recompilation 及 derivative projects 有兩個原因
- PyZMQ 太龐大, recompile 需要花很多時間
- 第二個原因是 Cython (C-extension for Python),PyZMQ 是以 Cython 實作,會將 object 編譯為
pyx
files,每一個都有pxd
header,其他 project 可直接以 Cython 撰寫 extensions,直接呼叫 C-level ZeroMQ APIs
Thread Safe
ZeroMQ 的 Context 是 threadsafe object,但 Socket 不是。故可在 multithread application 透過 zmq.Context.instance()
使用 Context,但必須在每個 thread 獨立產生 socket,跨 thread 使用 socket 會造成 c-level crash。
Socket Options as Attributes
0MQ 的 socket options 是以 set/getsockopt() method 存取。但在 PyZMQ 是以 attribute 的方式存取。
s = ctx.socket(zmq.DEALER)
s.identity = b'dealer'
s.hwm = 10
s.events
# 0
s.fd
# 16
Default Options on the Context
Context 的 attribute 會影響後續產生的 socket 的 default options,該 socket 不適用的 attribute 會直接略過。
ctx = zmq.Context()
ctx.linger = 0
rep = ctx.socket(zmq.REP)
req = ctx.socket(zmq.REQ)
Core Extensions
在 core 裡面有兩個功能,是 0MQ 沒有的
- Builtin Serialization
將 json
及 pickle
作為 Socket class 的 serialization method,socket 有 send_json()
及 send_pyobj()
,分別是以 json 及 pickle 序列化。收到時可以用 recv_json()
及 recv_pyobj()
轉回物件。
unicode string 不能直接傳送,因此提供了 send_string()
及 recv_string()
,用來傳送 encode 為 utf-8 bytes 的字串。
- MessageTracker
可用來追蹤 0MQ 的 message buffer,主要原因是 Python 0MQ 可處理 non-copying sends。因為 Python 的 buffer interface (例如 numpy array),可直接發送出去。
但在 asynchronous non-copying message system (ex: 0MQ or MPI),必須有方法可得知訊息是否已經被發送出去,這就是 MessageTracker 的用途。
MessageTracker 必須處理 threadsage communications (內建 Queue
Object),產生 MessageTracker 需要 10 µs,追蹤很多 small messages 可能會稍微影響到效率。
traking is optional,透過 track
flag 決定,該 flag 出現在 Frame constructor,non-copying send 與 receive。
MessageTracker 只有一個 method 一個 attribute,當 Frame 被追蹤到已經不被 0MQ 使用時,MessageTracker.done
會變成 True,另外 MessageTracker.wait()
會 block operation 直到 Frame 被 released。
Extensions
- zmq.log: Logging handlers for hooking Python logging up to the network
- zmq.devices: Custom devices and objects for running devices in the background
- zmq.eventloop: Tornado event loop 用在 0MQ sockets
- zmq.ssh: Simple tools for tunneling zeromq connections via ssh
在 PyZMQ 序列化 messages
- Builtin Serialization
PyZMQ 有三種 serialization methods
主要兩種是 json 及 pickle,method 分別是 send_json()
send_pyobj()
以及 recv_json()
recv_pyobj()
`
- 自訂 Serialization
例如可使用 msgpack (Message Pack serialization library) 或是 protobuf (Google Protocol Buffers),並增加 zlib (Python stdlib module for zip compression) 壓縮,或是使用 blosc (A blocking, shuffling and loss-less (and crazy-fast) compression library)
自訂 Serialization 有兩種方式
- 寫一個以 socket 為參數的 function
- subclass Socket
例如使用 pickle 及 zlib
import zlib, cPickle as pickle
def send_zipped_pickle(socket, obj, flags=0, protocol=-1):
"""pickle an object, and zip the pickle before sending it"""
p = pickle.dumps(obj, protocol)
z = zlib.compress(p)
return socket.send(z, flags=flags)
def recv_zipped_pickle(socket, flags=0, protocol=-1):
"""inverse of send_zipped_pickle"""
z = socket.recv(flags)
p = zlib.decompress(z)
return pickle.loads(p)
PyZMQ 支援 numpy 的 non-copying buffer,以下為 non-copying send/recv of numpy arrays,包含用來重建 array 的 dtype/shape data
import numpy
def send_array(socket, A, flags=0, copy=True, track=False):
"""send a numpy array with metadata"""
md = dict(
dtype = str(A.dtype),
shape = A.shape,
)
socket.send_json(md, flags|zmq.SNDMORE)
return socket.send(A, flags, copy=copy, track=track)
def recv_array(socket, flags=0, copy=True, track=False):
"""recv a numpy array"""
md = socket.recv_json(flags=flags)
msg = socket.recv(flags=flags, copy=copy, track=track)
buf = buffer(msg)
A = numpy.frombuffer(buf, dtype=md['dtype'])
return A.reshape(md['shape'])
Devices
0MQ 有 device 的概念,可用來管理 send-recv pattern,連結多個 sockets。為了可完整執行程式,devices 必須內建 while(True)
loop,因此會 blocking 其他程式的執行。PyZMQ 提供可在背景執行的 devices,以及自訂 3-socket MonitoredQueue deivce
BackgroundDevices
通常會很少在 python 主程式中透過 device() 產生 zmq device,因為會 blocking 其他程式。因此需要在背景 threads/process 執行 devices。
PyZMQ 提供 ThreadDevice
以及 ProcessDevice
。因為 thead safe 的關係,參數為 socket type,而不是 socket,透過 foo_in()
proxy method 產生 socket。另外設定的 methods (bind/connect/setsockopt) 也都是 proxy methods,內部的 socket 會加上 in_
及 out_
的 prefix,為 in_socket
out_socket
from zmq.devices import ProcessDevice
pd = ProcessDevice(zmq.QUEUE, zmq.ROUTER, zmq.DEALER)
pd.bind_in('tcp://*:12345')
pd.connect_out('tcp://127.0.0.1:12543')
pd.setsockopt_in(zmq.IDENTITY, 'ROUTER')
pd.setsockopt_out(zmq.IDENTITY, 'DEALER')
pd.start()
# it will now be running in a background process
MonitoredQueue
0MQ 的內建 device 為 QUEUE
,這是 symmetric 2-socket device,支援任意 pattern。為了效能,monitored_queue()
以 Cython 實作。
QUEUE
device 的缺點是,不支援 ROUTER 同時為 input output sockets,這是因為 ROUTER 接收訊息時,會加上 IDENTITY
of the socket,然後發送訊息(用在 reply 的 routing)。這會造成 output socket 會將訊息送回 sender的問題,因此 ROUTER-ROUTER connection 必須要將前兩個部分的 message 交換順序。
from zmq.devices import monitored_queue
ins = ctx.socket(zmq.ROUTER)
outs = ctx.socket(zmq.DEALER)
mons = ctx.socket(zmq.PUB)
configure_sockets(ins,outs,mons)
monitored_queue(ins, outs, mons, in_prefix='in', out_prefix='out')
PUB socket 最適合用在 monitor socket,因為他不會接收訊息,in/out prefix 也符合 PUB/SUB topic subscription model。
ThreadMonitoredQueue
及 ProcessMonitoredQueue
可在背景運作,增加了 foo_mon()
設定 monitor socket
Eventloops
pyzmq 17 整合了 eventloops,且不需要設定。但因使用了 edge-triggered file descriptor,可能會發生問題。
AsyncIO
PyZMQ 15 增加 zmq.asyncio
支援 asyncio,包含了 Socket subclass 會回傳 `asyncio.Future
物件,可用在 asyncio
coroutines。使用該 API 必須 import zmq.asyncio.Context
,以該 context 產生的 socket,會由任何 blocking method 回傳 Futures。
import asyncio
import zmq
from zmq.asyncio import Context
ctx = Context.instance()
async def recv():
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:5555')
s.subscribe(b'')
while True:
msg = await s.recv_multipart()
print('received', msg)
s.close()
PyZMQ <17 必須要先註冊 zmq poller
import zmq.asyncio
zmq.asyncio.install()
ctx = zmq.asyncio.Context()
Tornado IOLoop
Tornado 可在 filedescriptors 及 native socket 進行 event polling,這邊使用了 ioloop,並將 IOStream
轉成 ZMQStream
處理 0MQ socket 的 event polling。ZMQStream object 運作類似 socket,但不是直接呼叫 recv()
,而是註冊 on_recv()
callback,另外也有 on_send()
callback。
PyZMQ 15 增加了 zmq.eventloop.future
,包含 Socket subclass,會回傳 Future
object,用在 tornado coroutines。使用該 API 必須 import zmq.eventloop.future.Context
,以該 context 產生的 socket,會由任何 blocking method 回傳 Futures。
from tornado import gen, ioloop
import zmq
from zmq.eventloop.future import Context
ctx = Context.instance()
@gen.coroutine
def recv():
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:5555')
s.subscribe(b'')
while True:
msg = yield s.recv_multipart()
print('received', msg)
s.close()
ZMQStream 可註冊接收訊息的 callback 處理 message
ZMQStream 有 send()
及 send_multipart()
,但不會直接發送訊息,會等到 IOLoop 有空時,才發送訊息。發送出去的訊息,會進行 on_send()
callback。
ZMQStream.on_recv()
是 ZMQStream 的主要 method,註冊接收訊息的 callback,且只會是 multipart。
以下是 echo socket 的範例
s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
stream = ZMQStream(s)
def echo(msg):
stream.send_multipart(msg)
stream.on_recv(echo)
ioloop.IOLoop.instance().start()
on_recv()
有一個 copy flag,如果為 False,俵是 on_recv
是透過 Frame object 而不是 bytes 處理 message。
ZMQStream.on_recv_stream()
類似 on_recv
,但會註冊 message 及 stream,可用在多個 streams 的 single callback 狀況。
s1 = ctx.socket(zmq.REP)
s1.bind('tcp://localhost:12345')
stream1 = ZMQStream(s1)
s2 = ctx.socket(zmq.REP)
s2.bind('tcp://localhost:54321')
stream2 = ZMQStream(s2)
def echo(stream, msg):
stream.send_multipart(msg)
stream1.on_recv_stream(echo)
stream2.on_recv_stream(echo)
ioloop.IOLoop.instance().start()
flush() 可將所有訊息都取出來,用以完成 priority event 處理。可指定只 flush recv evnets 或是 send events,也可設定 flush 的訊息數量。
PyZMQ <17 ,必須要先 ioloop.install()
才能使用 tornado ioloop
可在 tornado app使用 PyZMQ ioloop,可透過 ioloop.install()
通知 tornado 使用 zmq 的 poller
from zmq.eventloop import ioloop
ioloop.install()
也可以由 pyzmq 產生 global instance,這會要求 tornado’s tornado.ioloop.IOLoop 使用 zmq’s poller 並註冊目前的 instance
from zmq.eventloop.ioloop import IOLoop
loop = IOLoop.current()
install()
或是取得 instance 都必須在 global * instance 註冊前呼叫,否則會產生錯誤。
可不需要註冊 global instance,就使用 tornado,首先要通知 tornado 使用 zmq poller
from zmq.eventloop.ioloop import ZMQIOLoop
loop = ZMQIOLoop()
然後初始化 tornado 及 ZMQStream,必須傳入 io_loop
參數
from tornado.testing import AsyncTestCase
from zmq.eventloop.ioloop import ZMQIOLoop
from zmq.eventloop.zmqstream import ZMQStream
class TestZMQBridge(AsyncTestCase):
# Use a ZMQ-compatible I/O loop so that we can use `ZMQStream`.
def get_new_ioloop(self):
return ZMQIOLoop()
也可以手動註冊 IOLoop 為 global tornado instance
from zmq.eventloop.ioloop import ZMQIOLoop
loop = ZMQIOLoop()
loop.install()
gevent
PyZMQ >= 2.2.0.1 ,以 zmq.green
提供了 gevent API。Socket.send/recv and zmq.Poller are gevent-aware
import zmq.green as zmq
PyZMQ ≥ 2.2.0.2,green.device
與 green.eventloop
are gevent-aware
不能同時使用兩種 eventloops 例如 tornado + gevent
建議要使用 gevent >=1.0 版
Asynchronous Logging
Python 提供 logging module,透過 Handler 綁定 log 處理。可利用 PUBHandler
class 將 log 透過 PUB socket 廣播出去。
PUB/SUB and Topics
SUB 只會接收到註冊的 topic 的訊息
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, 'topic1')
sub.setsockopt(zmq.SUBSCRIBE, 'topic2')
PUBHandler
最基本的 logging,就直接產生 PUB socket
pub = context.socket(zmq.PUB)
pub.bind('tcp://*:12345')
handler = PUBHandler(pub)
logger = logging.getLogger()
logger.addHandler(handler)
再來可將 PUBHandler 增加 root_topic
以及 loglevel
handler.root_topic = 'myprogram'
基本的 topic 形式為 <handler.root_topic>.<loglevel>
例如 ‘myprogram.INFO
`
>>> logger.info('hello there')
>>> print sub.recv_multipart()
['myprogram.INFO', 'hello there']
Subtopics
可在 log level 後面增加 topic tree,以 zmq.log.handlers.TOPIC_DELIM
作為分隔符號
>>> log_msg = "hello there"
>>> subtopic = "sub.topic"
>>> msg = zmq.log.handlers.TOPIC_DELIM.join([subtopic, log_msg])
>>> logger.warn(msg)
>>> print sub.recv_multipart()
['myprogram.WARN.sub.topic', 'hello there']
Tunneling PyZMQ connection with SSH
IPython 可利用 SSH 將 0MQ connection 加密,為 zmq.ssh
module。
PyZMQ 預設透過 pexpect 傳送 ssh command,也支援 paramiko for tunnels
SSH tunnel 包含五個部分
- server
- remote ip: server 看到的 remote machine ip
- remote port
- local ip: local machine 使用的網路介面
- local port
一但建立了 tunnel,就會將 localip:localport
連接到 remoteip:remoteport
通常會將 0MQ url 用在 remote machine,但以 ssh server 產生 tunnel
在相同 LAN 的 remote machine 使用以下指令
sock.connect("tcp://10.0.1.2:5555")
為了讓外部網路的機器,建立連線,必須要在 LAN 的某個 server
from zmq import ssh
ssh.tunnel_connection(sock, "tcp://10.0.1.2:5555", "server")
"server" 的部分,實際上要寫成 "user@server:port" (user@remote-server-ip:5555)
tunnel_connection()
會將任意的 localhost port 轉接到 real server,並以新的 local url 連接 socket
Python 2.5, 3
pyversion_compat.h
有許多 function 需要 C-buffers 跟 Python objects 之間的轉換,為了支援不同 python 版本,需要增加 utils/pyversion_compat.h
定義 missing symbols with macros。
Bytes and Strings
在 Python >=2.6,PyZMQ 程式要支援 Python 3 必須使用 b'message'
這樣的語法。
為了相容 python 2.6, 3,必須區分 bytes, unicode, basestring 的定義。因為 2.x, 3.x 的 str 是不同的,bytes 在 2.5 沒有定義,unicode, basestring 在 3.x 沒有定義。
Explicit Type | 2.x | 3.x |
---|---|---|
bytes | str | bytes |
unicode | unicode | str |
basestring | basestring | (str, bytes) |
因 b'message'
的語法在 2.5 不能使用,故在 test suite 是改用 "message".encode()
Buffers
有兩種 buffer interface,可轉換 object 為 C-buffer,memoryview
是新的,可在 python 2.7 以後使用,buffer
是舊的,不能在 python 3 使用。
__str__
str 不是 platform indepenedent type,在 err.strerror()
及 Message.__str__()
都需要回傳 native str objects,但通常是需要 bytes object,故要檢查 str type。如果是 unicode,就要decode
# ...
b = natural_result()
if str is unicode:
return b.decode()
else:
return b
Exceptions
舊的語法,在 python 3 不支援
try:
s.send(msg)
except zmq.ZMQError, e:
handle(e)
新的語法
try:
s.send(msg)
except zmq.ZMQError as e:
handle(e)
Unicode
Python < 3,str
物件是 C string,有 endswith()
split
method,另外有 unicode
物件處理特殊編碼的字串。
Python 3 的 str 改變了,完整支援 unicode。C string 必須轉換為 bytes 物件。
為解決歧義,都是將 C array 編碼為 bytes,且將 unicode object 視為 strings
因為 PyZMQ 是 C library 的封裝,要使用 bytes,或是以 buffer interface 提供的物件 (ex: memoryview)
相關的 methods 有 socket.send/recv
socket.get/setsockopt
socket.bind/connect
。
因常需要發送 unicode string,故增加了 socket.<method>_string()
wrappers,附帶 encoding 參數,預設為 utf-8。
socket.bind/connect
跟其他的不同,需要 setter,沒有 getter,故可強制在裡面將 unicode object 轉為 bytes。
methods:
- socket.bind
- socket.connect
- socket.send
- socket.recv
- socket.send_string
- socket.recv_string
- socket.setsockopt
- socket.getsockopt
- socket.setsockopt_string
- socket.getsockopt_string
沒有留言:
張貼留言