2019/09/29

ZeroMQ 2 Sockets and Patterns

說明 ØMQ patterns: request-reply, pub-sub, pipeline,包含以下項目

  1. create and work with ØMQ sockets
  2. send/receive messages
  3. 如何使用非同步 I/O model 實作 application
  4. 如何在一個 thread 處理多個 sockets
  5. 如何處理 fatal and nonfatal errors
  6. 如何關閉 ØMQ application
  7. 如何 send/receive multipart messages
  8. 如何 forward message
  9. 如何建立一個簡單的 message queueing broker
  10. 如何實作 multithread application

Socket API

  1. creating and destroying sockets: zmq_socket(), zmq_close()
  2. socket 的 options 參數: zmq_setsockopt(), zmq_getsockopt()
  3. writing/receiving messages: zmq_send(), zmq_recv()

注意 socket 永遠是 void pointer,message 是 structure。

注意 ØMQ 是非同步的


要將兩個 node 連接起來,一個用 zmq_bind(),另一個用 zmq_connect(),通常呼叫 zmq_bind() 的是 "server",有固定的 network address,另一個是 "client"

ØMQ 跟舊的 TCP connection 的差異在

  1. 可使用 inproc, ipc, tcp, pgm, epgm 這些 transport layer
  2. 一個 socket 可有多個 outgoing 及 incoming connections
  3. 沒有 zmq_accept() method,當 socket 連接到一個 endpoint,會自動開始接受 connection
  4. network 連線在背景處理,當網路異常斷線後,ØMQ會自動 reconnect
  5. application 不能直接處理 network connecton

最常見的是 client/server model,對所有 clients 來說,server 要保持 visible,因此 server 要使用 zmq_bind(),而 client 要用 zmq_connect()。ØMQ 可接受先啟動 client 再啟動 server。也就是先啟動呼叫 zmq_connect() 的 node。

server node 可使用 socket bind to 很多種 protocol 的 endpoints。但不能 bind 到相同的 endpoint 兩次以上。不過 ipc transport 可以讓一個 process bind to 已經被某個 process 使用的 endpoint,也就是可在 crash 以後,recover socket。

zmq_bind (socket, "tcp://*:5555");
zmq_bind (socket, "tcp://*:9999");
zmq_bind (socket, "inproc://somename");

server 就是架構中的 static component,而 client 是 dynamic parts。

socket 有特殊的類型分別,遵循 messaging pattern 的限制,例如:不能將 publisher 連接到 subscriber socket。


使用 zmq_msg_send() zmq_msg_recv() 收送訊息。 ØMQ 的 I/O model 跟傳統的 TCP 不同,TCP socket 是 1-to-1 的。

  • ØMQ socket 傳遞訊息類似 UDP,是帶有長度的 binary data,不像是 TCP 是 stream of bytes。
  • ØMQ socket 會在背景 thread 處理 I/O,訊息會送到 local input queue,且由 local output queue 送出,不管 application 在處理什麼東西。
  • ØMQ socket 根據類型不同,會有內建 one-to-N routing 行為

zmq_send 並不會真的將訊息送到 socket connection,而是先放到 I/O thread 的 queue,並以非同步方式發送。


Unicast Transports

ØMQ 提供 inproc, ipc, tcp 這些 unicast teansports,還有 epgm, pgm 這些 multicast transports。

最常見的是使用 tcp: disconnected TCP transport。disconnected 是因為在 endpoints 存在前,就可以連接。

windows platform 不支援 ipc,通常會用 ".ipc" extension,在 UNIX 要注意權限,因為不同 user id 都要有存取權限。

inproc: inter-thread transport,是 connected signaling transport,比 tcp, ipc 快,在 client 執行 connect 以前,server 必須要先執行 bind。


ØMQ is not a neutral carrier

有時會有 "如何用 ØMQ 實作 http server" 的問題。但結論是不行,因為 ØMQ 並不是一般化的 socket library,ØMQ 的 framing 機制無法跟其他 protocol 相容。

ØMQ v3.3 以後提供了 ZMQ_ROUTER_RAW 選項,可讓我讀寫沒有 ØMQ framing 機制的 message,就可以利用這個方式,讀寫正常的 http request 與 response,但目前還是實驗性的功能。


I/O Threads

ØMQ 是在 background thread 處理 I/O,通常一個 thread 就夠了,當產生一個 context,就會啟動一個 I/O thread,每秒可處理約 1 GB 的資料,但也可以在產生 socket 前,呼叫 zmq_ctx_set() 增加 I/O thread 的數量。

int io_threads = 4;
void *context = zmq_ctx_new ();
zmq_ctx_set (context, ZMQ_IO_THREADS, io_threads);
assert (zmq_ctx_get (context, ZMQ_IO_THREADS) == io_threads);

以往已有案例,一個 socket 一次可以處理上千個 connections,傳統做法是一個 process/thread 處理一個 connection,該 process/thread 處理一個 socket,ØMQ 將整個架構壓縮到單一 process。

如果把 ØMQ 用在 inter-thread communication (multithreaded application),沒有 external socket I/O,可以將 I/O thread 設定為 0。

Messaging Patterns

ØMQ pattern 是由 pairs of sockets with matching type 實作的,在了解 pattern 前要先了解 socket type。

ØMQ 內建 pattern 為

  • Request-Reply: 連接 client 到 services,這是 remote procedure call 及 task distribution pattern

  • Pub-Sub: 連接一組 publishers 到 subscribers,這是 data distribution pattern

  • Pipeline: 用 fan-out/fan-in pattern 連接 nodes,可有多個 steps 及 loops,這是 parallel task distribution 與 collection pattern

  • Exclusive Pair: exclusivly 連接兩個 sockets,這是在連接一個 process 兩個 threads。

前三種在 chap1 已經說明了,現在要討論第四種。

以下是不同的 socket 組合的有效的 connect-bind pair,任意一端都可執行 bind

  • PUB and SUB
  • REQ and REP
  • REQ and ROUTER (take care, REQ inserts an extra null frame)
  • DEALER and REP (take care, REP assumes a null frame)
  • DEALER and ROUTER
  • DEALER and DEALER
  • ROUTER and ROUTER
  • PUSH and PULL
  • PAIR and PAIR

另外還有 XPUB, XSUB,這是 raw versions of PUB 及 SUB


以上是四種 ØMQ core patterns,以 C++ 實作。

根據這四個 pattern 還可組成 high-level messaging patterns,這些不在 core library 裡面,可在 ØMQ community 中找到,例如 Majordomo pattern: Reliable Request-Reply Pattern (chap4),可在 ØMQ 的 github repo 裡面找到。

Working with Messages

ØMQ 有兩組收送訊息的 API,有簡單的 one-liner 的zmq_send() 及 zmq_recv(),但是 zmq_recv() 並不能處理任意長度的 message size,如果超過 buffer 長度會 truncates messages,有另一組 API 會使用 zmq_msg_t structure

  • 訊息初始化: zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data()
  • 收送訊息: zmq_msg_send(), zmq_msg_recv()
  • 釋放訊息: zmq_msg_close()
  • 存取訊息內容: zmq_msg_data(), zmq_msg_size(), zmq_msg_more()
  • 處理訊息屬性: zmq_msg_get(), zmq_msg_set()
  • 操作訊息: zmq_msg_copy(), zmq_msg_move()

ØMQ message 是 blobs of any size,可傳送任意一種資料格式,例如 protocol buffers, msgpack, JSON...

在記憶體中,使用 zmq_msg_t structure,在 C 語言的使用規則如下

  1. 產生 zmq_msg_t 物件,不是 blocks of data
  2. zmq_msg_init() 產生空白的訊息,並傳給 zmq_msg_recv()
  3. 使用 zmq_msg_init_size() 產生訊息,然後 allocate a block of data,利用 memcpy 複製資料,傳送訊息給 zmq_msg_send()
  4. 呼叫 zmq_msg_close() 釋放訊息,這會釋放 reference,最終會丟棄訊息
  5. 使用 zmq_msg_data() 存取訊息資料,使用 zmq_msg_size() 取得訊息長度
  6. 使用 zmq_msg_move(), zmq_msg_copy(), zmq_msg_init_data() 前要詳讀手冊
  7. zmq_msg_send() 傳送訊息後,ØMQ 會清除 message,也就是設定 size 為 0,不能傳送相同的訊息 2次,發送訊息後,就不能存取該訊息
  8. 這些規則不適用於 zmq_send(), zmq_recv(),那兩個 method 是傳送 byte arrays 而不是 message structure

如果想發送相同訊息2次,可用 zmq_msg_init() 並用 zmq_msg_copy() 複製,複製時只會複製 data reference。

ØMQ 支援 multipart messages,可在單一 message 傳送 list of frames,frames (也稱為 message parts) 是 ØMQ message 的 basic wire format,frame 是有指定長度的 block of data。

另外有 wire-level protocol 稱為 ZMTP,定義了 ØMQ 如何在 TCP connection 讀寫 frames。

一開始 ØMQ 只能送單一 frame,後來 API 增加了 "more" flag,可傳送 multipart messages。

  • message 可以是 one or more parts
  • message parts 稱為 "frames"
  • 每一個 frame 都是 zmq_msg_t object
  • 在 API 底層,收送訊息都是分開獨立的
  • higher-level APIs 提供發送整個 multipart messages 的 wrappers

另外要注意

  • 可發送長度為 0 的訊息,例如當作 signal
  • ØMQ 保證發送 all parts for a message,或是 none of them
  • ØMQ 不會立即發送訊息,但要確認記憶體可放入 multipart message
  • message (single or multipart) 必須放入 memory,如果要發送 file,應該要拆成多個 pieces,分別用 single-part message 傳送,因為 multipart data 會耗用較多 memory
  • 完成接收訊息後,要呼叫 zmq_msg_close(),某些 programming language api 並不會自動 destroy objects

先不要使用 zmq_msg_init_data(),這是 zero-copy method,可能會造成一些問題

Handling Multiple Sockets

目前 main loop 都是用以下程序

  1. wait for message on socket
  2. process message
  3. repeat

但如果要同時在多個 endpoints 讀取資料要怎麼辦?最簡單的方式,是將所有 endpoints 都連接到一個 socket,然後讓 ØMQ 處理 fan-in,這種方式適用於有相同 pattern 的 endpoints

如果要一次讀取多個 sockets,可使用 zmq_poll()

msreader.py,使用一個簡單的 loop 讀取多個 sockets,後面的 sleep 可能會造成一些 delay 的問題。

另外因為依序在兩個 socket 讀取資料,也就是用 fair read 的形式。

