擴充 core pub-sub pattern,增加 higher-level patterns for performance, reliability, state distribution, and monitoring。
- 什麼時候要使用 publish-subscribe
- 如何處理 too-slow subscribers (Suicidal Snail pattern)
- 如何設計 high-speed subscribers (Black Box pattern)
- 如何監控 pub-sub network (Espresso pattern)
- 如何製作 shared key-value store (Clone pattern)
- 如何使用 reactors 用以簡化 servers
- 如何使用 Binary Star Pattern,增加 server failover 功能
Pros and Cons of PubSub
pub-sub 強調 multicast/group messaging 的功能。
PUB 可發送訊息給 "all of many",而 PUSH 與 DEALER 是 "one of many",無法直接用 PUB 替換 PUSH。
pub-sub 強調 scalability,大量資料可快速轉發給多個接收端。pub-sub 使用跟 push-pull 相同的方法: get rid of back-chatter,接收端不會跟 sender 溝通,唯一的例外是 SUB 會發送subscriptions 到 PUB sockets,但這部分是 anonymous and infrequent。
去掉 back-chatter 對 scalability 很重要,subscriber 不會直接跟 publisher 連線,而是連到 multicast group。雖然移除 back-chatter 讓 message flow 變得簡單,但也無法讓 sender/receiver 互相協作。
- publisher 無法知道 subscribers 什麼時候連線成功(包含 initial connetion 及 reconnection)
- subscibers 無法通知 publisher,控制發送訊息的速度,publisher 只能以全速發送
- publisher 無法知道 subscribers 是不是突然斷線了
如果需要有 reliable multicast 要解決 pub-sub pattern 隨時可能遺失訊息的問題。但如果只需要 almost reliable multicast,是可以使用 pub-sub。如果需要用到 back-chatter,可以改用 ROUTER-DEALER,或是增加另一個 channel 用在 synchronization。
pub-sub 就像是 radio broadcast,可能會在 join 前遺失一些訊息,接收訊息的狀況,跟 network quality 有關。但這個 model 就跟真實世界的 information distribution 一樣。
以下是 classic failure cases for pub-sub:
- subscribers join late,遺失了 server 已經發送的訊息
- subscribers 接收訊息速度太慢,造成 queue overflow
- subscribers 在離開時,會丟棄並遺失訊息
- subscribers crash/restart,會遺失已經收到的訊息
- network overloaded and drop data
- network 太慢,publisher-side queues overflow,而 publisher crash
ZeroMQ v3.x 以後限制了 internal buffer size (high water mark, HWM)
Espresso pattern: PubSub Tracing
這是 trace pub-sub 的方法。使用 chap2 zmq_proxy()
可以做 transport bridging,有三個參數: frontend, bakcend socket,及 capture socket
espresso 會生 listener thread,該 thread 會讀取 PAIR socket,並列印取得的 message,PAIR socket 是 pipe 的一端,另一端是傳給 zmq_proxy()
的 socket,實際上,可以過濾 messages,並取得有興趣的訊息。
espresso.py
# Espresso Pattern
# This shows how to capture data using a pub-sub proxy
#
import time
from random import randint
from string import ascii_uppercase as uppercase
from threading import Thread
import zmq
from zmq.devices import monitored_queue
from zhelpers import zpipe
# The subscriber thread requests messages starting with
# A and B, then reads and counts incoming messages.
def subscriber_thread():
ctx = zmq.Context.instance()
# Subscribe to "A" and "B"
subscriber = ctx.socket(zmq.SUB)
subscriber.connect("tcp://localhost:6001")
subscriber.setsockopt(zmq.SUBSCRIBE, b"A")
subscriber.setsockopt(zmq.SUBSCRIBE, b"B")
count = 0
while True:
try:
[msg] = subscriber.recv_multipart()
print (" subscriber: {}".format(msg))
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
else:
raise
count += 1
print ("Subscriber received %d messages" % count)
# publisher thread
# The publisher sends random messages starting with A-J:
def publisher_thread():
ctx = zmq.Context.instance()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:6000")
while True:
cateint = uppercase[randint(0,10)]
cate = "%s" % (cateint)
string = "%s-%05d" % (cateint, randint(0,100000))
try:
print (" publisher: {}".format(string.encode('utf-8')))
publisher.send(string.encode('utf-8'))
# publisher.send_multipart( [cate.encode(), string.encode()] )
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
else:
raise
time.sleep(0.1) # Wait for 1/10th second
# listener thread
# The listener receives all messages flowing through the proxy, on its
# pipe. Here, the pipe is a pair of ZMQ_PAIR sockets that connects
# attached child threads via inproc. In other languages your mileage may vary:
def listener_thread (pipe):
# Print everything that arrives on pipe
while True:
try:
print (pipe.recv_multipart())
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
break # Interrupted
# main thread
# The main task starts the subscriber and publisher, and then sets
# itself up as a listening proxy. The listener runs as a child thread:
def main ():
# Start child threads
ctx = zmq.Context.instance()
p_thread = Thread(target=publisher_thread)
s_thread = Thread(target=subscriber_thread)
p_thread.start()
s_thread.start()
pipe = zpipe(ctx)
subscriber = ctx.socket(zmq.XSUB)
subscriber.connect("tcp://localhost:6000")
publisher = ctx.socket(zmq.XPUB)
publisher.bind("tcp://*:6001")
l_thread = Thread(target=listener_thread, args=(pipe[1],))
l_thread.start()
try:
monitored_queue(subscriber, publisher, pipe[0], b'pub', b'sub')
except KeyboardInterrupt:
print ("Interrupted")
del subscriber, publisher, pipe
ctx.term()
if __name__ == '__main__':
main()
執行結果
$ python espresso.py
publisher: b'H-15364'
[b'sub', b'\x01A']
[b'sub', b'\x01B']
publisher: b'D-22262'
publisher: b'I-36691'
publisher: b'D-61882'
publisher: b'B-22663'
[b'pub', b'B-22663']
subscriber: b'B-22663'
publisher: b'G-79149'
Last Value Caching
ZeroMQ 的 pub-sub 少了一些 pub-sub 系統的功能,其中一個是 LVC: last value caching,這可解決新的 subscriber 加入時,取得資料的問題。publisher 會在新的 subscriber 加入時,重新廣播一次該 subcriber 註冊的 topic 的最新訊息。
在大型 pub-sub 網路,需要類似 PGM(Pragmatic General Multicast) 的protocol,如果想要用 TCP unicast 處理上千個 subscribers,會遇到問題。
PGM 是單向 protocol: publisher 發送訊息到 switch 的 multicast address,廣播到所有需要的 subscribers。publisher 不會知道 subscribers join/leave,這都在 network switch 上處理。
在幾十個 subscribers,有限 topics 的網路環境,可使用 TCP 及 XSUB, XPUB。
也可以用 ZeroMQ 實作 LVC 功能,就是在 publisher, subscribers 之間增加一個 proxy,類似 PGM 的 switch。
有個產生問題的例子,publisher 會對上千個 topics 發送訊息,每秒對 random topic 發送一個,在 subscriber 連線後,因為沒有 LVC,subscriber 平均要等 500s 才有第一個訊息。
pathopub.py: pathologic publisher
#
# Pathological publisher
# Sends out 1,000 topics and then one random update per second
#
import sys
import time
from random import randint
import zmq
def main(url=None):
ctx = zmq.Context.instance()
publisher = ctx.socket(zmq.PUB)
if url:
publisher.bind(url)
else:
publisher.bind("tcp://*:5556")
# Ensure subscriber connection has time to complete
print("bind to {}".format(url))
time.sleep(1)
# Send out all 1,000 topic messages
for topic_nbr in range(1000):
publisher.send_multipart([
b"%03d" % topic_nbr,
b"Save Roger",
])
while True:
# Send one random update per second
try:
time.sleep(1)
publisher.send_multipart([
b"%03d" % randint(0,999),
b"Off with his head!",
])
except KeyboardInterrupt:
print( "interrupted" )
break
if __name__ == '__main__':
main(sys.argv[1] if len(sys.argv) > 1 else None)
pathosub.py
#
# Pathological subscriber
# Subscribes to one random topic and prints received messages
#
import sys
import time
from random import randint
import zmq
def main(url=None):
ctx = zmq.Context.instance()
subscriber = ctx.socket(zmq.SUB)
if url is None:
url = "tcp://localhost:5556"
print("connect to {}".format(url))
subscriber.connect(url)
subscription = b"%03d" % randint(0,999)
print("subscription to {}".format(subscription))
subscriber.setsockopt(zmq.SUBSCRIBE, subscription)
while True:
topic, data = subscriber.recv_multipart()
assert topic == subscription
print( data )
if __name__ == '__main__':
main(sys.argv[1] if len(sys.argv) > 1 else None)
執行結果
$ python pathopub.py
$ python pathosub.py
b'Save Roger'
b'Off with his head!'
在 pathopub 與 pathosub 中間加上 lvache
lvcache.py: Last Value Caching Proxy
#
# Last value cache
# Uses XPUB subscription messages to re-send data
#
import zmq
def main():
ctx = zmq.Context.instance()
frontend = ctx.socket(zmq.SUB)
frontend.connect("tcp://localhost:5557")
backend = ctx.socket(zmq.XPUB)
backend.bind("tcp://*:5558")
# Subscribe to every single topic from publisher
frontend.setsockopt(zmq.SUBSCRIBE, b"")
# Store last instance of each topic in a cache
cache = {}
# main poll loop
# We route topic updates from frontend to backend, and
# we handle subscriptions by sending whatever we cached,
# if anything:
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
while True:
try:
events = dict(poller.poll(1000))
except KeyboardInterrupt:
print("interrupted")
break
# Any new topic data we cache and then forward
if frontend in events:
msg = frontend.recv_multipart()
topic, current = msg
# print("cache topic: {} msg: {}".format(topic, current))
cache[topic] = current
backend.send_multipart(msg)
# handle subscriptions
# When we get a new subscription we pull data from the cache:
if backend in events:
event = backend.recv()
# event=b'\x01188'
# print( "event={}".format(event) )
# Event is one byte 0=unsub or 1=sub, followed by topic
if( event[0:1] == b'\x01' ):
topic = event[1:]
# print ("topic={}".format(topic))
if topic in cache:
print ("Sending cached topic %s" % topic)
backend.send_multipart([ topic, cache[topic] ])
if __name__ == '__main__':
main()
執行結果
$ python lvcache.py
event=b'\x01'
topic=b'708'
Sending cached topic b'708'
$ python pathopub.py tcp://*:5557
bind to tcp://*:5557
$ python pathosub.py tcp://localhost:5558
connect to tcp://localhost:5558
subscription to b'708'
b'Save Roger'
Suicidal Snail pattern: Show Subscriber Detection
pub-sub 系統常見問題: slow subscriber。當 publisher 以全速發送給 subscribers,subscribers 無法快速處理資料。
處理方法是
Queue messages on the publisher
但如果有很多 subscribers,會讓 publisher 記憶體不足而 crash
Queue messages on the subscriber
這個方法比較好,就算 subscriber 記憶體不足而 crash 也沒關係,總比 publisher crash 還好。
Stop queuing new messages after a while
新訊息會被 rejected or dropped,如果 ZeroMQ 在 publisher 設定了 HWM,就會這樣處理,但還是無法解決 slow subscriber 的問題。這會讓 message stream 發生 gap。
Punish slow subscribers with disconnect
ZeroMQ 無法這樣處理,因為 subcribers are invisible to publishers
如果不讓 publisher 斷線,應該讓 subscriber kill itself。這就是 Suicide Snail Pattern
當 subscriber 偵測到自己運作太慢,就會將自己斷線。
偵測的方法是,利用 sequences message 以及在 publisher 設定 HWM。如果 subscriber 偵測到 gap,就知道發生問題了。
這樣做有兩個問題:(1) 如果有多個 publishers,
要如何設定 sequence messages?解決方式是將每一個 publisher 都設定 unique ID,加到 sequencing 上面。(2) 如果 subscribers 使用了 ZMQ_SUBSCRIBER
filters,欲設定會取得 gap。
Suicide Snail Pattern 用在 subscribers 有個各自的 clinets 及 service-level agreements,且需要保證有 maximum latency。雖然將 client 斷線,並不是保證 maxmimum latency 的好方法,但這是 assertion model,如果 client 斷線,可解決問題。讓 late data 持續運作,可能會造成更嚴重的問題。
suisnail.py
"""
Suicidal Snail
"""
from __future__ import print_function
import sys
import threading
import time
from pickle import dumps, loads
import random
import zmq
from zhelpers import zpipe
# ---------------------------------------------------------------------
# This is our subscriber
# It connects to the publisher and subscribes to everything. It
# sleeps for a short time between messages to simulate doing too
# much work. If a message is more than 1 second late, it croaks.
MAX_ALLOWED_DELAY = 1.0 # secs
def subscriber(pipe):
# Subscribe to everything
ctx = zmq.Context.instance()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, b'')
sub.connect("tcp://localhost:5556")
# Get and process messages
while True:
clock = loads(sub.recv())
# Suicide snail logic
if (time.time() - clock > MAX_ALLOWED_DELAY):
print("E: subscriber cannot keep up, aborting", file=sys.stderr)
break
# Work for 1 msec plus some random additional time
time.sleep(1e-3 * (1+2*random.random()))
pipe.send(b"gone and died")
# ---------------------------------------------------------------------
# This is our server task
# It publishes a time-stamped message to its pub socket every 1ms.
def publisher(pipe):
# Prepare publisher
ctx = zmq.Context.instance()
pub = ctx.socket(zmq.PUB)
pub.bind("tcp://*:5556")
while True:
# Send current clock (secs) to subscribers
pub.send(dumps(time.time()))
try:
signal = pipe.recv(zmq.DONTWAIT)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# nothing to recv
pass
else:
raise
else:
# received break message
break
time.sleep(1e-3) # 1msec wait
# This main thread simply starts a client, and a server, and then
# waits for the client to signal it's died.
def main():
ctx = zmq.Context.instance()
pub_pipe, pub_peer = zpipe(ctx)
sub_pipe, sub_peer = zpipe(ctx)
pub_thread = threading.Thread(target=publisher, args=(pub_peer,))
pub_thread.daemon=True
pub_thread.start()
sub_thread = threading.Thread(target=subscriber, args=(sub_peer,))
sub_thread.daemon=True
sub_thread.start()
# wait for sub to finish
sub_pipe.recv()
# tell pub to halt
pub_pipe.send(b"break")
time.sleep(0.1)
if __name__ == '__main__':
main()
notes:
- message 包含了 system clock as a number of ms。實際上必須至少有一個 timestamp message header 及 data message body
- subscriber 及 publisher 放在單一 process,兩個 threads。實際上,應該分屬兩個 processes,因為是 demo 方便才寫成 thread
Black Box pattern: HighSpeed Subscribers
尋找一個讓 subscribers 更快的方法。最常見的 pub-sub 使用範例是發布 market data from stock exchanges,通常會設定一個 publisher 連接到 stock exchange,取得 prices quotes,發送給多個 subscribers,少量 subscribers 可使用 TCP,大量 subscribers 可使用 reliable multicast 例如 PGM。
先想像有 feed 平均每秒產生 100,000 100 bytes messages,這是過濾掉不需要發送給 subscriber 的資料速度,現在我們需要記錄所有資料,8 hrs 大約是 250GB,並在模擬環境中,重新 replay。ZeroMQ application 可以每秒處理 100K messages,但我們希望用更快的速度處理他們。
我們設計一個架構,有些速度很快的機器,一個給 publisher 其他每一個 subscriber 使用一個 box。
當我們將資料傳給 subscribers 要注意:
- 如果對 message 處理一些工作,都會降低 subscriber 的速度,直到他無法跟上 publisher 發送的速度。
- 在 publisher 及 subscribers 要達到最快速度,大約是每秒 6M messages
首先要將 subscriber 拆到 multithreaded design,以便以多個 thread 運作,reading message 放在另一個 thread。通常不會對每個訊息都用相同的方法處理,subscriber 會以 prefix key 過濾訊息,如果 message 符合條件,subscriber 就會呼叫 worker 處理它,就是在 ZeroMQ 發送訊息給 worker thread
subscriber 就像是 queue device,可用多個 sockets 連接 subscriber 及 workers,先假設是 one-way traffic 且 worker 都是一樣的,可使用 PUSH, PULL 方式實作。
subscriber 以 TCP/PGM 跟 publisher 溝通,subscriber 以 inproc:// 跟 workers 溝通。
在每一個 subscriber thread 使用 100% CPU 效能後,因為是一個 thread,無法用到多個 CPU core,我們可以將工作平行分配給多個 threads。
這個方法稱為 sharding。可將工作分為 parallel and independent streams,例如一半 topic key 在一個 stream,另一半在另一個 stream,但 performance 只會因為有多餘的 CPU core 而增加。
sharding 為 2 streams 的方法:
- 使用 2 I/O threads
- 使用 2 network interfaces (NIC), one per subscriber
- 每個 I/O thread 使用特定的 NIC
- 2 subscriber thread 各自使用專屬的 cores
- 每一個 subscriber thread 使用 2 SUB sockets
- 將其他 core 指定給 worker threads
- worker thread 連接到兩個 subscriber PUSH sockets
最好的情況是,我們以 full-loaded thread 使用到所有 CPU cores,當 threads 開始競爭 cores 及 CPU cycles,就會減慢速度。
Clone pattern: Reliable PubSub
目標是讓多個 applications 共用 some common state
- 有上千個 applications
- 可任意 join/leave network
- applications 必須共用 single eventually-consistent state
- 任一個 application 都可在任何時間點修改 state
- state 可儲存於記憶體
有些實際應用案例:
- 由 cloud servers 共享的 configuration
- 由一些 players 共享的 game state
- exchange rate data 即時更新,且提供給 applications 使用
Centralized Versus Decentralized
首先要決定要不要有 central server
- central server 容易瞭解,網路不對稱。使用 central server,可避免 discovery, bind vs connect 等問題
- full-distributed 架構比較不容易實作,但最終會有一個簡單的 protocol。每個 node 都可被指定作為 server 及 client。這就是 Freelance pattern (chap 4)
- central server 在大量使用時,會變成 bottleneck,應該使用 decentralization,可在每秒處理 millions of messages。
- centralized 架構比較容易增加多個 nodes,因為將 10,000 nodes 連到一個 server 會比互相連線來得容易。
clone pattern: 一個發布 state update 的 server,一些 client applications
Represneting State as Key-Value Pairs
接下來一步一步實作 Clone pattern
首先是如何將 shared state 分享給一些 clinets,要先決定如何描述 state,最簡單的方式是:key-value store,一個 key-value 代表一組 shared state
chap1 已有一個 pub-sub weather srever/client,將它改為 key-value pair,client 以 hash table 儲存
update
- a new key-value pair
- a modified value for existing key
- a deleted key
先假設 state 可完全儲存在 memory
clonecli1.py
"""
Clone Client Model One
Author: Min RK <benjaminrk@gmail.com>
"""
import random
import time
import zmq
from kvsimple import KVMsg
def main():
# Prepare our context and publisher socket
ctx = zmq.Context()
updates = ctx.socket(zmq.SUB)
updates.linger = 0
updates.setsockopt(zmq.SUBSCRIBE, b'')
updates.connect("tcp://localhost:5556")
kvmap = {}
sequence = 0
while True:
try:
kvmsg = KVMsg.recv(updates)
except:
break # Interrupted
kvmsg.store(kvmap)
sequence += 1
print( "Interrupted\n%d messages in" % sequence )
if __name__ == '__main__':
main()
clonesrv1.py
"""
Clone server Model One
"""
import random
import time
import zmq
from kvsimple import KVMsg
def main():
# Prepare our context and publisher socket
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5556")
time.sleep(0.2)
sequence = 0
random.seed(time.time())
kvmap = {}
try:
while True:
# Distribute as key-value message
sequence += 1
kvmsg = KVMsg(sequence)
kvmsg.key = b"%d" % random.randint(1,10000)
kvmsg.body = b"%d" % random.randint(1,1000000)
kvmsg.send(publisher)
kvmsg.store(kvmap)
except KeyboardInterrupt:
print( " Interrupted\n%d messages out" % sequence )
if __name__ == '__main__':
main()
kvsimple.py
"""
=====================================================================
kvsimple - simple key-value message class for example applications
"""
import struct # for packing integers
import sys
import zmq
class KVMsg(object):
"""
Message is formatted on wire as 3 frames:
frame 0: key (0MQ string)
frame 1: sequence (8 bytes, network order)
frame 2: body (blob)
"""
key = None # key (string)
sequence = 0 # int
body = None # blob
def __init__(self, sequence, key=None, body=None):
assert isinstance(sequence, int)
self.sequence = sequence
self.key = key
self.body = body
def store(self, dikt):
"""Store me in a dict if I have anything to store"""
# this seems weird to check, but it's what the C example does
if self.key is not None and self.body is not None:
dikt[self.key] = self
def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
key = b'' if self.key is None else self.key
seq_s = struct.pack('!l', self.sequence)
body = b'' if self.body is None else self.body
socket.send_multipart([ key, seq_s, body ])
@classmethod
def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
key, seq_s, body = socket.recv_multipart()
key = key if key else None
seq = struct.unpack('!l',seq_s)[0]
body = body if body else None
return cls(seq, key=key, body=body)
def dump(self):
if self.body is None:
size = 0
data='NULL'
else:
size = len(self.body)
data=repr(self.body)
print("[seq:{seq}][key:{key}][size:{size}] {data}".format(
seq=self.sequence,
key=self.key,
size=size,
data=data,
))
# ---------------------------------------------------------------------
# Runs self test of class
def test_kvmsg (verbose):
print( " * kvmsg: ")
# Prepare our context and sockets
ctx = zmq.Context()
output = ctx.socket(zmq.DEALER)
output.bind("ipc://kvmsg_selftest.ipc")
input = ctx.socket(zmq.DEALER)
input.connect("ipc://kvmsg_selftest.ipc")
kvmap = {}
# Test send and receive of simple message
kvmsg = KVMsg(1)
kvmsg.key = b"key"
kvmsg.body = b"body"
if verbose:
kvmsg.dump()
kvmsg.send(output)
kvmsg.store(kvmap)
kvmsg2 = KVMsg.recv(input)
if verbose:
kvmsg2.dump()
assert kvmsg2.key == b"key"
kvmsg2.store(kvmap)
assert len(kvmap) == 1 # shouldn't be different
print( "OK")
if __name__ == '__main__':
test_kvmsg('-v' in sys.argv)
主要工作都放在 kvmsg class,處理 key-value message objects,這是 multipart ZeroMQ message 有 3 frames: key, sequence number (64 bits, network byte order), binary body(可存任意資料)
server 會產生 random 4-digit key,模擬大量 hash table
還沒實作 delete,只有 insert/update
server 在 bind socket 後有 200 ms pause,這是為了避免遇到 slow joiner syndrome,subscriber 會在連線到 server socket 前,遺失一些訊息
server/client 都有 hash tables,目前的版本只適合在啟動 server 前先啟動所有 clients,且 clients 永遠不會 crash
Getting an Out-of-Band Snapshot
如何處理 slow-joiner clients,或是 client crash and restart
為了讓 late(recovering) client 能夠追上 server state,需要取得 server's state 的 snapshot,接下來會把 "message" 代表 "sequenced key-value pair","state" 代表 "hash table"。為了取得 server state,client 會使用 DEALER 詢問 server
在 client 實作 synchronization
- client 會先註冊 updates,並發送 state request,這會保證 state 會比 oldest update 還要新
- client 會等待 server reply state,同時會 queue all updates,可透過不讀取 ZeroMQ socket 完成,會 queue 在 ZeroMQ
- 當 client 接收了 state update,後會開始讀取 updates,但會丟棄所有比 state update 還舊的資料
- client 會在 state snapshot 中,更新 updates
clonesrv2.py
"""
Clone server Model Two
"""
import random
import threading
import time
import zmq
from kvsimple import KVMsg
from zhelpers import zpipe
def main():
# Prepare our context and publisher socket
ctx = zmq.Context()
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
updates, peer = zpipe(ctx)
manager_thread = threading.Thread(target=state_manager, args=(ctx,peer))
manager_thread.daemon=True
manager_thread.start()
sequence = 0
random.seed(time.time())
try:
while True:
# Distribute as key-value message
sequence += 1
kvmsg = KVMsg(sequence)
kvmsg.key = b"%d" % random.randint(1,10000)
kvmsg.body = b"%d" % random.randint(1,1000000)
kvmsg.send(publisher)
kvmsg.send(updates)
except KeyboardInterrupt:
print( " Interrupted\n%d messages out" % sequence )
# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket
Hash item data is our kvmsg object, ready to send
"""
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)
kvmsg.send(route.socket)
def state_manager(ctx, pipe):
"""This thread maintains the state and handles requests from clients for snapshots.
"""
kvmap = {}
pipe.send(b"READY")
snapshot = ctx.socket(zmq.ROUTER)
snapshot.bind("tcp://*:5556")
poller = zmq.Poller()
poller.register(pipe, zmq.POLLIN)
poller.register(snapshot, zmq.POLLIN)
sequence = 0 # Current snapshot version number
while True:
try:
items = dict(poller.poll())
except (zmq.ZMQError, KeyboardInterrupt):
break # interrupt/context shutdown
# Apply state update from main thread
if pipe in items:
kvmsg = KVMsg.recv(pipe)
sequence = kvmsg.sequence
kvmsg.store(kvmap)
# Execute state snapshot request
if snapshot in items:
msg = snapshot.recv_multipart()
identity = msg[0]
request = msg[1]
if request == b"ICANHAZ?":
pass
else:
print( "E: bad request, aborting\n")
break
# Send state snapshot to client
route = Route(snapshot, identity)
# For each entry in kvmap, send kvmsg to client
for k,v in kvmap.items():
send_single(k,v,route)
# Now send END message with sequence number
print( "Sending state shapshot=%d\n" % sequence)
snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(sequence)
kvmsg.key = b"KTHXBAI"
kvmsg.body = b""
kvmsg.send(snapshot)
if __name__ == '__main__':
main()
clonecli2.py
"""
Clone client Model Two
"""
import time
import zmq
from kvsimple import KVMsg
def main():
# Prepare our context and subscriber
ctx = zmq.Context()
snapshot = ctx.socket(zmq.DEALER)
snapshot.linger = 0
snapshot.connect("tcp://localhost:5556")
subscriber = ctx.socket(zmq.SUB)
subscriber.linger = 0
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
subscriber.connect("tcp://localhost:5557")
kvmap = {}
# Get state snapshot
sequence = 0
snapshot.send(b"ICANHAZ?")
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except:
break; # Interrupted
if kvmsg.key == b"KTHXBAI":
sequence = kvmsg.sequence
print( "Received snapshot=%d" % sequence )
break # Done
kvmsg.store(kvmap)
# Now apply pending updates, discard out-of-sequence messages
while True:
try:
kvmsg = KVMsg.recv(subscriber)
except:
break # Interrupted
if kvmsg.sequence > sequence:
sequence = kvmsg.sequence
kvmsg.store(kvmap)
if __name__ == '__main__':
main()
notes:
- server 使用兩個 sockets,一個 thread random 產生 updates,並發送到 main PUB socket,另一個 thread 以 ROUTER 處理 state requests,這兩個會透過 PAIR socket 互相溝通
- client 很簡單,大部分在 kvmsg class 實作
- 只用簡單的方法 serializing state,hash table 會儲存 kvmsg objects,server 會在 client 要求 state 時,發送 a batch of messages。如果有多個 client 要求 state,每一個會取得不同的 snapshot
- 假設 client 只會跟一個 server 溝通,server 會一直運作,不會 crash
Republishing Updates from Clients
第二個 model 修改了來自 server 的 key-value store,這是 centralizd model,可用在有個設定檔要發布給 clients,每一個 node 都有 local cache 的狀況。
另一個 model 是由 client 取得 updates,不是 server,這會讓 server 變成 stateless broker,優點是
- 不需要擔心 server reliability,crash 後可啟動新的 instance 並提供新的 values
- 可使用 key-value store 在 active peers 之間共享 knowledge
為了從 client 發送 updates,需要用到 PUSH-PULL socket pattern
為什麼不讓 clients 直接互相更新 updates?因為這會降低 latency,並失去consistency。如果讓接收端根據接收順序修改 sate,會造成 state 不同步。
應付 state change 同時發生在多個 nodes 的狀況,使用 centralizing all change 的方法,不管client 產生 update 的時間點,都會發布到 server,強制以server接收 update 的順序作為 update 基準。
為了協調 changes,server 可對 all updates 增加
unique sequence number,client 會偵測 nastier failure,包含 network congestion及 queue overflow。如果 client 發現 message stream 失序,就可以採取對應措施。
讓 client 跟 server 要求遺失的 message,看起來有效,但實際上沒有用。因為失序可能是 network stress 造成,要求重送會讓網路更繁忙。client 可警示 user,發生異常,會 stop 並在人為介入後,進行 manual restart。
clonesrv3.py
"""
Clone server Model Three
"""
import zmq
from kvsimple import KVMsg
# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket"""
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)
kvmsg.send(route.socket)
def main():
# context and sockets
ctx = zmq.Context()
snapshot = ctx.socket(zmq.ROUTER)
snapshot.bind("tcp://*:5556")
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
collector = ctx.socket(zmq.PULL)
collector.bind("tcp://*:5558")
sequence = 0
kvmap = {}
poller = zmq.Poller()
poller.register(collector, zmq.POLLIN)
poller.register(snapshot, zmq.POLLIN)
while True:
try:
items = dict(poller.poll(1000))
except:
break # Interrupted
# Apply state update sent from client
if collector in items:
kvmsg = KVMsg.recv(collector)
sequence += 1
kvmsg.sequence = sequence
kvmsg.send(publisher)
kvmsg.store(kvmap)
print( "I: publishing update %5d" % sequence )
# Execute state snapshot request
if snapshot in items:
msg = snapshot.recv_multipart()
identity = msg[0]
request = msg[1]
if request == b"ICANHAZ?":
pass
else:
print( "E: bad request, aborting\n" )
break
# Send state snapshot to client
route = Route(snapshot, identity)
# For each entry in kvmap, send kvmsg to client
for k,v in kvmap.items():
send_single(k,v,route)
# Now send END message with sequence number
print( "Sending state shapshot=%d\n" % sequence )
snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(sequence)
kvmsg.key = b"KTHXBAI"
kvmsg.body = b""
kvmsg.send(snapshot)
print( " Interrupted\n%d messages handled" % sequence )
if __name__ == '__main__':
main()
clonecli3.py
"""
Clone client Model Three
"""
import random
import time
import zmq
from kvsimple import KVMsg
def main():
# Prepare our context and subscriber
ctx = zmq.Context()
snapshot = ctx.socket(zmq.DEALER)
snapshot.linger = 0
snapshot.connect("tcp://localhost:5556")
subscriber = ctx.socket(zmq.SUB)
subscriber.linger = 0
subscriber.setsockopt(zmq.SUBSCRIBE, b'')
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.linger = 0
publisher.connect("tcp://localhost:5558")
random.seed(time.time())
kvmap = {}
# Get state snapshot
sequence = 0
snapshot.send(b"ICANHAZ?")
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except:
return # Interrupted
if kvmsg.key == b"KTHXBAI":
sequence = kvmsg.sequence
print( "I: Received snapshot=%d" % sequence )
break # Done
kvmsg.store(kvmap)
poller = zmq.Poller()
poller.register(subscriber, zmq.POLLIN)
alarm = time.time()+1.
while True:
tickless = 1000*max(0, alarm - time.time())
try:
items = dict(poller.poll(tickless))
except:
break # Interrupted
if subscriber in items:
kvmsg = KVMsg.recv(subscriber)
# Discard out-of-sequence kvmsgs, incl. heartbeats
if kvmsg.sequence > sequence:
sequence = kvmsg.sequence
kvmsg.store(kvmap)
print( "I: received update=%d" % sequence )
# If we timed-out, generate a random kvmsg
if time.time() >= alarm:
kvmsg = KVMsg(0)
kvmsg.key = b"%d" % random.randint(1,10000)
kvmsg.body = b"%d" % random.randint(1,1000000)
kvmsg.send(publisher)
kvmsg.store(kvmap)
alarm = time.time() + 1.
print( " Interrupted\n%d messages in" % sequence )
if __name__ == '__main__':
main()
- server 封裝為單一 task,以 PULL 管理 incoming updates,ROUTER 管理 state requests,PUB 管理 outgoing updates
- client 會使用 tickless timer 發送 random update 給 server,每秒一次,實際應用上,應該由 application code 驅動 updates。
##### Working with Subtrees
當 client 數量增加,shared store 的 size 也會增加,這時候不適合將所有資料都送給 clients。
當使用 shared store,某些 client 只處理部分 store,稱為 subtree。client 會在 state request 只要求要 subtree。
表示 subtree 的語法有很多種
- path hierarchy: /some/list/of/paths
- topic tree: some.list.of.topics
範例使用 path hierarchy,並擴充 client/server 讓 client 可跟 single subtree 運作。
clonesrv4.py
"""
Clone server Model Four
"""
import zmq
from kvsimple import KVMsg
# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity, subtree):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
self.subtree = subtree # Client subtree specification
def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket"""
# check front of key against subscription subtree:
if kvmsg.key.startswith(route.subtree):
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)
kvmsg.send(route.socket)
def main():
# context and sockets
ctx = zmq.Context()
snapshot = ctx.socket(zmq.ROUTER)
snapshot.bind("tcp://*:5556")
publisher = ctx.socket(zmq.PUB)
publisher.bind("tcp://*:5557")
collector = ctx.socket(zmq.PULL)
collector.bind("tcp://*:5558")
sequence = 0
kvmap = {}
poller = zmq.Poller()
poller.register(collector, zmq.POLLIN)
poller.register(snapshot, zmq.POLLIN)
while True:
try:
items = dict(poller.poll(1000))
except:
break # Interrupted
# Apply state update sent from client
if collector in items:
kvmsg = KVMsg.recv(collector)
sequence += 1
kvmsg.sequence = sequence
kvmsg.send(publisher)
kvmsg.store(kvmap)
print( "I: publishing update %5d" % sequence )
# Execute state snapshot request
if snapshot in items:
msg = snapshot.recv_multipart()
identity, request, subtree = msg
if request == b"ICANHAZ?":
pass
else:
print( "E: bad request, aborting\n" )
break
# Send state snapshot to client
route = Route(snapshot, identity, subtree)
# For each entry in kvmap, send kvmsg to client
for k,v in kvmap.items():
send_single(k,v,route)
# Now send END message with sequence number
print( "Sending state shapshot=%d\n" % sequence )
snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(sequence)
kvmsg.key = b"KTHXBAI"
kvmsg.body = subtree
kvmsg.send(snapshot)
print( " Interrupted\n%d messages handled" % sequence )
if __name__ == '__main__':
main()
clonecli4.py
"""
Clone client Model Four
"""
import random
import time
import zmq
from kvsimple import KVMsg
SUBTREE = b"/client/"
def main():
# Prepare our context and subscriber
ctx = zmq.Context()
snapshot = ctx.socket(zmq.DEALER)
snapshot.linger = 0
snapshot.connect("tcp://localhost:5556")
subscriber = ctx.socket(zmq.SUB)
subscriber.linger = 0
subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE)
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.linger = 0
publisher.connect("tcp://localhost:5558")
random.seed(time.time())
kvmap = {}
# Get state snapshot
sequence = 0
snapshot.send_multipart([b"ICANHAZ?", SUBTREE])
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except:
raise
return # Interrupted
if kvmsg.key == b"KTHXBAI":
sequence = kvmsg.sequence
print( "I: Received snapshot=%d" % sequence )
break # Done
kvmsg.store(kvmap)
poller = zmq.Poller()
poller.register(subscriber, zmq.POLLIN)
alarm = time.time()+1.
while True:
tickless = 1000*max(0, alarm - time.time())
try:
items = dict(poller.poll(tickless))
except:
break # Interrupted
if subscriber in items:
kvmsg = KVMsg.recv(subscriber)
# Discard out-of-sequence kvmsgs, incl. heartbeats
if kvmsg.sequence > sequence:
sequence = kvmsg.sequence
kvmsg.store(kvmap)
print( "I: received update=%d" % sequence )
# If we timed-out, generate a random kvmsg
if time.time() >= alarm:
kvmsg = KVMsg(0)
kvmsg.key = SUBTREE + b"%d" % random.randint(1,10000)
kvmsg.body = b"%d" % random.randint(1,1000000)
kvmsg.send(publisher)
kvmsg.store(kvmap)
alarm = time.time() + 1.
print( " Interrupted\n%d messages in" % sequence )
if __name__ == '__main__':
main()
Ephemeral Values
ephermeral value 是沒有定時 refresh 時會自動 expire 的數值,例如 a node joing network,發布 address,並定時更新,但如果 node crash,該 adress 最終會被移除。
通常會將 ephermeral value 附加於 session,並在該 session 終止時被刪除。在 Clone,session 是由 client 定義,並會在 client 終止時一併停止,一個簡單的做法是,在 ephemeral value 附加 time to live (TTL),server 可用來判斷是否使用了 expired value。
實作 ephemeral value 首先要找到在 key-value message 裡面 encode TTL 的方法。可增加 frame,但問題在於只要想增加新的 property,就要修改 message structure,這會破壞相容性。因此改用 properties frame 實作。
另外,需要一個 "delete this value" 的方法,目前 server 及 client 都只有 insert/update 的方式,可增加 value 為 empty 時,就是 "delete"
以下是實作了 propeties frame (增加了 UUID frame),並能處理 delete 的 kvmsg.py
"""
=====================================================================
kvmsg - key-value message class for example applications
"""
import struct # for packing integers
import sys
from uuid import uuid4
import zmq
# zmq.jsonapi ensures bytes, instead of unicode:
def encode_properties(properties_dict):
prop_s = b""
for key, value in properties_dict.items():
prop_s += b"%s=%s\n" % (key, value)
return prop_s
def decode_properties(prop_s):
prop = {}
line_array = prop_s.split(b"\n")
for line in line_array:
try:
key, value = line.split(b"=")
prop[key] = value
except ValueError as e:
#Catch empty line
pass
return prop
class KVMsg(object):
"""
Message is formatted on wire as 5 frames:
frame 0: key (0MQ string)
frame 1: sequence (8 bytes, network order)
frame 2: uuid (blob, 16 bytes)
frame 3: properties (0MQ string)
frame 4: body (blob)
"""
key = None
sequence = 0
uuid=None
properties = None
body = None
def __init__(self, sequence, uuid=None, key=None, properties=None, body=None):
assert isinstance(sequence, int)
self.sequence = sequence
if uuid is None:
uuid = uuid4().bytes
self.uuid = uuid
self.key = key
self.properties = {} if properties is None else properties
self.body = body
# dictionary access maps to properties:
def __getitem__(self, k):
return self.properties[k]
def __setitem__(self, k, v):
self.properties[k] = v
def get(self, k, default=None):
return self.properties.get(k, default)
def store(self, dikt):
"""Store me in a dict if I have anything to store
else delete me from the dict."""
if self.key is not None and self.body is not None:
dikt[self.key] = self
elif self.key in dikt:
del dikt[self.key]
def send(self, socket):
"""Send key-value message to socket; any empty frames are sent as such."""
key = b'' if self.key is None else self.key
seq_s = struct.pack('!q', self.sequence)
body = b'' if self.body is None else self.body
prop_s = encode_properties(self.properties)
socket.send_multipart([ key, seq_s, self.uuid, prop_s, body ])
@classmethod
def recv(cls, socket):
"""Reads key-value message from socket, returns new kvmsg instance."""
return cls.from_msg(socket.recv_multipart())
@classmethod
def from_msg(cls, msg):
"""Construct key-value message from a multipart message"""
key, seq_s, uuid, prop_s, body = msg
key = key if key else None
seq = struct.unpack('!q',seq_s)[0]
body = body if body else None
prop = decode_properties(prop_s)
return cls(seq, uuid=uuid, key=key, properties=prop, body=body)
def __repr__(self):
if self.body is None:
size = 0
data=b'NULL'
else:
size = len(self.body)
data = repr(self.body)
mstr = "[seq:{seq}][key:{key}][size:{size}][props:{props}][data:{data}]".format(
seq=self.sequence,
# uuid=hexlify(self.uuid),
key=self.key,
size=size,
props=encode_properties(self.properties),
data=data,
)
return mstr
def dump(self):
print("<<", str(self), ">>", file=sys.stderr)
# ---------------------------------------------------------------------
# Runs self test of class
def test_kvmsg (verbose):
print(" * kvmsg: ", end='')
# Prepare our context and sockets
ctx = zmq.Context()
output = ctx.socket(zmq.DEALER)
output.bind("ipc://kvmsg_selftest.ipc")
input = ctx.socket(zmq.DEALER)
input.connect("ipc://kvmsg_selftest.ipc")
kvmap = {}
# Test send and receive of simple message
kvmsg = KVMsg(1)
kvmsg.key = b"key"
kvmsg.body = b"body"
if verbose:
kvmsg.dump()
kvmsg.send(output)
kvmsg.store(kvmap)
kvmsg2 = KVMsg.recv(input)
if verbose:
kvmsg2.dump()
assert kvmsg2.key == b"key"
kvmsg2.store(kvmap)
assert len(kvmap) == 1 # shouldn't be different
# test send/recv with properties:
kvmsg = KVMsg(2, key=b"key", body=b"body")
kvmsg[b"prop1"] = b"value1"
kvmsg[b"prop2"] = b"value2"
kvmsg[b"prop3"] = b"value3"
assert kvmsg[b"prop1"] == b"value1"
if verbose:
kvmsg.dump()
kvmsg.send(output)
kvmsg2 = KVMsg.recv(input)
if verbose:
kvmsg2.dump()
# ensure properties were preserved
assert kvmsg2.key == kvmsg.key
assert kvmsg2.body == kvmsg.body
assert kvmsg2.properties == kvmsg.properties
assert kvmsg2[b"prop2"] == kvmsg[b"prop2"]
print("OK")
if __name__ == '__main__':
test_kvmsg('-v' in sys.argv)
$ python kvmsg.py -v
<< [seq:1][key:b'key'][size:4][props:b''][data:b'body'] >>
<< [seq:1][key:b'key'][size:4][props:b''][data:b'body'] >>
<< [seq:2][key:b'key'][size:4][props:b'prop1=value1\nprop2=value2\nprop3=value3\n'][data:b'body'] >>
<< [seq:2][key:b'key'][size:4][props:b'prop1=value1\nprop2=value2\nprop3=value3\n'][data:b'body'] >>
* kvmsg: OK
使用 Reactor
目前都是在 server 使用 pool loop,接下來改用 reactor,C 是用 CZMQ 的 zloop,使用 reactor 會讓程式更容易了解。
用單一 thread,並傳送 server object 給 reactor handlers,可讓 server 使用 multithreads,每一個處理一個 socket or timer,但最好是不在 thread 之間share data,目前是以 server 的 hashmap 為核心,因此單一 thread 會比較簡單。
有三個 reactor handler:
- 一個處理來自 ROUTER socket 的 snapshot request
- 一個處理來自 PULL socket 的 incoming updates from clients
- 一個處理 expred ephemeral values (TTL 逾時)
clonesrv5.py
"""
Clone server Model Five
"""
import logging
import time
import zmq
from zmq.eventloop.ioloop import IOLoop, PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream
from kvmsg import KVMsg
from zhelpers import dump
# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity, subtree):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
self.subtree = subtree # Client subtree specification
def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket"""
# check front of key against subscription subtree:
if kvmsg.key.startswith(route.subtree):
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)
kvmsg.send(route.socket)
class CloneServer(object):
# Our server is defined by these properties
ctx = None # Context wrapper
kvmap = None # Key-value store
loop = None # IOLoop reactor
port = None # Main port we're working on
sequence = 0 # How many updates we're at
snapshot = None # Handle snapshot requests
publisher = None # Publish updates to clients
collector = None # Collect updates from clients
def __init__(self, port=5556):
self.port = port
self.ctx = zmq.Context()
self.kvmap = {}
self.loop = IOLoop.instance()
# Set up our clone server sockets
self.snapshot = self.ctx.socket(zmq.ROUTER)
self.publisher = self.ctx.socket(zmq.PUB)
self.collector = self.ctx.socket(zmq.PULL)
self.snapshot.bind("tcp://*:%d" % self.port)
self.publisher.bind("tcp://*:%d" % (self.port + 1))
self.collector.bind("tcp://*:%d" % (self.port + 2))
# Wrap sockets in ZMQStreams for IOLoop handlers
self.snapshot = ZMQStream(self.snapshot)
self.publisher = ZMQStream(self.publisher)
self.collector = ZMQStream(self.collector)
# Register our handlers with reactor
self.snapshot.on_recv(self.handle_snapshot)
self.collector.on_recv(self.handle_collect)
self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)
# basic log formatting:
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
def start(self):
# Run reactor until process interrupted
self.flush_callback.start()
try:
self.loop.start()
except KeyboardInterrupt:
pass
def handle_snapshot(self, msg):
"""snapshot requests"""
if len(msg) != 3 or msg[1] != b"ICANHAZ?":
print("E: bad request, aborting")
dump(msg)
self.loop.stop()
return
identity, request, subtree = msg
if subtree:
# Send state snapshot to client
route = Route(self.snapshot, identity, subtree)
# For each entry in kvmap, send kvmsg to client
for k,v in self.kvmap.items():
send_single(k,v,route)
# Now send END message with sequence number
logging.info("I: Sending state shapshot=%d" % self.sequence)
self.snapshot.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(self.sequence)
kvmsg.key = b"KTHXBAI"
kvmsg.body = subtree
kvmsg.send(self.snapshot)
def handle_collect(self, msg):
"""Collect updates from clients"""
kvmsg = KVMsg.from_msg(msg)
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.send(self.publisher)
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
kvmsg.store(self.kvmap)
logging.info("I: publishing update=%d", self.sequence)
def flush_ttl(self):
"""Purge ephemeral values that have expired"""
for key,kvmsg in list(self.kvmap.items()):
# used list() to exhaust the iterator before deleting from the dict
self.flush_single(kvmsg)
def flush_single(self, kvmsg):
"""If key-value pair has expired, delete it and publish the fact
to listening clients."""
ttl = float(kvmsg.get(b'ttl', 0))
if ttl and ttl <= time.time():
kvmsg.body = b""
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.send(self.publisher)
del self.kvmap[kvmsg.key]
logging.info("I: publishing delete=%d", self.sequence)
def main():
clone = CloneServer()
clone.start()
if __name__ == '__main__':
main()
clonecli5.py
"""
Clone client Model Five
"""
import random
import time
import zmq
from kvmsg import KVMsg
SUBTREE = "/client/"
def main():
# Prepare our context and subscriber
ctx = zmq.Context()
snapshot = ctx.socket(zmq.DEALER)
snapshot.linger = 0
snapshot.connect("tcp://localhost:5556")
subscriber = ctx.socket(zmq.SUB)
subscriber.linger = 0
subscriber.setsockopt(zmq.SUBSCRIBE, SUBTREE.encode())
subscriber.connect("tcp://localhost:5557")
publisher = ctx.socket(zmq.PUSH)
publisher.linger = 0
publisher.connect("tcp://localhost:5558")
random.seed(time.time())
kvmap = {}
# Get state snapshot
sequence = 0
snapshot.send_multipart([b"ICANHAZ?", SUBTREE.encode()])
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except:
raise
return # Interrupted
if kvmsg.key == b"KTHXBAI":
sequence = kvmsg.sequence
print("I: Received snapshot=%d" % sequence)
break # Done
kvmsg.store(kvmap)
poller = zmq.Poller()
poller.register(subscriber, zmq.POLLIN)
alarm = time.time()+1.
while True:
tickless = 1000*max(0, alarm - time.time())
try:
items = dict(poller.poll(tickless))
except:
break # Interrupted
if subscriber in items:
kvmsg = KVMsg.recv(subscriber)
# Discard out-of-sequence kvmsgs, incl. heartbeats
if kvmsg.sequence > sequence:
sequence = kvmsg.sequence
kvmsg.store(kvmap)
action = "update" if kvmsg.body else "delete"
print("I: received %s=%d" % (action, sequence))
# If we timed-out, generate a random kvmsg
if time.time() >= alarm:
kvmsg = KVMsg(0)
kvmsg.key = SUBTREE.encode() + b"%d" % random.randint(1,10000)
kvmsg.body = b"%d" % random.randint(1,1000000)
kvmsg[b'ttl'] = b"%d" % random.randint(0,30)
kvmsg.send(publisher)
kvmsg.store(kvmap)
alarm = time.time() + 1.
print(" Interrupted\n%d messages in" % sequence)
if __name__ == '__main__':
main()
Adding Binary Star Pattern for Reliability
reliability 的定義:
- server process crashes 會自動或手動 restart,process 會遺失原本的 state
- server machine dies,且已經離線一段時間,client 必須要能切換到備援 server
- server process/machine 自網路斷線,例如 switch 掛掉,可能會在任意時間點恢復網路,但 clients 在未復原期間,需要連接到備援 server
第一步是要增加 second server,可使用 chap4 Binary Star Pattern。
要先確保 updates 在 primary server crash 時不會遺失,最簡單的做法,是發送到兩個 servers,backup server 會當作 client,持續取得所有 clients 的 updates,也會取得來自 clients 的 updates,但還不要放到 hash table,先暫存起來。
- 使用 pub-sub flow 取代 client updates 的 push-pull flow to servers。這邊會持續 fan out updates 到兩台 servers。
- 增加 server updates 到 clients 的 heartbeats,client 可藉此偵測 primary server 是否 crash,並切換到 backup server
- 利用 bstar reactor class 連接兩個 servers,Binary Star 藉由 clients 實際呼叫他們認定的 active server,進行 vote 決定 primary server,利用 snapshot requests 作為 voting mechanism
- 將 update message 都增加 unique UUID 欄位,由 client 產生
- passive server 會儲存接收自 clients 的 "pending list" of updates,或是來自 active server 的 updates。list 會依照時間順序排列。
以 FSM 設計 client logic
- client open, connect sockets,對第一個 server 發送 snapshot request,為避免 request storms,每個 server 只會詢問 2 次。
- client 會從 current server 等待 snapshot data reply,取得後,就會存起來,但如果 timeout 後沒收到 reply,就會 failover 到下一個 server
- 如果 client 取得 snapshot,等待 updates,但如果 timeout 後沒收到 update,就會 failover 到下一個 server
client 會 loop forever,但可能發生 some clients 會跟 primary server 溝通,但其他的跟 backup server 溝通,Binary Star state machine 會處理這個問題。
failover 發生程序為
- client 偵測到 primary server 沒有發送 heartbeats,匯改連接到 backup server 並要求 new state snapshot
- backup server 開始接收來自 clients 的 snapshot requests,偵測到 primary server dies,接手變成 primary server
- backup server 將 pending list 寫入 hash table,開始處理 state snapshot requests
當 primary server 恢復 online 時
- 啟動成為 passive server,連接到 backup server 作為 Clone client
- 開始透過 SUB socket 接收來自 client 的 updates
有兩點假設
- 至少有一個 server 在運作,如果兩個 server 都 crash,就無法復原
- 多個 clients 不能同時更新相同的 hash keys,client updates 會以不同的順序到達兩個 servers,因此 backup server 會以 pending list 中跟 primary server 不同的順序套用 updates。但對於某個 client 來說,update 都是以相同順序到達 servers,所以這是安全的機制
clonesrv6.py
"""
Clone server Model Six
"""
import logging
import time
import zmq
from zmq.eventloop.ioloop import PeriodicCallback
from zmq.eventloop.zmqstream import ZMQStream
from bstar import BinaryStar
from kvmsg import KVMsg
from zhelpers import dump
# simple struct for routing information for a key-value snapshot
class Route:
def __init__(self, socket, identity, subtree):
self.socket = socket # ROUTER socket to send to
self.identity = identity # Identity of peer who requested state
self.subtree = subtree # Client subtree specification
def send_single(key, kvmsg, route):
"""Send one state snapshot key-value pair to a socket"""
# check front of key against subscription subtree:
if kvmsg.key.startswith(route.subtree):
# Send identity of recipient first
route.socket.send(route.identity, zmq.SNDMORE)
kvmsg.send(route.socket)
class CloneServer(object):
# Our server is defined by these properties
ctx = None # Context wrapper
kvmap = None # Key-value store
bstar = None # Binary Star
sequence = 0 # How many updates so far
port = None # Main port we're working on
peer = None # Main port of our peer
publisher = None # Publish updates and hugz
collector = None # Collect updates from clients
subscriber = None # Get updates from peer
pending = None # Pending updates from client
primary = False # True if we're primary
master = False # True if we're master
slave = False # True if we're slave
def __init__(self, primary=True, ports=(5556,5566)):
self.primary = primary
if primary:
self.port, self.peer = ports
frontend = "tcp://*:5003"
backend = "tcp://localhost:5004"
self.kvmap = {}
else:
self.peer, self.port = ports
frontend = "tcp://*:5004"
backend = "tcp://localhost:5003"
self.ctx = zmq.Context.instance()
self.pending = []
self.bstar = BinaryStar(primary, frontend, backend)
self.bstar.register_voter("tcp://*:%i" % self.port, zmq.ROUTER, self.handle_snapshot)
# Set up our clone server sockets
self.publisher = self.ctx.socket(zmq.PUB)
self.collector = self.ctx.socket(zmq.SUB)
self.collector.setsockopt(zmq.SUBSCRIBE, b'')
self.publisher.bind("tcp://*:%d" % (self.port + 1))
self.collector.bind("tcp://*:%d" % (self.port + 2))
# Set up our own clone client interface to peer
self.subscriber = self.ctx.socket(zmq.SUB)
self.subscriber.setsockopt(zmq.SUBSCRIBE, b'')
self.subscriber.connect("tcp://localhost:%d" % (self.peer + 1))
# Register state change handlers
self.bstar.master_callback = self.become_master
self.bstar.slave_callback = self.become_slave
# Wrap sockets in ZMQStreams for IOLoop handlers
self.publisher = ZMQStream(self.publisher)
self.subscriber = ZMQStream(self.subscriber)
self.collector = ZMQStream(self.collector)
# Register our handlers with reactor
self.collector.on_recv(self.handle_collect)
self.flush_callback = PeriodicCallback(self.flush_ttl, 1000)
self.hugz_callback = PeriodicCallback(self.send_hugz, 1000)
# basic log formatting:
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
def start(self):
# start periodic callbacks
self.flush_callback.start()
self.hugz_callback.start()
# Run bstar reactor until process interrupted
try:
self.bstar.start()
except KeyboardInterrupt:
pass
def handle_snapshot(self, socket, msg):
"""snapshot requests"""
if msg[1] != b"ICANHAZ?" or len(msg) != 3:
logging.error("E: bad request, aborting")
dump(msg)
self.bstar.loop.stop()
return
identity, request = msg[:2]
if len(msg) >= 3:
subtree = msg[2]
# Send state snapshot to client
route = Route(socket, identity, subtree)
# For each entry in kvmap, send kvmsg to client
for k,v in self.kvmap.items():
send_single(k,v,route)
# Now send END message with sequence number
logging.info("I: Sending state shapshot=%d" % self.sequence)
socket.send(identity, zmq.SNDMORE)
kvmsg = KVMsg(self.sequence)
kvmsg.key = b"KTHXBAI"
kvmsg.body = subtree
kvmsg.send(socket)
def handle_collect(self, msg):
"""Collect updates from clients
If we're master, we apply these to the kvmap
If we're slave, or unsure, we queue them on our pending list
"""
kvmsg = KVMsg.from_msg(msg)
if self.master:
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.send(self.publisher)
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
kvmsg.store(self.kvmap)
logging.info("I: publishing update=%d", self.sequence)
else:
# If we already got message from master, drop it, else
# hold on pending list
if not self.was_pending(kvmsg):
self.pending.append(kvmsg)
def was_pending(self, kvmsg):
"""If message was already on pending list, remove and return True.
Else return False.
"""
found = False
for idx, held in enumerate(self.pending):
if held.uuid == kvmsg.uuid:
found = True
break
if found:
self.pending.pop(idx)
return found
def flush_ttl(self):
"""Purge ephemeral values that have expired"""
if self.kvmap:
for key,kvmsg in list(self.kvmap.items()):
self.flush_single(kvmsg)
def flush_single(self, kvmsg):
"""If key-value pair has expired, delete it and publish the fact
to listening clients."""
ttl = float(kvmsg.get(b'ttl', 0))
if ttl and ttl <= time.time():
kvmsg.body = b""
self.sequence += 1
kvmsg.sequence = self.sequence
logging.info("I: preparing to publish delete=%s", kvmsg.properties)
kvmsg.send(self.publisher)
del self.kvmap[kvmsg.key]
logging.info("I: publishing delete=%d", self.sequence)
def send_hugz(self):
"""Send hugz to anyone listening on the publisher socket"""
kvmsg = KVMsg(self.sequence)
kvmsg.key = b"HUGZ"
kvmsg.body = b""
kvmsg.send(self.publisher)
# ---------------------------------------------------------------------
# State change handlers
def become_master(self):
"""We're becoming master
The backup server applies its pending list to its own hash table,
and then starts to process state snapshot requests.
"""
self.master = True
self.slave = False
# stop receiving subscriber updates while we are master
self.subscriber.stop_on_recv()
# Apply pending list to own kvmap
while self.pending:
kvmsg = self.pending.pop(0)
self.sequence += 1
kvmsg.sequence = self.sequence
kvmsg.store(self.kvmap)
logging.info ("I: publishing pending=%d", self.sequence)
def become_slave(self):
"""We're becoming slave"""
# clear kvmap
self.kvmap = None
self.master = False
self.slave = True
self.subscriber.on_recv(self.handle_subscriber)
def handle_subscriber(self, msg):
"""Collect updates from peer (master)
We're always slave when we get these updates
"""
if self.master:
logging.warn("received subscriber message, but we are master %s", msg)
return
# Get state snapshot if necessary
if self.kvmap is None:
self.kvmap = {}
snapshot = self.ctx.socket(zmq.DEALER)
snapshot.linger = 0
snapshot.connect("tcp://localhost:%i" % self.peer)
logging.info ("I: asking for snapshot from: tcp://localhost:%d",
self.peer)
snapshot.send_multipart([b"ICANHAZ?", b''])
while True:
try:
kvmsg = KVMsg.recv(snapshot)
except KeyboardInterrupt:
# Interrupted
self.bstar.loop.stop()
return
if kvmsg.key == b"KTHXBAI":
self.sequence = kvmsg.sequence
break # Done
kvmsg.store(self.kvmap)
logging.info ("I: received snapshot=%d", self.sequence)
# Find and remove update off pending list
kvmsg = KVMsg.from_msg(msg)
# update float ttl -> timestamp
ttl = float(kvmsg.get(b'ttl', 0))
if ttl:
kvmsg[b'ttl'] = b'%f' % (time.time() + ttl)
if kvmsg.key != b"HUGZ":
if not self.was_pending(kvmsg):
# If master update came before client update, flip it
# around, store master update (with sequence) on pending
# list and use to clear client update when it comes later
self.pending.append(kvmsg)
# If update is more recent than our kvmap, apply it
if (kvmsg.sequence > self.sequence):
self.sequence = kvmsg.sequence
kvmsg.store(self.kvmap)
logging.info ("I: received update=%d", self.sequence)
def main():
import sys
if '-p' in sys.argv:
primary = True
elif '-b' in sys.argv:
primary = False
else:
print("Usage: clonesrv6.py { -p | -b }")
sys.exit(1)
clone = CloneServer(primary)
clone.start()
if __name__ == '__main__':
main()
clonesrv6 跟 bstarsrv2 一樣,啟動時發生 tornado error,可能要把 tornado 由 5.1 降至 4.5 才行,沒有測試不確定。
有 failover, ephemeral values, subtrees 的功能
reactor-based design 去掉很多多餘的 code。整個 server 以單一 thread 運作,就不需要 inter-thread 的處理,只需要將 structure pointer (self) 傳到所有 handlers。
The Clustered Hashmap Protocol
server 已經整合了 Binary Star Pattern,client 有點複雜,實作前先了解 Clustered Hashmap Protocol
有兩種設計 protocol 方法
- 將每個 flow socket flow 分開,這也是先前使用的方法。優點是每個 flow simple and clean,缺點是要管理很多socket flow。使用 reactor 會比較簡單一點,但還是要是當地將程式分開。
- 設計一個 protocol 使用單一 socket pair for everything,目前是使用 ROUTER for server,DEALER for clients。這會讓 protocol 比較複雜。 chap7 有另一個 sample 使用 ROUTER-DEALER
現在先了解 CHP protocol spec,要注意 "SHOULD" "MUST" "MAY" 這些關鍵字
讓一組 clients 透過 ZeroMQ network 提供 reliable pub-sub。"hashmap" 是一組 key-value pairs,任一個 client 都可隨時修改任一個 key-value pair,變更後,會傳遞給所有 clients。client 可在任意時間點加入網路。
CHP 連接一組 client applications 及一組 servers,client 連到 server,client 無法互相連線,可以任意加入或離開網路。
client MUST 在啟動 snapshot connection 時,發送 ICANHAZ command,兩個 frames 都是 ZeroMQ strings,subtree spec MAY be empty,一般是以 / 開始,有多個 path segments,以 / 結束
ICANHAZ command
-----------------------------------
Frame 0: "ICANHAZ?"
Frame 1: subtree specification
server MUST 以 0~多個 KVSYNC command 回應 ICANHAZ command,後面接著 KTHXBAI command。server MUST 以 client 的 identity (由 ICANHAZ 提供) 放在command 的 prefix。sequence number 沒有規定,可以為 0
KVSYNC command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: value, as blob
KTHXBAI 的 sequence number MUST 為前面的 KVSYNC 的 highest sequence number
KTHXBAI command
-----------------------------------
Frame 0: "KTHXBAI"
Frame 1: sequence number, 8 bytes in network order
Frame 2: <empty>
Frame 3: <empty>
Frame 4: subtree specification
當 client 收到 KTHXBAI,SHOULD 開始接收 messages from its subscriber connection and apply them
當 server 要為 hashmap 提供 updates,MUST 在 publisher sockets broadcast KVPUB command
KVPUB command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob
sequence number MUST 絕對遞增。client MUST 丟棄所有sequence number 沒有大於上一個 KTHXBAI/KVPUB command 的 sequnece number 的 command。
UUID 為 optional,可以為 0 (size 0)。當 properties 欄位為 0,或有多個 "name=value",要以 newline char 分開。如果 key-value pair 沒有任何 properties,properties 可以為 empty。
如果 value 為 empty,client SHOULD delete key-value entry
在遺失 updates 時,server SHOULD 定時(每秒) 發送一個 HUGZ。
HUGZ command
-----------------------------------
Frame 0: "HUGZ"
Frame 1: 00000000
Frame 2: <empty>
Frame 3: <empty>
Frame 4: <empty>
client MAY 將遺失 HUGZ 是為 server crash 的信號
client 如果有 hashmap 的 updates,MAY 透過 publisher 發送 KVSET 給 server
KVSET command
-----------------------------------
Frame 0: key, as ZeroMQ string
Frame 1: sequence number, 8 bytes in network order
Frame 2: UUID, 16 bytes
Frame 3: properties, as ZeroMQ string
Frame 4: value, as blob
sequence number 可以為 0
UUID SHOULD 為 universally unique identifier
如果 value 為 empty,server MUST delete key-value entry
server SHOULD 接受這個 properties
ttl: 指定 time-to-live in seconds。如果 KVSET 有 ttl property,server 應該在 TTL expired 時,刪除該 key-value pair,並廣播 KVPUB (empty value) 給所有 clients
Reliability
CHP 可使用 dual-server 架構,backup server 在 primary server fails 時會 take over。CHP 並沒有指定 failover 方法,可使用 Binary Star
為達到 server reliability,client MAY
- 在每個 KVSET 指令設定 UUID
- 定時偵測有無收到 HUGZ,將其視為 server failed 的信號
- 連接到 backup server,並 re-request state synchronization
Scalability and Performance
CHP 可用在上千個 clients,受限於 broker 的系統資源,因為所有 updates 都會經過 single server,最高可每秒處理 millions of updates
Security
CHP 沒有實作 authentication, access control, encryption 機制
Building a Multithread Stack and API
client 要修改成可運作在 background thread,類似 chap 4 Freelance Pattern 最後的 mutithread API 做法。
包含了 frontend object 及 backend agent,以 PAIR socket 連接,可以跟 create thread 整合在一起,可透過 PAIR 發送訊息給該 thread
multithread API
- constructor 產生 context,並啟動 background thread,以 pipe 連接
- background 啟動 agent,也就是利用
zmq_poll
loop 自 pipe socket 以及 DEALER, SUB 讀取訊息。
- main application thread 透過 ZeroMQ message 溝通,通常是這個格式
void
clone_connect (clone_t *self, char *address, char *service)
{
assert (self);
zmsg_t *msg = zmsg_new ();
zmsg_addstr (msg, "CONNECT");
zmsg_addstr (msg, address);
zmsg_addstr (msg, service);
zmsg_send (&msg, self->pipe);
}
clonecli6.py
"""
Clone server Model Six
"""
import random
import time
import zmq
from clone import Clone
SUBTREE = "/client/"
def main():
# Create and connect clone
clone = Clone()
clone.subtree = SUBTREE.encode()
clone.connect("tcp://localhost", 5556)
clone.connect("tcp://localhost", 5566)
try:
while True:
# Distribute as key-value message
key = b"%d" % random.randint(1,10000)
value = b"%d" % random.randint(1,1000000)
clone.set(key, value, random.randint(0,30))
time.sleep(1)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()
clone.py
"""
clone - client-side Clone Pattern class
"""
import logging
import threading
import time
import zmq
from zhelpers import zpipe
from kvmsg import KVMsg
# If no server replies within this time, abandon request
GLOBAL_TIMEOUT = 4000 # msecs
# Server considered dead if silent for this long
SERVER_TTL = 5.0 # secs
# Number of servers we will talk to
SERVER_MAX = 2
# basic log formatting:
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO)
# =====================================================================
# Synchronous part, works in our application thread
class Clone(object):
ctx = None # Our Context
pipe = None # Pipe through to clone agent
agent = None # agent in a thread
_subtree = None # cache of our subtree value
def __init__(self):
self.ctx = zmq.Context()
self.pipe, peer = zpipe(self.ctx)
self.agent = threading.Thread(target=clone_agent, args=(self.ctx,peer))
self.agent.daemon = True
self.agent.start()
# ---------------------------------------------------------------------
# Clone.subtree is a property, which sets the subtree for snapshot
# and updates
@property
def subtree(self):
return self._subtree
@subtree.setter
def subtree(self, subtree):
"""Sends [SUBTREE][subtree] to the agent"""
self._subtree = subtree
self.pipe.send_multipart([b"SUBTREE", subtree])
def connect(self, address, port):
"""Connect to new server endpoint
Sends [CONNECT][address][port] to the agent
"""
self.pipe.send_multipart([b"CONNECT", (address.encode() if isinstance(address, str) else address), b'%d' % port])
def set(self, key, value, ttl=0):
"""Set new value in distributed hash table
Sends [SET][key][value][ttl] to the agent
"""
self.pipe.send_multipart([b"SET", key, value, b'%i' % ttl])
def get(self, key):
"""Lookup value in distributed hash table
Sends [GET][key] to the agent and waits for a value response
If there is no clone available, will eventually return None.
"""
self.pipe.send_multipart([b"GET", key])
try:
reply = self.pipe.recv_multipart()
except KeyboardInterrupt:
return
else:
return reply[0]
# =====================================================================
# Asynchronous part, works in the background
# ---------------------------------------------------------------------
# Simple class for one server we talk to
class CloneServer(object):
address = None # Server address
port = None # Server port
snapshot = None # Snapshot socket
subscriber = None # Incoming updates
expiry = 0 # Expires at this time
requests = 0 # How many snapshot requests made?
def __init__(self, ctx, address, port, subtree):
self.address = address
self.port = port
self.snapshot = ctx.socket(zmq.DEALER)
self.snapshot.linger = 0
self.snapshot.connect("%s:%i" % (address.decode(),port))
self.subscriber = ctx.socket(zmq.SUB)
self.subscriber.setsockopt(zmq.SUBSCRIBE, subtree)
self.subscriber.setsockopt(zmq.SUBSCRIBE, b'HUGZ')
self.subscriber.connect("%s:%i" % (address.decode(),port+1))
self.subscriber.linger = 0
# ---------------------------------------------------------------------
# Simple class for one background agent
# States we can be in
STATE_INITIAL = 0 # Before asking server for state
STATE_SYNCING = 1 # Getting state from server
STATE_ACTIVE = 2 # Getting new updates from server
class CloneAgent(object):
ctx = None # Own context
pipe = None # Socket to talk back to application
kvmap = None # Actual key/value dict
subtree = '' # Subtree specification, if any
servers = None # list of connected Servers
state = 0 # Current state
cur_server = 0 # If active, index of server in list
sequence = 0 # last kvmsg procesed
publisher = None # Outgoing updates
def __init__(self, ctx, pipe):
self.ctx = ctx
self.pipe = pipe
self.kvmap = {}
self.subtree = ''
self.state = STATE_INITIAL
self.publisher = ctx.socket(zmq.PUB)
self.router = ctx.socket(zmq.ROUTER)
self.servers = []
def control_message (self):
msg = self.pipe.recv_multipart()
command = msg.pop(0)
if command == b"CONNECT":
address = msg.pop(0)
port = int(msg.pop(0))
if len(self.servers) < SERVER_MAX:
self.servers.append(CloneServer(self.ctx, address, port, self.subtree))
self.publisher.connect("%s:%i" % (address.decode(),port+2))
else:
logging.error("E: too many servers (max. %i)", SERVER_MAX)
elif command == b"SET":
key,value,sttl = msg
ttl = int(sttl)
# Send key-value pair on to server
kvmsg = KVMsg(0, key=key, body=value)
kvmsg.store(self.kvmap)
if ttl:
kvmsg[b"ttl"] = sttl
kvmsg.send(self.publisher)
elif command == b"GET":
key = msg[0]
value = self.kvmap.get(key)
self.pipe.send(value.body if value else '')
elif command == b"SUBTREE":
self.subtree = msg[0]
# ---------------------------------------------------------------------
# Asynchronous agent manages server pool and handles request/reply
# dialog when the application asks for it.
def clone_agent(ctx, pipe):
agent = CloneAgent(ctx, pipe)
server = None
while True:
poller = zmq.Poller()
poller.register(agent.pipe, zmq.POLLIN)
poll_timer = None
server_socket = None
if agent.state == STATE_INITIAL:
# In this state we ask the server for a snapshot,
# if we have a server to talk to...
if agent.servers:
server = agent.servers[agent.cur_server]
logging.info ("I: waiting for server at %s:%d...",
server.address, server.port)
if (server.requests < 2):
server.snapshot.send_multipart([b"ICANHAZ?", agent.subtree])
server.requests += 1
server.expiry = time.time() + SERVER_TTL
agent.state = STATE_SYNCING
server_socket = server.snapshot
elif agent.state == STATE_SYNCING:
# In this state we read from snapshot and we expect
# the server to respond, else we fail over.
server_socket = server.snapshot
elif agent.state == STATE_ACTIVE:
# In this state we read from subscriber and we expect
# the server to give hugz, else we fail over.
server_socket = server.subscriber
if server_socket:
# we have a second socket to poll:
poller.register(server_socket, zmq.POLLIN)
if server is not None:
poll_timer = 1e3 * max(0,server.expiry - time.time())
# ------------------------------------------------------------
# Poll loop
try:
items = dict(poller.poll(poll_timer))
except:
raise # DEBUG
break # Context has been shut down
if agent.pipe in items:
agent.control_message()
elif server_socket in items:
kvmsg = KVMsg.recv(server_socket)
# Anything from server resets its expiry time
server.expiry = time.time() + SERVER_TTL
if (agent.state == STATE_SYNCING):
# Store in snapshot until we're finished
server.requests = 0
if kvmsg.key == b"KTHXBAI":
agent.sequence = kvmsg.sequence
agent.state = STATE_ACTIVE
logging.info ("I: received from %s:%d snapshot=%d",
server.address, server.port, agent.sequence)
else:
kvmsg.store(agent.kvmap)
elif (agent.state == STATE_ACTIVE):
# Discard out-of-sequence updates, incl. hugz
if (kvmsg.sequence > agent.sequence):
agent.sequence = kvmsg.sequence
kvmsg.store(agent.kvmap)
action = "update" if kvmsg.body else "delete"
logging.info ("I: received from %s:%d %s=%d",
server.address, server.port, action, agent.sequence)
else:
# Server has died, failover to next
logging.info ("I: server at %s:%d didn't give hugz",
server.address, server.port)
agent.cur_server = (agent.cur_server + 1) % len(agent.servers)
agent.state = STATE_INITIAL
connect method 有指定 endpoint,實際上是 3 ports
- server state router (ROUTER): port P
- server updates publisher (PUB): port P+1
- server updates subscriber (SUB): port P+2
可將 3 connections 整合到一個 operation
clone stack 可分為 frontend object class 及 backend agent 兩部份,frontend 發送 ("SUBTREE", "CONNECT", "SET", "GET") 給 agent,跟 server 溝通。
- startup 並由第一個 server 取得 snapshot
- 取得 snapshot 後,切換到 subscriber socket 讀取訊息
- 如果沒收到 snapshot,就 failover 到 2nd server
- 自 pipe 及 subscriber socket 進行 poll
- 如收到 pipe 的訊息,就是來自 frontend
- 收到 subscriber 訊息,就是 store/apply updates
- 如某個時間內,沒收到 server 的訊息,就進行 failover
References
ØMQ - The Guide