2019年10月21日

ZeroMQ 5 Advanced PubSub Patterns

擴充 core pub-sub pattern,增加 higher-level patterns for performance, reliability, state distribution, and monitoring。

  • 什麼時候要使用 publish-subscribe
  • 如何處理 too-slow subscribers (Suicidal Snail pattern)
  • 如何設計 high-speed subscribers (Black Box pattern)
  • 如何監控 pub-sub network (Espresso pattern)
  • 如何製作 shared key-value store (Clone pattern)
  • 如何使用 reactors 用以簡化 servers
  • 如何使用 Binary Star Pattern,增加 server failover 功能

Pros and Cons of PubSub

pub-sub 強調 multicast/group messaging 的功能。

PUB 可發送訊息給 "all of many",而 PUSH 與 DEALER 是 "one of many",無法直接用 PUB 替換 PUSH。

pub-sub 強調 scalability,大量資料可快速轉發給多個接收端。pub-sub 使用跟 push-pull 相同的方法: get rid of back-chatter,接收端不會跟 sender 溝通,唯一的例外是 SUB 會發送subscriptions 到 PUB sockets,但這部分是 anonymous and infrequent。

去掉 back-chatter 對 scalability 很重要,subscriber 不會直接跟 publisher 連線,而是連到 multicast group。雖然移除 back-chatter 讓 message flow 變得簡單,但也無法讓 sender/receiver 互相協作。

  • publisher 無法知道 subscribers 什麼時候連線成功(包含 initial connetion 及 reconnection)
  • subscibers 無法通知 publisher,控制發送訊息的速度,publisher 只能以全速發送
  • publisher 無法知道 subscribers 是不是突然斷線了

如果需要有 reliable multicast 要解決 pub-sub pattern 隨時可能遺失訊息的問題。但如果只需要 almost reliable multicast,是可以使用 pub-sub。如果需要用到 back-chatter,可以改用 ROUTER-DEALER,或是增加另一個 channel 用在 synchronization。

pub-sub 就像是 radio broadcast,可能會在 join 前遺失一些訊息,接收訊息的狀況,跟 network quality 有關。但這個 model 就跟真實世界的 information distribution 一樣。

以下是 classic failure cases for pub-sub:

  • subscribers join late,遺失了 server 已經發送的訊息
  • subscribers 接收訊息速度太慢,造成 queue overflow
  • subscribers 在離開時,會丟棄並遺失訊息
  • subscribers crash/restart,會遺失已經收到的訊息
  • network overloaded and drop data
  • network 太慢,publisher-side queues overflow,而 publisher crash

ZeroMQ v3.x 以後限制了 internal buffer size (high water mark, HWM)

Espresso pattern: PubSub Tracing

這是 trace pub-sub 的方法。使用 chap2 zmq_proxy() 可以做 transport bridging,有三個參數: frontend, bakcend socket,及 capture socket

espresso 會生 listener thread,該 thread 會讀取 PAIR socket,並列印取得的 message,PAIR socket 是 pipe 的一端,另一端是傳給 zmq_proxy() 的 socket,實際上,可以過濾 messages,並取得有興趣的訊息。

espresso.py

# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#

import time

from random import randint
from string import ascii_uppercase as uppercase
from threading import Thread

import zmq
from zmq.devices import monitored_queue

from zhelpers import zpipe

# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.

def subscriber_thread():
    ctx = zmq.Context.instance()

    # Subscribe to "A" and "B"
    subscriber = ctx.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:6001")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

    count = 0
    while True:
        try:
            [msg] = subscriber.recv_multipart()
            print ("  subscriber: {}".format(msg))
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        count += 1

    print ("Subscriber received %d messages" % count)


# publisher thread
# The publisher sends random messages starting with A-J:

def publisher_thread():
    ctx = zmq.Context.instance()

    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:6000")

    while True:
        cateint = uppercase[randint(0,10)]
        cate = "%s" % (cateint)
        string = "%s-%05d" % (cateint, randint(0,100000))
        try:
            print ("  publisher: {}".format(string.encode('utf-8')))
            publisher.send(string.encode('utf-8'))
            # publisher.send_multipart( [cate.encode(), string.encode()] )
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted
            else:
                raise
        time.sleep(0.1)         # Wait for 1/10th second


# listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:

def listener_thread (pipe):

    # Print everything that arrives on pipe
    while True:
        try:
            print (pipe.recv_multipart())
        except zmq.ZMQError as e:
            if e.errno == zmq.ETERM:
                break           # Interrupted


# main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:

def main ():

    # Start child threads
    ctx = zmq.Context.instance()
    p_thread = Thread(target=publisher_thread)
    s_thread = Thread(target=subscriber_thread)
    p_thread.start()
    s_thread.start()

    pipe = zpipe(ctx)

    subscriber = ctx.socket(zmq.XSUB)
    subscriber.connect("tcp://localhost:6000")

    publisher = ctx.socket(zmq.XPUB)
    publisher.bind("tcp://*:6001")

    l_thread = Thread(target=listener_thread, args=(pipe[1],))
    l_thread.start()

    try:
        monitored_queue(subscriber, publisher, pipe[0], b'pub', b'sub')
    except KeyboardInterrupt:
        print ("Interrupted")

    del subscriber, publisher, pipe
    ctx.term()

if __name__ == '__main__':
    main()

執行結果

$ python espresso.py
  publisher: b'H-15364'
[b'sub', b'\x01A']
[b'sub', b'\x01B']
  publisher: b'D-22262'
  publisher: b'I-36691'
  publisher: b'D-61882'
  publisher: b'B-22663'
[b'pub', b'B-22663']
  subscriber: b'B-22663'
  publisher: b'G-79149'

Last Value Caching

ZeroMQ 的 pub-sub 少了一些 pub-sub 系統的功能,其中一個是 LVC: last value caching,這可解決新的 subscriber 加入時,取得資料的問題。publisher 會在新的 subscriber 加入時,重新廣播一次該 subcriber 註冊的 topic 的最新訊息。

在大型 pub-sub 網路,需要類似 PGM(Pragmatic General Multicast) 的protocol,如果想要用 TCP unicast 處理上千個 subscribers,會遇到問題。

PGM 是單向 protocol: publisher 發送訊息到 switch 的 multicast address,廣播到所有需要的 subscribers。publisher 不會知道 subscribers join/leave,這都在 network switch 上處理。

在幾十個 subscribers,有限 topics 的網路環境,可使用 TCP 及 XSUB, XPUB。

也可以用 ZeroMQ 實作 LVC 功能,就是在 publisher, subscribers 之間增加一個 proxy,類似 PGM 的 switch。

有個產生問題的例子,publisher 會對上千個 topics 發送訊息,每秒對 random topic 發送一個,在 subscriber 連線後,因為沒有 LVC,subscriber 平均要等 500s 才有第一個訊息。

pathopub.py: pathologic publisher

#
# Pathological publisher
# Sends out 1,000 topics and then one random update per second
#

import sys
import time

from random import randint

import zmq

def main(url=None):
    ctx = zmq.Context.instance()
    publisher = ctx.socket(zmq.PUB)
    if url:
        publisher.bind(url)
    else:
        publisher.bind("tcp://*:5556")
    # Ensure subscriber connection has time to complete
    print("bind to {}".format(url))
    time.sleep(1)

    # Send out all 1,000 topic messages
    for topic_nbr in range(1000):
        publisher.send_multipart([
            b"%03d" % topic_nbr,
            b"Save Roger",
        ])

    while True:
        # Send one random update per second
        try:
            time.sleep(1)
            publisher.send_multipart([
                b"%03d" % randint(0,999),
                b"Off with his head!",
            ])
        except KeyboardInterrupt:
            print( "interrupted" )
            break

if __name__ == '__main__':
    main(sys.argv[1] if len(sys.argv) > 1 else None)

pathosub.py

#
# Pathological subscriber
# Subscribes to one random topic and prints received messages
#

import sys
import time

from random import randint

import zmq

def main(url=None):
    ctx = zmq.Context.instance()
    subscriber = ctx.socket(zmq.SUB)
    if url is None:
        url = "tcp://localhost:5556"

    print("connect to {}".format(url))
    subscriber.connect(url)

    subscription = b"%03d" % randint(0,999)
    print("subscription to {}".format(subscription))
    subscriber.setsockopt(zmq.SUBSCRIBE, subscription)

    while True:
        topic, data = subscriber.recv_multipart()
        assert topic == subscription
        print( data )

if __name__ == '__main__':
    main(sys.argv[1] if len(sys.argv) > 1 else None)

執行結果

$ python pathopub.py

$ python pathosub.py
b'Save Roger'
b'Off with his head!'

在 pathopub 與 pathosub 中間加上 lvache

lvcache.py: Last Value Caching Proxy

#
# Last value cache
# Uses XPUB subscription messages to re-send data
#

import zmq

def main():
    ctx = zmq.Context.instance()
    frontend = ctx.socket(zmq.SUB)
    frontend.connect("tcp://localhost:5557")
    backend = ctx.socket(zmq.XPUB)
    backend.bind("tcp://*:5558")

    # Subscribe to every single topic from publisher
    frontend.setsockopt(zmq.SUBSCRIBE, b"")

    # Store last instance of each topic in a cache
    cache = {}

    # main poll loop
    # We route topic updates from frontend to backend, and
    # we handle subscriptions by sending whatever we cached,
    # if anything:
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
    while True:

        try:
            events = dict(poller.poll(1000))
        except KeyboardInterrupt:
            print("interrupted")
            break

        # Any new topic data we cache and then forward
        if frontend in events:
            msg = frontend.recv_multipart()
            topic, current = msg
            # print("cache topic: {} msg: {}".format(topic, current))
            cache[topic] = current
            backend.send_multipart(msg)

        # handle subscriptions
        # When we get a new subscription we pull data from the cache:
        if backend in events:
            event = backend.recv()
            # event=b'\x01188'
            # print( "event={}".format(event) )
            # Event is one byte 0=unsub or 1=sub, followed by topic
            if( event[0:1] == b'\x01' ):
                topic = event[1:]
                # print ("topic={}".format(topic))
                if topic in cache:
                    print ("Sending cached topic %s" % topic)
                    backend.send_multipart([ topic, cache[topic] ])

if __name__ == '__main__':
    main()

執行結果

$ python lvcache.py
event=b'\x01'
topic=b'708'
Sending cached topic b'708'

$ python pathopub.py tcp://*:5557
bind to tcp://*:5557

$ python pathosub.py tcp://localhost:5558
connect to tcp://localhost:5558
subscription to b'708'
b'Save Roger'

Suicidal Snail pattern: Show Subscriber Detection

pub-sub 系統常見問題: slow subscriber。當 publisher 以全速發送給 subscribers,subscribers 無法快速處理資料。

處理方法是

  • Queue messages on the publisher

    但如果有很多 subscribers,會讓 publisher 記憶體不足而 crash

  • Queue messages on the subscriber

    這個方法比較好,就算 subscriber 記憶體不足而 crash 也沒關係,總比 publisher crash 還好。

  • Stop queuing new messages after a while

    新訊息會被 rejected or dropped,如果 ZeroMQ 在 publisher 設定了 HWM,就會這樣處理,但還是無法解決 slow subscriber 的問題。這會讓 message stream 發生 gap。

  • Punish slow subscribers with disconnect

    ZeroMQ 無法這樣處理,因為 subcribers are invisible to publishers

如果不讓 publisher 斷線,應該讓 subscriber kill itself。這就是 Suicide Snail Pattern

當 subscriber 偵測到自己運作太慢,就會將自己斷線。

偵測的方法是,利用 sequences message 以及在 publisher 設定 HWM。如果 subscriber 偵測到 gap,就知道發生問題了。

這樣做有兩個問題:(1) 如果有多個 publishers, 要如何設定 sequence messages?解決方式是將每一個 publisher 都設定 unique ID,加到 sequencing 上面。(2) 如果 subscribers 使用了 ZMQ_SUBSCRIBER filters,欲設定會取得 gap。

Suicide Snail Pattern 用在 subscribers 有個各自的 clinets 及 service-level agreements,且需要保證有 maximum latency。雖然將 client 斷線,並不是保證 maxmimum latency 的好方法,但這是 assertion model,如果 client 斷線,可解決問題。讓 late data 持續運作,可能會造成更嚴重的問題。

suisnail.py

"""
Suicidal Snail
"""
from __future__ import print_function
import sys
import threading
import time
from pickle import dumps, loads
import random

import zmq

from zhelpers import zpipe

# ---------------------------------------------------------------------
# This is our subscriber
# It connects to the publisher and subscribes to everything. It
# sleeps for a short time between messages to simulate doing too
# much work. If a message is more than 1 second late, it croaks.

MAX_ALLOWED_DELAY = 1.0    # secs

def subscriber(pipe):
    # Subscribe to everything
    ctx = zmq.Context.instance()
    sub = ctx.socket(zmq.SUB)
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    sub.connect("tcp://localhost:5556")

    # Get and process messages
    while True:
        clock = loads(sub.recv())
        # Suicide snail logic
        if (time.time() - clock > MAX_ALLOWED_DELAY):
            print("E: subscriber cannot keep up, aborting", file=sys.stderr)
            break

        # Work for 1 msec plus some random additional time
        time.sleep(1e-3 * (1+2*random.random()))
    pipe.send(b"gone and died")


# ---------------------------------------------------------------------
# This is our server task
# It publishes a time-stamped message to its pub socket every 1ms.

def publisher(pipe):
    # Prepare publisher
    ctx = zmq.Context.instance()
    pub = ctx.socket(zmq.PUB)
    pub.bind("tcp://*:5556")

    while True:
        # Send current clock (secs) to subscribers
        pub.send(dumps(time.time()))
        try:
            signal = pipe.recv(zmq.DONTWAIT)
        except zmq.ZMQError as e:
            if e.errno == zmq.EAGAIN:
                # nothing to recv
                pass
            else:
                raise
        else:
            # received break message
            break
        time.sleep(1e-3)            # 1msec wait


# This main thread simply starts a client, and a server, and then
# waits for the client to signal it's died.

def main():
    ctx = zmq.Context.instance()
    pub_pipe, pub_peer = zpipe(ctx)
    sub_pipe, sub_peer = zpipe(ctx)

    pub_thread = threading.Thread(target=publisher, args=(pub_peer,))
    pub_thread.daemon=True
    pub_thread.start()
    sub_thread = threading.Thread(target=subscriber, args=(sub_peer,))
    sub_thread.daemon=True
    sub_thread.start()
    # wait for sub to finish
    sub_pipe.recv()
    # tell pub to halt
    pub_pipe.send(b"break")
    time.sleep(0.1)

if __name__ == '__main__':
    main()

notes:

  • message 包含了 system clock as a number of ms。實際上必須至少有一個 timestamp message header 及 data message body
  • subscriber 及 publisher 放在單一 process,兩個 threads。實際上,應該分屬兩個 processes,因為是 demo 方便才寫成 thread

Black Box pattern: HighSpeed Subscribers

尋找一個讓 subscribers 更快的方法。最常見的 pub-sub 使用範例是發布 market data from stock exchanges,通常會設定一個 publisher 連接到 stock exchange,取得 prices quotes,發送給多個 subscribers,少量 subscribers 可使用 TCP,大量 subscribers 可使用 reliable multicast 例如 PGM。

先想像有 feed 平均每秒產生 100,000 100 bytes messages,這是過濾掉不需要發送給 subscriber 的資料速度,現在我們需要記錄所有資料,8 hrs 大約是 250GB,並在模擬環境中,重新 replay。ZeroMQ application 可以每秒處理 100K messages,但我們希望用更快的速度處理他們。

我們設計一個架構,有些速度很快的機器,一個給 publisher 其他每一個 subscriber 使用一個 box。

當我們將資料傳給 subscribers 要注意:

  1. 如果對 message 處理一些工作,都會降低 subscriber 的速度,直到他無法跟上 publisher 發送的速度。
  2. 在 publisher 及 subscribers 要達到最快速度,大約是每秒 6M messages

首先要將 subscriber 拆到 multithreaded design,以便以多個 thread 運作,reading message 放在另一個 thread。通常不會對每個訊息都用相同的方法處理,subscriber 會以 prefix key 過濾訊息,如果 message 符合條件,subscriber 就會呼叫 worker 處理它,就是在 ZeroMQ 發送訊息給 worker thread

subscriber 就像是 queue device,可用多個 sockets 連接 subscriber 及 workers,先假設是 one-way traffic 且 worker 都是一樣的,可使用 PUSH, PULL 方式實作。

subscriber 以 TCP/PGM 跟 publisher 溝通,subscriber 以 inproc:// 跟 workers 溝通。

在每一個 subscriber thread 使用 100% CPU 效能後,因為是一個 thread,無法用到多個 CPU core,我們可以將工作平行分配給多個 threads。

這個方法稱為 sharding。可將工作分為 parallel and independent streams,例如一半 topic key 在一個 stream,另一半在另一個 stream,但 performance 只會因為有多餘的 CPU core 而增加。

sharding 為 2 streams 的方法:

  • 使用 2 I/O threads
  • 使用 2 network interfaces (NIC), one per subscriber
  • 每個 I/O thread 使用特定的 NIC
  • 2 subscriber thread 各自使用專屬的 cores
  • 每一個 subscriber thread 使用 2 SUB sockets
  • 將其他 core 指定給 worker threads
  • worker thread 連接到兩個 subscriber PUSH sockets

最好的情況是,我們以 full-loaded thread 使用到所有 CPU cores,當 threads 開始競爭 cores 及 CPU cycles,就會減慢速度。

Clone pattern: Reliable PubSub

目標是讓多個 applications 共用 some common state

  • 有上千個 applications
  • 可任意 join/leave network
  • applications 必須共用 single eventually-consistent state
  • 任一個 application 都可在任何時間點修改 state
  • state 可儲存於記憶體

有些實際應用案例:

  • 由 cloud servers 共享的 configuration
  • 由一些 players 共享的 game state
  • exchange rate data 即時更新,且提供給 applications 使用
Centralized Versus Decentralized

首先要決定要不要有 central server

  • central server 容易瞭解,網路不對稱。使用 central server,可避免 discovery, bind vs connect 等問題
  • full-distributed 架構比較不容易實作,但最終會有一個簡單的 protocol。每個 node 都可被指定作為 server 及 client。這就是 Freelance pattern (chap 4)
  • central server 在大量使用時,會變成 bottleneck,應該使用 decentralization,可在每秒處理 millions of messages。
  • centralized 架構比較容易增加多個 nodes,因為將 10,000 nodes 連到一個 server 會比互相連線來得容易。

clone pattern: 一個發布 state update 的 server,一些 client applications

Represneting State as Key-Value Pairs

接下來一步一步實作 Clone pattern

首先是如何將 shared state 分享給一些 clinets,要先決定如何描述 state,最簡單的方式是:key-value store,一個 key-value 代表一組 shared state

chap1 已有一個 pub-sub weather srever/client,將它改為 key-value pair,client 以 hash table 儲存

update

  • a new key-value pair
  • a modified value for existing key
  • a deleted key

先假設 state 可完全儲存在 memory

clonecli1.py

"""
Clone Client Model One

Author: Min RK <benjaminrk@gmail.com>

"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    updates = ctx.socket(zmq.SUB)
    updates.linger = 0
    updates.setsockopt(zmq.SUBSCRIBE, b'')
    updates.connect("tcp://localhost:5556")

    kvmap = {}
    sequence = 0

    while True:
        try:
            kvmsg = KVMsg.recv(updates)
        except:
            break # Interrupted
        kvmsg.store(kvmap)
        sequence += 1
    print( "Interrupted\n%d messages in" % sequence )


if __name__ == '__main__':
    main()

clonesrv1.py

"""
Clone server Model One

"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)

    publisher.bind("tcp://*:5556")
    time.sleep(0.2)

    sequence = 0
    random.seed(time.time())
    kvmap = {}

    try:
        while True:
            # Distribute as key-value message
            sequence += 1
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
    except KeyboardInterrupt:
        print( " Interrupted\n%d messages out" % sequence )

if __name__ == '__main__':
    main()

kvsimple.py

"""
=====================================================================
kvsimple - simple key-value message class for example applications
"""

import struct # for packing integers
import sys

import zmq

class KVMsg(object):
    """
    Message is formatted on wire as 3 frames:
    frame 0: key (0MQ string)
    frame 1: sequence (8 bytes, network order)
    frame 2: body (blob)
    """
    key = None # key (string)
    sequence = 0 # int
    body = None # blob

    def __init__(self, sequence, key=None, body=None):
        assert isinstance(sequence, int)
        self.sequence = sequence
        self.key = key
        self.body = body

    def store(self, dikt):
        """Store me in a dict if I have anything to store"""
        # this seems weird to check, but it's what the C example does
        if self.key is not None and self.body is not None:
            dikt[self.key] = self

    def send(self, socket):
        """Send key-value message to socket; any empty frames are sent as such."""
        key = b'' if self.key is None else self.key
        seq_s = struct.pack('!l', self.sequence)
        body = b'' if self.body is None else self.body
        socket.send_multipart([ key, seq_s, body ])

    @classmethod
    def recv(cls, socket):
        """Reads key-value message from socket, returns new kvmsg instance."""
        key, seq_s, body = socket.recv_multipart()
        key = key if key else None
        seq = struct.unpack('!l',seq_s)[0]
        body = body if body else None
        return cls(seq, key=key, body=body)

    def dump(self):
        if self.body is None:
            size = 0
            data='NULL'
        else:
            size = len(self.body)
            data=repr(self.body)
        print("[seq:{seq}][key:{key}][size:{size}] {data}".format(
            seq=self.sequence,
            key=self.key,
            size=size,
            data=data,
        ))

# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
    print( " * kvmsg: ")

    # Prepare our context and sockets
    ctx = zmq.Context()
    output = ctx.socket(zmq.DEALER)
    output.bind("ipc://kvmsg_selftest.ipc")
    input = ctx.socket(zmq.DEALER)
    input.connect("ipc://kvmsg_selftest.ipc")

    kvmap = {}
    # Test send and receive of simple message
    kvmsg = KVMsg(1)
    kvmsg.key = b"key"
    kvmsg.body = b"body"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg.store(kvmap)

    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    assert kvmsg2.key == b"key"
    kvmsg2.store(kvmap)

    assert len(kvmap) == 1 # shouldn't be different

    print( "OK")

if __name__ == '__main__':
    test_kvmsg('-v' in sys.argv)
  • 主要工作都放在 kvmsg class,處理 key-value message objects,這是 multipart ZeroMQ message 有 3 frames: key, sequence number (64 bits, network byte order), binary body(可存任意資料)

  • server 會產生 random 4-digit key,模擬大量 hash table

  • 還沒實作 delete,只有 insert/update

  • server 在 bind socket 後有 200 ms pause,這是為了避免遇到 slow joiner syndrome,subscriber 會在連線到 server socket 前,遺失一些訊息

  • server/client 都有 hash tables,目前的版本只適合在啟動 server 前先啟動所有 clients,且 clients 永遠不會 crash

Getting an Out-of-Band Snapshot

如何處理 slow-joiner clients,或是 client crash and restart

為了讓 late(recovering) client 能夠追上 server state,需要取得 server's state 的 snapshot,接下來會把 "message" 代表 "sequenced key-value pair","state" 代表 "hash table"。為了取得 server state,client 會使用 DEALER 詢問 server

在 client 實作 synchronization

  • client 會先註冊 updates,並發送 state request,這會保證 state 會比 oldest update 還要新
  • client 會等待 server reply state,同時會 queue all updates,可透過不讀取 ZeroMQ socket 完成,會 queue 在 ZeroMQ
  • 當 client 接收了 state update,後會開始讀取 updates,但會丟棄所有比 state update 還舊的資料
  • client 會在 state snapshot 中,更新 updates

clonesrv2.py

"""
Clone server Model Two

"""
import random
import threading
import time

import zmq

from kvsimple import KVMsg
from zhelpers import zpipe

def main():
    # Prepare our context and publisher socket
    ctx = zmq.Context()
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")

    updates, peer = zpipe(ctx)

    manager_thread = threading.Thread(target=state_manager, args=(ctx,peer))
    manager_thread.daemon=True
    manager_thread.start()


    sequence = 0
    random.seed(time.time())

    try:
        while True:
            # Distribute as key-value message
            sequence += 1
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.send(updates)
    except KeyboardInterrupt:
        print( " Interrupted\n%d messages out" % sequence )

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity):
        self.socket = socket # ROUTER socket to send to
        self.identity = identity # Identity of peer who requested state

def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket

    Hash item data is our kvmsg object, ready to send
    """
    # Send identity of recipient first
    route.socket.send(route.identity, zmq.SNDMORE)
    kvmsg.send(route.socket)


def state_manager(ctx, pipe):
    """This thread maintains the state and handles requests from clients for snapshots.
    """
    kvmap = {}
    pipe.send(b"READY")
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")

    poller = zmq.Poller()
    poller.register(pipe, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)

    sequence = 0       # Current snapshot version number
    while True:
        try:
            items = dict(poller.poll())
        except (zmq.ZMQError, KeyboardInterrupt):
            break # interrupt/context shutdown

        # Apply state update from main thread
        if pipe in items:
            kvmsg = KVMsg.recv(pipe)
            sequence = kvmsg.sequence
            kvmsg.store(kvmap)
        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity = msg[0]
            request = msg[1]
            if request == b"ICANHAZ?":
                pass
            else:
                print( "E: bad request, aborting\n")
                break

            # Send state snapshot to client
            route = Route(snapshot, identity)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print( "Sending state shapshot=%d\n" % sequence)
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = b""
            kvmsg.send(snapshot)

if __name__ == '__main__':
    main()

clonecli2.py

"""
Clone client Model Two

"""

import time

import zmq

from kvsimple import KVMsg

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    subscriber.connect("tcp://localhost:5557")

    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send(b"ICANHAZ?")
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            break;          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print( "Received snapshot=%d" % sequence )
            break          # Done
        kvmsg.store(kvmap)

    # Now apply pending updates, discard out-of-sequence messages
    while True:
        try:
            kvmsg = KVMsg.recv(subscriber)
        except:
            break          # Interrupted
        if kvmsg.sequence > sequence:
            sequence = kvmsg.sequence
            kvmsg.store(kvmap)

if __name__ == '__main__':
    main()

notes:

  • server 使用兩個 sockets,一個 thread random 產生 updates,並發送到 main PUB socket,另一個 thread 以 ROUTER 處理 state requests,這兩個會透過 PAIR socket 互相溝通
  • client 很簡單,大部分在 kvmsg class 實作
  • 只用簡單的方法 serializing state,hash table 會儲存 kvmsg objects,server 會在 client 要求 state 時,發送 a batch of messages。如果有多個 client 要求 state,每一個會取得不同的 snapshot
  • 假設 client 只會跟一個 server 溝通,server 會一直運作,不會 crash
Republishing Updates from Clients

第二個 model 修改了來自 server 的 key-value store,這是 centralizd model,可用在有個設定檔要發布給 clients,每一個 node 都有 local cache 的狀況。

另一個 model 是由 client 取得 updates,不是 server,這會讓 server 變成 stateless broker,優點是

  • 不需要擔心 server reliability,crash 後可啟動新的 instance 並提供新的 values
  • 可使用 key-value store 在 active peers 之間共享 knowledge

為了從 client 發送 updates,需要用到 PUSH-PULL socket pattern

為什麼不讓 clients 直接互相更新 updates?因為這會降低 latency,並失去consistency。如果讓接收端根據接收順序修改 sate,會造成 state 不同步。

應付 state change 同時發生在多個 nodes 的狀況,使用 centralizing all change 的方法,不管client 產生 update 的時間點,都會發布到 server,強制以server接收 update 的順序作為 update 基準。

為了協調 changes,server 可對 all updates 增加
unique sequence number,client 會偵測 nastier failure,包含 network congestion及 queue overflow。如果 client 發現 message stream 失序,就可以採取對應措施。

讓 client 跟 server 要求遺失的 message,看起來有效,但實際上沒有用。因為失序可能是 network stress 造成,要求重送會讓網路更繁忙。client 可警示 user,發生異常,會 stop 並在人為介入後,進行 manual restart。

clonesrv3.py

"""
Clone server Model Three

"""

import zmq

from kvsimple import KVMsg

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity):
        self.socket = socket # ROUTER socket to send to
        self.identity = identity # Identity of peer who requested state

def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # Send identity of recipient first
    route.socket.send(route.identity, zmq.SNDMORE)
    kvmsg.send(route.socket)

def main():
    # context and sockets
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    sequence = 0
    kvmap = {}

    poller = zmq.Poller()
    poller.register(collector, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)
    while True:
        try:
            items = dict(poller.poll(1000))
        except:
            break           # Interrupted

        # Apply state update sent from client
        if collector in items:
            kvmsg = KVMsg.recv(collector)
            sequence += 1
            kvmsg.sequence = sequence
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            print( "I: publishing update %5d" % sequence )

        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity = msg[0]
            request = msg[1]
            if request == b"ICANHAZ?":
                pass
            else:
                print( "E: bad request, aborting\n" )
                break

            # Send state snapshot to client
            route = Route(snapshot, identity)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print( "Sending state shapshot=%d\n" % sequence )
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = b""
            kvmsg.send(snapshot)

    print( " Interrupted\n%d messages handled" % sequence )


if __name__ == '__main__':
    main()

clonecli3.py

"""
Clone client Model Three
"""

import random
import time

import zmq

from kvsimple import KVMsg

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send(b"ICANHAZ?")
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            return          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print( "I: Received snapshot=%d" % sequence )
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                print( "I: received update=%d" % sequence )

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print( " Interrupted\n%d messages in" % sequence )

if __name__ == '__main__':
    main()
  • server 封裝為單一 task,以 PULL 管理 incoming updates,ROUTER 管理 state requests,PUB 管理 outgoing updates
  • client 會使用 tickless timer 發送 random update 給 server,每秒一次,實際應用上,應該由 application code 驅動 updates。
 ##### Working with Subtrees

當 client 數量增加,shared store 的 size 也會增加,這時候不適合將所有資料都送給 clients。

當使用 shared store,某些 client 只處理部分 store,稱為 subtree。client 會在 state request 只要求要 subtree。

表示 subtree 的語法有很多種

  1. path hierarchy: /some/list/of/paths
  2. topic tree: some.list.of.topics

範例使用 path hierarchy,並擴充 client/server 讓 client 可跟 single subtree 運作。

clonesrv4.py

"""
Clone server Model Four

"""

import zmq

from kvsimple import KVMsg

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

def main():
    # context and sockets
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.ROUTER)
    snapshot.bind("tcp://*:5556")
    publisher = ctx.socket(zmq.PUB)
    publisher.bind("tcp://*:5557")
    collector = ctx.socket(zmq.PULL)
    collector.bind("tcp://*:5558")

    sequence = 0
    kvmap = {}

    poller = zmq.Poller()
    poller.register(collector, zmq.POLLIN)
    poller.register(snapshot, zmq.POLLIN)
    while True:
        try:
            items = dict(poller.poll(1000))
        except:
            break           # Interrupted

        # Apply state update sent from client
        if collector in items:
            kvmsg = KVMsg.recv(collector)
            sequence += 1
            kvmsg.sequence = sequence
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            print( "I: publishing update %5d" % sequence )

        # Execute state snapshot request
        if snapshot in items:
            msg = snapshot.recv_multipart()
            identity, request, subtree = msg
            if request == b"ICANHAZ?":
                pass
            else:
                print( "E: bad request, aborting\n" )
                break

            # Send state snapshot to client
            route = Route(snapshot, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            print( "Sending state shapshot=%d\n" % sequence )
            snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(snapshot)

    print( " Interrupted\n%d messages handled" % sequence )


if __name__ == '__main__':
    main()

clonecli4.py

"""
Clone client Model Four

"""

import random
import time

import zmq

from kvsimple import KVMsg

SUBTREE = b"/client/"

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE)
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send_multipart([b"ICANHAZ?", SUBTREE])
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            raise
            return          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print( "I: Received snapshot=%d" % sequence )
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                print( "I: received update=%d" % sequence )

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = SUBTREE + b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print( " Interrupted\n%d messages in" % sequence )

if __name__ == '__main__':
    main()
Ephemeral Values

ephermeral value 是沒有定時 refresh 時會自動 expire 的數值,例如 a node joing network,發布 address,並定時更新,但如果 node crash,該 adress 最終會被移除。

通常會將 ephermeral value 附加於 session,並在該 session 終止時被刪除。在 Clone,session 是由 client 定義,並會在 client 終止時一併停止,一個簡單的做法是,在 ephemeral value 附加 time to live (TTL),server 可用來判斷是否使用了 expired value。

實作 ephemeral value 首先要找到在 key-value message 裡面 encode TTL 的方法。可增加 frame,但問題在於只要想增加新的 property,就要修改 message structure,這會破壞相容性。因此改用 properties frame 實作。

另外,需要一個 "delete this value" 的方法,目前 server 及 client 都只有 insert/update 的方式,可增加 value 為 empty 時,就是 "delete"

以下是實作了 propeties frame (增加了 UUID frame),並能處理 delete 的 kvmsg.py

"""
=====================================================================
kvmsg - key-value message class for example applications
"""

import struct # for packing integers
import sys
from uuid import uuid4

import zmq
# zmq.jsonapi ensures bytes, instead of unicode:

def encode_properties(properties_dict):
    prop_s = b""
    for key, value in properties_dict.items():
        prop_s += b"%s=%s\n" % (key, value)
    return prop_s


def decode_properties(prop_s):
    prop = {}
    line_array = prop_s.split(b"\n")

    for line in line_array:
        try:
            key, value = line.split(b"=")
            prop[key] = value
        except ValueError as e:
            #Catch empty line
            pass

    return prop

class KVMsg(object):
    """
    Message is formatted on wire as 5 frames:
    frame 0: key (0MQ string)
    frame 1: sequence (8 bytes, network order)
    frame 2: uuid (blob, 16 bytes)
    frame 3: properties (0MQ string)
    frame 4: body (blob)
    """
    key = None
    sequence = 0
    uuid=None
    properties = None
    body = None

    def __init__(self, sequence, uuid=None, key=None, properties=None, body=None):
        assert isinstance(sequence, int)
        self.sequence = sequence
        if uuid is None:
            uuid = uuid4().bytes
        self.uuid = uuid
        self.key = key
        self.properties = {} if properties is None else properties
        self.body = body

    # dictionary access maps to properties:
    def __getitem__(self, k):
        return self.properties[k]

    def __setitem__(self, k, v):
        self.properties[k] = v

    def get(self, k, default=None):
        return self.properties.get(k, default)

    def store(self, dikt):
        """Store me in a dict if I have anything to store
        else delete me from the dict."""
        if self.key is not None and self.body is not None:
            dikt[self.key] = self
        elif self.key in dikt:
            del dikt[self.key]

    def send(self, socket):
        """Send key-value message to socket; any empty frames are sent as such."""
        key = b'' if self.key is None else self.key
        seq_s = struct.pack('!q', self.sequence)
        body = b'' if self.body is None else self.body
        prop_s = encode_properties(self.properties)
        socket.send_multipart([ key, seq_s, self.uuid, prop_s, body ])

    @classmethod
    def recv(cls, socket):
        """Reads key-value message from socket, returns new kvmsg instance."""
        return cls.from_msg(socket.recv_multipart())

    @classmethod
    def from_msg(cls, msg):
        """Construct key-value message from a multipart message"""
        key, seq_s, uuid, prop_s, body = msg
        key = key if key else None
        seq = struct.unpack('!q',seq_s)[0]
        body = body if body else None
        prop = decode_properties(prop_s)
        return cls(seq, uuid=uuid, key=key, properties=prop, body=body)

    def __repr__(self):
        if self.body is None:
            size = 0
            data=b'NULL'
        else:
            size = len(self.body)
            data = repr(self.body)

        mstr = "[seq:{seq}][key:{key}][size:{size}][props:{props}][data:{data}]".format(
            seq=self.sequence,
            # uuid=hexlify(self.uuid),
            key=self.key,
            size=size,
            props=encode_properties(self.properties),
            data=data,
        )
        return mstr


    def dump(self):
        print("<<", str(self), ">>", file=sys.stderr)
# ---------------------------------------------------------------------
# Runs self test of class

def test_kvmsg (verbose):
    print(" * kvmsg: ", end='')

    # Prepare our context and sockets
    ctx = zmq.Context()
    output = ctx.socket(zmq.DEALER)
    output.bind("ipc://kvmsg_selftest.ipc")
    input = ctx.socket(zmq.DEALER)
    input.connect("ipc://kvmsg_selftest.ipc")

    kvmap = {}
    # Test send and receive of simple message
    kvmsg = KVMsg(1)
    kvmsg.key = b"key"
    kvmsg.body = b"body"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg.store(kvmap)

    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    assert kvmsg2.key == b"key"
    kvmsg2.store(kvmap)

    assert len(kvmap) == 1 # shouldn't be different

    # test send/recv with properties:
    kvmsg = KVMsg(2, key=b"key", body=b"body")
    kvmsg[b"prop1"] = b"value1"
    kvmsg[b"prop2"] = b"value2"
    kvmsg[b"prop3"] = b"value3"
    assert kvmsg[b"prop1"] == b"value1"
    if verbose:
        kvmsg.dump()
    kvmsg.send(output)
    kvmsg2 = KVMsg.recv(input)
    if verbose:
        kvmsg2.dump()
    # ensure properties were preserved
    assert kvmsg2.key == kvmsg.key
    assert kvmsg2.body == kvmsg.body
    assert kvmsg2.properties == kvmsg.properties
    assert kvmsg2[b"prop2"] == kvmsg[b"prop2"]

    print("OK")

if __name__ == '__main__':
    test_kvmsg('-v' in sys.argv)
$ python kvmsg.py  -v
<< [seq:1][key:b'key'][size:4][props:b''][data:b'body'] >>
<< [seq:1][key:b'key'][size:4][props:b''][data:b'body'] >>
<< [seq:2][key:b'key'][size:4][props:b'prop1=value1\nprop2=value2\nprop3=value3\n'][data:b'body'] >>
<< [seq:2][key:b'key'][size:4][props:b'prop1=value1\nprop2=value2\nprop3=value3\n'][data:b'body'] >>
 * kvmsg: OK
使用 Reactor

目前都是在 server 使用 pool loop,接下來改用 reactor,C 是用 CZMQ 的 zloop,使用 reactor 會讓程式更容易了解。

用單一 thread,並傳送 server object 給 reactor handlers,可讓 server 使用 multithreads,每一個處理一個 socket or timer,但最好是不在 thread 之間share data,目前是以 server 的 hashmap 為核心,因此單一 thread 會比較簡單。

有三個 reactor handler:

  1. 一個處理來自 ROUTER socket 的 snapshot request
  2. 一個處理來自 PULL socket 的 incoming updates from clients
  3. 一個處理 expred ephemeral values (TTL 逾時)

clonesrv5.py

"""
Clone server Model Five

"""

import logging
import time

import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

class CloneServer(object):

    # Our server is defined by these properties
    ctx = None                  # Context wrapper
    kvmap = None                # Key-value store
    loop = None                 # IOLoop reactor
    port = None                 # Main port we're working on
    sequence = 0                # How many updates we're at
    snapshot = None             # Handle snapshot requests
    publisher = None            # Publish updates to clients
    collector = None            # Collect updates from clients

    def __init__(self, port=5556):
        self.port = port
        self.ctx = zmq.Context()
        self.kvmap = {}
        self.loop = IOLoop.instance()

        # Set up our clone server sockets
        self.snapshot  = self.ctx.socket(zmq.ROUTER)
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.PULL)
        self.snapshot.bind("tcp://*:%d" % self.port)
        self.publisher.bind("tcp://*:%d" % (self.port + 1))
        self.collector.bind("tcp://*:%d" % (self.port + 2))

        # Wrap sockets in ZMQStreams for IOLoop handlers
        self.snapshot = ZMQStream(self.snapshot)
        self.publisher = ZMQStream(self.publisher)
        self.collector = ZMQStream(self.collector)

        # Register our handlers with reactor
        self.snapshot.on_recv(self.handle_snapshot)
        self.collector.on_recv(self.handle_collect)
        self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)

        # basic log formatting:
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)


    def start(self):
        # Run reactor until process interrupted
        self.flush_callback.start()
        try:
            self.loop.start()
        except KeyboardInterrupt:
            pass

    def handle_snapshot(self, msg):
        """snapshot requests"""
        if len(msg) != 3 or msg[1] != b"ICANHAZ?":
            print("E: bad request, aborting")
            dump(msg)
            self.loop.stop()
            return
        identity, request, subtree = msg
        if subtree:
            # Send state snapshot to client
            route = Route(self.snapshot, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in self.kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            logging.info("I: Sending state shapshot=%d" % self.sequence)
            self.snapshot.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(self.sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(self.snapshot)

    def handle_collect(self, msg):
        """Collect updates from clients"""
        kvmsg = KVMsg.from_msg(msg)
        self.sequence += 1
        kvmsg.sequence = self.sequence
        kvmsg.send(self.publisher)
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl:
            kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
        kvmsg.store(self.kvmap)
        logging.info("I: publishing update=%d", self.sequence)

    def flush_ttl(self):
        """Purge ephemeral values that have expired"""
        for key,kvmsg in list(self.kvmap.items()):
            # used list() to exhaust the iterator before deleting from the dict
            self.flush_single(kvmsg)

    def flush_single(self, kvmsg):
        """If key-value pair has expired, delete it and publish the fact
        to listening clients."""
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl and ttl <= time.time():
            kvmsg.body = b""
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.send(self.publisher)
            del self.kvmap[kvmsg.key]
            logging.info("I: publishing delete=%d", self.sequence)

def main():
    clone = CloneServer()
    clone.start()

if __name__ == '__main__':
    main()

clonecli5.py

"""
Clone client Model Five

