2019年10月28日

PyZMQ

這是 PyZMQ 的說明文件,裡面有使用 PyZMQ 要注意的事情,有些功能,只有 PyZMQ 單獨增加的,不在 0MQ 裡面。

Working with libzmq DRAFT sockets

libzmq-4.2 提供了一些 unstable DRAFT APIs,另外還有 CLIENT-SERVER 與 RADIO-DISH patterns。

因為是 unstable,pyzmq 預設並不支援,如果 pyzmq 17 要支援 draft socket,可以自行編譯 pyzmq

首先要安裝有支援 draft socket 的 libzmq

export ZMQ_VERSION=4.2.2
export PREFIX=/usr/local

wget https://github.com/zeromq/libzmq/releases/download/v${ZMQ_VERSION}/zeromq-${ZMQ_VERSION}.tar.gz -O libzmq.tar.gz
tar -xzf libzmq.tar.gz
cd zeromq-${ZMQ_VERSION}
./configure --prefix=${PREFIX} --enable-drafts
make -j && make install

然後安裝有支援 draft 的 pyzmq

pip install -v --pre pyzmq \
  --install-option=--enable-drafts \
  --install-option=--zmq=${PREFIX}

--zmq=${PREFIX} 是 libzmq 的路徑,如果不是 /usr/local 就需要指定路徑。

More Than Just Binding

PyZMQ 除了單純提供 ZeroMQ Python 介面外,還有一些特殊的自訂方法。

The Core as Bindings

PyZMQ 分為四個 subpackages

zmq.core 是 bindings for ZeroMQ,每一個 Object 都是一個獨立的 module,例如:zmq.core.context 是 Context,zmq.core.poll 是 Poller,ZMQ constants 都放在 zmq.core.constants 裡面

將 core 分為 recompilation 及 derivative projects 有兩個原因

  1. PyZMQ 太龐大, recompile 需要花很多時間
  2. 第二個原因是 Cython (C-extension for Python),PyZMQ 是以 Cython 實作,會將 object 編譯為 pyx files,每一個都有 pxd header,其他 project 可直接以 Cython 撰寫 extensions,直接呼叫 C-level ZeroMQ APIs
Thread Safe

ZeroMQ 的 Context 是 threadsafe object,但 Socket 不是。故可在 multithread application 透過 zmq.Context.instance() 使用 Context,但必須在每個 thread 獨立產生 socket,跨 thread 使用 socket 會造成 c-level crash。

Socket Options as Attributes

0MQ 的 socket options 是以 set/getsockopt() method 存取。但在 PyZMQ 是以 attribute 的方式存取。

s = ctx.socket(zmq.DEALER)
s.identity = b'dealer'
s.hwm = 10
s.events
# 0
s.fd
# 16
Default Options on the Context

Context 的 attribute 會影響後續產生的 socket 的 default options,該 socket 不適用的 attribute 會直接略過。

ctx = zmq.Context()
ctx.linger = 0
rep = ctx.socket(zmq.REP)
req = ctx.socket(zmq.REQ)
Core Extensions

在 core 裡面有兩個功能,是 0MQ 沒有的

  • Builtin Serialization

jsonpickle 作為 Socket class 的 serialization method,socket 有 send_json()send_pyobj(),分別是以 json 及 pickle 序列化。收到時可以用 recv_json()recv_pyobj() 轉回物件。

unicode string 不能直接傳送,因此提供了 send_string()recv_string(),用來傳送 encode 為 utf-8 bytes 的字串。

  • MessageTracker

可用來追蹤 0MQ 的 message buffer,主要原因是 Python 0MQ 可處理 non-copying sends。因為 Python 的 buffer interface (例如 numpy array),可直接發送出去。

但在 asynchronous non-copying message system (ex: 0MQ or MPI),必須有方法可得知訊息是否已經被發送出去,這就是 MessageTracker 的用途。

MessageTracker 必須處理 threadsage communications (內建 Queue Object),產生 MessageTracker 需要 10 µs,追蹤很多 small messages 可能會稍微影響到效率。

traking is optional,透過 track flag 決定,該 flag 出現在 Frame constructor,non-copying send 與 receive。

MessageTracker 只有一個 method 一個 attribute,當 Frame 被追蹤到已經不被 0MQ 使用時,MessageTracker.done 會變成 True,另外 MessageTracker.wait() 會 block operation 直到 Frame 被 released。

Extensions
  1. zmq.log: Logging handlers for hooking Python logging up to the network
  2. zmq.devices: Custom devices and objects for running devices in the background
  3. zmq.eventloop: Tornado event loop 用在 0MQ sockets
  4. zmq.ssh: Simple tools for tunneling zeromq connections via ssh

在 PyZMQ 序列化 messages

  • Builtin Serialization

PyZMQ 有三種 serialization methods

主要兩種是 json 及 pickle,method 分別是 send_json() send_pyobj() 以及 recv_json()recv_pyobj()`

  • 自訂 Serialization

例如可使用 msgpack (Message Pack serialization library) 或是 protobuf (Google Protocol Buffers),並增加 zlib (Python stdlib module for zip compression) 壓縮,或是使用 blosc (A blocking, shuffling and loss-less (and crazy-fast) compression library)

自訂 Serialization 有兩種方式

  1. 寫一個以 socket 為參數的 function
  2. subclass Socket

例如使用 pickle 及 zlib

import zlib, cPickle as pickle

def send_zipped_pickle(socket, obj, flags=0, protocol=-1):
    """pickle an object, and zip the pickle before sending it"""
    p = pickle.dumps(obj, protocol)
    z = zlib.compress(p)
    return socket.send(z, flags=flags)

def recv_zipped_pickle(socket, flags=0, protocol=-1):
    """inverse of send_zipped_pickle"""
    z = socket.recv(flags)
    p = zlib.decompress(z)
    return pickle.loads(p)

PyZMQ 支援 numpy 的 non-copying buffer,以下為 non-copying send/recv of numpy arrays,包含用來重建 array 的 dtype/shape data

import numpy

def send_array(socket, A, flags=0, copy=True, track=False):
    """send a numpy array with metadata"""
    md = dict(
        dtype = str(A.dtype),
        shape = A.shape,
    )
    socket.send_json(md, flags|zmq.SNDMORE)
    return socket.send(A, flags, copy=copy, track=track)

def recv_array(socket, flags=0, copy=True, track=False):
    """recv a numpy array"""
    md = socket.recv_json(flags=flags)
    msg = socket.recv(flags=flags, copy=copy, track=track)
    buf = buffer(msg)
    A = numpy.frombuffer(buf, dtype=md['dtype'])
    return A.reshape(md['shape'])

Devices

0MQ 有 device 的概念,可用來管理 send-recv pattern,連結多個 sockets。為了可完整執行程式,devices 必須內建 while(True) loop,因此會 blocking 其他程式的執行。PyZMQ 提供可在背景執行的 devices,以及自訂 3-socket MonitoredQueue deivce

BackgroundDevices

通常會很少在 python 主程式中透過 device() 產生 zmq device,因為會 blocking 其他程式。因此需要在背景 threads/process 執行 devices。

PyZMQ 提供 ThreadDevice 以及 ProcessDevice。因為 thead safe 的關係,參數為 socket type,而不是 socket,透過 foo_in() proxy method 產生 socket。另外設定的 methods (bind/connect/setsockopt) 也都是 proxy methods,內部的 socket 會加上 in_out_ 的 prefix,為 in_socket out_socket

from zmq.devices import ProcessDevice

pd = ProcessDevice(zmq.QUEUE, zmq.ROUTER, zmq.DEALER)
pd.bind_in('tcp://*:12345')
pd.connect_out('tcp://127.0.0.1:12543')
pd.setsockopt_in(zmq.IDENTITY, 'ROUTER')
pd.setsockopt_out(zmq.IDENTITY, 'DEALER')
pd.start()
# it will now be running in a background process
MonitoredQueue

0MQ 的內建 device 為 QUEUE,這是 symmetric 2-socket device,支援任意 pattern。為了效能,monitored_queue() 以 Cython 實作。

QUEUE device 的缺點是,不支援 ROUTER 同時為 input output sockets,這是因為 ROUTER 接收訊息時,會加上 IDENTITY of the socket,然後發送訊息(用在 reply 的 routing)。這會造成 output socket 會將訊息送回 sender的問題,因此 ROUTER-ROUTER connection 必須要將前兩個部分的 message 交換順序。

from zmq.devices import monitored_queue
ins = ctx.socket(zmq.ROUTER)
outs = ctx.socket(zmq.DEALER)
mons = ctx.socket(zmq.PUB)
configure_sockets(ins,outs,mons)
monitored_queue(ins, outs, mons, in_prefix='in', out_prefix='out')

PUB socket 最適合用在 monitor socket,因為他不會接收訊息,in/out prefix 也符合 PUB/SUB topic subscription model。

ThreadMonitoredQueueProcessMonitoredQueue 可在背景運作,增加了 foo_mon() 設定 monitor socket

Eventloops

pyzmq 17 整合了 eventloops,且不需要設定。但因使用了 edge-triggered file descriptor,可能會發生問題。

AsyncIO

PyZMQ 15 增加 zmq.asyncio 支援 asyncio,包含了 Socket subclass 會回傳 `asyncio.Future 物件,可用在 asyncio coroutines。使用該 API 必須 import zmq.asyncio.Context,以該 context 產生的 socket,會由任何 blocking method 回傳 Futures。

import asyncio
import zmq
from zmq.asyncio import Context

ctx = Context.instance()

async def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')
    while True:
        msg = await s.recv_multipart()
        print('received', msg)
    s.close()

PyZMQ <17 必須要先註冊 zmq poller

import zmq.asyncio
zmq.asyncio.install()

ctx = zmq.asyncio.Context()
Tornado IOLoop

Tornado 可在 filedescriptors 及 native socket 進行 event polling,這邊使用了 ioloop,並將 IOStream 轉成 ZMQStream 處理 0MQ socket 的 event polling。ZMQStream object 運作類似 socket,但不是直接呼叫 recv(),而是註冊 on_recv() callback,另外也有 on_send() callback。

PyZMQ 15 增加了 zmq.eventloop.future,包含 Socket subclass,會回傳 Future object,用在 tornado coroutines。使用該 API 必須 import zmq.eventloop.future.Context,以該 context 產生的 socket,會由任何 blocking method 回傳 Futures。

from tornado import gen, ioloop
import zmq
from zmq.eventloop.future import Context

ctx = Context.instance()

@gen.coroutine
def recv():
    s = ctx.socket(zmq.SUB)
    s.connect('tcp://127.0.0.1:5555')
    s.subscribe(b'')
    while True:
        msg = yield s.recv_multipart()
        print('received', msg)
    s.close()