# encoding: utf-8
#
#   Reading from multiple sockets
#   This version uses a simple recv loop

import zmq
import time

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Process messages from both sockets
# We prioritize traffic from the task ventilator
while True:

    # Process any waiting tasks
    while True:
        try:
            msg = receiver.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        # process task

    # Process any waiting weather updates
    while True:
        try:
            msg = subscriber.recv(zmq.DONTWAIT)
        except zmq.Again:
            break
        # process weather update

    # No activity, so sleep for 1 msec
    time.sleep(0.001)

mspoller.py,這是比較正確,使用 zmq_poll() 讀取多個 socket 的程式

# encoding: utf-8
#
#   Reading from multiple sockets
#   This version uses zmq.Poller()

import zmq

# Prepare our context and sockets
context = zmq.Context()

# Connect to task ventilator
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Connect to weather server
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://localhost:5556")
subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")

# Initialize poll set
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(subscriber, zmq.POLLIN)

# Process messages from both sockets
while True:
    try:
        socks = dict(poller.poll())
    except KeyboardInterrupt:
        break

    if receiver in socks:
        message = receiver.recv()
        # process task

    if subscriber in socks:
        message = subscriber.recv()
        # process weather update

items structure 有以下四個 members:

typedef struct {
    void *socket;       //  ZeroMQ socket to poll on
    int fd;             //  OR, native file handle to poll on
    short events;       //  Events to poll on
    short revents;      //  Events returned after poll
} zmq_pollitem_t;
Multipart Message

ØMQ 可用多個 frames 組成一個 message,也就是 multipart message。

現在要了解如何實作 proxy,讀寫 forward 資料,但不修改資料。

multipart messages 裡面每一個 part 都是 zmq_msg item,如果發送了 5 parts message,就必須產生、發送並銷毀 5 個 zmq_msg items。

發送 multipart message 的方式 (接收每一個 frame 到一個 message object)

zmq_msg_send (&message, socket, ZMQ_SNDMORE);
…
zmq_msg_send (&message, socket, ZMQ_SNDMORE);
…
zmq_msg_send (&message, socket, 0);

這是接收並處理單一訊息多個 parts 的方式

while (1) {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, socket, 0);
    //  Process the message frame
    …
    zmq_msg_close (&message);
    if (!zmq_msg_more (&message))
        break;      //  Last message frame
}

處理 multiparts message 要注意

  1. 發送 multipart message 時,在發送最後一個 part 完成後,才會真的將訊息發送出去
  2. 如果使用 zmq_poll(),接收了訊息的第一個 part 後,其他部分也都會收到
  3. 接收 multipart 訊息,會收到全部,要不然就是全部都沒收到
  4. 每一個 part 都是獨立的 zmq_msg item
  5. 不管有沒有檢查 more property,都會收到所有 parts of a message
  6. ØMQ 會在收到所有 parts of a message 後,才會發送出去
  7. 除非 close socket,否則沒有 cancel 部分 parts 的方法
Intermediaries and Proxies

ØMQ 重點是 decentralized intelligence,這不代表不需要 middle nodes,ØMQ 可實作出 pipes 或是 service oriented brokers,messaging 領域將這種產品稱為 intermidiation。ØMQ 則稱為 proxies, queues, forwarders, device 或 brokers。

在現實世界,稱為 wholesalers, distributors, managers...

Dynamic Discovery Problem

在大型分散式架構中,最大的問題是 discovery,各 components 之間如何互相知道對方的位址,也就是 "dynamic discovery problem"

最簡單的解決方式就是,hard-coding (configuring) network architecture,問題在於增加新的 piece 需要 reconfigure network。

這是簡單的 Pub-Sub 架構,如果有 1 publisher,上百個 subscriber,需要在每一個 subscriber 都設定 publisher endpoint。subscriber 是動態增加的,publisher 是固定的,但如果要臨時增加 publisher,不容易調整架構。

另一種簡單的解決方案,就是增加一個固定的 network point,讓其他 node 連接,傳統 messaging 就是用這種做法,稱為 message broker。雖然 ØMQ 沒有內建 broker,但可以簡單完成一個 intermediary。

最好是將 intermediaries 視為簡單的 stateless message switches,類似 http proxy,增加 pub-sub proxy 可解決 dynamic discovery problem,proxy 會 open 1 XSUB socket,1 XPUB socket,並以 well-known IP addresses, ports 連接在一起,其他 processes 會連到這個 proxy 而不是互相連線。

Shared Queue (DEALER and ROUTER sockets)

基本的 client/server application如下,但通常會有多個 services 及 clients,這種方式的限制是 services 必須要是 stateless,request 的 state 可存放在 database。

如果要連接多個 clients 到多個 servers,可直接將每一個 client 連到多個 service endpoints。以下是將 client 連到 service A, B, C,發送四個 request,其中 service A 收到 R1, R4, service B 收到 R2, service C 收到 R3。

問題在於如果要臨時增加 3 個 services,就需要重新設定舊的 clients,增加 services,並 restart clients。

可改用以下 broker 解決這個問題。會使用 nonblocking zmq_poll() 監控兩個 sockets,然後在兩個 socket 之間傳遞訊息。

chap3 另外有 asynchronous request-reply flow 的方法

rrclient.py,連接到 broker,發送 10 個 requests,並等待 response

#
#   Request-reply client in Python
#   Connects REQ socket to tcp://localhost:5559
#   Sends "Hello" to server, expects "World" back
#
import zmq

#  Prepare our context and sockets
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5559")

#  Do 10 requests, waiting each time for a response
for request in range(1,11):
    socket.send(b"Hello")
    message = socket.recv()
    print("Received reply %s [%s]" % (request, message))

rrbroker.py,可正確處理 multipart messages

# Simple request-reply broker
import zmq

# Prepare our context and sockets
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
backend = context.socket(zmq.DEALER)
frontend.bind("tcp://*:5559")
backend.bind("tcp://*:5560")

# Initialize poll set
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

# Switch messages between sockets
while True:
    socks = dict(poller.poll())

    if socks.get(frontend) == zmq.POLLIN:
        message = frontend.recv_multipart()
        backend.send_multipart(message)

    if socks.get(backend) == zmq.POLLIN:
        message = backend.recv_multipart()
        frontend.send_multipart(message)

rrworker.py

#
#   Request-reply service in Python
#   Connects REP socket to tcp://localhost:5560
#   Expects "Hello" from client, replies with "World"
#
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:5560")

while True:
    message = socket.recv()
    print("Received request: %s" % message)
    socket.send(b"World")

以下是 Request-Reply Broker 的示意圖,只有中間的 broker 是固定的 node

ØMQ 內建的 proxy function

剛剛的 rrbroker 是很有用的元件,可用來建立 pub-sub forwarders,shared queues,ØMQ 包裝成一個 method zmq_proxy(frontend, backend, capture)

msgqueue.py,可直接替換掉 rrbroker.py

"""

   Simple message queuing broker
   Same as request-reply broker but using ``zmq.proxy``
"""

import zmq


def main():
    """ main method """

    context = zmq.Context()

    # Socket facing clients
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5559")

    # Socket facing services
    backend  = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5560")

    zmq.proxy(frontend, backend)

    # We never get here...
    frontend.close()
    backend.close()
    context.term()


if __name__ == "__main__":
    main()

實際上可以將 ROUTER/DEALER, XSUB/XPUB, PULL/PUSH 這三組 socket type 連接到 zmq.proxy()

Transport Bridging

ØMQ 常見問題是,如何將 ØMQ network 連接到某一種技術的網路中? 可能是 networking 或 messaging technology

解決方式是使用 bridge,bridge 就是在某個 socket 使用某一種 protocol,在另一個 socket 轉換到另一種 protocol,也就是 protocol interpreter

wuproxy.py,會使用 SUB 連接 weather sever,然後轉換以 PUB 發送到 external network

import zmq

context = zmq.Context()

# This is where the weather server sits
frontend = context.socket(zmq.SUB)
frontend.connect("tcp://192.168.55.210:5556")

# This is our public endpoint for subscribers
backend = context.socket(zmq.PUB)
backend.bind("tcp://10.1.1.0:8100")

# Subscribe on everything
frontend.setsockopt(zmq.SUBSCRIBE, b'')

# Shunt messages out to our own subscribers
while True:
    # Process all parts of the message
    message = frontend.recv_multipart()
    backend.send_multipart(message)

Handling Errors and ETERM

ØMQ 以 fail-fast 及 resilience 方式處理 error。當 process 發生 internal error,就容易受到外界的攻擊,因此如果發生 internal error,就會 self-destruct。

當 ØMQ 偵測到 external fault,會對 calling node 發出 error,某些少數特殊狀況下,會直接丟棄訊息,沒有從這種 error 復原的方法。

在 C 的程式裡,需要對每一個 ØMQ call 都進行 error handling。

  • create object 的 method 回傳 null 就是 failed
  • 處理資料的 method,回傳了已處理的 bytes 數量,如果是 -1 就是發生 error
  • 其他 method 如果回傳 0 就是成功,-1 就是 error
  • error code 是在 errno 或是 zmq_errno() 提供
  • 可用 zmq_strerror() 轉換為 error 文字說明
void *context = zmq_ctx_new ();
assert (context);
void *socket = zmq_socket (context, ZMQ_REP);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc == -1) {
    printf ("E: bind failed: %s\n", strerror (errno));
    return -1;
}

有兩種例外狀況,應該視為 nonfatal

  1. 收到訊息,夾帶 ZMQ_DONTWAIT option,沒有 waiting data,ØMQ 會 return -1 並設定 errno 為 EAGAIN
  2. 某個 thread 呼叫 zmq_ctx_destory(),其他thread 會停止運作,zmq_ctx_destory() 會把 context 關掉,並 block 所有 function call,exit with -1,errno 設定為 ETERM

以下是如何正確停止 process 的範例。

因為 PUSH/PULL socket 是單向的,要增加 PUB/SUB socket,用來發送 KILL signal 給 worker

  • sink 產生新的 PUB socket 到新的 endpoint
  • worker bind input socket 到 endpoint
  • 當 sink 偵測到工作完成,就發送 kill 到 PUB socket
  • 當 worker 偵測 kill message,就 exit

taskworker2.py

# encoding: utf-8
#
#   Task worker - design 2
#   Adds pub-sub flow to receive and respond to kill signal

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Socket for control input
controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, b"")