"""

import random
import time

import zmq

from kvmsg import KVMsg

SUBTREE = "/client/"

def main():

    # Prepare our context and subscriber
    ctx = zmq.Context()
    snapshot = ctx.socket(zmq.DEALER)
    snapshot.linger = 0
    snapshot.connect("tcp://localhost:5556")
    subscriber = ctx.socket(zmq.SUB)
    subscriber.linger = 0
    subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE.encode())
    subscriber.connect("tcp://localhost:5557")
    publisher = ctx.socket(zmq.PUSH)
    publisher.linger = 0
    publisher.connect("tcp://localhost:5558")

    random.seed(time.time())
    kvmap = {}

    # Get state snapshot
    sequence = 0
    snapshot.send_multipart([b"ICANHAZ?", SUBTREE.encode()])
    while True:
        try:
            kvmsg = KVMsg.recv(snapshot)
        except:
            raise
            return          # Interrupted

        if kvmsg.key == b"KTHXBAI":
            sequence = kvmsg.sequence
            print("I: Received snapshot=%d" % sequence)
            break          # Done
        kvmsg.store(kvmap)

    poller = zmq.Poller()
    poller.register(subscriber, zmq.POLLIN)

    alarm = time.time()+1.
    while True:
        tickless = 1000*max(0, alarm - time.time())
        try:
            items = dict(poller.poll(tickless))
        except:
            break           # Interrupted

        if subscriber in items:
            kvmsg = KVMsg.recv(subscriber)

            # Discard out-of-sequence kvmsgs, incl. heartbeats
            if kvmsg.sequence > sequence:
                sequence = kvmsg.sequence
                kvmsg.store(kvmap)
                action = "update" if kvmsg.body else "delete"
                print("I: received %s=%d" % (action, sequence))

        # If we timed-out, generate a random kvmsg
        if time.time() >= alarm:
            kvmsg = KVMsg(0)
            kvmsg.key = SUBTREE.encode() + b"%d" % random.randint(1,10000)
            kvmsg.body = b"%d" % random.randint(1,1000000)
            kvmsg[b'ttl'] = b"%d" % random.randint(0,30)
            kvmsg.send(publisher)
            kvmsg.store(kvmap)
            alarm = time.time() + 1.

    print(" Interrupted\n%d messages in" % sequence)

if __name__ == '__main__':
    main()
Adding Binary Star Pattern for Reliability

reliability 的定義:

  1. server process crashes 會自動或手動 restart,process 會遺失原本的 state
  2. server machine dies,且已經離線一段時間,client 必須要能切換到備援 server
  3. server process/machine 自網路斷線,例如 switch 掛掉,可能會在任意時間點恢復網路,但 clients 在未復原期間,需要連接到備援 server

第一步是要增加 second server,可使用 chap4 Binary Star Pattern。

要先確保 updates 在 primary server crash 時不會遺失,最簡單的做法,是發送到兩個 servers,backup server 會當作 client,持續取得所有 clients 的 updates,也會取得來自 clients 的 updates,但還不要放到 hash table,先暫存起來。

  • 使用 pub-sub flow 取代 client updates 的 push-pull flow to servers。這邊會持續 fan out updates 到兩台 servers。
  • 增加 server updates 到 clients 的 heartbeats,client 可藉此偵測 primary server 是否 crash,並切換到 backup server
  • 利用 bstar reactor class 連接兩個 servers,Binary Star 藉由 clients 實際呼叫他們認定的 active server,進行 vote 決定 primary server,利用 snapshot requests 作為 voting mechanism
  • 將 update message 都增加 unique UUID 欄位,由 client 產生
  • passive server 會儲存接收自 clients 的 "pending list" of updates,或是來自 active server 的 updates。list 會依照時間順序排列。

以 FSM 設計 client logic

  • client open, connect sockets,對第一個 server 發送 snapshot request,為避免 request storms,每個 server 只會詢問 2 次。
  • client 會從 current server 等待 snapshot data reply,取得後,就會存起來,但如果 timeout 後沒收到 reply,就會 failover 到下一個 server
  • 如果 client 取得 snapshot,等待 updates,但如果 timeout 後沒收到 update,就會 failover 到下一個 server
 client 會 loop forever,但可能發生 some clients 會跟 primary server 溝通,但其他的跟 backup server 溝通,Binary Star state machine 會處理這個問題。

failover 發生程序為

  • client 偵測到 primary server 沒有發送 heartbeats,匯改連接到 backup server 並要求 new state snapshot
  • backup server 開始接收來自 clients 的 snapshot requests,偵測到 primary server dies,接手變成 primary server
  • backup server 將 pending list 寫入 hash table,開始處理 state snapshot requests

當 primary server 恢復 online 時

  • 啟動成為 passive server,連接到 backup server 作為 Clone client
  • 開始透過 SUB socket 接收來自 client 的 updates

有兩點假設

  1. 至少有一個 server 在運作,如果兩個 server 都 crash,就無法復原
  2. 多個 clients 不能同時更新相同的 hash keys,client updates 會以不同的順序到達兩個 servers,因此 backup server 會以 pending list 中跟 primary server 不同的順序套用 updates。但對於某個 client 來說,update 都是以相同順序到達 servers,所以這是安全的機制

clonesrv6.py

"""
Clone server Model Six
"""

import logging
import time

import zmq
from zmq.eventloop.ioloop import PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

from bstar import BinaryStar
from kvmsg import KVMsg
from zhelpers import dump

# simple struct for routing information for a key-value snapshot
class Route:
    def __init__(self, socket, identity, subtree):
        self.socket = socket        # ROUTER socket to send to
        self.identity = identity    # Identity of peer who requested state
        self.subtree = subtree      # Client subtree specification


def send_single(key, kvmsg, route):
    """Send one state snapshot key-value pair to a socket"""
    # check front of key against subscription subtree:
    if kvmsg.key.startswith(route.subtree):
        # Send identity of recipient first
        route.socket.send(route.identity, zmq.SNDMORE)
        kvmsg.send(route.socket)

class CloneServer(object):

    # Our server is defined by these properties
    ctx = None                  # Context wrapper
    kvmap = None                # Key-value store
    bstar = None                # Binary Star
    sequence = 0                # How many updates so far
    port = None                 # Main port we're working on
    peer = None                 # Main port of our peer
    publisher = None            # Publish updates and hugz
    collector = None            # Collect updates from clients
    subscriber = None           # Get updates from peer
    pending = None              # Pending updates from client
    primary = False             # True if we're primary
    master = False              # True if we're master
    slave = False               # True if we're slave

    def __init__(self, primary=True, ports=(5556,5566)):
        self.primary = primary
        if primary:
            self.port, self.peer = ports
            frontend = "tcp://*:5003"
            backend  = "tcp://localhost:5004"
            self.kvmap = {}
        else:
            self.peer, self.port = ports
            frontend = "tcp://*:5004"
            backend  = "tcp://localhost:5003"

        self.ctx = zmq.Context.instance()
        self.pending = []
        self.bstar = BinaryStar(primary, frontend, backend)

        self.bstar.register_voter("tcp://*:%i" % self.port, zmq.ROUTER, self.handle_snapshot)

        # Set up our clone server sockets
        self.publisher = self.ctx.socket(zmq.PUB)
        self.collector = self.ctx.socket(zmq.SUB)
        self.collector.setsockopt(zmq.SUBSCRIBE, b'')
        self.publisher.bind("tcp://*:%d" % (self.port + 1))
        self.collector.bind("tcp://*:%d" % (self.port + 2))

        # Set up our own clone client interface to peer
        self.subscriber = self.ctx.socket(zmq.SUB)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, b'')
        self.subscriber.connect("tcp://localhost:%d" % (self.peer + 1))

        # Register state change handlers
        self.bstar.master_callback = self.become_master
        self.bstar.slave_callback = self.become_slave

        # Wrap sockets in ZMQStreams for IOLoop handlers
        self.publisher = ZMQStream(self.publisher)
        self.subscriber = ZMQStream(self.subscriber)
        self.collector = ZMQStream(self.collector)

        # Register our handlers with reactor
        self.collector.on_recv(self.handle_collect)
        self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)
        self.hugz_callback = PeriodicCallback(self.send_hugz, 1000)

        # basic log formatting:
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)

    def start(self):
        # start periodic callbacks
        self.flush_callback.start()
        self.hugz_callback.start()
        # Run bstar reactor until process interrupted
        try:
            self.bstar.start()
        except KeyboardInterrupt:
            pass

    def handle_snapshot(self, socket, msg):
        """snapshot requests"""
        if msg[1] != b"ICANHAZ?" or len(msg) != 3:
            logging.error("E: bad request, aborting")
            dump(msg)
            self.bstar.loop.stop()
            return
        identity, request = msg[:2]
        if len(msg) >= 3:
            subtree = msg[2]
            # Send state snapshot to client
            route = Route(socket, identity, subtree)

            # For each entry in kvmap, send kvmsg to client
            for k,v in self.kvmap.items():
                send_single(k,v,route)

            # Now send END message with sequence number
            logging.info("I: Sending state shapshot=%d" % self.sequence)
            socket.send(identity, zmq.SNDMORE)
            kvmsg = KVMsg(self.sequence)
            kvmsg.key = b"KTHXBAI"
            kvmsg.body = subtree
            kvmsg.send(socket)

    def handle_collect(self, msg):
        """Collect updates from clients

        If we're master, we apply these to the kvmap
        If we're slave, or unsure, we queue them on our pending list
        """
        kvmsg = KVMsg.from_msg(msg)
        if self.master:
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.send(self.publisher)
            ttl = float(kvmsg.get(b'ttl', 0))
            if ttl:
                kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
            kvmsg.store(self.kvmap)
            logging.info("I: publishing update=%d", self.sequence)
        else:
            # If we already got message from master, drop it, else
            # hold on pending list
            if not self.was_pending(kvmsg):
                self.pending.append(kvmsg)

    def was_pending(self, kvmsg):
        """If message was already on pending list, remove and return True.
        Else return False.
        """
        found = False
        for idx, held in enumerate(self.pending):
            if held.uuid == kvmsg.uuid:
                found = True
                break
        if found:
            self.pending.pop(idx)
        return found

    def flush_ttl(self):
        """Purge ephemeral values that have expired"""
        if self.kvmap:
            for key,kvmsg in list(self.kvmap.items()):
                self.flush_single(kvmsg)

    def flush_single(self, kvmsg):
        """If key-value pair has expired, delete it and publish the fact
        to listening clients."""
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl and ttl <= time.time():
            kvmsg.body = b""
            self.sequence += 1
            kvmsg.sequence = self.sequence
            logging.info("I: preparing to publish delete=%s", kvmsg.properties)
            kvmsg.send(self.publisher)
            del self.kvmap[kvmsg.key]
            logging.info("I: publishing delete=%d", self.sequence)

    def send_hugz(self):
        """Send hugz to anyone listening on the publisher socket"""
        kvmsg = KVMsg(self.sequence)
        kvmsg.key = b"HUGZ"
        kvmsg.body = b""
        kvmsg.send(self.publisher)

    # ---------------------------------------------------------------------
    # State change handlers

    def become_master(self):
        """We're becoming master

        The backup server applies its pending list to its own hash table,
        and then starts to process state snapshot requests.
        """
        self.master = True
        self.slave = False
        # stop receiving subscriber updates while we are master
        self.subscriber.stop_on_recv()

        # Apply pending list to own kvmap
        while self.pending:
            kvmsg = self.pending.pop(0)
            self.sequence += 1
            kvmsg.sequence = self.sequence
            kvmsg.store(self.kvmap)
            logging.info ("I: publishing pending=%d", self.sequence)


    def become_slave(self):
        """We're becoming slave"""
        # clear kvmap
        self.kvmap = None
        self.master = False
        self.slave = True
        self.subscriber.on_recv(self.handle_subscriber)

    def handle_subscriber(self, msg):
        """Collect updates from peer (master)
        We're always slave when we get these updates
        """
        if self.master:
            logging.warn("received subscriber message, but we are master %s", msg)
            return

        # Get state snapshot if necessary
        if self.kvmap is None:
            self.kvmap = {}
            snapshot = self.ctx.socket(zmq.DEALER)
            snapshot.linger = 0
            snapshot.connect("tcp://localhost:%i" % self.peer)

            logging.info ("I: asking for snapshot from: tcp://localhost:%d",
                        self.peer)
            snapshot.send_multipart([b"ICANHAZ?", b''])
            while True:
                try:
                    kvmsg = KVMsg.recv(snapshot)
                except KeyboardInterrupt:
                    # Interrupted
                    self.bstar.loop.stop()
                    return
                if kvmsg.key == b"KTHXBAI":
                    self.sequence = kvmsg.sequence
                    break          # Done
                kvmsg.store(self.kvmap)

            logging.info ("I: received snapshot=%d", self.sequence)

        # Find and remove update off pending list
        kvmsg = KVMsg.from_msg(msg)
        # update float ttl -> timestamp
        ttl = float(kvmsg.get(b'ttl', 0))
        if ttl:
            kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)

        if kvmsg.key != b"HUGZ":
            if not self.was_pending(kvmsg):
                # If master update came before client update, flip it
                # around, store master update (with sequence) on pending
                # list and use to clear client update when it comes later
                self.pending.append(kvmsg)

            # If update is more recent than our kvmap, apply it
            if (kvmsg.sequence > self.sequence):
                self.sequence = kvmsg.sequence
                kvmsg.store(self.kvmap)
                logging.info ("I: received update=%d", self.sequence)


def main():
    import sys
    if '-p' in sys.argv:
        primary = True
    elif '-b' in sys.argv:
        primary = False
    else:
        print("Usage: clonesrv6.py { -p | -b }")
        sys.exit(1)
    clone = CloneServer(primary)
    clone.start()

if __name__ == '__main__':
    main()

clonesrv6 跟 bstarsrv2 一樣,啟動時發生 tornado error,可能要把 tornado 由 5.1 降至 4.5 才行,沒有測試不確定。

有 failover, ephemeral values, subtrees 的功能

reactor-based design 去掉很多多餘的 code。整個 server 以單一 thread 運作,就不需要 inter-thread 的處理,只需要將 structure pointer (self) 傳到所有 handlers。

The Clustered Hashmap Protocol

server 已經整合了 Binary Star Pattern,client 有點複雜,實作前先了解 Clustered Hashmap Protocol

有兩種設計 protocol 方法

  1. 將每個 flow socket flow 分開,這也是先前使用的方法。優點是每個 flow simple and clean,缺點是要管理很多socket flow。使用 reactor 會比較簡單一點,但還是要是當地將程式分開。
  2. 設計一個 protocol 使用單一 socket pair for everything,目前是使用 ROUTER for server,DEALER for clients。這會讓 protocol 比較複雜。 chap7 有另一個 sample 使用 ROUTER-DEALER

現在先了解 CHP protocol spec,要注意 "SHOULD" "MUST" "MAY" 這些關鍵字

  • Goals

讓一組 clients 透過 ZeroMQ network 提供 reliable pub-sub。"hashmap" 是一組 key-value pairs,任一個 client 都可隨時修改任一個 key-value pair,變更後,會傳遞給所有 clients。client 可在任意時間點加入網路。

  • Architecture

CHP 連接一組 client applications 及一組 servers,client 連到 server,client 無法互相連線,可以任意加入或離開網路。

  • Ports and Connections

    server MUST 有三個 ports

    • A SNAPSHOT port (ZeroMQ ROUTER socket) at port number P.
    • A PUBLISHER port (ZeroMQ PUB socket) at port number P + 1.
    • A COLLECTOR port (ZeroMQ SUB socket) at port number P + 2.

    client SHOULD 打開至少 2 connections

    • A SNAPSHOT connection (ZeroMQ DEALER socket) to port number P.
    • A SUBSCRIBER connection (ZeroMQ SUB socket) to port number P + 1.

    client MAY 打開 3rd connection (如果要更新 hashmap)

    • A PUBLISHER connection (ZeroMQ PUB socket) to port number P + 2.
  • State Synchronization

client MUST 在啟動 snapshot connection 時,發送 ICANHAZ command,兩個 frames 都是 ZeroMQ strings,subtree spec MAY be empty,一般是以 / 開始,有多個 path segments,以 / 結束

ICANHAZ command
-----------------------------------
Frame 0: "ICANHAZ?"
Frame 1: subtree specification

server MUST 以 0~多個 KVSYNC command 回應 ICANHAZ command,後面接著 KTHXBAI command。server MUST 以 client 的 identity (由 ICANHAZ 提供) 放在command 的 prefix。sequence number 沒有規定,可以為 0

KVSYNC command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: value, as blob

KTHXBAI 的 sequence number MUST 為前面的 KVSYNC 的 highest sequence number

KTHXBAI command
-----------------------------------
Frame 0: "KTHXBAI"
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: subtree specification

當 client 收到 KTHXBAI,SHOULD 開始接收 messages from its subscriber connection and apply them

  • Server-to-Client Updates

當 server 要為 hashmap 提供 updates,MUST 在 publisher sockets broadcast KVPUB command

KVPUB command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

sequence number MUST 絕對遞增。client MUST 丟棄所有sequence number 沒有大於上一個 KTHXBAI/KVPUB command 的 sequnece number 的 command。

UUID 為 optional,可以為 0 (size 0)。當 properties 欄位為 0,或有多個 "name=value",要以 newline char 分開。如果 key-value pair 沒有任何 properties,properties 可以為 empty。

如果 value 為 empty,client SHOULD delete key-value entry

在遺失 updates 時,server SHOULD 定時(每秒) 發送一個 HUGZ。

HUGZ command
-----------------------------------
Frame 0: "HUGZ"
Frame 1: 00000000
Frame 2: <empty>
Frame 3: <empty>
Frame 4: <empty>

client MAY 將遺失 HUGZ 是為 server crash 的信號

  • Client-to-Server Updates

client 如果有 hashmap 的 updates,MAY 透過 publisher 發送 KVSET 給 server

KVSET command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob

sequence number 可以為 0 UUID SHOULD 為 universally unique identifier

如果 value 為 empty,server MUST delete key-value entry

server SHOULD 接受這個 properties

ttl: 指定 time-to-live in seconds。如果 KVSET 有 ttl property,server 應該在 TTL expired 時,刪除該 key-value pair,並廣播 KVPUB (empty value) 給所有 clients
  • Reliability

    CHP 可使用 dual-server 架構,backup server 在 primary server fails 時會 take over。CHP 並沒有指定 failover 方法,可使用 Binary Star

    為達到 server reliability,client MAY

    • 在每個 KVSET 指令設定 UUID
    • 定時偵測有無收到 HUGZ,將其視為 server failed 的信號
    • 連接到 backup server,並 re-request state synchronization
  • Scalability and Performance

    CHP 可用在上千個 clients,受限於 broker 的系統資源,因為所有 updates 都會經過 single server,最高可每秒處理 millions of updates

  • Security

    CHP 沒有實作 authentication, access control, encryption 機制

Building a Multithread Stack and API

client 要修改成可運作在 background thread,類似 chap 4 Freelance Pattern 最後的 mutithread API 做法。

包含了 frontend object 及 backend agent,以 PAIR socket 連接,可以跟 create thread 整合在一起,可透過 PAIR 發送訊息給該 thread

multithread API

  • constructor 產生 context,並啟動 background thread,以 pipe 連接
  • background 啟動 agent,也就是利用 zmq_poll loop 自 pipe socket 以及 DEALER, SUB 讀取訊息。
  • main application thread 透過 ZeroMQ message 溝通,通常是這個格式
void
clone_connect (clone_t *self, char *address, char *service)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, address);
    zmsg_addstr (msg, service);
    zmsg_send (&msg, self->pipe);
}
  • 如果 method 有 return code,可等待 agent 發送的
reply message
  • 如果 agent 要發送 asynchronous event 給 frontend,要增加 recv method,在 frontend 等待訊息

  • 可expose frontend pipe socket,整合 pool loops,否則 recv 會 block application

clonecli6.py

"""
Clone server Model Six

"""

import random
import time

import zmq

from clone import Clone

SUBTREE = "/client/"

def main():
    # Create and connect clone
    clone = Clone()
    clone.subtree = SUBTREE.encode()
    clone.connect("tcp://localhost", 5556)
    clone.connect("tcp://localhost", 5566)

    try:
        while True:
            # Distribute as key-value message
            key = b"%d" % random.randint(1,10000)
            value = b"%d" % random.randint(1,1000000)
            clone.set(key, value, random.randint(0,30))
            time.sleep(1)
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    main()

clone.py

"""
clone - client-side Clone Pattern class
"""

import logging
import threading
import time

import zmq

from zhelpers import zpipe
from kvmsg import KVMsg

# If no server replies within this time, abandon request
GLOBAL_TIMEOUT  =   4000    # msecs
# Server considered dead if silent for this long
SERVER_TTL      =   5.0     # secs
# Number of servers we will talk to
SERVER_MAX      =   2

# basic log formatting:
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
        level=logging.INFO)

# =====================================================================
# Synchronous part, works in our application thread

class Clone(object):
    ctx = None      # Our Context
    pipe = None     # Pipe through to clone agent
    agent = None    # agent in a thread
    _subtree = None # cache of our subtree value

    def __init__(self):
        self.ctx = zmq.Context()
        self.pipe, peer = zpipe(self.ctx)
        self.agent = threading.Thread(target=clone_agent, args=(self.ctx,peer))
        self.agent.daemon = True
        self.agent.start()

    # ---------------------------------------------------------------------
    # Clone.subtree is a property, which sets the subtree for snapshot
    # and updates

    @property
    def subtree(self):
        return self._subtree

    @subtree.setter
    def subtree(self, subtree):
        """Sends [SUBTREE][subtree] to the agent"""
        self._subtree = subtree
        self.pipe.send_multipart([b"SUBTREE", subtree])

    def connect(self, address, port):
        """Connect to new server endpoint
        Sends [CONNECT][address][port] to the agent
        """
        self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance(address, str) else address), b'%d' % port])

    def set(self, key, value, ttl=0):
        """Set new value in distributed hash table
        Sends [SET][key][value][ttl] to the agent
        """
        self.pipe.send_multipart([b"SET", key, value, b'%i' % ttl])

    def get(self, key):
        """Lookup value in distributed hash table
        Sends [GET][key] to the agent and waits for a value response
        If there is no clone available, will eventually return None.
        """

        self.pipe.send_multipart([b"GET", key])
        try:
            reply = self.pipe.recv_multipart()
        except KeyboardInterrupt:
            return
        else:
            return reply[0]


# =====================================================================
# Asynchronous part, works in the background

# ---------------------------------------------------------------------
# Simple class for one server we talk to

class CloneServer(object):
    address = None          # Server address
    port = None             # Server port
    snapshot = None         # Snapshot socket
    subscriber = None       # Incoming updates
    expiry = 0              # Expires at this time
    requests = 0            # How many snapshot requests made?

    def __init__(self, ctx, address, port, subtree):
        self.address = address
        self.port = port
        self.snapshot = ctx.socket(zmq.DEALER)
        self.snapshot.linger = 0
        self.snapshot.connect("%s:%i" % (address.decode(),port))
        self.subscriber = ctx.socket(zmq.SUB)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, subtree)
        self.subscriber.setsockopt(zmq.SUBSCRIBE, b'HUGZ')
        self.subscriber.connect("%s:%i" % (address.decode(),port+1))
        self.subscriber.linger = 0


# ---------------------------------------------------------------------
# Simple class for one background agent

#  States we can be in
STATE_INITIAL   =    0   #  Before asking server for state
STATE_SYNCING   =    1   #  Getting state from server
STATE_ACTIVE    =    2   #  Getting new updates from server