ZMQStream 可註冊接收訊息的 callback 處理 message

ZMQStream 有 send()send_multipart(),但不會直接發送訊息,會等到 IOLoop 有空時,才發送訊息。發送出去的訊息,會進行 on_send() callback。

ZMQStream.on_recv() 是 ZMQStream 的主要 method,註冊接收訊息的 callback,且只會是 multipart。

以下是 echo socket 的範例

s = ctx.socket(zmq.REP)
s.bind('tcp://localhost:12345')
stream = ZMQStream(s)
def echo(msg):
    stream.send_multipart(msg)
stream.on_recv(echo)
ioloop.IOLoop.instance().start()

on_recv() 有一個 copy flag,如果為 False,俵是 on_recv 是透過 Frame object 而不是 bytes 處理 message。


ZMQStream.on_recv_stream() 類似 on_recv,但會註冊 message 及 stream,可用在多個 streams 的 single callback 狀況。

s1 = ctx.socket(zmq.REP)
s1.bind('tcp://localhost:12345')
stream1 = ZMQStream(s1)

s2 = ctx.socket(zmq.REP)
s2.bind('tcp://localhost:54321')
stream2 = ZMQStream(s2)

def echo(stream, msg):
    stream.send_multipart(msg)

stream1.on_recv_stream(echo)
stream2.on_recv_stream(echo)

ioloop.IOLoop.instance().start()

flush() 可將所有訊息都取出來,用以完成 priority event 處理。可指定只 flush recv evnets 或是 send events,也可設定 flush 的訊息數量。


PyZMQ <17 ,必須要先 ioloop.install() 才能使用 tornado ioloop

可在 tornado app使用 PyZMQ ioloop,可透過 ioloop.install() 通知 tornado 使用 zmq 的 poller

from zmq.eventloop import ioloop
ioloop.install()

也可以由 pyzmq 產生 global instance,這會要求 tornado’s tornado.ioloop.IOLoop 使用 zmq’s poller 並註冊目前的 instance

from zmq.eventloop.ioloop import IOLoop
loop = IOLoop.current()

install() 或是取得 instance 都必須在 global * instance 註冊前呼叫,否則會產生錯誤。


可不需要註冊 global instance,就使用 tornado,首先要通知 tornado 使用 zmq poller

from zmq.eventloop.ioloop import ZMQIOLoop

loop = ZMQIOLoop()

然後初始化 tornado 及 ZMQStream,必須傳入 io_loop 參數

from tornado.testing import AsyncTestCase
from zmq.eventloop.ioloop import ZMQIOLoop
from zmq.eventloop.zmqstream import ZMQStream

class TestZMQBridge(AsyncTestCase):

     # Use a ZMQ-compatible I/O loop so that we can use `ZMQStream`.
     def get_new_ioloop(self):
         return ZMQIOLoop()

也可以手動註冊 IOLoop 為 global tornado instance

from zmq.eventloop.ioloop import ZMQIOLoop
loop = ZMQIOLoop()
loop.install()
gevent

PyZMQ >= 2.2.0.1 ,以 zmq.green 提供了 gevent API。Socket.send/recv and zmq.Poller are gevent-aware

import zmq.green as zmq

PyZMQ ≥ 2.2.0.2,green.devicegreen.eventloop are gevent-aware

不能同時使用兩種 eventloops 例如 tornado + gevent

建議要使用 gevent >=1.0 版

Asynchronous Logging

Python 提供 logging module,透過 Handler 綁定 log 處理。可利用 PUBHandler class 將 log 透過 PUB socket 廣播出去。

PUB/SUB and Topics

SUB 只會接收到註冊的 topic 的訊息

sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, 'topic1')
sub.setsockopt(zmq.SUBSCRIBE, 'topic2')
PUBHandler

最基本的 logging,就直接產生 PUB socket

pub = context.socket(zmq.PUB)
pub.bind('tcp://*:12345')
handler = PUBHandler(pub)
logger = logging.getLogger()
logger.addHandler(handler)

再來可將 PUBHandler 增加 root_topic 以及 loglevel

handler.root_topic = 'myprogram'