# Process messages from receiver and controller
poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
# Process messages from both sockets
while True:
    socks = dict(poller.poll())

    if socks.get(receiver) == zmq.POLLIN:
        message = receiver.recv_string()

        # Process task
        workload = int(message)  # Workload in msecs

        # Do the work
        time.sleep(workload / 1000.0)

        # Send results to sink
        sender.send_string(message)

        # Simple progress indicator for the viewer
        sys.stdout.write(".")
        sys.stdout.flush()

    # Any waiting controller command acts as 'KILL'
    if socks.get(controller) == zmq.POLLIN:
        break

# Finished
receiver.close()
sender.close()
controller.close()
context.term()

tasksink2.py,收到 100 個 task 結果後,就發送 controller.send(b"KILL")

# encoding: utf-8
#
#   Task sink - design 2
#   Adds pub-sub flow to send kill signal to workers

import sys
import time
import zmq

context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Socket for worker control
controller = context.socket(zmq.PUB)
controller.bind("tcp://*:5559")

# Wait for start of batch
receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmiations
for task_nbr in range(100):
    receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(":")
    else:
        sys.stdout.write(".")
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print("Total elapsed time: %d msec" % total_msec)

# Send kill signal to workers
controller.send(b"KILL")

# Finished
receiver.close()
controller.close()
context.term()

Handling Interrupt Signals

正式程式需要針對中斷信號 ex: Ctrl-C 或 SIGTERM 停止程式。預設狀況下,只會 kill process,這表示 messages 不會被 flushed,檔案不會正常關閉。

interrupt.py,這是處理 Ctrl-C 的方式

#
#   Shows how to handle Ctrl-C
#
import signal
import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5558")

# SIGINT will normally raise a KeyboardInterrupt, just like any other Python call
try:
    socket.recv()
except KeyboardInterrupt:
    print("W: interrupt received, stopping...")
finally:
    # clean up
    socket.close()
    context.term()

程式提供 s_catch_signals() method,會抓到 Ctrl-C(SIGINT) 及 SIGTERM 信號,並設定 global variable: s_interrupted

在程式一開始就要呼叫 s_catch_signals(),處理 signaling handling

s_catch_signals ();
client = zmq_socket (...);
while (!s_interrupted) {
    char *message = s_recv (client);
    if (!message)
        break;          //  Ctrl-C used
}
zmq_close (client);

偵測 Memory Leak

如果是使用自動管理記憶體的程式語言,就不用處理這個問題。如果是使用 C/C++,就要管理記憶體。

首先要安裝 valgrind

sudo apt-get install valgrind

預設狀況下,ØMQ 會讓 valgrind 產生很多資料,可建立一個vg.supp 檔案,填入以下內容,移除 warnings

{
    <socketcall_sendto>
    Memcheck:Param
    socketcall.sendto(msg)
    fun:send
    ...
}
{
    <socketcall_sendto>
    Memcheck:Param
    socketcall.send(msg)
    fun:send
    ...
}

-DDEBUG 選項,編譯程式

利用 valgrind 執行程式

valgrind --tool=memcheck --leak-check=full --suppressions=vg.supp someprog

Multithreading with ØMQ

ØMQ 應該是寫 multithread 程式做好的工具,且不需要 mutexes, locks, 或其他 inter-thread communication 機制

concurrent programming 要注意: "don't share state"

利用 ØMQ 撰寫 multithread code 要注意的事項

  1. 分離 data,不要在多個 thread 之間分享資料,只能用 ØMQ context (threadsafe) 傳遞資料。
  2. 不要使用傳統的 concurrency mechanisms,例如: mutexes, criticl sections, semaphores...
  3. 在 process 啟動時,產生一個 ØMQ context,然後傳給其他需要使用 inproc socket 的 threads
  4. 利用 attached thread 產生 structure,並以 inproc 的 PAIR socket 連接到 parent threads,pattern 為 bind parent socket,然後產生連接該 socket 的 child thread
  5. 使用 attached threads 以他們自己的 contexts 模擬獨立 tasks,以 tcp 方式連線。然後再將程式移到 stand-alone processes。
  6. 只能用 ØMQ message 在 thread 之間傳遞資料
  7. 不要在 thread 之間分享 ØMQ socket,因為 ØMQ socket 不是 threadsafe

只能在產生 socket 的 thread 裡面使用或關閉 sockets

ØMQ 是使用 native OS threads,而不是 "green" threads,可利用 Intel 的 ThreadChecker 工具查看程式在做的工作。缺點是 native threading API 並不 portable

單一個 ØMQ thread 可以在單 CPU core 全速運作,不需要 wait。

mtserver.py

"""
   Multithreaded Hello World server
"""
import time
import threading
import zmq


def worker_routine(worker_url, context=None):
    """Worker routine"""
    context = context or zmq.Context.instance()
    # Socket to talk to dispatcher
    socket = context.socket(zmq.REP)

    socket.connect(worker_url)

    while True:

        string  = socket.recv()

        print("Received request: [ %s ]" % (string))

        # do some 'work'
        time.sleep(1)

        #send reply back to client
        socket.send(b"World")


def main():
    """Server routine"""

    url_worker = "inproc://workers"
    url_client = "tcp://*:5555"

    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Socket to talk to clients
    clients = context.socket(zmq.ROUTER)
    clients.bind(url_client)

    # Socket to talk to workers
    workers = context.socket(zmq.DEALER)
    workers.bind(url_worker)

    # Launch pool of worker threads
    for i in range(5):
        thread = threading.Thread(target=worker_routine, args=(url_worker,))
        thread.daemon = True
        thread.start()

    zmq.proxy(clients, workers)

    # We never get here but clean up anyhow
    clients.close()
    workers.close()
    context.term()


if __name__ == "__main__":
    main()
  • server 啟動多個 worker threads,每一個 worker thread 都產生 REP socket,處理 request。worker thread 是單一 threaded 的 server,差異只是由 tcp 換成 inproc
  • server 產生 ROUTER socket,連接 clients (tcp)
  • server 產生 DEALER socket,連接 worker (inproc)
  • server 啟動 proxy,連接 ROUTER/DEALER sockets

create thread 的 function 在每一種 programming language 都不同,POSIX library 是 pthreads,但 windows 是不同的 API

Signaling Between Threads (PAIR Sockets)

如何在多個 thread 之間協作:使用 ØMQ messages

mtrelay.py

"""
   Multithreaded relay
"""

import threading
import zmq

def step1(context=None):
    """Step 1"""
    context = context or zmq.Context.instance()
    # Signal downstream to step 2
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step2")

    sender.send(b"")


def step2(context=None):
    """Step 2"""
    context = context or zmq.Context.instance()
    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step2")

    thread = threading.Thread(target=step1)
    thread.start()

    # Wait for signal
    msg = receiver.recv()

    # Signal downstream to step 3
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step3")
    sender.send(b"")


def main():
    """ server routine """
    # Prepare our context and sockets
    context = zmq.Context.instance()

    # Bind to inproc: endpoint, then start upstream thread
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step3")

    thread = threading.Thread(target=step2)
    thread.start()

    # Wait for signal
    string = receiver.recv()

    print("Test successful!")

    receiver.close()
    context.term()


if __name__ == "__main__":
    main()
  • thread 透過 shared context 以 inproc socket 通訊
  • parent thread 產生 socket,連接到 inproc:// endpoint,將 context 傳給 child thread

使用 PAIR socket 的原因:

  1. 可使用 PUSH/PULL,但 PUSH 會將 message 傳給所有 receivers,如果不小心產生了兩個 receivers,可能會遺失一半的 signals,PAIR 的優點是,只能有一個 connection,it's "exclusive"
  2. 可使用 DEALER/ROUTER,但 ROUTER 會將訊息包裝起來,將size 為 0 的 signal 轉換為 multipart message,如果不在意,就可以使用 DEALER/ROUTER。另外 DEALER 跟 PUSH 一樣,會平均分配訊息給接收端
  3. 可使用 PUB/SUB,且 PUB 沒有 PUSH, DEALER 的問題。但需要以 empty subscription 設定 subscriber,用起來麻煩。

Node Coordination

當我們需要在網路多個 nodes 之間協作,PAIR socket 不適用,在 threads 跟 nodes 協作是不同的,因為 nodes 可能會隨時上線或斷線,PAIR sockets 在遠端 node 斷線後,不會自動 reconnect。

node與 thread 另一個差異是 thread 數量通常是固定的,但 nodes 數量會異動。

  • publisher 已經先知道會有幾個 subscribers
  • publisher 啟動並等待所有 subscribers 連線,subscriber 透過 REQ/REP 通知 publisher 已經上線
  • 當所有 subscribers 都連線後,publisher 就開始發送資料

synpub.py

#
#  Synchronized publisher
#
import zmq

#  We wait for 10 subscribers
SUBSCRIBERS_EXPECTED = 10

def main():
    context = zmq.Context()

    # Socket to talk to clients
    publisher = context.socket(zmq.PUB)
    # set SNDHWM, so we don't drop messages for slow subscribers
    publisher.sndhwm = 1100000
    publisher.bind('tcp://*:5561')

    # Socket to receive signals
    syncservice = context.socket(zmq.REP)
    syncservice.bind('tcp://*:5562')

    # Get synchronization from subscribers
    subscribers = 0
    while subscribers < SUBSCRIBERS_EXPECTED:
        # wait for synchronization request
        msg = syncservice.recv()
        # send synchronization reply
        syncservice.send(b'')
        subscribers += 1
        print("+1 subscriber (%i/%i)" % (subscribers, SUBSCRIBERS_EXPECTED))

    # Now broadcast exactly 1M updates followed by END
    for i in range(1000000):
        publisher.send(b'Rhubarb')

    publisher.send(b'END')

if __name__ == '__main__':
    main()

synpub.py

#
#  Synchronized subscriber
#
import time

import zmq

def main():
    context = zmq.Context()

    # First, connect our subscriber socket
    subscriber = context.socket(zmq.SUB)
    subscriber.connect('tcp://localhost:5561')
    subscriber.setsockopt(zmq.SUBSCRIBE, b'')

    time.sleep(1)

    # Second, synchronize with publisher
    syncclient = context.socket(zmq.REQ)
    syncclient.connect('tcp://localhost:5562')

    # send a synchronization request
    syncclient.send(b'')

    # wait for synchronization reply
    syncclient.recv()

    # Third, get our updates and report how many we got
    nbr = 0
    while True:
        msg = subscriber.recv()
        if msg == b'END':
            break
        nbr += 1

    print ('Received %d updates' % nbr)

if __name__ == '__main__':
    main()

以 bash 啟動

echo "Starting subscribers..."