class CloneAgent(object):
    ctx = None              # Own context
    pipe = None             # Socket to talk back to application
    kvmap = None            # Actual key/value dict
    subtree = ''            # Subtree specification, if any
    servers = None          # list of connected Servers
    state = 0               # Current state
    cur_server = 0          # If active, index of server in list
    sequence = 0            # last kvmsg procesed
    publisher = None        # Outgoing updates

    def __init__(self, ctx, pipe):
        self.ctx = ctx
        self.pipe = pipe
        self.kvmap = {}
        self.subtree = ''
        self.state = STATE_INITIAL
        self.publisher = ctx.socket(zmq.PUB)
        self.router = ctx.socket(zmq.ROUTER)
        self.servers = []

    def control_message (self):
        msg = self.pipe.recv_multipart()
        command = msg.pop(0)

        if command == b"CONNECT":
            address = msg.pop(0)
            port = int(msg.pop(0))
            if len(self.servers) < SERVER_MAX:
                self.servers.append(CloneServer(self.ctx, address, port, self.subtree))
                self.publisher.connect("%s:%i" % (address.decode(),port+2))
            else:
                logging.error("E: too many servers (max. %i)", SERVER_MAX)
        elif command == b"SET":
            key,value,sttl = msg
            ttl = int(sttl)

            # Send key-value pair on to server
            kvmsg = KVMsg(0, key=key, body=value)
            kvmsg.store(self.kvmap)
            if ttl:
                kvmsg[b"ttl"] = sttl
            kvmsg.send(self.publisher)
        elif command == b"GET":
            key = msg[0]
            value = self.kvmap.get(key)
            self.pipe.send(value.body if value else '')
        elif command == b"SUBTREE":
            self.subtree = msg[0]


# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.

def clone_agent(ctx, pipe):
    agent = CloneAgent(ctx, pipe)
    server = None

    while True:
        poller = zmq.Poller()
        poller.register(agent.pipe, zmq.POLLIN)
        poll_timer = None
        server_socket = None

        if agent.state == STATE_INITIAL:
            # In this state we ask the server for a snapshot,
            # if we have a server to talk to...
            if agent.servers:
                server = agent.servers[agent.cur_server]
                logging.info ("I: waiting for server at %s:%d...",
                    server.address, server.port)
                if (server.requests < 2):
                    server.snapshot.send_multipart([b"ICANHAZ?", agent.subtree])
                    server.requests += 1
                server.expiry = time.time() + SERVER_TTL
                agent.state = STATE_SYNCING
                server_socket = server.snapshot
        elif agent.state == STATE_SYNCING:
            # In this state we read from snapshot and we expect
            # the server to respond, else we fail over.
            server_socket = server.snapshot
        elif agent.state == STATE_ACTIVE:
            # In this state we read from subscriber and we expect
            # the server to give hugz, else we fail over.
            server_socket = server.subscriber

        if server_socket:
            # we have a second socket to poll:
            poller.register(server_socket, zmq.POLLIN)

        if server is not None:
            poll_timer = 1e3 * max(0,server.expiry - time.time())

        # ------------------------------------------------------------
        # Poll loop
        try:
            items = dict(poller.poll(poll_timer))
        except:
            raise # DEBUG
            break # Context has been shut down

        if agent.pipe in items:
            agent.control_message()
        elif server_socket in items:
            kvmsg = KVMsg.recv(server_socket)

            # Anything from server resets its expiry time
            server.expiry = time.time() + SERVER_TTL
            if (agent.state == STATE_SYNCING):
                # Store in snapshot until we're finished
                server.requests = 0
                if kvmsg.key == b"KTHXBAI":
                    agent.sequence = kvmsg.sequence
                    agent.state = STATE_ACTIVE
                    logging.info ("I: received from %s:%d snapshot=%d",
                        server.address, server.port, agent.sequence)
                else:
                    kvmsg.store(agent.kvmap)
            elif (agent.state == STATE_ACTIVE):
                # Discard out-of-sequence updates, incl. hugz
                if (kvmsg.sequence > agent.sequence):
                    agent.sequence = kvmsg.sequence
                    kvmsg.store(agent.kvmap)
                    action = "update" if kvmsg.body else "delete"

                    logging.info ("I: received from %s:%d %s=%d",
                        server.address, server.port, action, agent.sequence)
        else:
            # Server has died, failover to next
            logging.info ("I: server at %s:%d didn't give hugz",
                    server.address, server.port)
            agent.cur_server = (agent.cur_server + 1) % len(agent.servers)
            agent.state = STATE_INITIAL

connect method 有指定 endpoint,實際上是 3 ports

  • server state router (ROUTER): port P
  • server updates publisher (PUB): port P+1
  • server updates subscriber (SUB): port P+2

可將 3 connections 整合到一個 operation

clone stack 可分為 frontend object class 及 backend agent 兩部份,frontend 發送 ("SUBTREE", "CONNECT", "SET", "GET") 給 agent,跟 server 溝通。

  • startup 並由第一個 server 取得 snapshot
  • 取得 snapshot 後,切換到 subscriber socket 讀取訊息
  • 如果沒收到 snapshot,就 failover 到 2nd server
  • 自 pipe 及 subscriber socket 進行 poll
  • 如收到 pipe 的訊息,就是來自 frontend
  • 收到 subscriber 訊息,就是 store/apply updates
  • 如某個時間內,沒收到 server 的訊息,就進行 failover

References

ØMQ - The Guide

2019年10月14日

ZeroMQ 4 Reliable Request-Reply Patterns

討論 reliability 的問題,在 ZMQ core request-reply pattern 上,提供 reliable messaging Patterns

  • The Lazy Pirate pattern: reliable request-reply from the client side
  • The Simple Pirate pattern: reliable request-reply using load balancing
  • The Paranoid Pirate pattern: reliable request-reply with heartbeating
  • The Majordomo pattern: service-oriented reliable queuing
  • The Titanic pattern: disk-based/disconnected reliable queuing
  • The Binary Star pattern: primary-backup server failover
  • The Freelance pattern: brokerless reliable request-reply

"Reliability" 是什麼?

以 failure 定義 reliability,如果可以處理 well-defined and unstood failure,就是 reliable。

  1. application code 會 crash, exit, freeze, stop responding to input,或是處理太慢,把記憶體耗盡...
  2. system code (例如 broker) 因某個原因 crash,system code 應該要比 application code 穩定。但還是可能因為 slow clients 造成 queued messages 把記憶體耗盡。
  3. message queue 會發生 overflow,通常在 system code 針對 slow clients 有對應處理方式,就是直接丟棄 message,因此會造成 "lost" message
  4. network 異常,ZMQ 會自動 reconnect,但是 message 會發生 lost
  5. 硬體異常
  6. network 以奇怪的方式故障,例如 switch 的某些 port 故障,造成某些網路區段沒有回應
  7. 因為 lightning, earthquakes, fire, 電力或降溫故障,造成 data centers 異常

為了增加軟體可靠度,有些問題的對應處理已超過 ZMQ 的範圍 前五個異常已經佔了 99.9% 的比例

設計 reliability

"要讓程式在 freezes/crashes 還能正常運作"

  • Request-reply: 如果 server 在處理某個 request 時掛了,client 會因為沒有收到回應,知道發生問題。可以找另一個 server 重做一次。client 部分如果掛掉,可以由 client 開發者處理。

  • Pub-sub: 如果 client dies,server 不會知道。因為 Pub-sub 的 client 不會回傳資訊給 server。但 client 可透過另一個管道,發送訊息通知 server。server crash 的部分,目前無法處理。subscribers 可在運作太慢時,採取行動通知管理員。

  • Pipeline: 如果 worker dies,ventilator 不會知道。pipeline 只會單向運作,但 downstream collector 可偵測某個 task 沒有完成的狀況,可回送訊息給 ventilator,通知要重送某個 task。如果 ventilator/collector 掛了,client等待會逾時,然後可重送所有 tasks

本章只專注討論 request-reply 的部分

基本的 request-reply pattern (REQ client socket 會 blocking send/receive to REP server socket。如果 server 在處理 request 時 crash、遺失 request 或reply,client 會一直等待。

request-reply 因為ZMQ 有 reconnect peer and load balance message 的功能,表現比傳統 tcp 好,如果在沒有網路或不同 process 之間運作時,會不太穩定,在 threads 之間運作會很穩定(因為沒有網路)。

可設計 reliable request-reply (RRR) pattern 稱為 Pirate pattern

有 3 種方式連接 client 及 server,每一種處理 reliability 的方式不同

  1. 多個 clients 跟單一 server 溝通。Use Case: clients 連到一個已知的 server。要處理的異常為:server crashes, restarts,網路斷線
  2. 多個 clients 跟 broker proxy 溝通,發布工作到多個 workers。Use Case: service oriented transaction processing。要處理的異常為:worker crashes/restarts、worker busy looping/overload、queue crashes/restarts、network 斷線
  3. 多個 clients 跟多個 servers 直接溝通,沒有透過 proxies。Use Case: distributed service 例如 dns。要處理的異常為:service crashes/restart、service busy looping/overload、網路斷線

Lazy Pirate pattern, Client-Side Reliability

  • poll REQ socket,只在有 reply 到達時,才取得 reply
  • 如果在 timeout 後,沒有收到 reply,就重發 request
  • 在幾次 request 後還是沒有收到 reply,就放棄這個 transaction

如果像讓 REQ socket 不遵循 send/receive 順序時,會收到 "EFSM" error,一位 REQ socket 是以 finite-state machine 實作,限制一定會遵循 send/receive 順序。

但在 pirate pattern 會因為沒收到 reply 重發 request,就會出現這個 error。解決方式是在收到 error 時 close 並 reopen REQ socket。

lpclient.py

#
#  Lazy Pirate client
#  Use zmq_poll to do a safe request-reply
#  To run, start lpserver and then randomly kill/restart it
from __future__ import print_function

import zmq

REQUEST_TIMEOUT = 2500
REQUEST_RETRIES = 3
SERVER_ENDPOINT = "tcp://localhost:5555"

context = zmq.Context(1)

print("I: Connecting to server...")
client = context.socket(zmq.REQ)
client.connect(SERVER_ENDPOINT)

poll = zmq.Poller()
poll.register(client, zmq.POLLIN)

sequence = 0
retries_left = REQUEST_RETRIES
while retries_left:
    sequence += 1
    request = str(sequence).encode()
    print("I: Sending (%s)" % request)
    client.send(request)

    expect_reply = True
    while expect_reply:
        socks = dict(poll.poll(REQUEST_TIMEOUT))
        if socks.get(client) == zmq.POLLIN:
            reply = client.recv()
            if not reply:
                break
            if int(reply) == sequence:
                print("I: Server replied OK (%s)" % reply)
                retries_left = REQUEST_RETRIES
                expect_reply = False
            else:
                print("E: Malformed reply from server: %s" % reply)

        else:
            print("W: No response from server, retrying...")
            # close 舊的 socket,重試次數 -1
            # Socket is confused. Close and remove it.
            client.setsockopt(zmq.LINGER, 0)
            client.close()
            poll.unregister(client)
            retries_left -= 1
            if retries_left == 0:
                # retry 超過3次,就認為 server 斷線無法復原
                print("E: Server seems to be offline, abandoning")
                break

            # 還可以retry,建立新的 client REQ socket,重發 request
            print("I: Reconnecting and resending (%s)" % request)
            # Create new connection
            client = context.socket(zmq.REQ)
            client.connect(SERVER_ENDPOINT)
            poll.register(client, zmq.POLLIN)
            client.send(request)

context.term()

lpserver.py

#
#  Lazy Pirate server
#  Binds REQ socket to tcp://*:5555
#  Like hwserver except:
#   - echoes request as-is
#   - randomly runs slowly, or exits to simulate a crash.
from __future__ import print_function

from random import randint
import time
import zmq

context = zmq.Context(1)
server = context.socket(zmq.REP)
server.bind("tcp://*:5555")

cycles = 0
while True:
    request = server.recv()
    cycles += 1

    # Simulate various problems, after a few cycles
    # 在處理幾次後,就故意產生 error
    if cycles > 3 and randint(0, 3) == 0:
        print("I: Simulating a crash")
        break
    elif cycles > 3 and randint(0, 3) == 0:
        print("I: Simulating CPU overload")
        time.sleep(2)

    print("I: Normal request (%s)" % request)
    time.sleep(1) # Do some heavy work
    server.send(request)

server.close()
context.term()

執行結果

$ python lpserver.py
I: Normal request (b'1')
I: Normal request (b'2')
I: Normal request (b'3')
I: Normal request (b'4')
I: Simulating a crash

$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')
I: Server replied OK (b'3')
I: Sending (b'4')
I: Server replied OK (b'4')
I: Sending (b'5')
W: No response from server, retrying...
I: Reconnecting and resending (b'5')
W: No response from server, retrying...
I: Reconnecting and resending (b'5')
W: No response from server, retrying...
E: Server seems to be offline, abandoning

client 依序傳送每個訊息,檢查 replies 是否依照順序。也就是沒有 request/replies 遺失,沒有超過一個以上的 replies。不需要加上序號,也可以保證訊息會依照順序傳遞。

client 使用 REQ socket,當沒有遵循 send/receive cycle 時,就會強制 close/reopne socket。不建議換成 DEALER,因為需要處理成類似 REQ socket 的 envelopes,另外還有可能會取得異常的 replies。

雖然有多個 clients,單一 server,但只需要在 client 處理 failures。

優點

  1. 容易實作及了解
  2. 容易在既有 client/server app code 實作
  3. ZeroMQ 會自動 reconnect

缺點

  1. 無法 failover 到 backup/alternate servers

Simple Pirate pattern, Basic Reliable Queueing

以 queue proxy 擴充 Lazy Pirate pattern,可連線到多個 servers (workers)。

在 Pirate pattern 裡面,workers 都是 stateless。如果 application 需要 shared state (ex: shared database),可自行處理,不需要放在 messaging framework 裡面。workers 可自行連線或斷線,不需要通知 clients。

queue proxy 適用 chap 3 的 load balancing broker 實作的,但需要加上一些處理 dead/blocked workers 的機制。

因 clients 已經有 retry 機制,同樣方式也可以在 load balancing pattern 運作。

spqueue.py: simple pirate queue

#
#  Simple Pirate queue
#  This is identical to the LRU pattern, with no reliability mechanisms
#  at all. It depends on the client for recovery. Runs forever.

import zmq

LRU_READY = "\x01"

context = zmq.Context(1)

frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER) # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556")  # For workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)

workers = []

while True:
    if workers:
        socks = dict(poll_both.poll())
    else:
        socks = dict(poll_workers.poll())

    # Handle worker activity on backend
    if socks.get(backend) == zmq.POLLIN:
        # Use worker address for LRU routing
        msg = backend.recv_multipart()
        if not msg:
            break
        address = msg[0]
        workers.append(address)

        # Everything after the second (delimiter) frame is reply
        reply = msg[2:]

        # Forward message to client if it's not a READY
        if reply[0] != LRU_READY:
            frontend.send_multipart(reply)

    if socks.get(frontend) == zmq.POLLIN:
        #  Get client request, route to first available worker
        # msg = frontend.recv_multipart()
        # request = [workers.pop(0), b''] + msg
        # backend.send_multipart( request )

        client, empty, request = frontend.recv_multipart()
        worker = workers.pop(0)
        backend.send_multipart([worker, b"", client, b"", request])

spworker.py: simple pirate worker

#
#  Simple Pirate worker
#  Connects REQ socket to tcp://*:5556
#  Implements worker part of LRU queueing

from random import randint
import time
import zmq

LRU_READY = "\x01"

context = zmq.Context(1)
worker = context.socket(zmq.REQ)

identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
worker.setsockopt_string(zmq.IDENTITY, identity)
worker.connect("tcp://localhost:5556")

print( "I: (%s) worker ready" % identity)
worker.send_string(LRU_READY)

cycles = 0
while True:
    msg = worker.recv_multipart()
    if not msg:
        break

    cycles += 1
    if cycles > 3 and randint(0, 5) == 0:
        print( "I: (%s) simulating a crash" % identity)
        break
    elif cycles > 3 and randint(0, 5) == 0:
        print( "I: (%s) simulating CPU overload" % identity)
        time.sleep(3)
    print( "I: (%s) normal reply" % identity)
    time.sleep(1) # Do some heavy work
    worker.send_multipart(msg)

測試時啟動多個 workers,一個 Lazy Pirate client 及 queue,誰先啟動都沒關係。

$ python spworker.py
I: (AF11-C837) worker ready
I: (AF11-C837) normal reply
I: (AF11-C837) normal reply

$ python spqueue.py

$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')

Paranoid Pirate pattern, Robust Reliable Queueing

Simple Pirate Pattern 只是把兩個既有的 pattern 合併再一起,但還是有兩個問題:

  1. 當遇到 queue crash/restart 時,client 會 recover 但 worker 不行。ZeroMQ 會自動重連 workers' sockets,但因為是新的 queue,workers 沒有初始化。因此要增加 queue 到 worker 的 heartbeating,用來偵測 queue 斷線。

  2. queue 無法偵測 worker failure,如果 worker 在 idle 時 crash,queue 在發送一個 request 前,無法由 worker queue 中移除,client 會進入 wait/retry 程序。雖不是很嚴重的問題,但不是很好。在 worker 到 queue 增加 heartbeating,可偵測 lost worker。

原本是在 worker 使用 REQ socket,但在 Paranoid Pirate worker,要換成 DEALER socket。

ppqueue.py: 擴充 load balancing pattern with heartbeating of workers

#
##  Paranoid Pirate queue

from collections import OrderedDict
import time

import zmq

HEARTBEAT_LIVENESS = 3     # 3..5 is reasonable
HEARTBEAT_INTERVAL = 1.0   # Seconds

#  Paranoid Pirate Protocol constants
PPP_READY = b"\x01"      # Signals worker is ready
PPP_HEARTBEAT = b"\x02"  # Signals worker heartbeat


class Worker(object):
    def __init__(self, address):
        self.address = address
        self.expiry = time.time() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

class WorkerQueue(object):
    def __init__(self):
        self.queue = OrderedDict()

    def ready(self, worker):
        self.queue.pop(worker.address, None)
        self.queue[worker.address] = worker

    def purge(self):
        """Look for & kill expired workers."""
        t = time.time()
        expired = []
        for address,worker in self.queue.items():
            if t > worker.expiry:  # Worker expired
                expired.append(address)
        for address in expired:
            print( "W: Idle worker expired: %s" % address )
            self.queue.pop(address, None)

    def next(self):
        address, worker = self.queue.popitem(False)
        return address

context = zmq.Context(1)

frontend = context.socket(zmq.ROUTER) # ROUTER
backend = context.socket(zmq.ROUTER)  # ROUTER
frontend.bind("tcp://*:5555") # For clients
backend.bind("tcp://*:5556")  # For workers

poll_workers = zmq.Poller()
poll_workers.register(backend, zmq.POLLIN)

poll_both = zmq.Poller()
poll_both.register(frontend, zmq.POLLIN)
poll_both.register(backend, zmq.POLLIN)

workers = WorkerQueue()

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

while True:
    if len(workers.queue) > 0:
        poller = poll_both
    else:
        poller = poll_workers
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(backend) == zmq.POLLIN:
        # Use worker address for LRU routing
        frames = backend.recv_multipart()
        if not frames:
            break

        address = frames[0]
        workers.ready(Worker(address))

        # Validate control message, or return reply to client
        msg = frames[1:]
        if len(msg) == 1:
            if msg[0] not in (PPP_READY, PPP_HEARTBEAT):
                print( "E: Invalid message from worker: %s" % msg )
        else:
            frontend.send_multipart(msg)

        # Send heartbeats to idle workers if it's time
        if time.time() >= heartbeat_at:
            for worker in workers.queue:
                msg = [worker, PPP_HEARTBEAT]
                backend.send_multipart(msg)
            heartbeat_at = time.time() + HEARTBEAT_INTERVAL
    if socks.get(frontend) == zmq.POLLIN:
        frames = frontend.recv_multipart()
        if not frames:
            break

        frames.insert(0, workers.next())
        backend.send_multipart(frames)


    workers.purge()

ppworker.py

#
##  Paranoid Pirate worker

from random import randint
import time

import zmq

HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 1
INTERVAL_INIT = 1
INTERVAL_MAX = 32

#  Paranoid Pirate Protocol constants
PPP_READY = b"\x01"      # Signals worker is ready
PPP_HEARTBEAT = b"\x02"  # Signals worker heartbeat

def worker_socket(context, poller):
    """Helper function that returns a new configured socket
       connected to the Paranoid Pirate queue"""
    worker = context.socket(zmq.DEALER) # DEALER
    identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
    worker.setsockopt_string(zmq.IDENTITY, identity)
    poller.register(worker, zmq.POLLIN)
    worker.connect("tcp://localhost:5556")
    worker.send(PPP_READY)
    return worker


context = zmq.Context(1)
poller = zmq.Poller()

liveness = HEARTBEAT_LIVENESS
interval = INTERVAL_INIT

heartbeat_at = time.time() + HEARTBEAT_INTERVAL