基本的 topic 形式為 <handler.root_topic>.<loglevel> 例如 ‘myprogram.INFO`

>>> logger.info('hello there')
>>> print sub.recv_multipart()
['myprogram.INFO', 'hello there']
Subtopics

可在 log level 後面增加 topic tree,以 zmq.log.handlers.TOPIC_DELIM 作為分隔符號

>>> log_msg  = "hello there"
>>> subtopic = "sub.topic"
>>> msg = zmq.log.handlers.TOPIC_DELIM.join([subtopic, log_msg])
>>> logger.warn(msg)
>>> print sub.recv_multipart()
['myprogram.WARN.sub.topic', 'hello there']

Tunneling PyZMQ connection with SSH

IPython 可利用 SSH 將 0MQ connection 加密,為 zmq.ssh module。

PyZMQ 預設透過 pexpect 傳送 ssh command,也支援 paramiko for tunnels

SSH tunnel 包含五個部分

  1. server
  2. remote ip: server 看到的 remote machine ip
  3. remote port
  4. local ip: local machine 使用的網路介面
  5. local port

一但建立了 tunnel,就會將 localip:localport 連接到 remoteip:remoteport

通常會將 0MQ url 用在 remote machine,但以 ssh server 產生 tunnel

在相同 LAN 的 remote machine 使用以下指令

sock.connect("tcp://10.0.1.2:5555")

為了讓外部網路的機器,建立連線,必須要在 LAN 的某個 server

from zmq import ssh
ssh.tunnel_connection(sock, "tcp://10.0.1.2:5555", "server")

"server" 的部分,實際上要寫成 "user@server:port" (user@remote-server-ip:5555)

tunnel_connection() 會將任意的 localhost port 轉接到 real server,並以新的 local url 連接 socket

Python 2.5, 3

pyversion_compat.h

有許多 function 需要 C-buffers 跟 Python objects 之間的轉換,為了支援不同 python 版本,需要增加 utils/pyversion_compat.h 定義 missing symbols with macros。

Bytes and Strings

在 Python >=2.6,PyZMQ 程式要支援 Python 3 必須使用 b'message' 這樣的語法。

為了相容 python 2.6, 3,必須區分 bytes, unicode, basestring 的定義。因為 2.x, 3.x 的 str 是不同的,bytes 在 2.5 沒有定義,unicode, basestring 在 3.x 沒有定義。

Explicit Type 2.x 3.x
bytes str bytes
unicode unicode str
basestring basestring (str, bytes)

b'message' 的語法在 2.5 不能使用,故在 test suite 是改用 "message".encode()

Buffers

有兩種 buffer interface,可轉換 object 為 C-buffer,memoryview 是新的,可在 python 2.7 以後使用,buffer 是舊的,不能在 python 3 使用。


__str__

str 不是 platform indepenedent type,在 err.strerror()Message.__str__() 都需要回傳 native str objects,但通常是需要 bytes object,故要檢查 str type。如果是 unicode,就要decode

# ...
b = natural_result()
if str is unicode:
    return b.decode()
else:
    return b
Exceptions

舊的語法,在 python 3 不支援 try: s.send(msg) except zmq.ZMQError, e: handle(e)

新的語法

try:
    s.send(msg)
except zmq.ZMQError as e:
    handle(e)

Unicode

Python < 3,str 物件是 C string,有 endswith() split method,另外有 unicode 物件處理特殊編碼的字串。

Python 3 的 str 改變了,完整支援 unicode。C string 必須轉換為 bytes 物件。

為解決歧義,都是將 C array 編碼為 bytes,且將 unicode object 視為 strings


因為 PyZMQ 是 C library 的封裝,要使用 bytes,或是以 buffer interface 提供的物件 (ex: memoryview)

相關的 methods 有 socket.send/recv socket.get/setsockopt socket.bind/connect

因常需要發送 unicode string,故增加了 socket.<method>_string() wrappers,附帶 encoding 參數,預設為 utf-8。

socket.bind/connect 跟其他的不同,需要 setter,沒有 getter,故可強制在裡面將 unicode object 轉為 bytes。


methods:

  1. socket.bind
  2. socket.connect
  3. socket.send
  4. socket.recv
  5. socket.send_string
  6. socket.recv_string
  7. socket.setsockopt
  8. socket.getsockopt
  9. socket.setsockopt_string
  10. socket.getsockopt_string

References

PyZMQ docs

2019年10月21日

ZeroMQ 5 Advanced PubSub Patterns

擴充 core pub-sub pattern,增加 higher-level patterns for performance, reliability, state distribution, and monitoring。

  • 什麼時候要使用 publish-subscribe
  • 如何處理 too-slow subscribers (Suicidal Snail pattern)
  • 如何設計 high-speed subscribers (Black Box pattern)
  • 如何監控 pub-sub network (Espresso pattern)
  • 如何製作 shared key-value store (Clone pattern)
  • 如何使用 reactors 用以簡化 servers
  • 如何使用 Binary Star Pattern,增加 server failover 功能

Pros and Cons of PubSub

pub-sub 強調 multicast/group messaging 的功能。

PUB 可發送訊息給 "all of many",而 PUSH 與 DEALER 是 "one of many",無法直接用 PUB 替換 PUSH。

pub-sub 強調 scalability,大量資料可快速轉發給多個接收端。pub-sub 使用跟 push-pull 相同的方法: get rid of back-chatter,接收端不會跟 sender 溝通,唯一的例外是 SUB 會發送subscriptions 到 PUB sockets,但這部分是 anonymous and infrequent。

去掉 back-chatter 對 scalability 很重要,subscriber 不會直接跟 publisher 連線,而是連到 multicast group。雖然移除 back-chatter 讓 message flow 變得簡單,但也無法讓 sender/receiver 互相協作。

  • publisher 無法知道 subscribers 什麼時候連線成功(包含 initial connetion 及 reconnection)
  • subscibers 無法通知 publisher,控制發送訊息的速度,publisher 只能以全速發送
  • publisher 無法知道 subscribers 是不是突然斷線了

如果需要有 reliable multicast 要解決 pub-sub pattern 隨時可能遺失訊息的問題。但如果只需要 almost reliable multicast,是可以使用 pub-sub。如果需要用到 back-chatter,可以改用 ROUTER-DEALER,或是增加另一個 channel 用在 synchronization。

pub-sub 就像是 radio broadcast,可能會在 join 前遺失一些訊息,接收訊息的狀況,跟 network quality 有關。但這個 model 就跟真實世界的 information distribution 一樣。

以下是 classic failure cases for pub-sub:

  • subscribers join late,遺失了 server 已經發送的訊息
  • subscribers 接收訊息速度太慢,造成 queue overflow
  • subscribers 在離開時,會丟棄並遺失訊息
  • subscribers crash/restart,會遺失已經收到的訊息
  • network overloaded and drop data
  • network 太慢,publisher-side queues overflow,而 publisher crash

ZeroMQ v3.x 以後限制了 internal buffer size (high water mark, HWM)

Espresso pattern: PubSub Tracing

這是 trace pub-sub 的方法。使用 chap2 zmq_proxy() 可以做 transport bridging,有三個參數: frontend, bakcend socket,及 capture socket

espresso 會生 listener thread,該 thread 會讀取 PAIR socket,並列印取得的 message,PAIR socket 是 pipe 的一端,另一端是傳給 zmq_proxy() 的 socket,實際上,可以過濾 messages,並取得有興趣的訊息。

espresso.py

# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#

import time

from random import randint
from string import ascii_uppercase as uppercase
from threading import Thread

import zmq
from zmq.devices import monitored_queue

from zhelpers import zpipe

# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.

def subscriber_thread():
    ctx = zmq.Context.instance()

    # Subscribe to "A" and "B"
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:6001")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

    count = 0
    while True:
        try:
            [msg] = subscriber.recv_multipart()
            print ("  subscriber: {}".format(msg))
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        count += 1

    print ("Subscriber received %d messages" % count)


# publisher thread
# The publisher sends random messages starting with A-J:

def publisher_thread():
    ctx = zmq.Context.instance()

    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:6000")

    while True:
        cateint = uppercase[randint(0,10)]
        cate = "%s" % (cateint)
        string = "%s-%05d" % (cateint, randint(0,100000))
        try:
            print ("  publisher: {}".format(string.encode('utf-8')))
            publisher.send(string.encode('utf-8'))
            # publisher.send_multipart( [cate.encode(), string.encode()] )
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        time.sleep(0.1)         # Wait for 1/10th second


# listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:

def listener_thread (pipe):

    # Print everything that arrives on pipe
    while True:
        try:
            print (pipe.recv_multipart())
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted


# main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:

def main ():

    # Start child threads
    ctx = zmq.Context.instance()
    p_thread = Thread(target=publisher_thread)
    s_thread = Thread(target=subscriber_thread)
    p_thread.start()
    s_thread.start()

    pipe = zpipe(ctx)

    subscriber = ctx.socket(zmq.XSUB)
    subscriber.connect("tcp://localhost:6000")

    publisher = ctx.socket(zmq.XPUB)
    publisher.bind("tcp://*:6001")

    l_thread = Thread(target=listener_thread, args=(pipe[1],))
    l_thread.start()

    try:
        monitored_queue(subscriber, publisher, pipe[0], b'pub', b'sub')
    except KeyboardInterrupt:
        print ("Interrupted")

    del subscriber, publisher, pipe
    ctx.term()

if __name__ == '__main__':
    main()

執行結果

$ python espresso.py
  publisher: b'H-15364'
[b'sub', b'\x01A']
[b'sub', b'\x01B']
  publisher: b'D-22262'
  publisher: b'I-36691'
  publisher: b'D-61882'
  publisher: b'B-22663'
[b'pub', b'B-22663']
  subscriber: b'B-22663'
  publisher: b'G-79149'

Last Value Caching

ZeroMQ 的 pub-sub 少了一些 pub-sub 系統的功能,其中一個是 LVC: last value caching,這可解決新的 subscriber 加入時,取得資料的問題。publisher 會在新的 subscriber 加入時,重新廣播一次該 subcriber 註冊的 topic 的最新訊息。

在大型 pub-sub 網路,需要類似 PGM(Pragmatic General Multicast) 的protocol,如果想要用 TCP unicast 處理上千個 subscribers,會遇到問題。

PGM 是單向 protocol: publisher 發送訊息到 switch 的 multicast address,廣播到所有需要的 subscribers。publisher 不會知道 subscribers join/leave,這都在 network switch 上處理。

在幾十個 subscribers,有限 topics 的網路環境,可使用 TCP 及 XSUB, XPUB。

也可以用 ZeroMQ 實作 LVC 功能,就是在 publisher, subscribers 之間增加一個 proxy,類似 PGM 的 switch。

有個產生問題的例子,publisher 會對上千個 topics 發送訊息,每秒對 random topic 發送一個,在 subscriber 連線後,因為沒有 LVC,subscriber 平均要等 500s 才有第一個訊息。

pathopub.py: pathologic publisher

#
# Pathological publisher
# Sends out 1,000 topics and then one random update per second
#

import sys
import time

from random import randint

import zmq

def main(url=None):
    ctx = zmq.Context.instance()
    publisher = ctx.socket(zmq.PUB)
    if url:
        publisher.bind(url)
    else:
        publisher.bind("tcp://*:5556")
    # Ensure subscriber connection has time to complete
    print("bind to {}".format(url))
    time.sleep(1)

    # Send out all 1,000 topic messages
    for topic_nbr in range(1000):
        publisher.send_multipart([
            b"%03d" % topic_nbr,
            b"Save Roger",
        ])

    while True:
        # Send one random update per second
        try:
            time.sleep(1)
            publisher.send_multipart([
                b"%03d" % randint(0,999),
                b"Off with his head!",
            ])
        except KeyboardInterrupt:
            print( "interrupted" )
            break

if __name__ == '__main__':
    main(sys.argv[1] if len(sys.argv) > 1 else None)

pathosub.py

#
# Pathological subscriber
# Subscribes to one random topic and prints received messages
#

import sys
import time

from random import randint

import zmq

def main(url=None):
    ctx = zmq.Context.instance()
    subscriber = ctx.socket(zmq.SUB)
    if url is None:
        url = "tcp://localhost:5556"

    print("connect to {}".format(url))
    subscriber.connect(url)

    subscription = b"%03d" % randint(0,999)
    print("subscription to {}".format(subscription))
    subscriber.setsockopt(zmq.SUBSCRIBE, subscription)

    while True:
        topic, data = subscriber.recv_multipart()
        assert topic == subscription
        print( data )

if __name__ == '__main__':
    main(sys.argv[1] if len(sys.argv) > 1 else None)

執行結果

$ python pathopub.py

$ python pathosub.py
b'Save Roger'
b'Off with his head!'

在 pathopub 與 pathosub 中間加上 lvache

lvcache.py: Last Value Caching Proxy

#
# Last value cache
# Uses XPUB subscription messages to re-send data
#

import zmq

def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.connect("tcp://localhost:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # main poll loop
    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            # print("cache topic: {} msg: {}".format(topic, current))
            cache[topic] = current
            backend.send_multipart(msg)

        # handle subscriptions
        # When we get a new subscription we pull data from the cache:
        if backend in events:
            event = backend.recv()
            # event=b'\x01188'
            # print( "event={}".format(event) )
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if( event[0:1] == b'\x01' ):
                topic = event[1:]
                # print ("topic={}".format(topic))
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])

if __name__ == '__main__':
    main()

執行結果

$ python lvcache.py
event=b'\x01'
topic=b'708'
Sending cached topic b'708'

$ python pathopub.py tcp://*:5557
bind to tcp://*:5557

$ python pathosub.py tcp://localhost:5558
connect to tcp://localhost:5558
subscription to b'708'
b'Save Roger'

Suicidal Snail pattern: Show Subscriber Detection

pub-sub 系統常見問題: slow subscriber。當 publisher 以全速發送給 subscribers,subscribers 無法快速處理資料。

處理方法是

  • Queue messages on the publisher

    但如果有很多 subscribers,會讓 publisher 記憶體不足而 crash

  • Queue messages on the subscriber

    這個方法比較好,就算 subscriber 記憶體不足而 crash 也沒關係,總比 publisher crash 還好。

  • Stop queuing new messages after a while

    新訊息會被 rejected or dropped,如果 ZeroMQ 在 publisher 設定了 HWM,就會這樣處理,但還是無法解決 slow subscriber 的問題。這會讓 message stream 發生 gap。

  • Punish slow subscribers with disconnect

    ZeroMQ 無法這樣處理,因為 subcribers are invisible to publishers

如果不讓 publisher 斷線,應該讓 subscriber kill itself。這就是 Suicide Snail Pattern

當 subscriber 偵測到自己運作太慢,就會將自己斷線。

偵測的方法是,利用 sequences message 以及在 publisher 設定 HWM。如果 subscriber 偵測到 gap,就知道發生問題了。

這樣做有兩個問題:(1) 如果有多個 publishers, 要如何設定 sequence messages?解決方式是將每一個 publisher 都設定 unique ID,加到 sequencing 上面。(2) 如果 subscribers 使用了 ZMQ_SUBSCRIBER filters,欲設定會取得 gap。

Suicide Snail Pattern 用在 subscribers 有個各自的 clinets 及 service-level agreements,且需要保證有 maximum latency。雖然將 client 斷線,並不是保證 maxmimum latency 的好方法,但這是 assertion model,如果 client 斷線,可解決問題。讓 late data 持續運作,可能會造成更嚴重的問題。

suisnail.py

"""
Suicidal Snail
"""
from __future__ import print_function
import sys
import threading
import time
from pickle import dumps, loads
import random

import zmq

from zhelpers import zpipe

# ---------------------------------------------------------------------
# This is our subscriber
# It connects to the publisher and subscribes to everything. It
# sleeps for a short time between messages to simulate doing too
# much work. If a message is more than 1 second late, it croaks.

MAX_ALLOWED_DELAY = 1.0    # secs

def subscriber(pipe):
    # Subscribe to everything
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.connect("tcp://localhost:5556")

    # Get and process messages
    while True:
        clock = loads(sub.recv())
        # Suicide snail logic
        if (time.time() - clock > MAX_ALLOWED_DELAY):
            print("E: subscriber cannot keep up, aborting", file=sys.stderr)
            break

        # Work for 1 msec plus some random additional time
        time.sleep(1e-3 * (1+2*random.random()))
    pipe.send(b"gone and died")


# ---------------------------------------------------------------------
# This is our server task
# It publishes a time-stamped message to its pub socket every 1ms.

def publisher(pipe):
    # Prepare publisher
    ctx = zmq.Context.instance()
    pub = ctx.socket(zmq.PUB)
    pub.bind("tcp://*:5556")

    while True:
        # Send current clock (secs) to subscribers
        pub.send(dumps(time.time()))
        try:
            signal = pipe.recv(zmq.DONTWAIT)
        except zmq.ZMQError as e:
            if e.errno == zmq.EAGAIN:
                # nothing to recv
                pass
            else:
                raise
        else:
            # received break message
            break
        time.sleep(1e-3)            # 1msec wait


# This main thread simply starts a client, and a server, and then
# waits for the client to signal it's died.

def main():
    ctx = zmq.Context.instance()
    pub_pipe, pub_peer = zpipe(ctx)
    sub_pipe, sub_peer = zpipe(ctx)

    pub_thread = threading.Thread(target=publisher, args=(pub_peer,))
    pub_thread.daemon=True
    pub_thread.start()
    sub_thread = threading.Thread(target=subscriber, args=(sub_peer,))
    sub_thread.daemon=True
    sub_thread.start()
    # wait for sub to finish
    sub_pipe.recv()
    # tell pub to halt
    pub_pipe.send(b"break")
    time.sleep(0.1)

if __name__ == '__main__':
    main()

notes:

  • message 包含了 system clock as a number of ms。實際上必須至少有一個 timestamp message header 及 data message body
  • subscriber 及 publisher 放在單一 process,兩個 threads。實際上,應該分屬兩個 processes,因為是 demo 方便才寫成 thread

Black Box pattern: HighSpeed Subscribers

尋找一個讓 subscribers 更快的方法。最常見的 pub-sub 使用範例是發布 market data from stock exchanges,通常會設定一個 publisher 連接到 stock exchange,取得 prices quotes,發送給多個 subscribers,少量 subscribers 可使用 TCP,大量 subscribers 可使用 reliable multicast 例如 PGM。

先想像有 feed 平均每秒產生 100,000 100 bytes messages,這是過濾掉不需要發送給 subscriber 的資料速度,現在我們需要記錄所有資料,8 hrs 大約是 250GB,並在模擬環境中,重新 replay。ZeroMQ application 可以每秒處理 100K messages,但我們希望用更快的速度處理他們。

我們設計一個架構,有些速度很快的機器,一個給 publisher 其他每一個 subscriber 使用一個 box。

當我們將資料傳給 subscribers 要注意:

  1. 如果對 message 處理一些工作,都會降低 subscriber 的速度,直到他無法跟上 publisher 發送的速度。
  2. 在 publisher 及 subscribers 要達到最快速度,大約是每秒 6M messages

首先要將 subscriber 拆到 multithreaded design,以便以多個 thread 運作,reading message 放在另一個 thread。通常不會對每個訊息都用相同的方法處理,subscriber 會以 prefix key 過濾訊息,如果 message 符合條件,subscriber 就會呼叫 worker 處理它,就是在 ZeroMQ 發送訊息給 worker thread

subscriber 就像是 queue device,可用多個 sockets 連接 subscriber 及 workers,先假設是 one-way traffic 且 worker 都是一樣的,可使用 PUSH, PULL 方式實作。

subscriber 以 TCP/PGM 跟 publisher 溝通,subscriber 以 inproc:// 跟 workers 溝通。

在每一個 subscriber thread 使用 100% CPU 效能後,因為是一個 thread,無法用到多個 CPU core,我們可以將工作平行分配給多個 threads。

這個方法稱為 sharding。可將工作分為 parallel and independent streams,例如一半 topic key 在一個 stream,另一半在另一個 stream,但 performance 只會因為有多餘的 CPU core 而增加。

sharding 為 2 streams 的方法:

  • 使用 2 I/O threads
  • 使用 2 network interfaces (NIC), one per subscriber
  • 每個 I/O thread 使用特定的 NIC
  • 2 subscriber thread 各自使用專屬的 cores
  • 每一個 subscriber thread 使用 2 SUB sockets
  • 將其他 core 指定給 worker threads
  • worker thread 連接到兩個 subscriber PUSH sockets

最好的情況是,我們以 full-loaded thread 使用到所有 CPU cores,當 threads 開始競爭 cores 及 CPU cycles,就會減慢速度。

Clone pattern: Reliable PubSub

目標是讓多個 applications 共用 some common state

  • 有上千個 applications
  • 可任意 join/leave network
  • applications 必須共用 single eventually-consistent state
  • 任一個 application 都可在任何時間點修改 state
  • state 可儲存於記憶體

有些實際應用案例:

  • 由 cloud servers 共享的 configuration
  • 由一些 players 共享的 game state
  • exchange rate data 即時更新,且提供給 applications 使用
Centralized Versus Decentralized

首先要決定要不要有 central server

  • central server 容易瞭解,網路不對稱。使用 central server,可避免 discovery, bind vs connect 等問題
  • full-distributed 架構比較不容易實作,但最終會有一個簡單的 protocol。每個 node 都可被指定作為 server 及 client。這就是 Freelance pattern (chap 4)
  • central server 在大量使用時,會變成 bottleneck,應該使用 decentralization,可在每秒處理 millions of messages。
  • centralized 架構比較容易增加多個 nodes,因為將 10,000 nodes 連到一個 server 會比互相連線來得容易。

clone pattern: 一個發布 state update 的 server,一些 client applications

Represneting State as Key-Value Pairs

接下來一步一步實作 Clone pattern

首先是如何將 shared state 分享給一些 clinets,要先決定如何描述 state,最簡單的方式是:key-value store,一個 key-value 代表一組 shared state

chap1 已有一個 pub-sub weather srever/client,將它改為 key-value pair,client 以 hash table 儲存

update

  • a new key-value pair
  • a modified value for existing key
  • a deleted key

先假設 state 可完全儲存在 memory

clonecli1.py

"""
Clone Client Model One

Author: Min RK <benjaminrk@gmail.com>

"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    updates = ctx.socket(zmq.SUB)
    updates.linger = 0
    updates.setsockopt(zmq.SUBSCRIBE, b'')
    updates.connect("tcp://localhost:5556")

    kvmap = {}
    sequence = 0

    while True:
        try:
            kvmsg = KVMsg.recv(updates)
        except:
            break # Interrupted
        kvmsg.store(kvmap)
        sequence += 1
    print( "Interrupted\n%d messages in" % sequence )


if __name__ == '__main__':
    main()

clonesrv1.py

"""
Clone server Model One

"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)

    publisher.bind("tcp://*:5556")
    time.sleep(0.2)

    sequence = 0
    random.seed(time.time())
    kvmap = {}

    try:
        while True:
            # Distribute as key-value message
            sequence += 1
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
    except KeyboardInterrupt:
        print( " Interrupted\n%d messages out" % sequence )

if __name__ == '__main__':
    main()

kvsimple.py

"""
=====================================================================
kvsimple - simple key-value message class for example applications
"""

import struct # for packing integers
import sys

import zmq

class KVMsg(object):
    """
    Message is formatted on wire as 3 frames:
    frame 0: key (0MQ string)
    frame 1: sequence (8 bytes, network order)
    frame 2: body (blob)
    """
    key = None # key (string)
    sequence = 0 # int
    body = None # blob

    def __init__(self, sequence, key=None, body=None):
        assert isinstance(sequence, int)
        self.sequence = sequence
        self.key = key
        self.body = body

    def store(self, dikt):
        """Store me in a dict if I have anything to store"""
        # this seems weird to check, but it's what the C example does
        if self.key is not None and self.body is not None:
            dikt[self.key] = self

    def send(self, socket):
        """Send key-value message to socket; any empty frames are sent as such."""
        key = b'' if self.key is None else self.key
        seq_s = struct.pack('!l', self.sequence)
        body = b'' if self.body is None else self.body
        socket.send_multipart([ key, seq_s, body ])

    @classmethod
    def recv(cls, socket):
        """Reads key-value message from socket, returns new kvmsg instance."""
        key, seq_s, body = socket.recv_multipart()
        key = key if key else None
        seq = struct.unpack('!l',seq_s)[0]
        body = body if body else None
        return cls(seq, key=key, body=body)

    def dump(self):
        if self.body is None:
            size = 0
            data='NULL'
        else:
            size = len(self.body)
            data=repr(self.body)
        print("[seq:{seq}][key:{key}][size:{size}] {data}".format(
            seq=self.sequence,
            key=self.key,
            size=size,
            data=data,
        ))

# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
    print( " * kvmsg: ")

    # Prepare our context and sockets
    ctx = zmq.Context()
    output = ctx.socket(zmq.DEALER)
    output.bind("ipc://kvmsg_selftest.ipc")
    input = ctx.socket(zmq.DEALER)
    input.connect("ipc://kvmsg_selftest.ipc")

    kvmap = {}
    # Test send and receive of simple message
    kvmsg = KVMsg(1)
    kvmsg.key = b"key"
    kvmsg.body = b"body"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg.store(kvmap)

    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    assert kvmsg2.key == b"key"
    kvmsg2.store(kvmap)

    assert len(kvmap) == 1 # shouldn't be different

    print( "OK")

if __name__ == '__main__':
    test_kvmsg('-v' in sys.argv)
  • 主要工作都放在 kvmsg class,處理 key-value message objects,這是 multipart ZeroMQ message 有 3 frames: key, sequence number (64 bits, network byte order), binary body(可存任意資料)

  • server 會產生 random 4-digit key,模擬大量 hash table

  • 還沒實作 delete,只有 insert/update

  • server 在 bind socket 後有 200 ms pause,這是為了避免遇到 slow joiner syndrome,subscriber 會在連線到 server socket 前,遺失一些訊息

  • server/client 都有 hash tables,目前的版本只適合在啟動 server 前先啟動所有 clients,且 clients 永遠不會 crash

Getting an Out-of-Band Snapshot

如何處理 slow-joiner clients,或是 client crash and restart

為了讓 late(recovering) client 能夠追上 server state,需要取得 server's state 的 snapshot,接下來會把 "message" 代表 "sequenced key-value pair","state" 代表 "hash table"。為了取得 server state,client 會使用 DEALER 詢問 server

在 client 實作 synchronization

  • client 會先註冊 updates,並發送 state request,這會保證 state 會比 oldest update 還要新
  • client 會等待 server reply state,同時會 queue all updates,可透過不讀取 ZeroMQ socket 完成,會 queue 在 ZeroMQ
  • 當 client 接收了 state update,後會開始讀取 updates,但會丟棄所有比 state update 還舊的資料
  • client 會在 state snapshot 中,更新 updates

clonesrv2.py

"""
Clone server Model Two

"""
import random
import threading
import time

import zmq

from kvsimple import KVMsg
from zhelpers import zpipe

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")

    updates, peer = zpipe(ctx)

    manager_thread = threading.Thread(target=state_manager, args=(ctx,peer))
    manager_thread.daemon=True
    manager_thread.start()


    sequence = 0
    random.seed(time.time())

    try:
        while True:
            # Distribute as key-value message
            sequence += 1
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.send(updates)
    except KeyboardInterrupt:
        print( " Interrupted\n%d messages out" % sequence )

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity):
        self.socket = socket # ROUTER socket to send to
        self.identity = identity # Identity of peer who requested state

def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket

    Hash item data is our kvmsg object, ready to send
    """
    # Send identity of recipient first
    route.socket.send(route.identity, zmq.SNDMORE)
    kvmsg.send(route.socket)


def state_manager(ctx, pipe):
    """This thread maintains the state and handles requests from clients for snapshots.
    """
    kvmap = {}
    pipe.send(b"READY")
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")

    poller = zmq.Poller()
    poller.register(pipe, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)

    sequence = 0       # Current snapshot version number
    while True:
        try:
            items = dict(poller.poll())
        except (zmq.ZMQError, KeyboardInterrupt):
            break # interrupt/context shutdown

        # Apply state update from main thread
        if pipe in items:
            kvmsg = KVMsg.recv(pipe)
            sequence = kvmsg.sequence
            kvmsg.store(kvmap)
        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity = msg[0]
            request = msg[1]
            if request == b"ICANHAZ?":
                pass
            else:
                print( "E: bad request, aborting\n")
                break

            # Send state snapshot to client
            route = Route(snapshot, identity)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print( "Sending state shapshot=%d\n" % sequence)
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = b""
            kvmsg.send(snapshot)

if __name__ == '__main__':
    main()

clonecli2.py

"""
Clone client Model Two

"""

import time

import zmq

from kvsimple import KVMsg

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    subscriber.connect("tcp://localhost:5557")

    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send(b"ICANHAZ?")
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            break;          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print( "Received snapshot=%d" % sequence )
            break          # Done
        kvmsg.store(kvmap)

    # Now apply pending updates, discard out-of-sequence messages
    while True:
        try:
            kvmsg = KVMsg.recv(subscriber)
        except:
            break          # Interrupted
        if kvmsg.sequence > sequence:
            sequence = kvmsg.sequence
            kvmsg.store(kvmap)

if __name__ == '__main__':
    main()

notes:

  • server 使用兩個 sockets,一個 thread random 產生 updates,並發送到 main PUB socket,另一個 thread 以 ROUTER 處理 state requests,這兩個會透過 PAIR socket 互相溝通
  • client 很簡單,大部分在 kvmsg class 實作
  • 只用簡單的方法 serializing state,hash table 會儲存 kvmsg objects,server 會在 client 要求 state 時,發送 a batch of messages。如果有多個 client 要求 state,每一個會取得不同的 snapshot
  • 假設 client 只會跟一個 server 溝通,server 會一直運作,不會 crash
Republishing Updates from Clients

第二個 model 修改了來自 server 的 key-value store,這是 centralizd model,可用在有個設定檔要發布給 clients,每一個 node 都有 local cache 的狀況。

另一個 model 是由 client 取得 updates,不是 server,這會讓 server 變成 stateless broker,優點是

  • 不需要擔心 server reliability,crash 後可啟動新的 instance 並提供新的 values
  • 可使用 key-value store 在 active peers 之間共享 knowledge

為了從 client 發送 updates,需要用到 PUSH-PULL socket pattern

為什麼不讓 clients 直接互相更新 updates?因為這會降低 latency,並失去consistency。如果讓接收端根據接收順序修改 sate,會造成 state 不同步。

應付 state change 同時發生在多個 nodes 的狀況,使用 centralizing all change 的方法,不管client 產生 update 的時間點,都會發布到 server,強制以server接收 update 的順序作為 update 基準。

為了協調 changes,server 可對 all updates 增加
unique sequence number,client 會偵測 nastier failure,包含 network congestion及 queue overflow。如果 client 發現 message stream 失序,就可以採取對應措施。

讓 client 跟 server 要求遺失的 message,看起來有效,但實際上沒有用。因為失序可能是 network stress 造成,要求重送會讓網路更繁忙。client 可警示 user,發生異常,會 stop 並在人為介入後,進行 manual restart。

clonesrv3.py

"""
Clone server Model Three

"""

import zmq

from kvsimple import KVMsg

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity):
        self.socket = socket # ROUTER socket to send to
        self.identity = identity # Identity of peer who requested state

def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # Send identity of recipient first
    route.socket.send(route.identity, zmq.SNDMORE)
    kvmsg.send(route.socket)

def main():
    # context and sockets
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    sequence = 0
    kvmap = {}

    poller = zmq.Poller()
    poller.register(collector, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)
    while True:
        try:
            items = dict(poller.poll(1000))
        except:
            break           # Interrupted

        # Apply state update sent from client
        if collector in items:
            kvmsg = KVMsg.recv(collector)
            sequence += 1
            kvmsg.sequence = sequence
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            print( "I: publishing update %5d" % sequence )

        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity = msg[0]
            request = msg[1]
            if request == b"ICANHAZ?":
                pass
            else:
                print( "E: bad request, aborting\n" )
                break

            # Send state snapshot to client
            route = Route(snapshot, identity)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print( "Sending state shapshot=%d\n" % sequence )
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = b""
            kvmsg.send(snapshot)

    print( " Interrupted\n%d messages handled" % sequence )


if __name__ == '__main__':
    main()

clonecli3.py

"""
Clone client Model Three
"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send(b"ICANHAZ?")
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            return          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print( "I: Received snapshot=%d" % sequence )
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                print( "I: received update=%d" % sequence )

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print( " Interrupted\n%d messages in" % sequence )

if __name__ == '__main__':
    main()
  • server 封裝為單一 task,以 PULL 管理 incoming updates,ROUTER 管理 state requests,PUB 管理 outgoing updates
  • client 會使用 tickless timer 發送 random update 給 server,每秒一次,實際應用上,應該由 application code 驅動 updates。
 ##### Working with Subtrees

當 client 數量增加,shared store 的 size 也會增加,這時候不適合將所有資料都送給 clients。

當使用 shared store,某些 client 只處理部分 store,稱為 subtree。client 會在 state request 只要求要 subtree。

表示 subtree 的語法有很多種

  1. path hierarchy: /some/list/of/paths
  2. topic tree: some.list.of.topics

範例使用 path hierarchy,並擴充 client/server 讓 client 可跟 single subtree 運作。

clonesrv4.py

"""
Clone server Model Four

"""

import zmq

from kvsimple import KVMsg

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

def main():
    # context and sockets
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    sequence = 0
    kvmap = {}

    poller = zmq.Poller()
    poller.register(collector, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)
    while True:
        try:
            items = dict(poller.poll(1000))
        except:
            break           # Interrupted

        # Apply state update sent from client
        if collector in items:
            kvmsg = KVMsg.recv(collector)
            sequence += 1
            kvmsg.sequence = sequence
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            print( "I: publishing update %5d" % sequence )

        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity, request, subtree = msg
            if request == b"ICANHAZ?":
                pass
            else:
                print( "E: bad request, aborting\n" )
                break

            # Send state snapshot to client
            route = Route(snapshot, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print( "Sending state shapshot=%d\n" % sequence )
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(snapshot)

    print( " Interrupted\n%d messages handled" % sequence )


if __name__ == '__main__':
    main()

clonecli4.py

"""
Clone client Model Four

"""

import random
import time

import zmq

from kvsimple import KVMsg

SUBTREE = b"/client/"

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE)
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send_multipart([b"ICANHAZ?", SUBTREE])
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            raise
            return          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print( "I: Received snapshot=%d" % sequence )
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                print( "I: received update=%d" % sequence )

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = SUBTREE + b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print( " Interrupted\n%d messages in" % sequence )

if __name__ == '__main__':
    main()
Ephemeral Values

ephermeral value 是沒有定時 refresh 時會自動 expire 的數值,例如 a node joing network,發布 address,並定時更新,但如果 node crash,該 adress 最終會被移除。

通常會將 ephermeral value 附加於 session,並在該 session 終止時被刪除。在 Clone,session 是由 client 定義,並會在 client 終止時一併停止,一個簡單的做法是,在 ephemeral value 附加 time to live (TTL),server 可用來判斷是否使用了 expired value。

實作 ephemeral value 首先要找到在 key-value message 裡面 encode TTL 的方法。可增加 frame,但問題在於只要想增加新的 property,就要修改 message structure,這會破壞相容性。因此改用 properties frame 實作。

另外,需要一個 "delete this value" 的方法,目前 server 及 client 都只有 insert/update 的方式,可增加 value 為 empty 時,就是 "delete"

以下是實作了 propeties frame (增加了 UUID frame),並能處理 delete 的 kvmsg.py

"""
=====================================================================
kvmsg - key-value message class for example applications
"""

import struct # for packing integers
import sys
from uuid import uuid4

import zmq
# zmq.jsonapi ensures bytes, instead of unicode:

def encode_properties(properties_dict):
    prop_s = b""
    for key, value in properties_dict.items():
        prop_s += b"%s=%s\n" % (key, value)
    return prop_s


def decode_properties(prop_s):
    prop = {}
    line_array = prop_s.split(b"\n")

    for line in line_array:
        try:
            key, value = line.split(b"=")
            prop[key] = value
        except ValueError as e:
            #Catch empty line
            pass

    return prop

class KVMsg(object):
    """
    Message is formatted on wire as 5 frames:
    frame 0: key (0MQ string)
    frame 1: sequence (8 bytes, network order)
    frame 2: uuid (blob, 16 bytes)
    frame 3: properties (0MQ string)
    frame 4: body (blob)
    """
    key = None
    sequence = 0
    uuid=None
    properties = None
    body = None

    def __init__(self, sequence, uuid=None, key=None, properties=None, body=None):
        assert isinstance(sequence, int)
        self.sequence = sequence
        if uuid is None:
            uuid = uuid4().bytes
        self.uuid = uuid
        self.key = key
        self.properties = {} if properties is None else properties
        self.body = body

    # dictionary access maps to properties:
    def __getitem__(self, k):
        return self.properties[k]

    def __setitem__(self, k, v):
        self.properties[k] = v

    def get(self, k, default=None):
        return self.properties.get(k, default)

    def store(self, dikt):
        """Store me in a dict if I have anything to store
        else delete me from the dict."""
        if self.key is not None and self.body is not None:
            dikt[self.key] = self
        elif self.key in dikt:
            del dikt[self.key]

    def send(self, socket):
        """Send key-value message to socket; any empty frames are sent as such."""
        key = b'' if self.key is None else self.key
        seq_s = struct.pack('!q', self.sequence)
        body = b'' if self.body is None else self.body
        prop_s = encode_properties(self.properties)
        socket.send_multipart([ key, seq_s, self.uuid, prop_s, body ])

    @classmethod
    def recv(cls, socket):
        """Reads key-value message from socket, returns new kvmsg instance."""
        return cls.from_msg(socket.recv_multipart())

    @classmethod
    def from_msg(cls, msg):
        """Construct key-value message from a multipart message"""
        key, seq_s, uuid, prop_s, body = msg
        key = key if key else None
        seq = struct.unpack('!q',seq_s)[0]
        body = body if body else None
        prop = decode_properties(prop_s)
        return cls(seq, uuid=uuid, key=key, properties=prop, body=body)

    def __repr__(self):
        if self.body is None:
            size = 0
            data=b'NULL'
        else:
            size = len(self.body)
            data = repr(self.body)

        mstr = "[seq:{seq}][key:{key}][size:{size}][props:{props}][data:{data}]".format(
            seq=self.sequence,
            # uuid=hexlify(self.uuid),
            key=self.key,
            size=size,
            props=encode_properties(self.properties),
            data=data,
        )
        return mstr


    def dump(self):
        print("<<", str(self), ">>", file=sys.stderr)
# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
    print(" * kvmsg: ", end='')

    # Prepare our context and sockets
    ctx = zmq.Context()
    output = ctx.socket(zmq.DEALER)
    output.bind("ipc://kvmsg_selftest.ipc")
    input = ctx.socket(zmq.DEALER)
    input.connect("ipc://kvmsg_selftest.ipc")

    kvmap = {}
    # Test send and receive of simple message
    kvmsg = KVMsg(1)
    kvmsg.key = b"key"
    kvmsg.body = b"body"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg.store(kvmap)

    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    assert kvmsg2.key == b"key"
    kvmsg2.store(kvmap)

    assert len(kvmap) == 1 # shouldn't be different

    # test send/recv with properties:
    kvmsg = KVMsg(2, key=b"key", body=b"body")
    kvmsg[b"prop1"] = b"value1"
    kvmsg[b"prop2"] = b"value2"
    kvmsg[b"prop3"] = b"value3"
    assert kvmsg[b"prop1"] == b"value1"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    # ensure properties were preserved
    assert kvmsg2.key == kvmsg.key
    assert kvmsg2.body == kvmsg.body
    assert kvmsg2.properties == kvmsg.properties
    assert kvmsg2[b"prop2"] == kvmsg[b"prop2"]

    print("OK")

if __name__ == '__main__':
    test_kvmsg('-v' in sys.argv)
$ python kvmsg.py  -v
<< [seq:1][key:b'key'][size:4][props:b''][data:b'body'] >>
<< [seq:1][key:b'key'][size:4][props:b''][data:b'body'] >>
<< [seq:2][key:b'key'][size:4][props:b'prop1=value1\nprop2=value2\nprop3=value3\n'][data:b'body'] >>
<< [seq:2][key:b'key'][size:4][props:b'prop1=value1\nprop2=value2\nprop3=value3\n'][data:b'body'] >>
 * kvmsg: OK
使用 Reactor

目前都是在 server 使用 pool loop,接下來改用 reactor,C 是用 CZMQ 的 zloop,使用 reactor 會讓程式更容易了解。

用單一 thread,並傳送 server object 給 reactor handlers,可讓 server 使用 multithreads,每一個處理一個 socket or timer,但最好是不在 thread 之間share data,目前是以 server 的 hashmap 為核心,因此單一 thread 會比較簡單。

有三個 reactor handler:

  1. 一個處理來自 ROUTER socket 的 snapshot request
  2. 一個處理來自 PULL socket 的 incoming updates from clients
  3. 一個處理 expred ephemeral values (TTL 逾時)

clonesrv5.py

"""
Clone server Model Five

"""

import logging
import time

import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

class CloneServer(object):

    # Our server is defined by these properties
    ctx = None                  # Context wrapper
    kvmap = None                # Key-value store
    loop = None                 # IOLoop reactor
    port = None                 # Main port we're working on
    sequence = 0                # How many updates we're at
    snapshot = None             # Handle snapshot requests
    publisher = None            # Publish updates to clients
    collector = None            # Collect updates from clients

    def __init__(self, port=5556):
        self.port = port
        self.ctx = zmq.Context()
        self.kvmap = {}
        self.loop = IOLoop.instance()

        # Set up our clone server sockets
        self.snapshot  = self.ctx.socket(zmq.ROUTER)
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.PULL)
        self.snapshot.bind("tcp://*:%d" % self.port)
        self.publisher.bind("tcp://*:%d" % (self.port + 1))
        self.collector.bind("tcp://*:%d" % (self.port + 2))

        # Wrap sockets in ZMQStreams for IOLoop handlers
        self.snapshot = ZMQStream(self.snapshot)
        self.publisher = ZMQStream(self.publisher)
        self.collector = ZMQStream(self.collector)

        # Register our handlers with reactor
        self.snapshot.on_recv(self.handle_snapshot)
        self.collector.on_recv(self.handle_collect)
        self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)

        # basic log formatting:
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)


    def start(self):
        # Run reactor until process interrupted
        self.flush_callback.start()
        try:
            self.loop.start()
        except KeyboardInterrupt:
            pass

    def handle_snapshot(self, msg):
        """snapshot requests"""
        if len(msg) != 3 or msg[1] != b"ICANHAZ?":
            print("E: bad request, aborting")
            dump(msg)
            self.loop.stop()
            return
        identity, request, subtree = msg
        if subtree:
            # Send state snapshot to client
            route = Route(self.snapshot, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in self.kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            logging.info("I: Sending state shapshot=%d" % self.sequence)
            self.snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(self.sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(self.snapshot)

    def handle_collect(self, msg):
        """Collect updates from clients"""
        kvmsg = KVMsg.from_msg(msg)
        self.sequence += 1
        kvmsg.sequence = self.sequence
        kvmsg.send(self.publisher)
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl:
            kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
        kvmsg.store(self.kvmap)
        logging.info("I: publishing update=%d", self.sequence)

    def flush_ttl(self):
        """Purge ephemeral values that have expired"""
        for key,kvmsg in list(self.kvmap.items()):
            # used list() to exhaust the iterator before deleting from the dict
            self.flush_single(kvmsg)

    def flush_single(self, kvmsg):
        """If key-value pair has expired, delete it and publish the fact
        to listening clients."""
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl and ttl <= time.time():
            kvmsg.body = b""
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.send(self.publisher)
            del self.kvmap[kvmsg.key]
            logging.info("I: publishing delete=%d", self.sequence)

def main():
    clone = CloneServer()
    clone.start()

if __name__ == '__main__':
    main()

clonecli5.py

"""
Clone client Model Five

"""

import random
import time

import zmq

from kvmsg import KVMsg

SUBTREE = "/client/"

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE.encode())
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send_multipart([b"ICANHAZ?", SUBTREE.encode()])
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            raise
            return          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print("I: Received snapshot=%d" % sequence)
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                action = "update" if kvmsg.body else "delete"
                print("I: received %s=%d" % (action, sequence))

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = SUBTREE.encode() + b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg[b'ttl'] = b"%d" % random.randint(0,30)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print(" Interrupted\n%d messages in" % sequence)

if __name__ == '__main__':
    main()
Adding Binary Star Pattern for Reliability

reliability 的定義:

  1. server process crashes 會自動或手動 restart,process 會遺失原本的 state
  2. server machine dies,且已經離線一段時間,client 必須要能切換到備援 server
  3. server process/machine 自網路斷線,例如 switch 掛掉,可能會在任意時間點恢復網路,但 clients 在未復原期間,需要連接到備援 server

第一步是要增加 second server,可使用 chap4 Binary Star Pattern。

要先確保 updates 在 primary server crash 時不會遺失,最簡單的做法,是發送到兩個 servers,backup server 會當作 client,持續取得所有 clients 的 updates,也會取得來自 clients 的 updates,但還不要放到 hash table,先暫存起來。

  • 使用 pub-sub flow 取代 client updates 的 push-pull flow to servers。這邊會持續 fan out updates 到兩台 servers。
  • 增加 server updates 到 clients 的 heartbeats,client 可藉此偵測 primary server 是否 crash,並切換到 backup server
  • 利用 bstar reactor class 連接兩個 servers,Binary Star 藉由 clients 實際呼叫他們認定的 active server,進行 vote 決定 primary server,利用 snapshot requests 作為 voting mechanism
  • 將 update message 都增加 unique UUID 欄位,由 client 產生
  • passive server 會儲存接收自 clients 的 "pending list" of updates,或是來自 active server 的 updates。list 會依照時間順序排列。

以 FSM 設計 client logic

  • client open, connect sockets,對第一個 server 發送 snapshot request,為避免 request storms,每個 server 只會詢問 2 次。
  • client 會從 current server 等待 snapshot data reply,取得後,就會存起來,但如果 timeout 後沒收到 reply,就會 failover 到下一個 server
  • 如果 client 取得 snapshot,等待 updates,但如果 timeout 後沒收到 update,就會 failover 到下一個 server
 client 會 loop forever,但可能發生 some clients 會跟 primary server 溝通,但其他的跟 backup server 溝通,Binary Star state machine 會處理這個問題。

failover 發生程序為

  • client 偵測到 primary server 沒有發送 heartbeats,匯改連接到 backup server 並要求 new state snapshot
  • backup server 開始接收來自 clients 的 snapshot requests,偵測到 primary server dies,接手變成 primary server
  • backup server 將 pending list 寫入 hash table,開始處理 state snapshot requests

當 primary server 恢復 online 時

  • 啟動成為 passive server,連接到 backup server 作為 Clone client
  • 開始透過 SUB socket 接收來自 client 的 updates

有兩點假設

  1. 至少有一個 server 在運作,如果兩個 server 都 crash,就無法復原
  2. 多個 clients 不能同時更新相同的 hash keys,client updates 會以不同的順序到達兩個 servers,因此 backup server 會以 pending list 中跟 primary server 不同的順序套用 updates。但對於某個 client 來說,update 都是以相同順序到達 servers,所以這是安全的機制

clonesrv6.py

"""
Clone server Model Six
"""

import logging
import time

import zmq
from zmq.eventloop.ioloop import PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from bstar import BinaryStar
from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

class CloneServer(object):

    # Our server is defined by these properties
    ctx = None                  # Context wrapper
    kvmap = None                # Key-value store
    bstar = None                # Binary Star
    sequence = 0                # How many updates so far
    port = None                 # Main port we're working on
    peer = None                 # Main port of our peer
    publisher = None            # Publish updates and hugz
    collector = None            # Collect updates from clients
    subscriber = None           # Get updates from peer
    pending = None              # Pending updates from client
    primary = False             # True if we're primary
    master = False              # True if we're master
    slave = False               # True if we're slave

    def __init__(self, primary=True, ports=(5556,5566)):
        self.primary = primary
        if primary:
            self.port, self.peer = ports
            frontend = "tcp://*:5003"
            backend  = "tcp://localhost:5004"
            self.kvmap = {}
        else:
            self.peer, self.port = ports
            frontend = "tcp://*:5004"
            backend  = "tcp://localhost:5003"

        self.ctx = zmq.Context.instance()
        self.pending = []
        self.bstar = BinaryStar(primary, frontend, backend)

        self.bstar.register_voter("tcp://*:%i" % self.port, zmq.ROUTER, self.handle_snapshot)

        # Set up our clone server sockets
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.SUB)
        self.collector.setsockopt(zmq.SUBSCRIBE, b'')
        self.publisher.bind("tcp://*:%d" % (self.port + 1))
        self.collector.bind("tcp://*:%d" % (self.port + 2))

        # Set up our own clone client interface to peer
        self.subscriber = self.ctx.socket(zmq.SUB)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, b'')
        self.subscriber.connect("tcp://localhost:%d" % (self.peer + 1))

        # Register state change handlers
        self.bstar.master_callback = self.become_master
        self.bstar.slave_callback = self.become_slave

        # Wrap sockets in ZMQStreams for IOLoop handlers
        self.publisher = ZMQStream(self.publisher)
        self.subscriber = ZMQStream(self.subscriber)
        self.collector = ZMQStream(self.collector)

        # Register our handlers with reactor
        self.collector.on_recv(self.handle_collect)
        self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)
        self.hugz_callback = PeriodicCallback(self.send_hugz, 1000)

        # basic log formatting:
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)

    def start(self):
        # start periodic callbacks
        self.flush_callback.start()
        self.hugz_callback.start()
        # Run bstar reactor until process interrupted
        try:
            self.bstar.start()
        except KeyboardInterrupt:
            pass

    def handle_snapshot(self, socket, msg):
        """snapshot requests"""
        if msg[1] != b"ICANHAZ?" or len(msg) != 3:
            logging.error("E: bad request, aborting")
            dump(msg)
            self.bstar.loop.stop()
            return
        identity, request = msg[:2]
        if len(msg) >= 3:
            subtree = msg[2]
            # Send state snapshot to client
            route = Route(socket, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in self.kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            logging.info("I: Sending state shapshot=%d" % self.sequence)
            socket.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(self.sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(socket)

    def handle_collect(self, msg):
        """Collect updates from clients

        If we're master, we apply these to the kvmap
        If we're slave, or unsure, we queue them on our pending list
        """
        kvmsg = KVMsg.from_msg(msg)
        if self.master:
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.send(self.publisher)
            ttl = float(kvmsg.get(b'ttl', 0))
            if ttl:
                kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
            kvmsg.store(self.kvmap)
            logging.info("I: publishing update=%d", self.sequence)
        else:
            # If we already got message from master, drop it, else
            # hold on pending list
            if not self.was_pending(kvmsg):
                self.pending.append(kvmsg)

    def was_pending(self, kvmsg):
        """If message was already on pending list, remove and return True.
        Else return False.
        """
        found = False
        for idx, held in enumerate(self.pending):
            if held.uuid == kvmsg.uuid:
                found = True
                break
        if found:
            self.pending.pop(idx)
        return found

    def flush_ttl(self):
        """Purge ephemeral values that have expired"""
        if self.kvmap:
            for key,kvmsg in list(self.kvmap.items()):
                self.flush_single(kvmsg)

    def flush_single(self, kvmsg):
        """If key-value pair has expired, delete it and publish the fact
        to listening clients."""
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl and ttl <= time.time():
            kvmsg.body = b""
            self.sequence += 1
            kvmsg.sequence = self.sequence
            logging.info("I: preparing to publish delete=%s", kvmsg.properties)
            kvmsg.send(self.publisher)
            del self.kvmap[kvmsg.key]
            logging.info("I: publishing delete=%d", self.sequence)

    def send_hugz(self):
        """Send hugz to anyone listening on the publisher socket"""
        kvmsg = KVMsg(self.sequence)
        kvmsg.key = b"HUGZ"
        kvmsg.body = b""
        kvmsg.send(self.publisher)

    # ---------------------------------------------------------------------
    # State change handlers

    def become_master(self):
        """We're becoming master

        The backup server applies its pending list to its own hash table,
        and then starts to process state snapshot requests.
        """
        self.master = True
        self.slave = False
        # stop receiving subscriber updates while we are master
        self.subscriber.stop_on_recv()

        # Apply pending list to own kvmap
        while self.pending:
            kvmsg = self.pending.pop(0)
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.store(self.kvmap)
            logging.info ("I: publishing pending=%d", self.sequence)


    def become_slave(self):
        """We're becoming slave"""
        # clear kvmap
        self.kvmap = None
        self.master = False
        self.slave = True
        self.subscriber.on_recv(self.handle_subscriber)

    def handle_subscriber(self, msg):
        """Collect updates from peer (master)
        We're always slave when we get these updates
        """
        if self.master:
            logging.warn("received subscriber message, but we are master %s", msg)
            return

        # Get state snapshot if necessary
        if self.kvmap is None:
            self.kvmap = {}
            snapshot = self.ctx.socket(zmq.DEALER)
            snapshot.linger = 0
            snapshot.connect("tcp://localhost:%i" % self.peer)

            logging.info ("I: asking for snapshot from: tcp://localhost:%d",
                        self.peer)
            snapshot.send_multipart([b"ICANHAZ?", b''])
            while True:
                try:
                    kvmsg = KVMsg.recv(snapshot)
                except KeyboardInterrupt:
                    # Interrupted
                    self.bstar.loop.stop()
                    return
                if kvmsg.key == b"KTHXBAI":
                    self.sequence = kvmsg.sequence
                    break          # Done
                kvmsg.store(self.kvmap)

            logging.info ("I: received snapshot=%d", self.sequence)

        # Find and remove update off pending list
        kvmsg = KVMsg.from_msg(msg)
        # update float ttl -> timestamp
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl:
            kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)

        if kvmsg.key != b"HUGZ":
            if not self.was_pending(kvmsg):
                # If master update came before client update, flip it
                # around, store master update (with sequence) on pending
                # list and use to clear client update when it comes later
                self.pending.append(kvmsg)

            # If update is more recent than our kvmap, apply it
            if (kvmsg.sequence > self.sequence):
                self.sequence = kvmsg.sequence
                kvmsg.store(self.kvmap)
                logging.info ("I: received update=%d", self.sequence)


def main():
    import sys
    if '-p' in sys.argv:
        primary = True
    elif '-b' in sys.argv:
        primary = False
    else:
        print("Usage: clonesrv6.py { -p | -b }")
        sys.exit(1)
    clone = CloneServer(primary)
    clone.start()

if __name__ == '__main__':
    main()

clonesrv6 跟 bstarsrv2 一樣,啟動時發生 tornado error,可能要把 tornado 由 5.1 降至 4.5 才行,沒有測試不確定。

有 failover, ephemeral values, subtrees 的功能

reactor-based design 去掉很多多餘的 code。整個 server 以單一 thread 運作,就不需要 inter-thread 的處理,只需要將 structure pointer (self) 傳到所有 handlers。

The Clustered Hashmap Protocol

server 已經整合了 Binary Star Pattern,client 有點複雜,實作前先了解 Clustered Hashmap Protocol

有兩種設計 protocol 方法

  1. 將每個 flow socket flow 分開,這也是先前使用的方法。優點是每個 flow simple and clean,缺點是要管理很多socket flow。使用 reactor 會比較簡單一點,但還是要是當地將程式分開。
  2. 設計一個 protocol 使用單一 socket pair for everything,目前是使用 ROUTER for server,DEALER for clients。這會讓 protocol 比較複雜。 chap7 有另一個 sample 使用 ROUTER-DEALER

現在先了解 CHP protocol spec,要注意 "SHOULD" "MUST" "MAY" 這些關鍵字

  • Goals

讓一組 clients 透過 ZeroMQ network 提供 reliable pub-sub。"hashmap" 是一組 key-value pairs,任一個 client 都可隨時修改任一個 key-value pair,變更後,會傳遞給所有 clients。client 可在任意時間點加入網路。

  • Architecture

CHP 連接一組 client applications 及一組 servers,client 連到 server,client 無法互相連線,可以任意加入或離開網路。

  • Ports and Connections

    server MUST 有三個 ports

    • A SNAPSHOT port (ZeroMQ ROUTER socket) at port number P.
    • A PUBLISHER port (ZeroMQ PUB socket) at port number P + 1.
    • A COLLECTOR port (ZeroMQ SUB socket) at port number P + 2.

    client SHOULD 打開至少 2 connections

    • A SNAPSHOT connection (ZeroMQ DEALER socket) to port number P.
    • A SUBSCRIBER connection (ZeroMQ SUB socket) to port number P + 1.

    client MAY 打開 3rd connection (如果要更新 hashmap)

    • A PUBLISHER connection (ZeroMQ PUB socket) to port number P + 2.
  • State Synchronization

client MUST 在啟動 snapshot connection 時,發送 ICANHAZ command,兩個 frames 都是 ZeroMQ strings,subtree spec MAY be empty,一般是以 / 開始,有多個 path segments,以 / 結束

ICANHAZ command
-----------------------------------
Frame 0: "ICANHAZ?"
Frame 1: subtree specification

server MUST 以 0~多個 KVSYNC command 回應 ICANHAZ command,後面接著 KTHXBAI command。server MUST 以 client 的 identity (由 ICANHAZ 提供) 放在command 的 prefix。sequence number 沒有規定,可以為 0

KVSYNC command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: value, as blob

KTHXBAI 的 sequence number MUST 為前面的 KVSYNC 的 highest sequence number

KTHXBAI command
-----------------------------------
Frame 0: "KTHXBAI"
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: subtree specification

當 client 收到 KTHXBAI,SHOULD 開始接收 messages from its subscriber connection and apply them

  • Server-to-Client Updates

當 server 要為 hashmap 提供 updates,MUST 在 publisher sockets broadcast KVPUB command

KVPUB command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

sequence number MUST 絕對遞增。client MUST 丟棄所有sequence number 沒有大於上一個 KTHXBAI/KVPUB command 的 sequnece number 的 command。

UUID 為 optional,可以為 0 (size 0)。當 properties 欄位為 0,或有多個 "name=value",要以 newline char 分開。如果 key-value pair 沒有任何 properties,properties 可以為 empty。

如果 value 為 empty,client SHOULD delete key-value entry

在遺失 updates 時,server SHOULD 定時(每秒) 發送一個 HUGZ。

HUGZ command
-----------------------------------
Frame 0: "HUGZ"
Frame 1: 00000000
Frame 2: <empty>
Frame 3: <empty>
Frame 4: <empty>

client MAY 將遺失 HUGZ 是為 server crash 的信號

  • Client-to-Server Updates

client 如果有 hashmap 的 updates,MAY 透過 publisher 發送 KVSET 給 server

KVSET command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

sequence number 可以為 0 UUID SHOULD 為 universally unique identifier

如果 value 為 empty,server MUST delete key-value entry

server SHOULD 接受這個 properties

ttl: 指定 time-to-live in seconds。如果 KVSET 有 ttl property,server 應該在 TTL expired 時,刪除該 key-value pair,並廣播 KVPUB (empty value) 給所有 clients
  • Reliability

    CHP 可使用 dual-server 架構,backup server 在 primary server fails 時會 take over。CHP 並沒有指定 failover 方法,可使用 Binary Star

    為達到 server reliability,client MAY

    • 在每個 KVSET 指令設定 UUID
    • 定時偵測有無收到 HUGZ,將其視為 server failed 的信號
    • 連接到 backup server,並 re-request state synchronization
  • Scalability and Performance

    CHP 可用在上千個 clients,受限於 broker 的系統資源,因為所有 updates 都會經過 single server,最高可每秒處理 millions of updates

  • Security

    CHP 沒有實作 authentication, access control, encryption 機制

Building a Multithread Stack and API

client 要修改成可運作在 background thread,類似 chap 4 Freelance Pattern 最後的 mutithread API 做法。

包含了 frontend object 及 backend agent,以 PAIR socket 連接,可以跟 create thread 整合在一起,可透過 PAIR 發送訊息給該 thread

multithread API

  • constructor 產生 context,並啟動 background thread,以 pipe 連接
  • background 啟動 agent,也就是利用 zmq_poll loop 自 pipe socket 以及 DEALER, SUB 讀取訊息。
  • main application thread 透過 ZeroMQ message 溝通,通常是這個格式
void
clone_connect (clone_t *self, char *address, char *service)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, address);
    zmsg_addstr (msg, service);
    zmsg_send (&msg, self->pipe);
}
  • 如果 method 有 return code,可等待 agent 發送的
reply message
  • 如果 agent 要發送 asynchronous event 給 frontend,要增加 recv method,在 frontend 等待訊息

  • 可expose frontend pipe socket,整合 pool loops,否則 recv 會 block application

clonecli6.py

"""
Clone server Model Six

"""

import random
import time

import zmq

from clone import Clone

SUBTREE = "/client/"

def main():
    # Create and connect clone
    clone = Clone()
    clone.subtree = SUBTREE.encode()
    clone.connect("tcp://localhost", 5556)
    clone.connect("tcp://localhost", 5566)

    try:
        while True:
            # Distribute as key-value message
            key = b"%d" % random.randint(1,10000)
            value = b"%d" % random.randint(1,1000000)
            clone.set(key, value, random.randint(0,30))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    main()

clone.py

"""
clone - client-side Clone Pattern class
"""

import logging
import threading
import time

import zmq

from zhelpers import zpipe
from kvmsg import KVMsg

# If no server replies within this time, abandon request
GLOBAL_TIMEOUT  =   4000    # msecs
# Server considered dead if silent for this long
SERVER_TTL      =   5.0     # secs
# Number of servers we will talk to
SERVER_MAX      =   2

# basic log formatting:
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
        level=logging.INFO)

# =====================================================================
# Synchronous part, works in our application thread

class Clone(object):
    ctx = None      # Our Context
    pipe = None     # Pipe through to clone agent
    agent = None    # agent in a thread
    _subtree = None # cache of our subtree value

    def __init__(self):
        self.ctx = zmq.Context()
        self.pipe, peer = zpipe(self.ctx)
        self.agent = threading.Thread(target=clone_agent, args=(self.ctx,peer))
        self.agent.daemon = True
        self.agent.start()

    # ---------------------------------------------------------------------
    # Clone.subtree is a property, which sets the subtree for snapshot
    # and updates

    @property
    def subtree(self):
        return self._subtree

    @subtree.setter
    def subtree(self, subtree):
        """Sends [SUBTREE][subtree] to the agent"""
        self._subtree = subtree
        self.pipe.send_multipart([b"SUBTREE", subtree])

    def connect(self, address, port):
        """Connect to new server endpoint
        Sends [CONNECT][address][port] to the agent
        """
        self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance(address, str) else address), b'%d' % port])

    def set(self, key, value, ttl=0):
        """Set new value in distributed hash table
        Sends [SET][key][value][ttl] to the agent
        """
        self.pipe.send_multipart([b"SET", key, value, b'%i' % ttl])

    def get(self, key):
        """Lookup value in distributed hash table
        Sends [GET][key] to the agent and waits for a value response
        If there is no clone available, will eventually return None.
        """

        self.pipe.send_multipart([b"GET", key])
        try:
            reply = self.pipe.recv_multipart()
        except KeyboardInterrupt:
            return
        else:
            return reply[0]


# =====================================================================
# Asynchronous part, works in the background

# ---------------------------------------------------------------------
# Simple class for one server we talk to

class CloneServer(object):
    address = None          # Server address
    port = None             # Server port
    snapshot = None         # Snapshot socket
    subscriber = None       # Incoming updates
    expiry = 0              # Expires at this time
    requests = 0            # How many snapshot requests made?

    def __init__(self, ctx, address, port, subtree):
        self.address = address
        self.port = port
        self.snapshot = ctx.socket(zmq.DEALER)
        self.snapshot.linger = 0
        self.snapshot.connect("%s:%i" % (address.decode(),port))
        self.subscriber = ctx.socket(zmq.SUB)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, subtree)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, b'HUGZ')
        self.subscriber.connect("%s:%i" % (address.decode(),port+1))
        self.subscriber.linger = 0


# ---------------------------------------------------------------------
# Simple class for one background agent

#  States we can be in
STATE_INITIAL   =    0   #  Before asking server for state
STATE_SYNCING   =    1   #  Getting state from server
STATE_ACTIVE    =    2   #  Getting new updates from server

class CloneAgent(object):
    ctx = None              # Own context
    pipe = None             # Socket to talk back to application
    kvmap = None            # Actual key/value dict
    subtree = ''            # Subtree specification, if any
    servers = None          # list of connected Servers
    state = 0               # Current state
    cur_server = 0          # If active, index of server in list
    sequence = 0            # last kvmsg procesed
    publisher = None        # Outgoing updates

    def __init__(self, ctx, pipe):
        self.ctx = ctx
        self.pipe = pipe
        self.kvmap = {}
        self.subtree = ''
        self.state = STATE_INITIAL
        self.publisher = ctx.socket(zmq.PUB)
        self.router = ctx.socket(zmq.ROUTER)
        self.servers = []

    def control_message (self):
        msg = self.pipe.recv_multipart()
        command = msg.pop(0)

        if command == b"CONNECT":
            address = msg.pop(0)
            port = int(msg.pop(0))
            if len(self.servers) < SERVER_MAX:
                self.servers.append(CloneServer(self.ctx, address, port, self.subtree))
                self.publisher.connect("%s:%i" % (address.decode(),port+2))
            else:
                logging.error("E: too many servers (max. %i)", SERVER_MAX)
        elif command == b"SET":
            key,value,sttl = msg
            ttl = int(sttl)

            # Send key-value pair on to server
            kvmsg = KVMsg(0, key=key, body=value)
            kvmsg.store(self.kvmap)
            if ttl:
                kvmsg[b"ttl"] = sttl
            kvmsg.send(self.publisher)
        elif command == b"GET":
            key = msg[0]
            value = self.kvmap.get(key)
            self.pipe.send(value.body if value else '')
        elif command == b"SUBTREE":
            self.subtree = msg[0]


# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.

def clone_agent(ctx, pipe):
    agent = CloneAgent(ctx, pipe)
    server = None

    while True:
        poller = zmq.Poller()
        poller.register(agent.pipe, zmq.POLLIN)
        poll_timer = None
        server_socket = None

        if agent.state == STATE_INITIAL:
            # In this state we ask the server for a snapshot,
            # if we have a server to talk to...
            if agent.servers:
                server = agent.servers[agent.cur_server]
                logging.info ("I: waiting for server at %s:%d...",
                    server.address, server.port)
                if (server.requests < 2):
                    server.snapshot.send_multipart([b"ICANHAZ?", agent.subtree])
                    server.requests += 1
                server.expiry = time.time() + SERVER_TTL
                agent.state = STATE_SYNCING
                server_socket = server.snapshot
        elif agent.state == STATE_SYNCING:
            # In this state we read from snapshot and we expect
            # the server to respond, else we fail over.
            server_socket = server.snapshot
        elif agent.state == STATE_ACTIVE:
            # In this state we read from subscriber and we expect
            # the server to give hugz, else we fail over.
            server_socket = server.subscriber

        if server_socket:
            # we have a second socket to poll:
            poller.register(server_socket, zmq.POLLIN)

        if server is not None:
            poll_timer = 1e3 * max(0,server.expiry - time.time())

        # ------------------------------------------------------------
        # Poll loop
        try:
            items = dict(poller.poll(poll_timer))
        except:
            raise # DEBUG
            break # Context has been shut down

        if agent.pipe in items:
            agent.control_message()
        elif server_socket in items:
            kvmsg = KVMsg.recv(server_socket)

            # Anything from server resets its expiry time
            server.expiry = time.time() + SERVER_TTL
            if (agent.state == STATE_SYNCING):
                # Store in snapshot until we're finished
                server.requests = 0
                if kvmsg.key == b"KTHXBAI":
                    agent.sequence = kvmsg.sequence
                    agent.state = STATE_ACTIVE
                    logging.info ("I: received from %s:%d snapshot=%d",
                        server.address, server.port, agent.sequence)
                else:
                    kvmsg.store(agent.kvmap)
            elif (agent.state == STATE_ACTIVE):
                # Discard out-of-sequence updates, incl. hugz
                if (kvmsg.sequence > agent.sequence):
                    agent.sequence = kvmsg.sequence
                    kvmsg.store(agent.kvmap)
                    action = "update" if kvmsg.body else "delete"

                    logging.info ("I: received from %s:%d %s=%d",
                        server.address, server.port, action, agent.sequence)
        else:
            # Server has died, failover to next
            logging.info ("I: server at %s:%d didn't give hugz",
                    server.address, server.port)
            agent.cur_server = (agent.cur_server + 1) % len(agent.servers)
            agent.state = STATE_INITIAL

connect method 有指定 endpoint,實際上是 3 ports

  • server state router (ROUTER): port P
  • server updates publisher (PUB): port P+1
  • server updates subscriber (SUB): port P+2

可將 3 connections 整合到一個 operation

clone stack 可分為 frontend object class 及 backend agent 兩部份,frontend 發送 ("SUBTREE", "CONNECT", "SET", "GET") 給 agent,跟 server 溝通。

  • startup 並由第一個 server 取得 snapshot
  • 取得 snapshot 後,切換到 subscriber socket 讀取訊息
  • 如果沒收到 snapshot,就 failover 到 2nd server
  • 自 pipe 及 subscriber socket 進行 poll
  • 如收到 pipe 的訊息,就是來自 frontend
  • 收到 subscriber 訊息,就是 store/apply updates
  • 如某個時間內,沒收到 server 的訊息,就進行 failover

References

ØMQ - The Guide