for ((a=0; a<10; a++)); do
        python syncsub.py &
done

echo "Starting publisher..."
python syncpub.py

Zero-Copy

ØMQ message API 可由 application buffer 直接傳遞訊息,不需要 copying data,這可改善效能。

先以 zmq_msg_init_data() 產生訊息,並設定 reference of data block (利用 malloc() 取得的資料區塊),傳遞給 zmq_msg_send()

void my_free (void *data, void *hint) {
    free (data);
}
//  Send message from buffer, which we allocate and ZeroMQ will free for us
zmq_msg_t message;
zmq_msg_init_data (&message, buffer, 1000, my_free, NULL);
zmq_msg_send (&message, socket, 0);

發送 message 後不需要呼叫 zmq_msg_close(),libzmq 會自動呼叫

無法在 receive 端提供 Zero-Copy 功能。

multipart message 也可使用 zero-copy

Pub-Sub Message Envelopes

在 pub-sub pattern 可將 key/data 分到不同的 message frame 裡面,稱為 envelope

psenvpub.py

"""
   Pubsub envelope publisher
"""
import time
import zmq

def main():
    """main method"""

    # Prepare our context and publisher
    context   = zmq.Context()
    publisher = context.socket(zmq.PUB)
    publisher.bind("tcp://*:5563")

    while True:
        # Write two messages, each with an envelope and content
        publisher.send_multipart([b"A", b"We don't want to see this"])
        publisher.send_multipart([b"B", b"We would like to see this"])
        time.sleep(1)

    # We never get here but clean up anyhow
    publisher.close()
    context.term()


if __name__ == "__main__":
    main()

psenvsub.py

"""
   Pubsub envelope subscriber
"""
import zmq

def main():
    """ main method """

    # Prepare our context and publisher
    context    = zmq.Context()
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5563")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"B")

    while True:
        # Read envelope with address
        [address, contents] = subscriber.recv_multipart()
        print("[%s] %s" % (address, contents))

    # We never get here but clean up anyhow
    subscriber.close()
    context.term()


if __name__ == "__main__":
    main()

如果有多個 publisher,需要知道 address,可以在發送時,增加 address 的資料。

High-Water Mark

如果在 process 之間發送很多訊息,但記憶體不足,或是一些delay 可能會造成 server crash。

想像 process A 快速發送很多訊息給 process B,但 B 因為 busy 處理速度變慢,有些訊息會停在 B 的 network buffer,有些在網路上,有些在 A 的 network buffer,其他的會累積在 A 的記憶體裡面,很快地會造成 A crash。

這在 message brokers 常常發生,B 造成這樣的問題。

一種解決方式是 flow control,通知 A 停止發送訊息。

但某些情境無法使用這種方法,例如會持續發生的 twitter feed。

另一種方式,是由 transport layer 提供機制,將訊息的 buffer 設定容量上限,而在 buffer 到達上限時,採用某種處理方案,例如 wait。

ØMQ 採用 HWM (high-water mark) 的概念,定義 internal pipes 的容量,每一個連出或連入的 socket 都有獨立的 pipe,設定發送或接收的 HWM,例如 PUB, SUB 只有 send buffer,而 SUB, PULL, REQ, REP 只有 receive buffer, 而 DEALER, ROUTER, PAIR 有 send 及 receive buffers。

在 ØMQ v2.x,HWM 預設為無限大。在 ØMQ v3.x 預設為 1000。

當 socket buffer 到達 HWM 容量,可以根據 socket type 決定 block 或是 drop data。PUB, ROUTER sockets 會 drop data,其他的 sockets 會 block data。

在 inproc 傳輸裡面,sender 與 receiver 會共用 buffers,因此實際上 HWM 是雙向設定的 HWM 的總和。

HWM 並不一定是正確值,預設是最多可使用 1000 messages,實際上會因為 libmq 實作 queue 的方式,造成 buffer size 比 1000 小。

Missing Message Problem Solver

在實作 ØMQ application 時,常會遇到遺失訊息的問題,以下是遺失訊息可能的發生原因:

  • SUB socket 必須要用 zmq_setsockopt() 設定 ZMQ_SUBSCRIBE subscription 條件,否則可能會收到到訊息,因為是以 prefix 方式做為條件,如果設定為 "" 會收到所有訊息
  • 在啟動 PUB socket 並發送 data 後,才啟動 SUB socket,可能會遺失前面的訊息,必須調整架構讓 SUB socket 先啟動,然後 PUB socket 再發送訊息。
  • 如果同步了 SUB, PUB socket,還是有可能會遺失訊息,這是因為 internal queue 會在連線完整建立後,才會產生出來,如果遇到這樣的問題,就將 bind/connect 的方向交換,讓 SUB socket binds,而 PUB socket connects
  • 如果使用了 REP, REQ sockets,但沒有遵循 send/recv/send/recv 的順序,ØMQ 會發生 error。就好像遺失訊息一樣,使用 REQ, REQ 必須遵循 sned/recv 的順序,並檢查 ØMQ calls 的 error code
  • 使用 PUSH socket,第一個 PULL socket 會取得較多訊息,因為只會在所有 PULL sockets 都連上的時候(需要幾 ms),才能公平分配訊息。對於 low data rate 的狀況,可以用 ROUTER/DEALER 以及 load balancing pattern 取代 PUSH/PULL
  • 如果透過 threads 分享 socket,可能會造成 crash
  • 如果使用 inproc,確保 sockets 都是使用相同的 context,否則 connecting side 會發生問題。另外要先 bind 再 connect,因為 inproc 不像是 tcp,不是 disconnected transport protocol。

  • 如果使用 ROUTER,意外狀況會發生遺失訊息的狀況,例如發送了 malformed identify frames,或是忘了發送 identity frame。通常要在 ROUTER 設定 ZMQ_ROUTER_MANDATORY option,且要在 send call 都要檢查 return code

References

ØMQ - The Guide

2019/09/23

ZeroMQ

2010年3月30日,AMQP的最初設計者 iMatix 的執行長 Pieter Hintjens 宣布 iMatix 要退出AMQP工作群組,並實作簡單又快速的ZeroMQ,且不支援未來會發布的AMQP/1.0。針對 AMQP 的問題,在 iMatrix 的 What is wrong with AMQP 有說明他的論點。

ØMQ(也寫作ZeroMQ、0MQ或ZMQ) 是一個為可伸縮的分散式或並行應用程式設計的高效能非同步訊息庫,它提供一個訊息佇列的功能,但ZeroMQ 不需要專門的訊息代理(message broker) 就可以運作。 目前已經有很多不同的程式語言的函式庫可以使用,可參考 ØMQ Language Bindings 有不同程式語言 libray 的安裝方式。

安裝

在 mac 可用 macport 安裝,但這部分是要直接使用 C 語言開發的情況下才要安裝。

sudo port install zmq

如果要使用 python,可使用 ZeroMQ for python pyzmq 這個 library,他已經將 libzmq.so 包裝在裡面,因此直接安裝 pyzmq 就可以使用 ZeroMQ

sudo pip3 install pyzmq

安裝完成後,以一個簡單的範例測試

client.py

import zmq

context = zmq.Context()

socket = context.socket(zmq.REQ)
socket.connect ("tcp://127.0.0.1:7788")

socket.send(b'hello')
print( socket.recv() )

server.py

import zmq

context = zmq.Context()

socket = context.socket(zmq.REP)
socket.bind ("tcp://*:7788")

print( socket.recv() )
socket.send(b'world')

一般網路程式,都需要先啟動 server 後,才能啟動 client,不過 ØMQ 不同,不管誰先啟動都沒關係。以下是在兩個 terminal 分別執行 client/server 的結果。

$ python client.py
b'world'
$ python server.py
b'hello'

pyzmq 的所有 API 文件在 The PyZMQ API

加法計算的小程式

addserver.py

import os
import zmq

context = zmq.Context()

socket = context.socket(zmq.REP)
socket.connect("tcp://localhost:7788")

print( 'Worker %s is running ...' % os.getpid() )

while True:
    # receive request
    a, b = socket.recv_multipart()
    a = int(a)
    b = int(b)

    print( 'Compute %s + %s and send response' % (a, b) )
    socket.send_string( str(a + b) +" (from worker "+ str(os.getpid()) +")" )

addclient.py

import zmq
import random
import time

context = zmq.Context()

socket = context.socket(zmq.REQ)
socket.bind("tcp://*:7788")

# wait all worker connected
time.sleep(1)

for i in range(9):
    a = random.randint(0, 100)
    b = random.randint(0, 100)
    print( 'Compute %s + %s ...' % (a, b) )

    # send request to peer
    socket.send_multipart([bytes(str(a),"ascii"), bytes(str(b),"ascii")])

    # receive response from peer
    rep = socket.recv()
    print( ' = ' + rep.decode('UTF-8') )

在這裡例子必須先啟動 worker (addserver),我們先啟動三個 worker後,再啟動 addclient.py

$ python addserver.py
Worker 45777 is running ...
Compute 33 + 47 and send response
Compute 68 + 13 and send response
Compute 45 + 68 and send response

$ python addserver.py
Worker 45784 is running ...
Compute 69 + 88 and send response
Compute 86 + 13 and send response
Compute 22 + 8 and send response

$ python addserver.py
Worker 45792 is running ...
Compute 41 + 25 and send response
Compute 14 + 84 and send response
Compute 13 + 48 and send response

client 會用 send_multipart 的方式,將資料以 Round-Robin 方式平均分配給 workers,

$ python addclient.py
Compute 33 + 47 ...
 = 80 (from worker 45777)
Compute 41 + 25 ...
 = 66 (from worker 45792)
Compute 69 + 88 ...
 = 157 (from worker 45784)
Compute 68 + 13 ...
 = 81 (from worker 45777)
Compute 14 + 84 ...
 = 98 (from worker 45792)
Compute 86 + 13 ...
 = 99 (from worker 45784)
Compute 45 + 68 ...
 = 113 (from worker 45777)
Compute 13 + 48 ...
 = 61 (from worker 45792)
Compute 22 + 8 ...
 = 30 (from worker 45784)

如果刻意實作一個 addserver2.py,只服務一次計算,就中止 socket。

import os
import zmq

context = zmq.Context()

socket = None

try:
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:7788")

    print( 'Worker %s is running ...' % os.getpid() )

    # while True:
    # receive request
    a, b = socket.recv_multipart()
    a = int(a)
    b = int(b)

    print( 'Compute %s + %s and send response' % (a, b) )
    socket.send_string( str(a + b) +" (from worker "+ str(os.getpid()) +")" )