worker = worker_socket(context, poller)
cycles = 0
while True:
    socks = dict(poller.poll(HEARTBEAT_INTERVAL * 1000))

    # Handle worker activity on backend
    if socks.get(worker) == zmq.POLLIN:
        #  Get message
        #  - 3-part envelope + content -> request
        #  - 1-part HEARTBEAT -> heartbeat
        frames = worker.recv_multipart()
        if not frames:
            break # Interrupted

        if len(frames) == 3:
            # Simulate various problems, after a few cycles
            cycles += 1
            if cycles > 3 and randint(0, 5) == 0:
                print( "I: Simulating a crash" )
                break
            if cycles > 3 and randint(0, 5) == 0:
                print( "I: Simulating CPU overload" )
                time.sleep(3)
            print( "I: Normal reply" )
            worker.send_multipart(frames)
            liveness = HEARTBEAT_LIVENESS
            time.sleep(1)  # Do some heavy work
        elif len(frames) == 1 and frames[0] == PPP_HEARTBEAT:
            print( "I: Queue heartbeat" )
            liveness = HEARTBEAT_LIVENESS
        else:
            print( "E: Invalid message: %s" % frames )
        interval = INTERVAL_INIT
    else:
        liveness -= 1
        if liveness == 0:
            print( "W: Heartbeat failure, can't reach queue" )
            print( "W: Reconnecting in %0.2fs..." % interval )
            time.sleep(interval)

            if interval < INTERVAL_MAX:
                interval *= 2
            poller.unregister(worker)
            worker.setsockopt(zmq.LINGER, 0)
            worker.close()
            worker = worker_socket(context, poller)
            liveness = HEARTBEAT_LIVENESS
    if time.time() > heartbeat_at:
        heartbeat_at = time.time() + HEARTBEAT_INTERVAL
        print( "I: Worker heartbeat" )
        worker.send(PPP_HEARTBEAT)

comments:

  1. 程式裡面有 failure 的模擬,實際使用時要去掉
  2. workers 有 reconnect 機制,類似 Lazy Pirate client 的做法,不同點在於 (a) exponential back-off (b) retries indefinitely

可任意 stop/restart queue/client/workers,client 永遠不會收到 out-of-order reply。

可用 script 啟動

python ppqueue.py &
for i in 1 2 3 4; do
    python ppworker.py &
    sleep 1
done
python lpclient.py &

或直接開多個 terminal 啟動

$ python ppqueue.py

$ python ppworker.py
I: Queue heartbeat
I: Normal reply
I: Worker heartbeat
I: Queue heartbeat

$ python lpclient.py
I: Connecting to server...
I: Sending (b'1')
I: Server replied OK (b'1')
I: Sending (b'2')
I: Server replied OK (b'2')
I: Sending (b'3')

Heartbeating

heartbeating 解決 peer 兩端偵測連線狀態的問題,這不是 ZeroMQ 單有的問題,TCP 有 long timeout (30 mins),因此會發生 peer 無法正常取得連線狀態的問題。

以下是三個 heartbeating 的常見問題

Shrugging It Off

常常有很多 application 不做 heartbeating,但如果不做,可能會發生以下問題。

  • 使用 ROUTER socket 追蹤 peers,當 peers 斷線或 reconnect,application 會發生 memory leak,變得越來越慢。
  • 使用 SUB- 或 DEALER-based data 接收器,無法分辨 good silence (沒有資料) 或是 bad silence (另一端斷線)。如果接收端知道另一端已經斷線,可切換到 backup route。
  • 如果使用 TCP connection,並保持靜默,在某些網路會自動斷線。因此要做 keep-alive,保持使用狀態,讓network connection 持續存活。
One-Way Heartbeats

second option: 在每個 node 每秒發送一個 heartbeat message 到 peers。當一個 node 超過 timeout 時間沒有收到資料,就將該 peer 視為斷線。但這種做法有些狀況會異常。

在 Pub-Sub socket,可用這種做法,也是唯一可用的方法。因為 SUB socket 無法發送資料給 PUB,但 PUB 可發送 "I'm alive" message 給 subscribers。

最好的方式,是沒有資料可發送時,就送 heartbeats。也可以慢速發送 heartbeat,只要接收端可以偵測 failure 就可以了。

設計時可能會遇到的問題:

  1. 發送大量資料時,可能會發生錯誤,因為 heartbeat 會在 data 後面發送,但因為 heartbeat delayed,會造成 timeout,並判斷為斷線。解決方式是將所有收到的 data,都視為 heartbeat。

  2. 在 pub-sub pattern,會因為接收端斷線,而丟棄 messages。PUSH/DEALER 會放到 queue。如果發送 heartbeat 給斷線的 peer,在該 peer 恢復連線後,會收到一堆在 queue 裡面的 heartbeat。

  3. 這個方案假設 timeout 時間長度是固定的,但實際上不一定是這樣。

Ping-Pong Heartbeat

3rd option: 使用 ping-pong dialog。一端送 ping,另一端 reply pong。ping/pong 是獨立的、沒有相關。通常是 client 發 ping,server 回應 pong。

這種方式在 all ROUTER-based brokers 可以運作。也可用上一個方法加強:將每一個 incoming data 都視為 pong,只在嗎有 data 時,發送 ping。

Heartbeating for Paranoid Pirate

Paranoid Pirate 是採用第二個方法。但第三種方法會比較簡單。heartbeat message flow 是雙向非同步的,任一端都可偵測到斷線。