finally:
    if( socket is not None):
        socket.close()
    if( context is not None):
        context.term()

執行後可以發現,另外兩個 worker 有將其他的 task 處理掉,不過這時要注意,server2.py 在結束前一定要 close socket,否則 client 會一直在等待那一個 worker。

$ python addserver2.py
Worker 45982 is running ...
Compute 49 + 96 and send response

$ python addserver.py
Worker 45967 is running ...
Compute 47 + 63 and send response
Compute 49 + 74 and send response
Compute 69 + 78 and send response
Compute 29 + 10 and send response

$ python addserver.py
Worker 45974 is running ...
Compute 56 + 7 and send response
Compute 62 + 36 and send response
Compute 3 + 47 and send response
Compute 75 + 66 and send response


$ python addclient.py
Compute 47 + 63 ...
 = 110 (from worker 45967)
Compute 49 + 96 ...
 = 145 (from worker 45982)
Compute 56 + 7 ...
 = 63 (from worker 45974)
Compute 49 + 74 ...
 = 123 (from worker 45967)
Compute 62 + 36 ...
 = 98 (from worker 45974)
Compute 69 + 78 ...
 = 147 (from worker 45967)
Compute 3 + 47 ...
 = 50 (from worker 45974)
Compute 29 + 10 ...
 = 39 (from worker 45967)
Compute 75 + 66 ...
 = 141 (from worker 45974)

The Zen of Zero: Ø 的意義

ZeroMQ 的Zero 一開始代表 "zero broker",以及 "zero latency",後來慢慢增加了 "zero administration", "zero cost", "zero waste" 這些意思,也就是說,ZeroMQ 就是要盡可能降低複雜度。ZeroMQ 就像是增強功能的 socket library,也可以說是 mailboxes with routing。

我們再回到 ØMQ 的首頁,看一下這個專案的目標

  • Distributed Messaging: 也就是在分散式的環境,不僅是跨機器,也可以跨 process,跨 thread 進行通訊

  • 可以用多種程式語言在多種平台上互通訊息

  • 在 inproc, IPC, TCP, TIPC, multicast 這些情境中傳遞訊息

  • 支援 pub-sub, push-pull, and router-dealer 這些傳遞訊息的 pattern

  • 用極小的 library 提供高速非同步 I/O engines

  • 有很多開源社群的支援

  • 支援高階程式語言及平台

  • 可建立 centralized, distributed, small, or large 這些架構

  • 有商業支援的自由軟體

以下依照 ØMQ - The Guide in python 的說明瞭解 ØMQ,範例程式可以在 GitHub repository 取得。

ØMQ 收送資料的方法

Request-Reply

這是最基本的,Client 跟 Server 互相收送資料,client 會發送 "Hello" 給 server,server 回應 "World" 給 client。

hwserver.py 會 bind tcp port 5555,並在收到資料後,回應 "World"

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    #  Wait for next request from client
    message = socket.recv()
    print("Received request: %s" % message)

    #  Do some 'work'
    time.sleep(1)

    #  Send reply back to client
    socket.send(b"World")

hwclient.py,會連接到 localhost:5555,會發送 10 次 "Hello",每一次發送後就會等待 server 回應。

import zmq

context = zmq.Context()

#  Socket to talk to server
print("Connecting to hello world server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

#  Do 10 requests, waiting each time for a response
for request in range(10):
    print("Sending request %s ..." % request)
    socket.send(b"Hello")

    #  Get the reply.
    message = socket.recv()
    print("Received reply %s [ %s ]" % (request, message))

我們可以先啟動 client,再啟動 server,或是同時啟動很多個 client。如果在 client 執行到一半,用 Ctrl-C 將 server 中斷,然後重新啟動 hwserver.py,會發現 client 並沒有繼續往下執行。這個問題會在 chap 4 Reliable Request-Reply Patterns 裡面討論。

Publish-Subscribe

剛剛的是 server-client 之間雙向的傳輸,第二種常見的 pattern 為單向的 data distribution,server 會持續發送資料給一些 clients。

wuserver.py 是天氣資訊 server 的範例,會持續一直發送資料,發送時會亂數處理一個 zipcode

#
#   Weather update server
#   Binds PUB socket to tcp://*:5556
#   Publishes random weather updates
#

import zmq
from random import randrange


context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

while True:
    zipcode = randrange(1, 100000)
    temperature = randrange(-80, 135)
    relhumidity = randrange(10, 60)

    socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))

wuclient.py 會透過 socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) 註冊只需要取得 zipcode 10001 的天氣資訊,程式後面取得五次天氣的更新資料,然後平均。

#
#   Weather update client
#   Connects SUB socket to tcp://localhost:5556
#   Collects weather updates and finds avg temp in zipcode
#

import sys
import zmq


#  Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)

print("Collecting updates from weather server...")
socket.connect("tcp://localhost:5556")

# Subscribe to zipcode, default is NYC, 10001
zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001"

# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
    zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

# Process 5 updates
total_temp = 0
for update_nbr in range(5):
    string = socket.recv_string()
    zipcode, temperature, relhumidity = string.split()
    total_temp += int(temperature)

print("Average temperature for zipcode '%s' was %dF" % (
      zip_filter, total_temp / (update_nbr+1))
)

SUB socket 必須要用 zmq_setsockopt() 設定要註冊取得的資訊是什麼,如果沒有註冊,就取不到任何資料。PUB-SUB socket pair 是非同步的。如果對 SUB socket 發送訊息(zmq_send()),就會發生 error,對 PUB socket 接收訊息(zmq_recv())也會發生 error。PUB socket 並不會理會有沒有 SUB socket 接收資料,會持續不斷發送資料出去。

另外,PUB-SUB sockets 中,不能保證 subscriber 什麼時候開始接收到訊息,就算是先啟動了 subscriber 再啟動 publisher,問題還是一樣會發生,subscriber 永遠會漏掉沒接收到 publisher 發送的第一個訊息。這是因為 subscriber 連接到 publisher 需要時間,但publisher 可能已經開始發送訊息了。

subscriber 並不能保證接收到所有 publisher 發送的訊息,在 chap2 會說明並解決這個問題。如果 publisher 有無限多的資料要發送給 subscriber,這個情境下,就不需要在意是不是接收到所有資料。

另外有一些要注意的事項:

  1. subscriber 可連接多個 publisher,資料會依次接收 (fair-queued),subscriber 不會一直接收到某一個 publisher 的資料
  2. 如果 publisher 沒有任何連接的 subscribers,就會直接丟棄所有訊息
  3. 如果是 TCP,且 subscriber 網路速度很慢,message 會被放在 publisher 的 queue,接下來會看到如何保護 publisher,解決這個問題

  4. 在 ØMQ 3.X 以後,如果是用 tcp:// 或是 ipc://,filter 會發生在 publisher 端,如果是用 epgm://,filter 會發生在 subscriber 端。在 2.X,所有 filtering 都發生在 subscriber 端。

Divide & Conquer - Parallel Pipeline

  • ventilator 會產生可平行處理得 tasks
  • 有多個 workers 處理 tasks
  • sink 會接收 worker 的處理結果

taskvent.py 會產生 100 個 tasks,在開始發送 task 之前,要先啟動某個數量的 worker

# Task ventilator
# Binds PUSH socket to tcp://localhost:5557
# Sends batch of tasks to workers via that socket

import zmq
import random
import time

try:
    raw_input
except NameError:
    # Python 3
    raw_input = input

context = zmq.Context()

# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.bind("tcp://*:5557")

# Socket with direct access to the sink: used to synchronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")

print("Press Enter when the workers are ready: ")
_ = raw_input()
print("Sending tasks to workers...")

# The first message is "0" and signals start of batch
sink.send(b'0')

# Initialize random number generator
random.seed()

# Send 100 tasks
total_msec = 0
for task_nbr in range(100):

    # Random workload from 1 to 100 msecs
    workload = random.randint(1, 100)
    total_msec += workload

    sender.send_string(u'%i' % workload)

print("Total expected cost: %s msec" % total_msec)

# Give 0MQ time to deliver
time.sleep(1)

taskwork.py 取得 task 並 sleep 一段時間,然後發送完成的訊息

# Task worker
# Connects PULL socket to tcp://localhost:5557
# Collects workloads from ventilator via that socket
# Connects PUSH socket to tcp://localhost:5558
# Sends results to sink via that socket

import sys
import time
import zmq


context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

# Process tasks forever
while True:
    s = receiver.recv()

    # Simple progress indicator for the viewer
    sys.stdout.write('.')
    sys.stdout.flush()

    # Do the work
    time.sleep(int(s)*0.001)

    # Send results to sink
    sender.send(b'')

tasksink.py,以 taskvent.py 發送的 b'0' 為啟動的訊號,接收 100 個 task 的結果,計算花費的時間

# Task sink
# Binds PULL socket to tcp://localhost:5558
# Collects results from workers via that socket
#
# Author: Lev Givon <lev(at)columbia(dot)edu>

import sys
import time
import zmq


context = zmq.Context()

# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

# Wait for start of batch
s = receiver.recv()

# Start our clock now
tstart = time.time()

# Process 100 confirmations
for task_nbr in range(100):
    s = receiver.recv()
    if task_nbr % 10 == 0:
        sys.stdout.write(':')
    else:
        sys.stdout.write('.')
    sys.stdout.flush()

# Calculate and report duration of batch
tend = time.time()
print("Total elapsed time: %d msec" % ((tend-tstart)*1000))
  • worker 數量越多,處理完成的時間越短。
  • worker 上接 ventilator,下接 sink,數量可任意變動
  • ventilator 的 PUSH 會平均發送 task 給 workers,也稱為 load balancing
  • sink 的 PULL 會平均接收 workers 的結果,也稱為 fair queueing
  • 同樣會發生 "slow joiner" 的問題,會造成 load balancing 分配不均。如果某個 PULL socket 比較快 join,就會在其他 PULL socket 連接前,就先取得較多 task。如果需要較正確的 load balancing 機制,會在 chap3 Advanced Request-Reply Patterns 裡面說明。

ØMQ 的 string

ØMQ 在傳送資料時,必須要指定傳送的資料量,也就是說,我們要自己進行資料格式化。以 C 語言來說,string 是以 null byte 結束,因此可以這樣發送,也就是發送 6 bytes

zmq_send(requester, "Hello", 6, 0);

在 python 的 string 並沒有 null byte,因此在發送時,會在前面增加一個 byte 為字串長度

socket.send("Hello")

但如果在 C 語言接收 python 發送的資料,會發生沒有正確結束的字串的錯誤。因此最好的方式,是 allocate 新的 buffer,並增加一個 byte,複製 string 然後最後填上 null。以下 s_recv 只能處理 255 chars。

static char *
s_recv (void *socket) {
    char buffer [256];
    int size = zmq_recv (socket, buffer, 255, 0);
    if (size == -1)
        return NULL;
    if (size > 255)
        size = 255;
    buffer [size] = 0;
    return strdup (buffer);
}

ZeroMQ strings 是以指定長度的方式處理,發送時並不會在後面加上 null。

ØMQ 的版本

不同版本的 ØMQ 傳送資料可能會發生問題,以下是在 python 確認 ØMQ 版本資訊的方式,libzmq 的部分才是 ØMQ 的 library 的版本號碼。

import zmq

print("Current libzmq version is %s" % zmq.zmq_version())
print("Current  pyzmq version is %s" % zmq.__version__)

執行結果為

Current libzmq version is 4.2.5
Current  pyzmq version is 17.1.2

Programming in ØMQ

ØMQ 都是由取得 context 開始,然後 create socket,C語言是 zmq_ctx_new(),在 process 裡面只需要一個 context,因為 context 是單一 process 裡面所有 socket 的 container,並負責 inproc sockets 的傳輸工作,如果產生了兩個 context,就像是兩個獨立的 ØMQ instances。

在程式開始呼叫一次 zmq_ctx_new(),結束前呼叫一次 zmq_ctx_destroy()

如果有使用 fork() system call,每個 process 都需要自己的 context,如果在呼叫 fork 前先在 main process 呼叫 zmq_ctx_new(),每個 child process 都會取得自己的contexts,必須在 parent process 進行 context 管理。


python 有自動回收的功能,但 C 必須要自己回收 context,否則會產生 memory leak,如果讓 socket 維持 open,zmq_ctx_destory 會永遠停在那邊,等待 socket close。

使用 ØMQ 要注意的 objects 只有 messages, sockets, contexts

  • 盡可能使用 zmq_send(), zmq_recv(),可避免使用 zmq_msg_t objects

  • 如果使用 zmq_msg_recv(),必須要呼叫 zmq_msg_close() 用來 release the received message

  • 如果程式 opening/closing 很多 socket,可能需要 redesign application,這些 socket handles 會等到 context destory 才會被回收。

  • 離開 application 前,要先 close sockets,然後呼叫 zmq_ctx_destory()

如果是 python,會自動 destory object, socket, context,記得要放在 final block。

multithread 環境在下一章討論。

不要在多個 thread 裡面使用相同的 socket。

必須要設定 LINGER value (1 second) 然後 close socket

destroy context 會造成每個 thread 產生 blocking receives/polls/send,並會 return error,程式必須 catch error,設定 linger on, close socket 然後 exit。

不要呼叫兩次 destroy the same context,zmq_ctx_destory 會 block all sockets。

Why we needed ØMQ

application 需要處理訊息及 message queue,但常會自己利用 TCP/UDP 開發。

以下是一般 TCP 程式要注意的問題

  1. 如何處理 I/O?要在 application 或是背景處理 I/O?blocking I/O 的 architecture 並不 scalable,background I/O 很難設計

  2. 如何處理 dynamic components,如何區分 client/server 的 components

  3. 如何處理 message,如何設計容易 write/read 的資料格式,不會造成 buffer overflow

  4. 如何處理不能立即發送的訊息?尤其是在等待 component online 的時候,要丟棄訊息,放到資料庫或是放到 memory queue?

  5. 如何處理 message queue,當 reading 速度很慢,發生 queue 異常時要怎麼辦?

  6. 如何處理遺失的訊息?要等待 refresh,發送 resend或是建立 reliability layer,確保訊息一定會送達。

  7. 如果需要另一種 tranport layer 要怎麼辦?例如要用 multicast 取代 TCP,或是使用 IPv6

  8. 如何 route message,要重送相同的訊息給不同 peers,或是要發送 reply 給原本的 requester?

  9. 如何用不同程式語言撰寫 API?要重新實作 wire-level protocol,或要 repackage library?

  10. 如何在不同 architecture 讀取資料?如何設定 data type?

  11. 如何處理 network error

Hadoop Zookeeper 的 C API code 有 4200 lines 處理 client/server network communication protocol,且沒有文件,對其他開發者來說很難理解。

AMQP 試圖要用 "broker" 處理 addressing, routing, queueing,這會產生大量 client/server prototcol APIs。broker 對於大型網路來說,簡化了複雜度,但這會多了一個 black box,漸漸會讓 broker 變成另一個 bottleneck。需要另一個 team 去維護 broker,也需要 backup boxes。

ØMQ 的優勢

  1. 在 background thread 非同步處理 I/O,以 lock-free 方式跟 application thread 溝通。concurrent ØMQ 不需要 lock, semaphore, 或 wait states

  2. components 可隨時連線或斷線,ØMQ 會自動 reconnect,可用任意順序啟動 components

  3. 需要時會自動 queue message

  4. 有處理 over-full queue 的機制,當 queue full 時,ØMQ 會依照 pattern 決定要 block senders 或是丟棄 messages。

  5. 可使用 TCP, multicast, in-process, inter-process 的 transport layer

  6. 可處理 slow/blocked readers,依照不同 pattern 有不同處理方式

  7. 以 request-reply 或是 pub-sub 方式 route message

  8. 可用一個 function call,建立 proxies 用來 queue, forward, capture messages,proxies 可降低 network 互聯的複雜度

  9. 有保證訊息送達的機制

  10. 可以處理任意格式的訊息,可以是 msgpack, protocol buffers 或其他格式

  11. 可處理 network error

  12. 不需要太多運算資源

References

Learning ZeroMQ with pyzmq

新世紀通訊函式庫 – ZeroMQ

Learning and Using ØMQ http://zguide.zeromq.org

ØMQ - Get The Software

python zeromq安装

ØMQ - wiki

zeromq用來怎麼玩?

ZeroMQ 簡介

ZeroMQ Message Transport Protocol ZMTP 3.1

Easy cluster parallelization with ZeroMQ

2019/09/15

以 python 實作迴歸與分類程式

迴歸

學習資料

取得學習資料文字檔 click.csv

x,y
235,591
216,539
148,413
35,310
85,308
...

先利用 matplotlib 繪製到圖表上

import numpy as np
import matplotlib.pyplot as plt