在 worker 如何處理 queue 裡面接收的 heartbeats:

  • 計算 "liveness",也就是在判斷是斷線前,還可以遺失幾個 heartbeats。通常設定為 3,每遺失一個 heartbeat 就減 1。
  • 在 `zmq_poll loop 裡面 wait 時,heartbeat interval 每次為 1s
  • 如果在 queue 裡面收到了訊息,就重設 "liveness" 為 3
  • 如果沒有訊息,就倒數 liveness
  • 當 liveness 為 0,就將該 peer/queue 視為斷線
  • 如果 queue 斷線,就要 destry socket,產生新的 socket,並 reconnect
  • 為了避免 opening/closing 太多 sockets,在 reconnect 前要等待一段時間,每一次要 2 倍,最多是 32 秒。

以下是處理發送給 queue 的 heartbeats

  • 因為只需要跟一個另一端的 queue 溝通,所以用一個變數,設定下一次要發送 heartbeat 的時間點。
  • zmq_poll loop 裡面,只要超過時間,就要發送 heartbeat 到 queue。

這是 worker heartbeat code (C)

#define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable
#define HEARTBEAT_INTERVAL  1000    //  msecs
#define INTERVAL_INIT       1000    //  Initial reconnect
#define INTERVAL_MAX       32000    //  After exponential backoff

…
//  If liveness hits zero, queue is considered disconnected
size_t liveness = HEARTBEAT_LIVENESS;
size_t interval = INTERVAL_INIT;

//  Send out heartbeats at regular intervals
uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;

while (true) {
    zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
    int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);

    if (items [0].revents & ZMQ_POLLIN) {
        //  Receive any message from queue
        liveness = HEARTBEAT_LIVENESS;
        interval = INTERVAL_INIT;
    }
    else
    if (--liveness == 0) {
        zclock_sleep (interval);
        if (interval < INTERVAL_MAX)
            interval *= 2;
        zsocket_destroy (ctx, worker);
        …
        liveness = HEARTBEAT_LIVENESS;
    }
    //  Send heartbeat to queue if it's time
    if (zclock_time () > heartbeat_at) {
        heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        //  Send heartbeat message to queue
    }
}

queue 的部分還需要對每一個 worker 記錄 expiration time

  • 使用 zmq_poll 或是 reactor 作為 application 的 main task
  • 在兩端啟動 heartbeating,利用failure 模擬測試,然後再實作 message flow
  • 使用 simple tracing (ex: console print),可協助 peers 之間的 message trace,使用 zmsg 提供的 dump method,在訊息裡面加上遞增的 numbers,檢查是否會發生 gap
  • 實際 application 的 heartbeating 必須要能設定,通常是由 peers 進行協調。有些 peer 需要 aggressive heartbeating,可設定為 10ms,有些則只需要設定為 30s
  • 如果針對不同 peers 要有不同的 heartbeat,poll timeout 必須設定為最短的那個時間,不要用 infinite timeout
  • 在發送 message 的 socket 實作 heartbeat,同時可讓 heartbeat 作為 network connection 的 keep-alive 機制。

Contracts and Protocols

因為 heartbeat 機制不同,Paranoid Pirate 無法 "interoperable" with Simple Pirate。interoperability 就像是 contract,也可以說是 protocol。

http://rfc.zeromq.org/ 有提供 ZeroMQ 可實作的 protocol contacts,例如 Pirate Pattern Protocol

要實作 PPP 時,要注意以下工作

  1. 在 READY command 有 protocol version number
  2. READY 及 HEARTBEAT 無法跟 requests/replies 區分開來。為了要分辨,可在 message structure 裡面加上 "message type" part

Majordomo pattern: Service Oriented Reliable Queueing

Majordomo Protocol (MDP) 擴充 PPP,增加 "service name",讓 client 指定某個服務的 request,workers 要註冊提供哪一種服務。

Paranoid Pirate queue 增加 service name 後,變成 service-oriented broker。

有兩個 contacts: (1) MDP,說明 distributed architecture (2) 定義 user application 如何跟 framework 溝通

majordomo 有兩個部分: client side, worker side。

MDP.py: Majordomo Protocol definitions

"""Majordomo Protocol definitions"""
#  This is the version of MDP/Client we implement
C_CLIENT = b"MDPC01"

#  This is the version of MDP/Worker we implement
W_WORKER = b"MDPW01"

#  MDP/Server commands, as strings
W_READY         =   b"\001"
W_REQUEST       =   b"\002"
W_REPLY         =   b"\003"
W_HEARTBEAT     =   b"\004"
W_DISCONNECT    =   b"\005"

commands = [None, "READY", "REQUEST", "REPLY", "HEARTBEAT", "DISCONNECT"]

mdcliapi.py: mojordemo client api

"""Majordomo Protocol Client API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
"""

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
    """Majordomo Protocol Client API, Python version.

      Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """
    broker = None
    ctx = None
    client = None
    poller = None
    timeout = 2500
    retries = 3
    verbose = False

    def __init__(self, broker, verbose=False):
        self.broker = broker
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.client:
            # client 已經存在時,表示要先 close,然後再 reconnect
            self.poller.unregister(self.client)
            self.client.close()

        self.client = self.ctx.socket(zmq.REQ)
        self.client.linger = 0
        self.client.connect(self.broker)
        self.poller.register(self.client, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

    def send(self, service, request):
        """Send request to broker and get reply by hook or crook.

        Takes ownership of request message and destroys it when sent.
        Returns the reply message or None if there was no reply.
        """
        if not isinstance(request, list):
            request = [request]
        request = [MDP.C_CLIENT, service] + request
        if self.verbose:
            logging.warn("I: send request to '%s' service: ", service)
            dump(request)
        reply = None

        retries = self.retries
        while retries > 0:
            self.client.send_multipart(request)
            try:
                items = self.poller.poll(self.timeout)
            except KeyboardInterrupt:
                break # interrupted

            if items:
                msg = self.client.recv_multipart()
                if self.verbose:
                    logging.info("I: received reply:")
                    dump(msg)

                # Don't try to handle errors, just assert noisily
                # 確認收到的訊息的 frame 長度
                assert len(msg) >= 3

                header = msg.pop(0)
                assert MDP.C_CLIENT == header

                reply_service = msg.pop(0)
                assert service == reply_service

                reply = msg
                break
            else:
                # 超過 timeout 時間,沒有收到 reply,必須進入 retry
                if retries:
                    logging.warn("W: no reply, reconnecting...")
                    self.reconnect_to_broker()
                else:
                    logging.warn("W: permanent error, abandoning")
                    break
                retries -= 1

        return reply

    def destroy(self):
        self.context.destroy()

mdclient.py: client application

"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects

"""

import sys
from mdcliapi import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    count = 0
    while count < 100000:
        request = b"Hello world"
        try:
            # 將 request 發送給 echo service
            reply = client.send(b"echo", request)
        except KeyboardInterrupt:
            break
        else:
            # also break on failure to reply:
            if reply is None:
                break
        count += 1
    print( "%i requests/replies processed" % count )

if __name__ == '__main__':
    main()

mdwrkapi.py: worker api

"""Majordomo Protocol Worker API, Python version

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

"""

import logging
import time
import zmq

from zhelpers import dump
# MajorDomo protocol constants:
import MDP

class MajorDomoWorker(object):
    """Majordomo Protocol Worker API, Python version

    Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """

    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    broker = None
    ctx = None
    service = None

    worker = None # Socket to broker
    heartbeat_at = 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
    liveness = 0 # How many attempts left
    heartbeat = 2500 # Heartbeat delay, msecs
    reconnect = 2500 # Reconnect delay, msecs

    # Internal state
    expect_reply = False # False only at start

    timeout = 2500 # poller timeout
    verbose = False # Print activity to stdout

    # Return address, if any
    reply_to = None

    def __init__(self, broker, service, verbose=False):
        self.broker = broker
        self.service = service
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.worker:
            # worker 已經存在時,表示要先 close,然後再 reconnect
            self.poller.unregister(self.worker)
            self.worker.close()
        self.worker = self.ctx.socket(zmq.DEALER)
        self.worker.linger = 0
        self.worker.connect(self.broker)
        self.poller.register(self.worker, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

        # Register service with broker
        self.send_to_broker(MDP.W_READY, self.service, [])

        # If liveness hits zero, queue is considered disconnected
        self.liveness = self.HEARTBEAT_LIVENESS
        # 1e-3 = 1 * 10^(-3) = 1/1000
        self.heartbeat_at = time.time() + 1e-3 * self.heartbeat


    def send_to_broker(self, command, option=None, msg=None):
        """Send message to broker.

        If no msg is provided, creates one internally
        """
        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        if option:
            msg = [option] + msg

        msg = [b'', MDP.W_WORKER, command] + msg
        if self.verbose:
            logging.info("I: sending %s to broker", command)
            dump(msg)
        self.worker.send_multipart(msg)


    def recv(self, reply=None):
        """Send reply, if any, to broker and wait for next request."""
        # Format and send the reply if we were provided one
        # expect_reply 只在第一次時為 False,因為第一次的 reply 為 None
        assert reply is not None or not self.expect_reply

        if reply is not None:
            assert self.reply_to is not None
            reply = [self.reply_to, b''] + reply
            # ex:
            # I: sending b'\x03' to broker
            # ----------------------------------------
            # [000]
            # [006] MDPW01
            # [001]
            # [005] 0x00800041aa
            # [000]
            # [011] Hello world
            self.send_to_broker(MDP.W_REPLY, msg=reply)

        self.expect_reply = True

        while True:
            # Poll socket for a reply, with timeout
            try:
                items = self.poller.poll(self.timeout)
            except KeyboardInterrupt:
                break # Interrupted

            if items:
                msg = self.worker.recv_multipart()
                if self.verbose:
                    logging.info("I: received message from broker: ")
                    # I: received message from broker:
                    # ----------------------------------------
                    # [000]
                    # [006] MDPW01
                    # [001]
                    # [005] 0x00800041aa
                    # [000]
                    # [011] Hello world
                    dump(msg)

                self.liveness = self.HEARTBEAT_LIVENESS
                # Don't try to handle errors, just assert noisily
                assert len(msg) >= 3

                empty = msg.pop(0)
                assert empty == b''

                header = msg.pop(0)
                assert header == MDP.W_WORKER

                command = msg.pop(0)
                if command == MDP.W_REQUEST:
                    # We should pop and save as many addresses as there are
                    # up to a null part, but for now, just save one...
                    self.reply_to = msg.pop(0)
                    # pop empty
                    empty = msg.pop(0)
                    assert empty == b''

                    return msg # We have a request to process
                elif command == MDP.W_HEARTBEAT:
                    # Do nothing for heartbeats
                    pass
                elif command == MDP.W_DISCONNECT:
                    self.reconnect_to_broker()
                else :
                    logging.error("E: invalid input message: ")
                    dump(msg)

            else:
                self.liveness -= 1
                if self.liveness == 0:
                    if self.verbose:
                        logging.warn("W: disconnected from broker - retrying...")
                    try:
                        time.sleep(1e-3*self.reconnect)
                    except KeyboardInterrupt:
                        break
                    self.reconnect_to_broker()

            # Send HEARTBEAT if it's time
            # 定時發送 HEARTBEAT
            if time.time() > self.heartbeat_at:
                self.send_to_broker(MDP.W_HEARTBEAT)
                self.heartbeat_at = time.time() + 1e-3*self.heartbeat

        logging.warn("W: interrupt received, killing worker...")
        return None


    def destroy(self):
        # context.destroy depends on pyzmq >= 2.1.10
        self.ctx.destroy(0)

mkworker.py: worker application

"""Majordomo Protocol worker example.

Uses the mdwrk API to hide all MDP aspects

"""

import sys
from mdwrkapi import MajorDomoWorker

def main():
    verbose = '-v' in sys.argv
    worker = MajorDomoWorker("tcp://localhost:5555", b"echo", verbose)
    reply = None
    while True:
        request = worker.recv(reply)
        if request is None:
            break # Worker was interrupted
        reply = request # Echo is complex... :-)


if __name__ == '__main__':
    main()

notes about worker api

  • API 為 single-threaded。這表示 worker 不會在背景發送 heartbeat。這沒關係,因為 worker 卡住時,就會停止 heartbeat,broker 就不會發送 request 給 worker
  • worker 不會做 exponential back-off。因不需要這樣做,太過複雜。
  • API 不會做 error reporting。由 application 自行檢查。

worker API 會在 peer 斷線重連後,自行關閉 socket 並重開一個新的 socket,這跟 Simple Pirate 及 Paranoid Pirate workers 一樣。雖然 ZeroMQ 會在 broker 斷線重啟後,worker 會自動重連,但不會跟 broker 重新註冊 worker。有兩種解決方法:比較簡單的做法,事關鰾 socket 並重開一個新的 socket。另一種做法是 broker 會在取得 worker heartbeat 時,詢問 unknown workers,並要求重新註冊。


mdbroker.py: majordomo broker

"""
Majordomo Protocol broker
A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8

"""

import logging
import sys
import time
from binascii import hexlify

import zmq

# local
import MDP
from zhelpers import dump

class Service(object):
    """a single Service"""
    name = None # Service name
    requests = None # List of client requests
    waiting = None # List of waiting workers

    def __init__(self, name):
        self.name = name
        self.requests = []
        self.waiting = []

class Worker(object):
    """a Worker, idle or active"""
    identity = None # hex Identity of worker
    address = None # Address to route to
    service = None # Owning service, if known
    expiry = None # expires at this point, unless heartbeat

    def __init__(self, identity, address, lifetime):
        self.identity = identity
        self.address = address
        self.expiry = time.time() + 1e-3*lifetime

class MajorDomoBroker(object):
    """
    Majordomo Protocol broker
    A minimal implementation of http:#rfc.zeromq.org/spec:7 and spec:8
    """

    # We'd normally pull these from config data
    INTERNAL_SERVICE_PREFIX = b"mmi."
    HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable
    HEARTBEAT_INTERVAL = 2500 # msecs
    HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS

    # ---------------------------------------------------------------------

    ctx = None # Our context
    socket = None # Socket for clients & workers
    poller = None # our Poller

    heartbeat_at = None# When to send HEARTBEAT
    services = None # known services
    workers = None # known workers
    waiting = None # idle workers

    verbose = False # Print activity to stdout

    # ---------------------------------------------------------------------


    def __init__(self, verbose=False):
        """Initialize broker state."""
        self.verbose = verbose
        self.services = {}
        self.workers = {}
        self.waiting = []
        self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL
        self.ctx = zmq.Context()
        self.socket = self.ctx.socket(zmq.ROUTER)
        self.socket.linger = 0
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)



    # ---------------------------------------------------------------------

    def mediate(self):
        """Main broker work happens here"""
        while True:
            try:
                items = self.poller.poll(self.HEARTBEAT_INTERVAL)
            except KeyboardInterrupt:
                break # Interrupted
            if items:
                msg = self.socket.recv_multipart()
                if self.verbose:
                    logging.info("I: received message:")
                    # I: received message:
                    # ----------------------------------------
                    # [005] 0x00800041aa
                    # [000]
                    # [006] MDPC01
                    # [004] echo
                    # [011] Hello world
                    dump(msg)

                sender = msg.pop(0)
                empty = msg.pop(0)
                assert empty == b''
                header = msg.pop(0)

                # 由 header 判斷是來自 client 或是 worker
                if (MDP.C_CLIENT == header):
                    self.process_client(sender, msg)
                elif (MDP.W_WORKER == header):
                    self.process_worker(sender, msg)
                else:
                    logging.error("E: invalid message:")
                    dump(msg)

            self.purge_workers()
            self.send_heartbeats()

    def destroy(self):
        """Disconnect all workers, destroy context."""
        while self.workers:
            self.delete_worker(self.workers.values()[0], True)
        self.ctx.destroy(0)


    def process_client(self, sender, msg):
        """Process a request coming from a client."""
        assert len(msg) >= 2 # Service name + body
        service = msg.pop(0)
        # Set reply return address to client sender
        msg = [sender, b''] + msg
        if service.startswith(self.INTERNAL_SERVICE_PREFIX):
            self.service_internal(service, msg)
        else:
            self.dispatch(self.require_service(service), msg)


    def process_worker(self, sender, msg):
        """Process message sent to us by a worker."""
        assert len(msg) >= 1 # At least, command

        command = msg.pop(0)

        worker_ready = hexlify(sender) in self.workers

        worker = self.require_worker(sender)

        if (MDP.W_READY == command):
            assert len(msg) >= 1 # At least, a service name
            service = msg.pop(0)
            # Not first command in session or Reserved service name
            if (worker_ready or service.startswith(self.INTERNAL_SERVICE_PREFIX)):
                self.delete_worker(worker, True)
            else:
                # Attach worker to service and mark as idle
                worker.service = self.require_service(service)
                self.worker_waiting(worker)

        elif (MDP.W_REPLY == command):
            if (worker_ready):
                # Remove & save client return envelope and insert the
                # protocol header and service name, then rewrap envelope.
                client = msg.pop(0)
                empty = msg.pop(0) # ?
                msg = [client, b'', MDP.C_CLIENT, worker.service.name] + msg
                self.socket.send_multipart(msg)
                self.worker_waiting(worker)
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_HEARTBEAT == command):
            if (worker_ready):
                worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
            else:
                self.delete_worker(worker, True)

        elif (MDP.W_DISCONNECT == command):
            self.delete_worker(worker, False)
        else:
            logging.error("E: invalid message:")
            dump(msg)

    def delete_worker(self, worker, disconnect):
        """Deletes worker from all data structures, and deletes worker."""
        assert worker is not None
        if disconnect:
            self.send_to_worker(worker, MDP.W_DISCONNECT, None, None)

        if worker.service is not None:
            worker.service.waiting.remove(worker)
        self.workers.pop(worker.identity)

    def require_worker(self, address):
        """Finds the worker (creates if necessary)."""
        assert (address is not None)
        identity = hexlify(address)
        worker = self.workers.get(identity)
        if (worker is None):
            worker = Worker(identity, address, self.HEARTBEAT_EXPIRY)
            self.workers[identity] = worker
            if self.verbose:
                logging.info("I: registering new worker: %s", identity)

        return worker

    def require_service(self, name):
        """Locates the service (creates if necessary)."""
        assert (name is not None)
        service = self.services.get(name)
        if (service is None):
            service = Service(name)
            self.services[name] = service

        return service

    def bind(self, endpoint):
        """Bind broker to endpoint, can call this multiple times.

        We use a single socket for both clients and workers.
        """
        self.socket.bind(endpoint)
        logging.info("I: MDP broker/0.1.1 is active at %s", endpoint)

    def service_internal(self, service, msg):
        """Handle internal service according to 8/MMI specification"""
        returncode = b"501"
        if b"mmi.service" == service:
            name = msg[-1]
            returncode = b"200" if name in self.services else b"404"
        msg[-1] = returncode

        # insert the protocol header and service name after the routing envelope ([client, ''])
        msg = msg[:2] + [MDP.C_CLIENT, service] + msg[2:]
        self.socket.send_multipart(msg)

    def send_heartbeats(self):
        """Send heartbeats to idle workers if it's time"""
        if (time.time() > self.heartbeat_at):
            for worker in self.waiting:
                self.send_to_worker(worker, MDP.W_HEARTBEAT, None, None)

            self.heartbeat_at = time.time() + 1e-3*self.HEARTBEAT_INTERVAL

    def purge_workers(self):
        """Look for & kill expired workers.

        Workers are oldest to most recent, so we stop at the first alive worker.
        """
        while self.waiting:
            w = self.waiting[0]
            if w.expiry < time.time():
                logging.info("I: deleting expired worker: %s", w.identity)
                self.delete_worker(w,False)
                self.waiting.pop(0)
            else:
                break

    def worker_waiting(self, worker):
        """This worker is now waiting for work."""
        # Queue to broker and service waiting lists
        self.waiting.append(worker)
        worker.service.waiting.append(worker)
        worker.expiry = time.time() + 1e-3*self.HEARTBEAT_EXPIRY
        self.dispatch(worker.service, None)

    def dispatch(self, service, msg):
        """Dispatch requests to waiting workers as possible"""
        assert (service is not None)
        if msg is not None:# Queue message if any
            service.requests.append(msg)
        self.purge_workers()
        while service.waiting and service.requests:
            msg = service.requests.pop(0)
            worker = service.waiting.pop(0)
            self.waiting.remove(worker)
            self.send_to_worker(worker, MDP.W_REQUEST, None, msg)

    def send_to_worker(self, worker, command, option, msg=None):
        """Send message to worker.

        If message is provided, sends that message.
        """

        if msg is None:
            msg = []
        elif not isinstance(msg, list):
            msg = [msg]

        # Stack routing and protocol envelopes to start of message
        # and routing envelope
        if option is not None:
            msg = [option] + msg
        msg = [worker.address, b'', MDP.W_WORKER, command] + msg

        if self.verbose:
            logging.info("I: sending %r to worker", command)
            # I: sending b'\x02' to worker
            # ----------------------------------------
            # [005] 0x00800041a8
            # [000]
            # [006] MDPW01
            # [001]
            # [005] 0x00800041aa
            # [000]
            # [011] Hello world
            dump(msg)

        self.socket.send_multipart(msg)


def main():
    """create and start new broker"""
    verbose = '-v' in sys.argv
    broker = MajorDomoBroker(verbose)
    broker.bind("tcp://*:5555")
    broker.mediate()

if __name__ == '__main__':
    main()

broker 是一些 queue 組成的,每一個 service 都有一個 queue,會在 worker 連線時,產生 queue,另外會在每一種 service 維持一份 queue of workers。

notes about broker

  • Majordomo Protocol 讓我們以單一 socket 處理 clients 及 workers。比較容易管理,因為只需要一個 ZeroMQ endpoint,大多數的 proxies 都需要兩個。
  • broker 實作 MDP/0.1,包含了 broker 發送 invalid commands, heartbeating 後的斷線功能
  • 可擴充為 multiple threads,每個 thread 管理一個 socket, one set of clients and workers。
  • 可實作 primary/failover 或 live/live broker reliability model,因為 broker 為 stateless,由 client, worker 在 broker 異常時,自行選擇連到另一個 broker
  • 範例使用 5-second heartbeats,如果要在真正的 LAN application 使用,必須減少這個時間,retry 必須要至少 10s,有足夠的時間讓 service restart

Asynchronous Majordomo pattern

前一個版本的 Majordomo 比較簡單,client 是 Simple Pirate。因為 ZeroMQ 沒有啟用 "Nagle's algorithm",無法應付 "small-packet problem",太多過短的訊息,會造成 TCP connection 效能不彰。

現在先以程式測量 round-trip 造成的影響。首先是發送並等待 reply,另一種是 batch 發送訊息,批次接收 replies。

tripping.py

"""Round-trip demonstrator

While this example runs in a single process, that is just to make
it easier to start and stop the example. Client thread signals to
main when it's ready.
"""

import sys
import threading
import time

import zmq

from zhelpers import zpipe

def client_task (ctx, pipe):
    client = ctx.socket(zmq.DEALER)
    client.identity = b'C'
    client.connect("tcp://localhost:5555")

    print( "Setting up test...\n" ),
    time.sleep(0.1)

    print( "Synchronous round-trip test...\n" ),
    start = time.time()
    requests = 10000
    for r in range(requests):
        client.send( b"hello" )
        client.recv()
    print( " %d calls/second\n" % (requests / (time.time()-start)) ),

    print( "Asynchronous round-trip test...\n" ),
    start = time.time()
    for r in range(requests):
        client.send( b"hello" )
    for r in range(requests):
        client.recv()
    print( " %d calls/second\n" % (requests / (time.time()-start)) ),

    # signal done:
    pipe.send( b"done")

def worker_task():
    ctx = zmq.Context()
    worker = ctx.socket(zmq.DEALER)
    worker.identity = b'W'
    worker.connect("tcp://localhost:5556")

    while True:
        msg = worker.recv_multipart()
        worker.send_multipart(msg)
    ctx.destroy(0)

def broker_task():
    # Prepare our context and sockets
    ctx = zmq.Context()
    frontend = ctx.socket(zmq.ROUTER)
    backend = ctx.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5555")
    backend.bind("tcp://*:5556")

    # Initialize poll set
    poller = zmq.Poller()
    poller.register(backend, zmq.POLLIN)
    poller.register(frontend, zmq.POLLIN)

    while True:
        try:
            items = dict(poller.poll())
        except:
            break # Interrupted

        if frontend in items:
            msg = frontend.recv_multipart()
            msg[0] = b'W'
            backend.send_multipart(msg)
        if backend in items:
            msg = backend.recv_multipart()
            msg[0] = b'C'
            frontend.send_multipart(msg)

def main():
    # Create threads
    ctx = zmq.Context()
    client,pipe = zpipe(ctx)

    client_thread = threading.Thread(target=client_task, args=(ctx, pipe))
    worker_thread = threading.Thread(target=worker_task)
    worker_thread.daemon=True
    broker_thread = threading.Thread(target=broker_task)
    broker_thread.daemon=True

    worker_thread.start()
    broker_thread.start()
    client_thread.start()

    # Wait for signal on client pipe
    client.recv()

if __name__ == '__main__':
    main()

執行結果

$ python tripping.py
Setting up test...

Synchronous round-trip test...

 2293 calls/second

Asynchronous round-trip test...

 4073 calls/second

client thread 會在啟動前暫停一陣子,這是為了避免 ROUTER 的問題,發送給還沒連線的 peer address,訊息會被丟棄。非同步批次處理的方式,比同步方式快。

接下來將剛剛的 client 改為非同步的版本

mdcliapi2.py

"""Majordomo Protocol Client API, Python version.

Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.

"""

import logging

import zmq

import MDP
from zhelpers import dump

class MajorDomoClient(object):
    """Majordomo Protocol Client API, Python version.

      Implements the MDP/Worker spec at http:#rfc.zeromq.org/spec:7.
    """
    broker = None
    ctx = None
    client = None
    poller = None
    timeout = 2500
    verbose = False

    def __init__(self, broker, verbose=False):
        self.broker = broker
        self.verbose = verbose
        self.ctx = zmq.Context()
        self.poller = zmq.Poller()
        logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
                level=logging.INFO)
        self.reconnect_to_broker()


    def reconnect_to_broker(self):
        """Connect or reconnect to broker"""
        if self.client:
            self.poller.unregister(self.client)
            self.client.close()
        self.client = self.ctx.socket(zmq.DEALER)
        self.client.linger = 0
        self.client.connect(self.broker)
        self.poller.register(self.client, zmq.POLLIN)
        if self.verbose:
            logging.info("I: connecting to broker at %s...", self.broker)

    def send(self, service, request):
        """Send request to broker
        """
        if not isinstance(request, list):
            request = [request]

        # Prefix request with protocol frames
        # Frame 0: empty (REQ emulation)
        # Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
        # Frame 2: Service name (printable string)

        request = [b'', MDP.C_CLIENT, service] + request
        if self.verbose:
            logging.warn("I: send request to '%s' service: ", service)
            dump(request)
        self.client.send_multipart(request)

    def recv(self):
        """Returns the reply message or None if there was no reply."""
        try:
            items = self.poller.poll(self.timeout)
        except KeyboardInterrupt:
            return # interrupted

        if items:
            # if we got a reply, process it
            msg = self.client.recv_multipart()
            if self.verbose:
                logging.info("I: received reply:")
                dump(msg)

            # Don't try to handle errors, just assert noisily
            assert len(msg) >= 4

            empty = msg.pop(0)
            header = msg.pop(0)
            assert MDP.C_CLIENT == header

            service = msg.pop(0)
            return msg
        else:
            logging.warn("W: permanent error, abandoning request")

mdclient2.py

"""
Majordomo Protocol client example. Uses the mdcli API to hide all MDP aspects
"""

import sys
from mdcliapi2 import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    requests = 100000
    for i in range(requests):
        request = b"Hello world"
        try:
            client.send(b"echo", request)
        except KeyboardInterrupt:
            print( "send interrupted, aborting" )
            return

    count = 0
    while count < requests:
        try:
            reply = client.recv()
        except KeyboardInterrupt:
            break
        else:
            # also break on failure to reply:
            if reply is None:
                break
        count += 1
    print( "%i requests/replies processed" % count )

if __name__ == '__main__':
    main()
  • 由 REQ 改為使用 DEALER socket,在 request 前面加上 empty delimiter frame
  • 不做 retry requests,由 application 自行處理 retry
  • 將 synchronous send 改為 send 跟 recv 分開
  • send 是 asynchronous 的,會在發送後,馬上 return
  • recv 會等待 1 response 並回傳給 caller
  • 測試程式會發送 100000 messages

asynchronous Majordomo pattern 有個問題,是無法處理 broker crash 的狀況。因 mdcliapi2.py 並沒有 reconnect 的機制

如要增加 reconnect 機制,要做以下工作

  1. 對每個 request 加上序號,並在 reply 檢查
  2. traking and holding 尚未收到 replies 的 requests
  3. 在 failover 時, client 要 resend 所有尚未收到 replies 的 requests

Service Discovery

目前已經有 service-oriented broker,但沒有檢測某種 service 是否存在的機制,只會知道 request failed。可修改 MDP/Client protocol,增加詢問 service 的 command。

另一種方式,是做類似 email 的機制,將 undeliverable requests 回傳給 sender,但需要有將 replies 跟 returned requests 區分的方法。

現在要用第一種方式,是在 MDP 上增加 Service discovery 的功能,也就是 Majordomo Management Interface (MMI),剛剛已經有實作在 broker 中。

  • client 發送 service request,以 mmi. 起始,這時以 internal 方式處理訊息
  • broker 只會處理一種 mmi.service,也就是 service discovery service
  • request 的 payload 為 external service 的名稱
  • broker 會回傳 "200" OK 或是 "404"
"""
MMI echo query example
"""

import sys
from mdcliapi import MajorDomoClient

def main():
    verbose = '-v' in sys.argv
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    request = b"echo"
    reply = client.send(b"mmi.service", request)

    if reply:
        replycode = reply[0]
        print( "Lookup echo service:", replycode )
    else:
        print( "E: no response from broker, make sure it's running" )

if __name__ == '__main__':
    main()

Idempotent Services

Idempotent 代表是可以重複的 operation。例如 checking the clock,以下是 idempotente use cases

  • stateless task distribution: a pipeline where servers are stateless workers
  • name service,將 logical addres 轉為 endpoint

non-idempotent use cases

  • logging service: 因不能將相同的 log 處理 2次以上
  • 影響 downstream nodes 的 service,例如發送 information 給其他 nodes,如果 service 收到相同的 request,downstream nodes 會產生 duplicate information
  • 會修改 shared data 的 service

如果 server application 為 non-idempotent,就要小心 crash 的狀況,如果 crash 發生在 idle 或處理 request 過程中,不會造成問題,如果是在 db transaction,如果在發送 reply 時 crash,就會發生問題。

如果是網路異常,也會出現相同的問題,client 會認為 server crash,而重送 request,server 會處理 2 次。

為了處理 non-idempotent operations,要使用 detecing and rejecting duplicate requests 的方法:

  1. client 要在每個 request 加上 unique client identifier,unique message number
  2. server 在發送 reply 前,用 client ID 及 message number 為 key 儲存起來
  3. server 取得 client 的 request,要先檢查是否已經有該 client ID, message number 的 reply,如果已經處理過,就不做 request,直接發送 reply

Titanic pattern: Disconnected Reliability

Majordomo 是 reliable message broker

  • Lazy Pirate client 可運作得很好,direct client-to-server 或是 distributed queue proxies 都一樣。這邊假設 workers 是 stateless and idempotent
  • rust 會帶來一些問題,例如 slow performance

rust-based reliability 在 asynchronous disconnected network 有用,可解決 Pirate 的問題,因為 client 會 real time 等待 reply,但 clients, workers 只會短時間連線 (類似 email),必須在 broker 增加 state。

Titanic pattern: 將message 寫入 disk,保證不會遺失。要將 MDP 增加一個 Titanic layer

  • 因 divide and conquer 變得更簡單,broker 處理 message routing,worker 處理 reliability
  • broker/worker 可用不同 language 實作
  • fire-and-forget

如果 client 需要馬上得到 reply, 可直接跟 service 溝通,如果可以等待,就用 Titanic 處理

Titanic 同時有 worker 跟 client 的角色

  • client: 請接受我的 request
  • Titanic: OK
  • client: 有我的 reply 嗎?
  • Titanic: Yes, here it is. Or, no, not yet.
  • client: OK,你可以把 request 記錄清除了
  • Titanic: OK, done.

Titanic 跟 broker, worker 的溝通過程如下

  • Titanic: broker, 有 coffee service 嗎?
  • broker: 有
  • Titanic: hey, coffee service, 請幫我處理一個 request
  • coffee: Suer, here you are
  • Titanic: OK

如果在處理 request 過程中,worker crashes,Titanic 會一直 retry,如果 reply lost,Titanic 會 retry。如果已經處理了 request,但 client 沒收到,client 會再詢問一次。如果 Titanic 在處理 request/reply 時 crashes, client 會重送 request。只要 request 放入 storage,work 永遠不會遺失。

client 使用 asynchronous Majordomo pattern 進行工作,然後得到 reply。

需要有讓 client 查詢 reply 的方法,但 client 會在重連後,有不同的 identities,以下是解決方法

  • 每個 request 都有 UUID
  • 當 client 詢問 reply,必須指定原始 request 的 UUID

理想狀況下,要把 UUID 儲存在 local database 裡面

首先了解 client 如何跟 Titanic 溝通,一種方式是使用一個 service,並用三種 request types,另一種方式,是使用三種 services

  • titanic.request: 儲存 request message,回傳該 request 的 UUID
  • titanic.reply: 取得特定 UUID 的 reply
  • titanic.close: 確認 reply 已經收到

接下來實作 multithreaded worker,也就是 Titanic Service Protocol (TSP)

ticlient.py

"""
Titanic client example
Implements client side of http:rfc.zeromq.org/spec:9

"""

import sys
import time

from mdcliapi import MajorDomoClient

def service_call (session, service, request):
    """Calls a TSP service

    Returns reponse if successful (status code 200 OK), else None
    """
    reply = session.send(service, request)
    if reply:
        status = reply.pop(0)
        if status == b"200":
            return reply
        elif status == b"400":
            print( "E: client fatal error, aborting" )
            sys.exit (1)
        elif status == b"500":
            print( "E: server fatal error, aborting" )
            sys.exit (1)
    else:
        sys.exit (0);    #  Interrupted or failed

def main():
    verbose = '-v' in sys.argv
    session = MajorDomoClient("tcp://localhost:5555", verbose)

    #  1. Send 'echo' request to Titanic
    request = [b"echo", b"Hello world"]
    reply = service_call(session, b"titanic.request", request)

    uuid = None

    if reply:
        uuid = reply.pop(0)
        print( "I: request UUID ", uuid )

    #  2. Wait until we get a reply
    while True:
        time.sleep (.1)
        request = [uuid]
        reply = service_call (session, b"titanic.reply", request)

        if reply:
            reply_string = reply[-1]
            print( "Reply:", reply_string )

            #  3. Close request
            request = [uuid]
            reply = service_call (session, b"titanic.close", request)
            break
        else:
            print( "I: no reply yet, trying again..." )
            time.sleep(5)     #  Try again in 5 seconds
    return 0

if __name__ == '__main__':
    main()

titanic.py

"""
Titanic service

Implements server side of http:#rfc.zeromq.org/spec:9

"""

# import cPickle as pickle
import pickle
import os
import sys
import threading
import time
from uuid import uuid4
from pathlib import Path

import zmq

from mdwrkapi import MajorDomoWorker
from mdcliapi import MajorDomoClient

from zhelpers import zpipe

TITANIC_DIR = ".titanic"

def request_filename (uuid):
    """Returns freshly allocated request filename for given UUID"""
    return os.path.join(TITANIC_DIR, "%s.req" % uuid)

#

def reply_filename (uuid):
    """Returns freshly allocated reply filename for given UUID"""
    return os.path.join(TITANIC_DIR, "%s.rep" % uuid)

# ---------------------------------------------------------------------
# Titanic request service

def titanic_request (pipe):
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.request")

    reply = None

    while True:
        # Send reply if it's not null
        # And then get next request from broker
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        # Ensure message directory exists
        if not os.path.exists(TITANIC_DIR):
            os.mkdir(TITANIC_DIR)

        # Generate UUID and save message to disk
        uuid = uuid4().hex
        filename = request_filename (uuid)
        with open(filename, 'wb') as f:
            pickle.dump(request, f)

        # Send UUID through to message queue
        uuidframe = uuid.encode()
        print( "uuid is {}".format(uuidframe) )
        pipe.send( uuidframe )

        # Now send UUID back to client
        # Done by the worker.recv() at the top of the loop
        reply = [b"200", uuidframe ]


# ---------------------------------------------------------------------
# Titanic reply service

def titanic_reply ():
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.reply")
    reply = None

    while True:
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        uuid = request.pop(0).decode()
        req_filename = request_filename(uuid)
        rep_filename = reply_filename(uuid)

        print("req_filename={}, rep_filename={}".format(req_filename, rep_filename))
        if os.path.exists(rep_filename):
            with open(rep_filename, 'rb') as f:
                reply = pickle.load(f)
            reply = [b"200"] + reply
        else:
            if os.path.exists(req_filename):
                reply = [b"300"] # pending
            else:
                reply = [b"400"] # unknown


# ---------------------------------------------------------------------
# Titanic close service

def titanic_close():
    worker = MajorDomoWorker("tcp://localhost:5555", b"titanic.close")
    reply = None

    while True:
        request = worker.recv(reply)
        if not request:
            break      # Interrupted, exit

        uuid = request.pop(0).decode()
        req_filename = request_filename(uuid)
        rep_filename = reply_filename(uuid)
        # should these be protected?  Does zfile_delete ignore files
        # that have already been removed?  That's what we are doing here.
        if os.path.exists(req_filename):
            os.remove(req_filename)
        if os.path.exists(rep_filename):
            os.remove(rep_filename)
        reply = [b"200"]


def service_success(client, uuid):
    """Attempt to process a single request, return True if successful"""
    # Load request message, service will be first frame
    filename = request_filename (uuid)
    # print("service_success filename = {}".format(filename))

    # If the client already closed request, treat as successful
    if not os.path.exists(filename):
        return True

    with open(filename, 'rb') as f:
        request = pickle.load(f)
    service = request.pop(0)
    # Use MMI protocol to check if service is available
    mmi_request = [service]
    mmi_reply = client.send(b"mmi.service", mmi_request)
    service_ok = mmi_reply and mmi_reply[0] == b"200"

    if service_ok:
        reply = client.send(service, request)
        if reply:
            filename = reply_filename (uuid)
            with open(filename, "wb") as f:
                pickle.dump(reply, f)
            return True

    return False


def main():
    basedir = os.path.join(TITANIC_DIR)
    if not os.path.exists(basedir):
        os.makedirs(basedir)
    queuefile = os.path.join(TITANIC_DIR, 'queue')
    Path(queuefile).touch()

    verbose = '-v' in sys.argv
    ctx = zmq.Context()

    # Create MDP client session with short timeout
    client = MajorDomoClient("tcp://localhost:5555", verbose)
    client.timeout = 1000 # 1 sec
    client.retries = 1 # only 1 retry

    request_pipe, peer = zpipe(ctx)
    request_thread = threading.Thread(target=titanic_request, args=(peer,))
    request_thread.daemon = True
    request_thread.start()
    reply_thread = threading.Thread(target=titanic_reply)
    reply_thread.daemon = True
    reply_thread.start()
    close_thread = threading.Thread(target=titanic_close)
    close_thread.daemon = True
    close_thread.start()

    poller = zmq.Poller()
    poller.register(request_pipe, zmq.POLLIN)
    # Main dispatcher loop
    while True:
        # Ensure message directory exists
        if not os.path.exists(TITANIC_DIR):
            os.mkdir(TITANIC_DIR)
        # We'll dispatch once per second, if there's no activity
        try:
            items = poller.poll(1000)
        except KeyboardInterrupt:
            break;              # Interrupted

        if items:

            # Append UUID to queue, prefixed with '-' for pending
            uuid = request_pipe.recv().decode()
            # print("append to queue: {}".format(uuid))

            with open(os.path.join(TITANIC_DIR, 'queue'), 'a') as f:
                f.write("-%s\n" % uuid)

        # Brute-force dispatcher
        #
        with open(os.path.join(TITANIC_DIR, 'queue'), 'r+b') as f:
            for entry in f.readlines():
                # UUID is prefixed with '-' if still waiting
                entry = entry.decode()
                # print("entry = {}".format(entry))
                if entry[0] == '-':
                    uuid = entry[1:].rstrip() # rstrip '\n' etc.
                    print( "I: processing request %s" % uuid )
                    if service_success(client, uuid):
                        # mark queue entry as processed
                        here = f.tell()
                        f.seek(-1*len(entry), os.SEEK_CUR)
                        f.write(b'+')
                        f.seek(here, os.SEEK_SET)


if __name__ == '__main__':
    main()

執行

$ python mdbroker.py -v

$ python mdworker.py -v

$ python titanic.py -v
2018-09-26 17:15:42 I: connecting to broker at tcp://localhost:5555...
uuid is b'a8398b5a2419455598a064936336ddf7'
I: processing request a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: send request to 'b'mmi.service'' service:
----------------------------------------
[006] MDPC01
[011] mmi.service
[004] echo
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[011] mmi.service
[003] 200
2018-09-26 17:16:02 I: send request to 'b'echo'' service:
----------------------------------------
[006] MDPC01
[004] echo
[011] Hello world
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[004] echo
[011] Hello world
req_filename=.titanic/a8398b5a2419455598a064936336ddf7.req, rep_filename=.titanic/a8398b5a2419455598a064936336ddf7.rep

$ python ticlient.py -v
2018-09-26 17:16:00 I: connecting to broker at tcp://localhost:5555...
2018-09-26 17:16:00 I: send request to 'b'titanic.request'' service:
----------------------------------------
[006] MDPC01
[015] titanic.request
[004] echo
[011] Hello world
2018-09-26 17:16:02 W: no reply, reconnecting...
2018-09-26 17:16:02 I: connecting to broker at tcp://localhost:5555...
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[015] titanic.request
[003] 200
[032] a8398b5a2419455598a064936336ddf7
I: request UUID  b'a8398b5a2419455598a064936336ddf7'
2018-09-26 17:16:02 I: send request to 'b'titanic.reply'' service:
----------------------------------------
[006] MDPC01
[013] titanic.reply
[032] a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[013] titanic.reply
[003] 200
[011] Hello world
Reply: b'Hello world'
2018-09-26 17:16:02 I: send request to 'b'titanic.close'' service:
----------------------------------------
[006] MDPC01
[013] titanic.close
[032] a8398b5a2419455598a064936336ddf7
2018-09-26 17:16:02 I: received reply:
----------------------------------------
[006] MDPC01
[013] titanic.close
[003] 200
  • 因 tinatic 同時有 client, worker 的身份,所以有 sending 及 receving message loops
  • titanic broker 以 MMI service discovery protocol 發送 requests
  • 使用 inproc 發送 request data 給 titanic.request

  • 使用一個檔案,處理所有 data

  • 不建議用 database,直接使用檔案,效能比 db 好

  • 如果要讓 titanic 更 reliable,可以複製 request 到 2nd server

  • 如果要讓 titanic much faster and less reliable,可將 request, replies 存在 memory

Binary Star pattern: High Availability Pair

Binary Star pattern 將兩個 servers 做成 primary-backup high-availablity pair,是 active-standby方式。

發生 failover 時

recovery from failover works:

  • operators 重新啟動 primary server,修正發生的問題
  • operators 突然停掉 backup server,短時間會造成 discruption to application
  • 當 application 重新連到 primary server,operators 會重新啟動 backup server

recovery 要手動處理,因為自動處理可能會有問題,以下是原因:

  1. failover 會讓 server 停止提供服務給 application,可能會有 10-30s,但會比完全無法使用還好。但 recovery 會再造成另一次 10-30s outage,最好是在離峰時間處理。
  2. 緊急狀況發生,最優先要把東西修好。auto recovery 可能會讓 system admin 不知道現在是哪一台機器在運作
  3. auto recovery 可能會是因為 netowrk fail over,會由不同地區的人員分析問並解決問題

Binary Star Pattern 會在 backup server fails 時,再回到 primary server,這也是觸動 recovery 的機制。

Binary Start pair 的關機程序,有以下兩種方式

  1. stop passive server -> stop active server later
  2. 任意順序,在幾秒內 stop both servers

如果先停止 active server 在關掉 passive server,會讓 application 發生 disconnect, reconnect 然後再 disconnect 的狀況。

Detailed Requirements for HA architecture
  • failover 表示發生了毀滅性 system failure 災難,例如硬體失效、火災...。一般性的 server crash 都有簡單的方法可以處理。
  • failover 時間要在 60s 內處理完成,最好是 10s 內
  • failover 要自動處理,但 recovery 是手動處理。application 要能自動切換到 backup server,但在 operators 修復前,都不要自動切換回原本的 primary server。要選擇適當的時間,再中斷一次 application。
  • failover 程序在 client application 要讓 developer 容易使用,應該要隱藏在 client API 裡面
  • 要有明確的步驟,讓 network architects 迴避可能會發生 split brain syndrome (兩個 server 都認爲自己是active) 的網路架構
  • 兩個 server 啟動沒有固定的先後順序
  • 在沒有停止 client application 時,要能提供 planned stops 並 restart 任何一台 server 的機制
  • operators 必須要隨時能夠監控 both servers
  • 使用 high-speed dedicated network connection連接兩台 servers,failover synchronization 要使用特定的 IP route

有以下假設

  • 單一 backup server 就足夠,不需要多層 backups
  • primary 與 backup server 有相同運算能力,可處理相同的 application load,不需要 load balance 到其他 servers
  • 有足夠預算,可以提供一台冗餘 backup server,平常沒在工作

不會討論到以下 issues:

  • 使用 active backup server 或是 load balancing。在 Binary Star Pair,backup server 平常都是 inactive,沒有在工作。
  • 要處理 persistent messages/transactions。假設有 unreliable server 或是 Binary Star pairs
  • Binary Star Pair 需要確切的網路架構,且要讓 application 知道 (configuration data)
  • 在 servers 之間會做 state/messages replication。server-side state 必須在 failover 時,進行重建。

在 Binary Start 主要的關鍵用語:

  • Primary: 一開始 active 的 server
  • Backup: passive server
  • Active: 接受 client 連線的 server,最多只有一台 active server
  • Passive: 正常狀況下,primary server 為 active,backup 為 passive,但 failover 發生時,角色會對調。

Binary Star Pair 必須要設定:

  1. 要告訴 primary server,backup server 在哪裡
  2. 要告訴 backup server,primary server 在哪裡
  3. optional: 可調整 failover response time,兩台 servers 的設定要相同

範例中,failover timeout 為 2000ms,如果將 primary server 包裝在 shell script,然後設定自動 restart,failover timeout 時間必需設定超過 primary server restart 所需要的時間。

為了讓 client app 在 Binary Start Pair 運作,必須要

  1. 知道 primary & backup servers 的 addresses
  2. 先連接 primary server,異常時,連接到 backup server
  3. 偵測 failed connection,通常是用 heartbeat
  4. 先嘗試重連到 primary,再嘗試 backup,retries 中間的delay時間,要超過 server failover timeout 的時間
  5. 能重建在某個 server 的所有 state
  6. 如果需要有 reliable message, 在 failover 時,要重送 messages

Binary Star Pattern 的限制:

  • server process 不能有超過 1 個 Binary Star Pair
  • primary server 只能有一台 backup server,不能超過
  • passive server 平常沒有在工作
  • backup server 必須要能完整處理所有 application loads
  • failover 設定不能在 runtime 時修改
  • client application 必須因應 failover 做一些修改
Preventing Split-Brain Syndrome

Split-Brain Syndrome 發生在 clusters 成員中,部分成員同時認為自己為 active server。她會造成 applications 無法互相知道對方的問題。Binary Star 有偵測並解除 Split-Brain 的方法,是用 three-way decision mechanism: server 在收到 application connection requests 以前,都無法判斷自己是 active,也無法看到 peer server。

然而還是有可能會有一種網路架構,會讓這個 algorithm 誤判。例如 Binary Start Pair 分在兩個建築物中,每一個建築物中,都有一組 applications,在兩個建築物中,有一條 network,將該 network 斷線,會形成兩組 applications,每一組各有一半的 Binary Star Pair,而兩個 server 都是 active。

要解決 split-brain 問題,必須將兩台 servers 以 dedicated network link 連接,最簡單的方法是連到同一台 switch,或是直接用 crossover 網路線連在一起。

不能將 Binary Star Pair 分在兩個不同的地點,如果是這樣,必須改用 federation 而不是 high-availability failover。

適當的 paranoid network 設定要使用兩個 private cluster interconnects,而不是一個。另外要用不同的網路卡。目的是區分 network failure 的問題。

Binary Star Implementation

primary 跟 backup server 是用相同的程式,在啟動時決定是哪一種角色

bstarsrv.py: binary star server

# Binary Star Server

from argparse import ArgumentParser
import time

from zhelpers import zmq

STATE_PRIMARY = 1
STATE_BACKUP = 2
STATE_ACTIVE = 3
STATE_PASSIVE = 4

PEER_PRIMARY = 1
PEER_BACKUP = 2
PEER_ACTIVE = 3
PEER_PASSIVE = 4
CLIENT_REQUEST = 5

HEARTBEAT = 1000


class BStarState(object):
    def __init__(self, state, event, peer_expiry):
        self.state = state
        self.event = event
        self.peer_expiry = peer_expiry


class BStarException(Exception):
    pass

fsm_states = {
    # 原本是 primary
    #  收到 PEER_BACKUP 會變成 active
    #  收到 PEER_ACTIVE,會變成 passive
    STATE_PRIMARY: {
        PEER_BACKUP: ("I: connected to backup (slave), ready as master",
                      STATE_ACTIVE),
        PEER_ACTIVE: ("I: connected to backup (master), ready as slave",
                      STATE_PASSIVE)
        },
    # 原本是 backup
    #  收到 PEER_ACTIVE,會變成 passive
    #  收到 CLIENT_REQUEST,異常,不異動狀態
    STATE_BACKUP: {
        PEER_ACTIVE: ("I: connected to primary (master), ready as slave",
                      STATE_PASSIVE),
        CLIENT_REQUEST: ("", False)
        },
    # 原本是 active
    #  收到 PEER_ACTIVE,異常
    STATE_ACTIVE: {
        PEER_ACTIVE: ("E: fatal error - dual masters, aborting", False)
        },
    # 原本是 passive
    #  收到 PEER_PRIMARY,會變成 active
    #  收到 PEER_BACKUP,會變成 active
    #  收到 PEER_PASSIVE,異常
    #  收到 CLIENT_REQUEST,維持不變
    STATE_PASSIVE: {
        PEER_PRIMARY: ("I: primary (slave) is restarting, ready as master",
                       STATE_ACTIVE),
        PEER_BACKUP: ("I: backup (slave) is restarting, ready as master",
                      STATE_ACTIVE),
        PEER_PASSIVE: ("E: fatal error - dual slaves, aborting", False),
        CLIENT_REQUEST: (CLIENT_REQUEST, True)  # Say true, check peer later
        }
    }


def run_fsm(fsm):
    # There are some transitional states we do not want to handle
    state_dict = fsm_states.get(fsm.state, {})
    res = state_dict.get(fsm.event)
    if res:
        msg, state = res
    else:
        return
    # state 為 False 表示 fsm 狀態異常
    if state is False:
        raise BStarException(msg)
    elif msg == CLIENT_REQUEST:
        assert fsm.peer_expiry > 0
        if int(time.time() * 1000) > fsm.peer_expiry:
            fsm.state = STATE_ACTIVE
        else:
            raise BStarException()
    else:
        print(msg)
        fsm.state = state


def main():
    parser = ArgumentParser()
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-p", "--primary", action="store_true", default=False)
    group.add_argument("-b", "--backup", action="store_true", default=False)
    args = parser.parse_args()

    ctx = zmq.Context()
    statepub = ctx.socket(zmq.PUB)
    statesub = ctx.socket(zmq.SUB)
    statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
    frontend = ctx.socket(zmq.ROUTER)

    fsm = BStarState(0, 0, 0)

    if args.primary:
        print("I: Primary master, waiting for backup (slave)")
        frontend.bind("tcp://*:5001")
        statepub.bind("tcp://*:5003")
        statesub.connect("tcp://localhost:5004")
        fsm.state = STATE_PRIMARY
    elif args.backup:
        print("I: Backup slave, waiting for primary (master)")
        frontend.bind("tcp://*:5002")
        statepub.bind("tcp://*:5004")
        statesub.connect("tcp://localhost:5003")
        statesub.setsockopt_string(zmq.SUBSCRIBE, u"")
        fsm.state = STATE_BACKUP

    send_state_at = int(time.time() * 1000 + HEARTBEAT)
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(statesub, zmq.POLLIN)

    while True:
        time_left = send_state_at - int(time.time() * 1000)
        if time_left < 0:
            time_left = 0
        socks = dict(poller.poll(time_left))
        if socks.get(frontend) == zmq.POLLIN:
            msg = frontend.recv_multipart()
            fsm.event = CLIENT_REQUEST
            try:
                run_fsm(fsm)
                frontend.send_multipart(msg)
            except BStarException:
                del msg

        if socks.get(statesub) == zmq.POLLIN:
            msg = statesub.recv()
            fsm.event = int(msg)
            del msg
            try:
                run_fsm(fsm)
                fsm.peer_expiry = int(time.time() * 1000) + (2 * HEARTBEAT)
            except BStarException:
                break

        # 超過要發布 state 的時間
        if int(time.time() * 1000) >= send_state_at:
            statepub.send_string("%d" % fsm.state)
            # 更新下一次發送 state 的時間
            send_state_at = int(time.time() * 1000) + HEARTBEAT

if __name__ == '__main__':
    main()

bstarcli.py: binary star client

from time import sleep
import zmq


REQUEST_TIMEOUT = 1000  # msecs
SETTLE_DELAY = 2000  # before failing over


def main():
    server = ['tcp://localhost:5001', 'tcp://localhost:5002']
    server_nbr = 0
    ctx = zmq.Context()

    # client 先連到 primary server
    client = ctx.socket(zmq.REQ)
    client.connect(server[server_nbr])
    poller = zmq.Poller()
    poller.register(client, zmq.POLLIN)

    sequence = 0
    while True:
        client.send_string("%s" % sequence)

        expect_reply = True
        while expect_reply:
            socks = dict(poller.poll(REQUEST_TIMEOUT))
            if socks.get(client) == zmq.POLLIN:
                reply = client.recv_string()
                if int(reply) == sequence:
                    print("I: server replied OK (%s)" % reply)
                    expect_reply = False
                    sequence += 1
                    sleep(1)
                else:
                    print("E: malformed reply from server: %s" % reply)
            else:
                # 接收 reply 的 timeout,表示 server 異常,就連接到下一台 server,重發訊息
                print("W: no response from server, failing over")
                sleep(SETTLE_DELAY / 1000)
                poller.unregister(client)
                client.close()
                server_nbr = (server_nbr + 1) % 2
                print("I: connecting to server at %s.." % server[server_nbr])
                client = ctx.socket(zmq.REQ)
                poller.register(client, zmq.POLLIN)
                # reconnect and resend request
                client.connect(server[server_nbr])
                client.send_string("%s" % sequence)

if __name__ == '__main__':
    main()

執行結果

$ python bstarsrv.py -p
I: Primary master, waiting for backup (slave)
I: connected to backup (slave), ready as master
^C

$ python bstarsrv.py -b
I: Backup slave, waiting for primary (master)
I: connected to primary (master), ready as slave

$ python bstarcli.py
I: server replied OK (0)
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
I: server replied OK (4)
I: server replied OK (5)
W: no response from server, failing over
I: connecting to server at tcp://localhost:5002..
I: server replied OK (6)
I: server replied OK (7)
I: server replied OK (8)
I: server replied OK (9)
I: server replied OK (10)
I: server replied OK (11)
I: server replied OK (12)

kill primary server,就可造成 failover,然後 restart primary,再 kill backup,就會 recover。

Binary Star 是用 finite state machine 實作,Events 為 peer state,"Peer Active" 代表另一個 server 通知我們,他是 active。"Client Request" 代表收到 client request。"Client Vote" 代表收到 client request,且 peer 已經 2 heartbeat 為 inactive。

servers 使用 PUB-SUB socket 用作 state 交換,沒有其他 socket combination。PUSH 及 DEALER 會在沒有 peer ready 時,block並不接收 message。PAIR 在 peer 斷線重新啟動後,不會自動 reconnect。ROUTER 需要知道 peer address 用來發送訊息。

Binary Star Reactor

Binary Star 可用來封裝成 reactor class。reactor 用在需要處理訊息後,會比 copy/paste Binary Star code 還方便。

bstar.py: Binary Star core class

"""
Binary Star server

"""

import time

import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream

# States we can be in at any point in time
STATE_PRIMARY = 1          # Primary, waiting for peer to connect
STATE_BACKUP = 2           # Backup, waiting for peer to connect
STATE_ACTIVE = 3           # Active - accepting connections
STATE_PASSIVE = 4          # Passive - not accepting connections

# Events, which start with the states our peer can be in
PEER_PRIMARY = 1           # HA peer is pending primary
PEER_BACKUP = 2            # HA peer is pending backup
PEER_ACTIVE = 3            # HA peer is active
PEER_PASSIVE = 4           # HA peer is passive
CLIENT_REQUEST = 5         # Client makes request

# We send state information every this often
# If peer doesn't respond in two heartbeats, it is 'dead'
HEARTBEAT = 1000          # In msecs


class FSMError(Exception):
    """Exception class for invalid state"""
    pass


class BinaryStar(object):
    def __init__(self, primary, local, remote):
        # initialize the Binary Star
        self.ctx = zmq.Context()  # Our private context
        self.loop = IOLoop.instance()  # Reactor loop
        self.state = STATE_PRIMARY if primary else STATE_BACKUP

        self.event = None  # Current event
        self.peer_expiry = 0  # When peer is considered 'dead'
        self.voter_callback = None  # Voting socket handler
        self.master_callback = None  # Call when become master
        self.slave_callback = None  # Call when become slave

        # Create publisher for state going to peer
        self.statepub = self.ctx.socket(zmq.PUB)
        self.statepub.bind(local)

        # Create subscriber for state coming from peer
        self.statesub = self.ctx.socket(zmq.SUB)
        self.statesub.setsockopt_string(zmq.SUBSCRIBE, u'')
        self.statesub.connect(remote)

        # wrap statesub in ZMQStream for event triggers
        self.statesub = ZMQStream(self.statesub, self.loop)

        # setup basic reactor events
        self.heartbeat = PeriodicCallback(self.send_state,
                                          HEARTBEAT, self.loop)
        self.statesub.on_recv(self.recv_state)

    def update_peer_expiry(self):
        """Update peer expiry time to be 2 heartbeats from now."""
        self.peer_expiry = time.time() + 2e-3 * HEARTBEAT

    def start(self):
        self.update_peer_expiry()
        self.heartbeat.start()
        return self.loop.start()

    def execute_fsm(self):
        """Binary Star finite state machine (applies event to state)

        returns True if connections should be accepted, False otherwise.
        """
        accept = True
        if self.state == STATE_PRIMARY:
            # Primary server is waiting for peer to connect
            # Accepts CLIENT_REQUEST events in this state
            if self.event == PEER_BACKUP:
                print("I: connected to backup (slave), ready as master")
                self.state = STATE_ACTIVE
                if self.master_callback:
                    self.loop.add_callback(self.master_callback)
            elif self.event == PEER_ACTIVE:
                print("I: connected to backup (master), ready as slave")
                self.state = STATE_PASSIVE
                if self.slave_callback:
                    self.loop.add_callback(self.slave_callback)
            elif self.event == CLIENT_REQUEST:
                if time.time() >= self.peer_expiry:
                    print("I: request from client, ready as master")
                    self.state = STATE_ACTIVE
                    if self.master_callback:
                        self.loop.add_callback(self.master_callback)
                else:
                    # don't respond to clients yet - we don't know if
                    # the backup is currently Active as a result of
                    # a successful failover
                    accept = False
        elif self.state == STATE_BACKUP:
            # Backup server is waiting for peer to connect
            # Rejects CLIENT_REQUEST events in this state
            if self.event == PEER_ACTIVE:
                print("I: connected to primary (master), ready as slave")
                self.state = STATE_PASSIVE
                if self.slave_callback:
                    self.loop.add_callback(self.slave_callback)
            elif self.event == CLIENT_REQUEST:
                accept = False
        elif self.state == STATE_ACTIVE:
            # Server is active
            # Accepts CLIENT_REQUEST events in this state
            # The only way out of ACTIVE is death
            if self.event == PEER_ACTIVE:
                # Two masters would mean split-brain
                print("E: fatal error - dual masters, aborting")
                raise FSMError("Dual Masters")
        elif self.state == STATE_PASSIVE:
            # Server is passive
            # CLIENT_REQUEST events can trigger failover if peer looks dead
            if self.event == PEER_PRIMARY:
                # Peer is restarting - become active, peer will go passive
                print("I: primary (slave) is restarting, ready as master")
                self.state = STATE_ACTIVE
            elif self.event == PEER_BACKUP:
                # Peer is restarting - become active, peer will go passive
                print("I: backup (slave) is restarting, ready as master")
                self.state = STATE_ACTIVE
            elif self.event == PEER_PASSIVE:
                # Two passives would mean cluster would be non-responsive
                print("E: fatal error - dual slaves, aborting")
                raise FSMError("Dual slaves")
            elif self.event == CLIENT_REQUEST:
                # Peer becomes master if timeout has passed
                # It's the client request that triggers the failover
                assert self.peer_expiry > 0
                if time.time() >= self.peer_expiry:
                    # If peer is dead, switch to the active state
                    print("I: failover successful, ready as master")
                    self.state = STATE_ACTIVE
                else:
                    # If peer is alive, reject connections
                    accept = False
            # Call state change handler if necessary
            if self.state == STATE_ACTIVE and self.master_callback:
                self.loop.add_callback(self.master_callback)
        return accept

    # ---------------------------------------------------------------------
    # Reactor event handlers...

    def send_state(self):
        """Publish our state to peer"""
        self.statepub.send_string("%d" % self.state)

    def recv_state(self, msg):
        """Receive state from peer, execute finite state machine"""
        state = msg[0]
        if state:
            self.event = int(state)
            self.update_peer_expiry()
        self.execute_fsm()

    def voter_ready(self, msg):
        """Application wants to speak to us, see if it's possible"""
        # If server can accept input now, call appl handler
        self.event = CLIENT_REQUEST
        if self.execute_fsm():
            print("CLIENT REQUEST")
            self.voter_callback(self.voter_socket, msg)
        else:
            # Message will be ignored
            pass

    # -------------------------------------------------------------------------
    #

    def register_voter(self, endpoint, type, handler):
        """Create socket, bind to local endpoint, and register as reader for
        voting. The socket will only be available if the Binary Star state
        machine allows it. Input on the socket will act as a "vote" in the
        Binary Star scheme.  We require exactly one voter per bstar instance.

        handler will always be called with two arguments: (socket,msg)
        where socket is the one we are creating here, and msg is the message
        that triggered the POLLIN event.
        """
        assert self.voter_callback is None

        socket = self.ctx.socket(type)
        socket.bind(endpoint)
        self.voter_socket = socket
        self.voter_callback = handler

        stream = ZMQStream(socket, self.loop)
        stream.on_recv(self.voter_ready)

bstarsrv2.py: Binary Star server

"""
Binary Star server, using bstar reactor

"""

import sys

import zmq

from bstar import BinaryStar


def echo(socket, msg):
    """Echo service"""
    socket.send_multipart(msg)


def main():
    # Arguments can be either of:
    #     -p  primary server, at tcp://localhost:5001
    #     -b  backup server, at tcp://localhost:5002
    if '-p' in sys.argv:
        star = BinaryStar(True, "tcp://*:5003", "tcp://localhost:5004")
        star.register_voter("tcp://*:5001", zmq.ROUTER, echo)
    elif '-b' in sys.argv:
        star = BinaryStar(False, "tcp://*:5004", "tcp://localhost:5003")
        star.register_voter("tcp://*:5002", zmq.ROUTER, echo)
    else:
        print("Usage: bstarsrv2.py { -p | -b }\n")
        return

    star.start()

if __name__ == '__main__':
    main()

bstarsrv2 啟動時發生 tornado error,可能要把 tornado 由 5.1 降至 4.5 才行,沒有測試不確定。

Freelance pattern: Brokerless Reliability

distributed peer-to-peer architecture: Freelance Pattern

use case 為 name resolution service

ZeroMQ 常見的架構問題:如何得知要連接到哪一個 endpoint?能不能不要 hard-coding,也不要用設定檔。

ZeroMQ name service 有以下工作

  • 解析 logical name 為 bind endpoint,及 connect endpoint。實際上要能提供多個 bind endpoints、多個 connect endpoints會更好。
  • 可管理多個平行環境,例如在不需要修改程式,區分 "test" 及 "production"
  • application 在 name service 異常時,就無法連接到網路,所以 name service 必須要 reliable

可將 name service 放在 service-oriented Majordomo broker,name service 變成唯一的 global network endpoint,client 只需要設定 name service 的 endpoint。

要處理的異常有 server crashes and restarts, server busy looping, server overload, network issues。要能 reliable,必須建立 pool of name servers。實際上,兩個就夠了。

架構中,多個 clients 直接連接到少量 servers,servers 會 bind addresses,這跟 Majordomo 不同。clients 有幾種實作方式:

  • REQ sockets 及 Lazy Pirate Pattern。實作簡單,但需要增加一些 code,否則 client 回一直嘗試連到 dead servers
  • DEALER sockets 及 blast out requests (load balanced to all connected servers) 直到取得 reply 為止
  • ROUTER sockets。clients 可使用特定 servers,但 client 要如何知道 identity of the server sockets? server 可 ping client,或是 server hard-coded,使用固定的 identity。
Model 1: Simple Retry and Failover

將 Lazy Pirate 改寫為跟多個 server endpoints 運作

flserver1.py: Freelance server

#
# Freelance server - Model 1
# Trivial echo service
import sys
import zmq

if len(sys.argv) < 2:
    print( "I: Syntax: %s <endpoint>" % sys.argv[0] )
    sys.exit(0)

endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)

print( "I: Echo service is ready at %s" % endpoint )
while True:
    msg = server.recv_multipart()
    if not msg:
        break  # Interrupted
    server.send_multipart(msg)

server.setsockopt(zmq.LINGER, 0) # Terminate immediately

flclient1.py

#
# Freelance Client - Model 1
# Uses REQ socket to query one or more services

import sys
import time

import zmq

REQUEST_TIMEOUT = 1000  # ms
MAX_RETRIES = 3   # Before we abandon

def try_request(ctx, endpoint, request):
    print( "I: Trying echo service at %s..." % endpoint )
    client = ctx.socket(zmq.REQ)
    client.setsockopt(zmq.LINGER, 0)  # Terminate early
    client.connect(endpoint)
    client.send(request)
    poll = zmq.Poller()
    poll.register(client, zmq.POLLIN)
    socks = dict(poll.poll(REQUEST_TIMEOUT))
    if socks.get(client) == zmq.POLLIN:
        reply = client.recv_multipart()
    else:
        reply = ''
    poll.unregister(client)
    client.close()
    return reply

context = zmq.Context()
request = b"Hello world"
reply = None

endpoints = len(sys.argv) - 1
if endpoints == 0:
    print( "I: syntax: %s <endpoint> ..." % sys.argv[0] )
elif endpoints == 1:
    # For one endpoint, we retry N times
    endpoint = sys.argv[1]
    for retries in range(MAX_RETRIES):
        reply = try_request(context, endpoint, request)
        if reply:
            break  # Success
        print( "W: No response from %s, retrying" % endpoint )
else:
    # For multiple endpoints, try each at most once
    for endpoint in sys.argv[1:]:
        reply = try_request(context, endpoint, request)
        if reply:
            break  # Success
        print( "W: No response from %s" % endpoint )

if reply:
    print( "Service is running OK" )

執行結果

$python flserver1.py tcp://*:5555
I: Echo service is ready at tcp://*:5555

$python flserver1.py tcp://*:5556
I: Echo service is ready at tcp://*:5556

$python flclient1.py tcp://localhost:5555 tcp:/localhost:5556
I: Trying echo service at tcp://localhost:5555...
Service is running OK

雖然是 Lazy Pirate,client 只需要取得一個 reply,但有兩個部分要注意

  • 如果是 single server,client 會重試很多次,就像 Lazy Pirate 一樣
  • 如果是 multiple servers,client 會嘗試每一個 server 一次,直到收到 reply,或是已經試過所有的 servers

這個方式解決 Lazy Pirate 的問題:無法 fail over 到 backup/alternate servers

但正式環境不能用這個方法,如果使用了多個 sockets,primary name server 掛了,就會一直遇到 timeout。

Model 2: Brutal Shotgun Massacre

改用 DEALER socket,要確保 shortest possible time 時間內,取得 reply。

client 改用以下方法:

  • 設定後,連接所有 servers
  • 如果有 request,就全發給所有的 servers
  • 等待第一個 reply,忽略其他 replies

client 會取得多個 replies,不確定會有幾個。requests, replies 都可能會遺失。

在 request 增加 sequence number,並忽略錯誤的 number 的 replies

flserver2.py

#
# Freelance server - Model 2
# Does some work, replies OK, with message sequencing


import sys
import zmq

if len(sys.argv) < 2:
    print( "I: Syntax: %s <endpoint>" % sys.argv[0] )
    sys.exit(0)

endpoint = sys.argv[1]
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(endpoint)

print( "I: Service is ready at %s" % endpoint )
while True:
    request = server.recv_multipart()
    if not request:
        break  # Interrupted
    # Fail nastily if run against wrong client
    assert len(request) == 2

    address = request[0]
    reply = [address, b"OK"]
    server.send_multipart(reply)

server.setsockopt(zmq.LINGER, 0)  # Terminate early

flclient2.py

#
# Freelance Client - Model 2
# Uses DEALER socket to blast one or more services
import sys
import time

import zmq

GLOBAL_TIMEOUT = 2500  # ms

class FLClient(object):
    def __init__(self):
        self.servers = 0
        self.sequence = 0
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.DEALER)   # DEALER

    def destroy(self):
        self.socket.setsockopt(zmq.LINGER, 0)  # Terminate early
        self.socket.close()
        self.context.term()

    def connect(self, endpoint):
        self.socket.connect(endpoint)
        self.servers += 1
        print( "I: Connected to %s" % endpoint )

    def request(self, *request):
        # Prefix request with sequence number and empty envelope
        self.sequence += 1
        msg = [b'', str(self.sequence).encode() ] + list(request)


        # Blast the request to all connected servers
        for server in range(self.servers):
            self.socket.send_multipart(msg)

        # Wait for a matching reply to arrive from anywhere
        # Since we can poll several times, calculate each one
        poll = zmq.Poller()
        poll.register(self.socket, zmq.POLLIN)

        reply = None
        endtime = time.time() + GLOBAL_TIMEOUT / 1000
        while time.time() < endtime:
            socks = dict(poll.poll((endtime - time.time()) * 1000))
            if socks.get(self.socket) == zmq.POLLIN:
                reply = self.socket.recv_multipart()
                assert len(reply) == 3
                sequence = int(reply[1].decode())
                if sequence == self.sequence:
                    break
        return reply

if len(sys.argv) == 1:
    print( "I: Usage: %s <endpoint> ..." % sys.argv[0] )
    sys.exit(0)

# Create new freelance client object
client = FLClient()

for endpoint in sys.argv[1:]:
    client.connect(endpoint)

start = time.time()
for requests in range(10000):
    request = b"random name"
    reply = client.request(request)
    if not reply:
        print( "E: Name service not available, aborting" )
        break
print( "Average round trip cost: {} usec".format((time.time() - start) / 100) )
client.destroy()

執行結果

$python flserver2.py tcp://*:5555
I: Service is ready at tcp://*:5555

$python flclient2.py tcp://localhost:5555
I: Connected to tcp://localhost:5555
Average round trip cost: 0.024947671890258788 usec

client 實作要注意

  • client 要用 class-based API 封裝
  • cloent 在數秒內,沒有連到任何 respoinsive server,就會放棄
  • client 會產生 valid REP envelope,增加 empty frame

client 會執行 10000 name resolution request,並測量 average cost

優點:

  • simple,容易實作
  • 有failover 功能,運作快,只要至少有一個 server 運作即可

缺點

  • 會產生 redundant network traffic
  • 無法設定 server 優先順序
  • server 一次只能處理一個 request
Model 3: Complex and Nasty

改用 ROUTER socket,可發送 request 給特定 servers,避免使用 dead servers。

在 ROUTER-ROUTER 之間,兩端都需要 identity,必須要收到第一個 message 後,才會產生 identity。唯一的解決方式,是在一個方向採用 hard-coded identities。

使用 connection endpoint 為 identity。

ZeroMQ identities 運作方式為,server ROUTER socket 會在 bind socket 前,設定 identity,在 client 連線時,會交換 identities,然後才會發送真正的訊息。

client ROUTER socket 一開始沒有設定 identity,會送 null identity 給 server,server 會產生 random UUID 給該 client,並發送給 client。

client 不是在 zmq_connect()後,就馬上可以 route message 到 server,而是在 random time 以後,這裡會產生一個問題:我們不知道 server 是否存在,但如果 server online,在幾 ms 後,就會完成。

我們需要知道哪些 servers 可連線使用。在 Freelance pattern,不同於 broker-based patterns,servers 一開始都是靜默的等待別人連線。

解決方案是採用 shotgun approach,就是嘗試發送 ping-pong heartbeat 給所有 servers。

how Freelance client and server exchange ping-pong commands and request-reply commands 說明了這個 protocol

flclient3.py

"""
Freelance client - Model 3

Uses flcliapi class to encapsulate Freelance pattern
"""

import time

from flcliapi import FreelanceClient

def main():
    # Create new freelance client object
    client = FreelanceClient()

    # Connect to several endpoints
    client.connect ("tcp://localhost:5555")
    client.connect ("tcp://localhost:5556")
    client.connect ("tcp://localhost:5557")

    # Send a bunch of name resolution 'requests', measure time
    requests = 10000
    start = time.time()
    for i in range(requests):
        request = [b"random name"]
        reply = client.request(request)
        if not reply:
            print( "E: name service not available, aborting" )
            return

    print( "Average round trip cost: {} usec".format(1e6*(time.time() - start) / requests) )

if __name__ == '__main__':
    main()

flcliapi.py

"""
flcliapi - Freelance Pattern agent class
Model 3: uses ROUTER socket to address specific services

"""

import threading
import time

import zmq

from zhelpers import zpipe

# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 3000    # msecs
# PING interval for servers we think are alivecp
PING_INTERVAL  = 2000    # msecs
# Server considered dead if silent for this long
SERVER_TTL     = 6000    # msecs


def flciapi_agent(peer):
    """This is the thread that handles our real flcliapi class
    """
    pass

# =====================================================================
# Synchronous part, works in our application thread

class FreelanceClient(object):
    ctx = None      # Our Context
    pipe = None     # Pipe through to flciapi agent
    agent = None    # agent in a thread

    def __init__(self):
        self.ctx = zmq.Context()
        self.pipe, peer = zpipe(self.ctx)
        self.agent = threading.Thread(target=agent_task, args=(self.ctx,peer))
        self.agent.daemon = True
        self.agent.start()


    def connect(self, endpoint):
        """Connect to new server endpoint
        Sends [CONNECT][endpoint] to the agent
        """
        self.pipe.send_multipart([b"CONNECT", endpoint.encode()])
        time.sleep(0.1) # Allow connection to come up

    def request(self, msg):
        "Send request, get reply"
        request = [b"REQUEST"] + msg
        self.pipe.send_multipart(request)
        reply = self.pipe.recv_multipart()
        status = reply.pop(0)
        if status != b"FAILED":
            return reply


# =====================================================================
# Asynchronous part, works in the background

# ---------------------------------------------------------------------
# Simple class for one server we talk to

class FreelanceServer(object):
    endpoint = None         # Server identity/endpoint
    alive = True            # 1 if known to be alive
    ping_at = 0             # Next ping at this time
    expires = 0             # Expires at this time

    def __init__(self, endpoint):
        self.endpoint = endpoint
        self.alive = True
        self.ping_at = time.time() + 1e-3*PING_INTERVAL
        self.expires = time.time() + 1e-3*SERVER_TTL

    def ping(self, socket):
        if time.time() > self.ping_at:
            socket.send_multipart([self.endpoint.encode(), b'PING'])
            self.ping_at = time.time() + 1e-3*PING_INTERVAL

    def tickless(self, tickless):
        if tickless > self.ping_at:
            tickless = self.ping_at
        return tickless

# ---------------------------------------------------------------------
# Simple class for one background agent

class FreelanceAgent(object):
    ctx = None              # Own context
    pipe = None             # Socket to talk back to application
    router = None           # Socket to talk to servers
    servers = None          # Servers we've connected to
    actives = None          # Servers we know are alive
    sequence = 0            # Number of requests ever sent
    request = None          # Current request if any
    reply = None            # Current reply if any
    expires = 0             # Timeout for request/reply

    def __init__(self, ctx, pipe):
        self.ctx = ctx
        self.pipe = pipe
        self.router = ctx.socket(zmq.ROUTER)
        self.servers = {}
        self.actives = []

    def control_message (self):
        msg = self.pipe.recv_multipart()
        command = msg.pop(0)

        if command == b"CONNECT":
            endpoint = msg.pop(0).decode()
            print( "I: connecting to %s...\n" % endpoint ),
            self.router.connect(endpoint)
            server = FreelanceServer(endpoint)
            self.servers[endpoint] = server
            self.actives.append(server)
            # these are in the C case, but seem redundant:
            server.ping_at = time.time() + 1e-3*PING_INTERVAL
            server.expires = time.time() + 1e-3*SERVER_TTL
        elif command == b"REQUEST":
            assert not self.request    # Strict request-reply cycle
            # Prefix request with sequence number and empty envelope
            self.request = [str(self.sequence).encode(), b''] + msg

            # Request expires after global timeout
            self.expires = time.time() + 1e-3*GLOBAL_TIMEOUT

    def router_message (self):
        reply = self.router.recv_multipart()
        # Frame 0 is server that replied
        endpoint = reply.pop(0).decode()
        server = self.servers[endpoint]
        if not server.alive:
            self.actives.append(server)
            server.alive = 1

        server.ping_at = time.time() + 1e-3*PING_INTERVAL
        server.expires = time.time() + 1e-3*SERVER_TTL;

        # Frame 1 may be sequence number for reply
        sequence = reply.pop(0)
        if int(sequence) == self.sequence:
            self.sequence += 1
            reply = [b"OK"] + reply
            self.pipe.send_multipart(reply)
            self.request = None


# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.

def agent_task(ctx, pipe):
    agent = FreelanceAgent(ctx, pipe)
    poller = zmq.Poller()
    poller.register(agent.pipe, zmq.POLLIN)
    poller.register(agent.router, zmq.POLLIN)

    while True:
        # Calculate tickless timer, up to 1 hour
        tickless = time.time() + 3600
        if (agent.request and tickless > agent.expires):
            tickless = agent.expires
            for server in agent.servers.values():
                tickless = server.tickless(tickless)
        try:
            items = dict(poller.poll(1000 * (tickless - time.time())))
        except:
            break              # Context has been shut down

        if agent.pipe in items:
            agent.control_message()

        if agent.router in items:
            agent.router_message()

        # If we're processing a request, dispatch to next server
        if (agent.request):
            if (time.time() >= agent.expires):
                # Request expired, kill it
                agent.pipe.send(b"FAILED")
                agent.request = None
            else:
                # Find server to talk to, remove any expired ones
                while agent.actives:
                    server = agent.actives[0]
                    if time.time() >= server.expires:
                        server.alive = 0
                        agent.actives.pop(0)
                    else:
                        request = [server.endpoint.encode()] + agent.request
                        agent.router.send_multipart(request)
                        break

        # Disconnect and delete any expired servers
        # Send heartbeats to idle servers if needed
        for server in agent.servers.values():
            server.ping(agent.router)

flserver3.py

"""Freelance server - Model 3

Uses an ROUTER/ROUTER socket but just one thread
"""

import sys

import zmq

from zhelpers import dump

def main():
    verbose = '-v' in sys.argv

    ctx = zmq.Context()
    # Prepare server socket with predictable identity
    bind_endpoint = "tcp://*:5555"
    connect_endpoint = "tcp://localhost:5555"
    server = ctx.socket(zmq.ROUTER)
    server.identity = connect_endpoint.encode()
    server.bind(bind_endpoint)
    print( "I: service is ready at", bind_endpoint )

    while True:
        try:
            request = server.recv_multipart()
        except:
            break # Interrupted
        # Frame 0: identity of client
        # Frame 1: PING, or client control frame
        # Frame 2: request body
        address, control = request[:2]
        reply = [address, control]
        if control == b"PING":
            reply[1] = b"PONG"
        else:
            reply.append(b"OK")
        if verbose:
            dump(reply)
        server.send_multipart(reply)
    print( "W: interrupted" )

if __name__ == '__main__':
    main()

執行結果

$ python flserver3.py -v
I: service is ready at tcp://*:5555
----------------------------------------
[005] 0x00800041aa
[001] 0
[002] OK
----------------------------------------
[005] 0x00800041aa
[001] 1
[002] OK
----------------------------------------
[005] 0x00800041aa
[001] 2
[002] OK


$ python flclient3.py
I: connecting to tcp://localhost:5555...

I: connecting to tcp://localhost:5556...

I: connecting to tcp://localhost:5557...

Average round trip cost: 652.7282238006592 usec
  • Multithreaded API: client API 有兩個部分,在 application thread 運作的 synchronous flcliapi class,及背景運作的 asynchronous agent。ZeroMQ 可用在 multithreaded apps,flcliapi 及 agent class 會用 inproc socket 互相溝通,所有 ZeroMQ 相關功能,都封裝在 API 裡面。agent 就像是 mini-broker,在背景跟所有 servers 溝通。發送 request 時,會盡可能連接到可使用的 server。
  • Tickless poll timer: 以往是在 poll loop 使用 fixed tick interval,但還是會造成 CPU costs power,這邊的 agent 使用 tickless timer,會根據 next timeout 計算 poll delay。

References

ØMQ - The Guide