train = np.loadtxt('click.csv', delimiter=',', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

plt.plot(train_x, train_y, 'o')
plt.show()

另外針對原始的學習資料,進行標準化(z-score正規化),也就是將資料平均轉換為 0,分散轉換為1。其中 𝜇 是所有資料的平均,𝜎 是所有資料的標準差。這樣處理後,會讓參數收斂更快。

\(z^{(i)} = \frac{x^{(i)} - 𝜇}{𝜎}\)

import numpy as np
import matplotlib.pyplot as plt

train = np.loadtxt('click.csv', delimiter=',', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 標準化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

plt.plot(train_z, train_y, 'o')
plt.show()

一次函數

先使用一次目標函數 \(f_𝜃(x)\)

\({f_𝜃(x)=𝜃_0+𝜃_1x}​\)

\({E(𝜃)= \frac{1}{2} \sum_{i=1}^{n}( y^{(i)} - f_𝜃(x^{(i)})^2 }​\)

\(𝜃_0, 𝜃_1​\) 可任意選擇初始值

\(𝜃_0, 𝜃_1\) 的參數更新式為

\(𝜃_0 := 𝜃_0 - 𝜂 \sum_{i=1}^{n}( f_𝜃(x^{(i)} )-y^{(i)} )\)

\(𝜃_1 := 𝜃_1 - 𝜂 \sum_{i=1}^{n}( f_𝜃(x^{(i)} )-y^{(i)} )x^{(i)}\)

用這個方法,就可以找出正確的 \(𝜃_0, 𝜃_1\)

其中 𝜂 是任意數值,先設定為 \(10^{-3}\) 試試看。一般來說,會指定要處理的次數,有時會比較參數更新前後,目標函數的值,如果差異不大,就直接結束。另外 \(𝜃_0, 𝜃_1\) 必須同時一起更新。

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 標準化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 任意選擇初始值
theta0 = np.random.rand()
theta1 = np.random.rand()

# 預測函數
def f(x):
    return theta0 + theta1 * x

# 目標函數 E(𝜃)
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 學習率
ETA = 1e-3

# 誤差
diff = 1

# 更新次數
count = 0

# 重複學習
error = E(train_z, train_y)
while diff > 1e-2:
    # 暫存更新結果
    tmp_theta0 = theta0 - ETA * np.sum((f(train_z) - train_y))
    tmp_theta1 = theta1 - ETA * np.sum((f(train_z) - train_y) * train_z)

    # 更新參數
    theta0 = tmp_theta0
    theta1 = tmp_theta1

    # 計算誤差
    current_error = E(train_z, train_y)
    diff = error - current_error
    error = current_error

    # log
    count += 1
    log = '{}次數: theta0 = {:.3f}, theta1 = {:.3f}, 誤差 = {:.4f}'
    print(log.format(count, theta0, theta1, diff))

# 繪製學習資料與預測函數的直線
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(x))
plt.show()

測試結果

391次數: theta0 = 428.991, theta1 = 93.444, 誤差 = 0.0109
392次數: theta0 = 428.994, theta1 = 93.445, 誤差 = 0.0105
393次數: theta0 = 428.997, theta1 = 93.446, 誤差 = 0.0101
394次數: theta0 = 429.000, theta1 = 93.446, 誤差 = 0.0097

驗證

可輸入 x 預測點擊數,但因為剛剛有將學習資料正規化,預測資料也必須正規化

>>> f(standardize(100))
370.96741051658194
>>> f(standardize(500))
928.9775823086377

二次多項式迴歸

\(f_𝜃(x) = 𝜃_0 + 𝜃_1x + 𝜃_2x^2\) 要增加 \( 𝜃_2\) 這個參數

目標的誤差函數 \({E(𝜃)= \frac{1}{2} \sum_{i=1}^{n}( y^{(i)} - f_𝜃(x^{(i)})^2 }​\)

因為有多筆學習資料,可將資料以矩陣方式處理

\( X = \begin{bmatrix} (x^{(1)})^T\\ (x^{(2)})^T\\ \cdot \\ \cdot \\ (x^{(n)})^T \\ \end{bmatrix} = \begin{bmatrix} 1 & x^{(1)} & (x^{(1)})^2 \\ 1 & x^{(2)} & (x^{(2)})^2 \\ \cdot \\ \cdot \\ 1 & x^{(n)} & (x^{(n)})^2 \\ \end{bmatrix} ​\)

\(f_𝜃(x) = \begin{bmatrix} 1 & x^{(1)} & (x^{(1)})^2 \\ 1 & x^{(2)} & (x^{(2)})^2 \\ \cdot \\ \cdot \\ 1 & x^{(n)} & (x^{(n)})^2 \\ \end{bmatrix} \begin{bmatrix} 𝜃_0 \\ 𝜃_1 \\ 𝜃_2 \\ \end{bmatrix} = \begin{bmatrix} 𝜃_0 + 𝜃_1 x^{(1)} + 𝜃_2 (x^{(1)})^2\\ 𝜃_0 + 𝜃_1 x^{(2)} + 𝜃_2 (x^{(2)})^2\\ \cdot \\ \cdot \\ 𝜃_0 + 𝜃_1 x^{(n)} + 𝜃_2 (x^{(n)})^2\\ \end{bmatrix}\)

第j 項參數的更新式定義為

\(𝜃_j := 𝜃_j - 𝜂 \sum_{i=1}^{n}( f_𝜃(x^{(i)} )-y^{(i)} )x_j^{(i)}​\)

可將 \( ( f_𝜃(x^{(i)} )-y^{(i)} ) ​\) 以及 \(x_j^{(i)}​\) 這兩部分各自以矩陣方式處理

\( f= \begin{bmatrix} ( f_𝜃(x^{(1)} )-y^{(1)} )\\ ( f_𝜃(x^{(2)} )-y^{(2)} )\\ \cdot \\ \cdot \\ ( f_𝜃(x^{(n)} )-y^{(n)} ) \\ \end{bmatrix} \)

\( x_0 = \begin{bmatrix} x_0^{(1)} \\ x_0^{(2)}\\ \cdot \\ \cdot \\ x_0^{(n)} \\ \end{bmatrix} \)

\( \sum_{i=1}^{n}( f_𝜃(x^{(i)} )-y^{(i)} )x_0^{(i)} = f^Tx_0 \)

分別考慮三個參數

\( x_0 = \begin{bmatrix} x_0^{(1)} \\ x_0^{(2)}\\ \cdot \\ \cdot \\ x_0^{(n)} \\ \end{bmatrix} , x_1 = \begin{bmatrix} x^{(1)} \\ x^{(2)}\\ \cdot \\ \cdot \\ x^{(n)} \\ \end{bmatrix} , x_2 = \begin{bmatrix} (x^{(1)})^2 \\ (x^{(2)})^2\\ \cdot \\ \cdot \\ (x^{(n)})^2 \\ \end{bmatrix}\)

\( X = \begin{bmatrix} x_0 & x_1 & x_2 \end{bmatrix} = \begin{bmatrix} 1 & x^{(1)} & (x^{(1)})^2 \\ 1 & x^{(2)} & (x^{(2)})^2\\ \cdot \\ \cdot \\ 1 & x^{(n)} & (x^{(n)})^2 \\ \end{bmatrix} \)

使用 \( f^TX\) 就可以一次更新三個參數

import numpy as np
import matplotlib.pyplot as plt

# 讀取學習資料
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 標準化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 任意初始值
theta = np.random.rand(3)

# 學習資料轉換為矩陣
def to_matrix(x):
    return np.vstack([np.ones(x.size), x, x ** 2]).T

X = to_matrix(train_z)

# 預測函數
def f(x):
    return np.dot(x, theta)

# 目標函數
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 學習率
ETA = 1e-3

# 誤差
diff = 1

# 更新次數
count = 0

# 重複學習
error = E(X, train_y)
while diff > 1e-2:
    # 更新參數
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # 計算誤差
    current_error = E(X, train_y)
    diff = error - current_error
    error = current_error

    # log
    count += 1
    log = '{}次: theta = {}, 誤差 = {:.4f}'
    print(log.format(count, theta, diff))

# 繪製學習資料與預測函數
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(to_matrix(x)))
plt.show()


也可以將重複停止的條件,改為均方誤差

目標的誤差函數 \({E(𝜃)= \frac{1}{n} \sum_{i=1}^{n}( y^{(i)} - f_𝜃(x^{(i)})^2 }\)

import numpy as np
import matplotlib.pyplot as plt

# 讀取學習資料
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 標準化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 任意初始值
theta = np.random.rand(3)

# 學習資料轉換為矩陣
def to_matrix(x):
    return np.vstack([np.ones(x.size), x, x ** 2]).T

X = to_matrix(train_z)

# 預測函數
def f(x):
    return np.dot(x, theta)

# 目標函數
def MSE(x, y):
    return ( 1 / x.shape[0] * np.sum( (y-f(x)))**2 )

# 學習率
ETA = 1e-3

# 誤差
diff = 1

# 更新次數
count = 0

# 均方誤差的歷史資料
errors = []

# 重複學習
errors.append( MSE(X, train_y) )
while diff > 1e-2:
    # 更新參數
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # 計算誤差
    errors.append( MSE(X, train_y) )
    diff = errors[-2] - errors[-1]

    # log
    count += 1
    log = '{}次: theta = {}, 誤差 = {:.4f}'
    print(log.format(count, theta, diff))

# 繪製重複次數 與誤差的關係
x = np.arange(len(errors))
plt.plot(x, errors)
plt.show()

隨機梯度下降法

隨機選擇一項學習資料,套用在參數的更新上,例如選擇第 k 項。

\(𝜃_j := 𝜃_j - 𝜂 ( f_𝜃(x^{(k)} )-y^{(k)} )x_j^{(k)}\)

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('click.csv', delimiter=',', dtype='int', skiprows=1)
train_x = train[:,0]
train_y = train[:,1]

# 標準化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 任意選擇初始值
theta = np.random.rand(3)

# 學習資料轉換為矩陣
def to_matrix(x):
    return np.vstack([np.ones(x.size), x, x ** 2]).T

X = to_matrix(train_z)

# 預測函數
def f(x):
    return np.dot(x, theta)

# 均方差
def MSE(x, y):
    return (1 / x.shape[0]) * np.sum((y - f(x)) ** 2)

# 學習率
ETA = 1e-3

# 誤差
diff = 1

# 更新次數
count = 0

# 重複學習
error = MSE(X, train_y)
while diff > 1e-2:
    # 排列學習資料所需的隨機排列
    p = np.random.permutation(X.shape[0])
    # 將學習資料以隨機方式取出,並用隨機梯度下降法 更新參數
    for x, y in zip(X[p,:], train_y[p]):
        theta = theta - ETA * (f(x) - y) * x

    # 計算跟前一個誤差的差距
    current_error = MSE(X, train_y)
    diff = error - current_error
    error = current_error

    # log
    count += 1
    log = '{}回目: theta = {}, 差分 = {:.4f}'
    print(log.format(count, theta, diff))

# 列印結果
x = np.linspace(-3, 3, 100)
plt.plot(train_z, train_y, 'o')
plt.plot(x, f(to_matrix(x)))
plt.show()

多元迴歸

如果要處理多元迴歸,就跟多項式迴歸一樣改用矩陣,但在多元迴歸中要注意,要對所有變數 \(x_1, x_2, x_3\)都進行標準化。

\(z_1^{(i)} = \frac{x_1^{(i)} - 𝜇_1}{𝜎_1} \)

\(z_2^{(i)} = \frac{x_2^{(i)} - 𝜇_2}{𝜎_2} \)

\(z_3^{(i)} = \frac{x_3^{(i)} - 𝜇_3}{𝜎_3} \)

分類(感知器)

使用 images1.csv 資料

x1,x2,y
153,432,-1
220,262,-1
118,214,-1
474,384,1
485,411,1
233,430,-1
...

先將原始資料標記在圖表上,y=1 用圓圈,y=-1 用

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('images1.csv', delimiter=',', skiprows=1)
train_x = train[:,0:2]
train_y = train[:,2]

# 繪圖
x1 = np.arange(0, 500)
plt.plot(train_x[train_y ==  1, 0], train_x[train_y ==  1, 1], 'o')
plt.plot(train_x[train_y == -1, 0], train_x[train_y == -1, 1], 'x')
plt.savefig('1.png')

  • 識別函數 \(f_w(x)\) 就是給定向量 \(x\) 後,回傳 1 或 -1 的函數,用來判斷橫向或縱向。

\(f_w(x) = \left\{\begin{matrix} 1 \quad (w \cdot x \geq 0) \\ -1 \quad (w \cdot x < 0) \end{matrix}\right.\)

  • 權重更新式

\(w := \left\{\begin{matrix} w + y^{(i)}x^{(i)} \quad (f_w(x) \neq y^{(i)}) \\ w \quad \quad \quad \quad (f_w(x) = y^{(i)}) \end{matrix}\right.\)

感知器使用精度作為停止的標準比較好,但目前先直接設定訓練次數

最後繪製以權重向量為法線的直線方程式

\(w \cdot x = w_1x_1 + w_2x_2 = 0​\)

\(x_2 = - \frac{w_1}{w2} x_1​\)

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('images1.csv', delimiter=',', skiprows=1)
train_x = train[:,0:2]
train_y = train[:,2]

# 任意初始值
w = np.random.rand(2)

# 識別函數,判斷矩形是橫向或縱向
def f(x):
    if np.dot(w, x) >= 0:
        return 1
    else:
        return -1

# 重複次數
epoch = 10

# 更新次數
count = 0

# 學習權重
for _ in range(epoch):
    for x, y in zip(train_x, train_y):
        if f(x) != y:
            w = w + y * x

            # log
            count += 1
            print('{}次數: w = {}'.format(count, w))

# 繪圖
x1 = np.arange(0, 500)
plt.plot(train_x[train_y ==  1, 0], train_x[train_y ==  1, 1], 'o')
plt.plot(train_x[train_y == -1, 0], train_x[train_y == -1, 1], 'x')
plt.plot(x1, -w[0] / w[1] * x1, linestyle='dashed')
plt.savefig("1.png")

驗證

python -i classification1_perceptron.py
>>> f([200,100])
1
>>> f([100,200])
-1

分類(邏輯迴歸)

邏輯迴歸要先修改學習資料,橫向為 1 ,縱向為 0

x1,x2,y
153,432,0
220,262,0
118,214,0
474,384,1
485,411,1
...

預測函數就是 S 函數

\(f_𝜃(x) = \frac{1}{1 + exp(-𝜃^Tx)}\)

參數更新式為

\(𝜃_j := 𝜃_j - 𝜂 \sum_{i=1}^{n}( f_𝜃(x^{(i)}) - y^{(i)} )x_j^{(i)}\)

可用矩陣處理,轉換時要加上 \(x_0\),且設定為 1,如果當 \(f_𝜃(x) \geq 0.5\),也就是 \(𝜃^T x >0​\) ,就判定為橫向。

將 \(Q^Tx = 0 \) 整理後,就可得到一條直線

\(Q^Tx = 𝜃_0x_0 + 𝜃_1x_1 + 𝜃_2x_2 = 𝜃_0 +𝜃_1x_1+𝜃_2x_2 =0\)

\(x_2 = - \frac{𝜃_0 + 𝜃_1x_2}{𝜃_2}​\)

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('images2.csv', delimiter=',', skiprows=1)
train_x = train[:,0:2]
train_y = train[:,2]

# 任意初始值
theta = np.random.rand(3)

# 以平均及標準差進行標準化
mu = train_x.mean(axis=0)
sigma = train_x.std(axis=0)
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 轉換為矩陣,加上 x0
def to_matrix(x):
    x0 = np.ones([x.shape[0], 1])
    return np.hstack([x0, x])

X = to_matrix(train_z)

# 預測函數 S函數
def f(x):
    return 1 / (1 + np.exp(-np.dot(x, theta)))

# 識別函數
def classify(x):
    return (f(x) >= 0.5).astype(np.int)

# 學習率
ETA = 1e-3

# 重複次數
epoch = 5000

# 更新次數
count = 0

# 重複學習
for _ in range(epoch):
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # log
    count += 1
    print('{}次數: theta = {}'.format(count, theta))

# 繪製圖形
x0 = np.linspace(-2, 2, 100)
plt.plot(train_z[train_y == 1, 0], train_z[train_y == 1, 1], 'o')
plt.plot(train_z[train_y == 0, 0], train_z[train_y == 0, 1], 'x')
plt.plot(x0, -(theta[0] + theta[1] * x0) / theta[2], linestyle='dashed')
# plt.show()
plt.savefig("機器學習4_coding_.png")

驗證

這樣的意思是 200x100 的矩形有 91.6% 的機率會是橫向

>>> f(to_matrix(standardize([[200,100], [100,200]])))
array([0.91604483, 0.03009514])

可再轉化為 1 與 0

>>> classify(to_matrix(standardize([[200,100], [100,200]])))
array([1, 0])

線性不可分離的分類

學習資料為 data3.csv

x1,x2,y
0.54508775,2.34541183,0
0.32769134,13.43066561,0
4.42748117,14.74150395,0
2.98189041,-1.81818172,1
4.02286274,8.90695686,1
2.26722613,-6.61287392,1
-2.66447221,5.05453871,1
-1.03482441,-1.95643469,1
4.06331548,1.70892541,1
2.89053966,6.07174283,0
2.26929206,10.59789814,0
4.68096051,13.01153161,1
1.27884366,-9.83826738,1
-0.1485496,12.99605136,0
-0.65113893,10.59417745,0
3.69145079,3.25209182,1
-0.63429623,11.6135625,0
0.17589959,5.84139826,0
0.98204409,-9.41271559,1
-0.11094911,6.27900499,0

先將學習資料繪製到圖表上看起來無法用一條直線來分類,增加 \(x_1^2​\) 進行分類

參數變成四個,將 \(Q^Tx = 0 ​\) 整理後,就可得到一條曲線

\(Q^Tx = 𝜃_0x_0 + 𝜃_1x_1 + 𝜃_2x_2 +𝜃_3x_1^2 = 𝜃_0 +𝜃_1x_1+𝜃_2x_2 +𝜃_3x_1^2 =0​\)

\(x_2 = - \frac{𝜃_0 + 𝜃_1x_2 +𝜃_3x_1^2}{𝜃_2}\)

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('data3.csv', delimiter=',', skiprows=1)
train_x = train[:,0:2]
train_y = train[:,2]

# 任意初始值
theta = np.random.rand(4)

# 標準化
mu = train_x.mean(axis=0)
sigma = train_x.std(axis=0)
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 轉換為矩陣,加上 x0, x3
def to_matrix(x):
    x0 = np.ones([x.shape[0], 1])
    x3 = x[:,0,np.newaxis] ** 2
    return np.hstack([x0, x, x3])

X = to_matrix(train_z)

# 預測函數 S函數
def f(x):
    return 1 / (1 + np.exp(-np.dot(x, theta)))

# 識別函數
def classify(x):
    return (f(x) >= 0.5).astype(np.int)

# 學習率
ETA = 1e-3

# 重複次數
epoch = 5000

# 更新次數
count = 0

# 重複學習
for _ in range(epoch):
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # log
    count += 1
    print('{}次數: theta = {}'.format(count, theta))

# 繪製圖形
x1 = np.linspace(-2, 2, 100)
x2 = -(theta[0] + theta[1] * x1 + theta[3] * x1 ** 2) / theta[2]
plt.plot(train_z[train_y == 1, 0], train_z[train_y == 1, 1], 'o')
plt.plot(train_z[train_y == 0, 0], train_z[train_y == 0, 1], 'x')
plt.plot(x1, x2, linestyle='dashed')
# plt.show()
plt.savefig("機器學習4_coding_.png")

分類的精度,就是在全部的資料中,能夠被正確分類的 TP與 TN 佔的比例,可表示為

\( Accuracy = \frac{TP + TN}{TP+FP+FN+TN} ​\)

# 精度
accuracies = []

# 重複學習
for _ in range(epoch):
    theta = theta - ETA * np.dot(f(X) - train_y, X)

    # 計算精度
    result = classify(X) == train_y
    accuracy = len(result[result ==True]) / len(result)
    accuracies.append(accuracy)

# 繪製圖形
x = np.arange(len(accuracies))
plt.plot(x, accuracies)
plt.savefig("機器學習4_coding_.png")

計算精度,繪製圖表

隨機梯度下降法

import numpy as np
import matplotlib.pyplot as plt

# 載入學習資料
train = np.loadtxt('data3.csv', delimiter=',', skiprows=1)
train_x = train[:,0:2]
train_y = train[:,2]

# 任意初始值
theta = np.random.rand(4)

# 標準化
mu = train_x.mean(axis=0)
sigma = train_x.std(axis=0)
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 轉換為矩陣,加上 x0, x3
def to_matrix(x):
    x0 = np.ones([x.shape[0], 1])
    x3 = x[:,0,np.newaxis] ** 2
    return np.hstack([x0, x, x3])

X = to_matrix(train_z)

# 預測函數 S函數
def f(x):
    return 1 / (1 + np.exp(-np.dot(x, theta)))

# 識別函數
def classify(x):
    return (f(x) >= 0.5).astype(np.int)

# 學習率
ETA = 1e-3

# 重複次數
epoch = 5000

# 更新次數
count = 0

# 重複學習
for _ in range(epoch):
    # 以隨機梯度下降法更新參數
    p = np.random.permutation(X.shape[0])
    for x, y in zip(X[p,:], train_y[p]):
        theta = theta - ETA * (f(x) - y) * x

    # log
    count += 1
    print('{}次數: theta = {}'.format(count, theta))

# 繪製圖形
x1 = np.linspace(-2, 2, 100)
x2 = -(theta[0] + theta[1] * x1 + theta[3] * x1 ** 2) / theta[2]
plt.plot(train_z[train_y == 1, 0], train_z[train_y == 1, 1], 'o')
plt.plot(train_z[train_y == 0, 0], train_z[train_y == 0, 1], 'x')
plt.plot(x1, x2, linestyle='dashed')
# plt.show()
plt.savefig("機器學習4_coding_.png")

正規化

首先考慮這樣的函數

\(g(x) = 0.1(x^3 + x^2 + x)\)

產生一些雜訊的學習資料,並繪製圖表

import numpy as np
import matplotlib.pyplot as plt

# 原始真正的函數
def g(x):
    return 0.1 * (x ** 3 + x ** 2 + x)

# 適當地利用原本的函數,加上一些雜訊,產生學習資料
train_x = np.linspace(-2, 2, 8)
train_y = g(train_x) + np.random.randn(train_x.size) * 0.05


plt.clf()
x=np.linspace(-2, 2, 100)
plt.plot(train_x, train_y, 'o')
plt.plot(x, g(x), linestyle='dashed')
plt.ylim(-1,2)
plt.savefig("機器學習4_coding_1.png")


# 標準化
mu = train_x.mean()
sigma = train_x.std()
def standardize(x):
    return (x - mu) / sigma

train_z = standardize(train_x)

# 產生學習資料的矩陣 (10次多項式)
def to_matrix(x):
    return np.vstack([
        np.ones(x.size),
        x,
        x ** 2,
        x ** 3,
        x ** 4,
        x ** 5,
        x ** 6,
        x ** 7,
        x ** 8,
        x ** 9,
        x ** 10
    ]).T

X = to_matrix(train_z)

# 參數使用任意初始值
theta = np.random.randn(X.shape[1])

# 預測函數
def f(x):
    return np.dot(x, theta)

# 目標函數
def E(x, y):
    return 0.5 * np.sum((y - f(x)) ** 2)

# 正規化常數
LAMBDA = 0.5

# 學習率
ETA = 1e-4

# 誤差
diff = 1

# 重複學習
error = E(X, train_y)
while diff > 1e-6:
    theta = theta - ETA * (np.dot(f(X) - train_y, X))

    current_error = E(X, train_y)
    diff = error - current_error
    error = current_error

theta1 = theta

# 加上正規化項
theta = np.random.randn(X.shape[1])
diff = 1
error = E(X, train_y)
while diff > 1e-6:
    # 正規化項,因為偏差項不適用於正規化,所以為 0,當 j>0,正規化項為 𝜆 * 𝜃
    reg_term = LAMBDA * np.hstack([0, theta[1:]])
    # 適用於正規化項,更新參數
    theta = theta - ETA * (np.dot(f(X) - train_y, X) + reg_term)

    current_error = E(X, train_y)
    diff = error - current_error
    error = current_error

theta2 = theta

# 繪製圖表
plt.clf()
plt.plot(train_z, train_y, 'o')
z = standardize(np.linspace(-2, 2, 100))
theta = theta1 # 無正規化的結果,虛線
plt.plot(z, f(to_matrix(z)), linestyle='dashed')
theta = theta2 # 有正規化的結果,實線
plt.plot(z, f(to_matrix(z)))
# plt.show()
plt.savefig("機器學習4_coding_2.png")

References

練好機器學習的基本功 範例下載