2016/07/25

Kafka Quick Start

因為 Kafka 依賴 ZooKeeper 來處理分散式系統的備援機制,以下由 Kafka 的安裝與測試,進一步討論如何用 Scala 來撰寫 Producer 與 Consumer。

ZooKeeper Server

Zookeeper 提供目錄和節點的服務,當有兩台伺服器啟動時,會在zookeeper的指定目錄下創建對應自己的臨時節點(這個過程稱為“註冊”),所謂臨時節點,是靠 heartbeat(定時向zookeeper伺服器發送訊息)維持,當伺服器出現故障,zookeeper 就會刪除臨時節點。當伺服器向zookeeper註冊時,zookeeper會分配序列號,我們認為序列號小的那個,就是“主”,序列號大的那個,就是“備援機”(slave)。

當有客戶端需要使用“寫”服務時,需要連接zookeeper,獲得指定目錄下的臨時節點列表,也就是已經註冊的伺服器信息,取得序列號小的那台“主”伺服器的地址,進行後續的訪問操作。以達到“總是訪問主伺服器”的目的。

接下來處理 ZooKeeper 的設定,首先下載 kafka

wget http://apache.stu.edu.tw/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

tar zxvf kafka_2.11-0.9.0.1.tgz

因為 kafka 必須要連結到 ZooKeeper,所以在啟動 kafka server 之前,必須要先啟動 ZooKeeper,通常 ZooKeeper cluster 以三個節點互相備援,且必須要分散配置在三台機器上,目前只是前期測試,就先只啟動一個 ZooKeeper server。

ZooKeeper 的設定檔在 config/zookeeper.properties 裡面,看一下內容,用到 /tmp/zookeeper 這個資料夾,並使用到 TCP Port 2181 作為服務的 port:

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper

# the port at which the clients will connect
clientPort=2181

# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

如果為了未來調整設定方便,可以設定環境變數 KAFKA_HOME

KAFKA_HOME=/root/download/kafka/kafka_2.11-0.9.0.1

啟動 ZooKeeper Server

nohup $KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties &

在啟動指令,加上 -daemon 也可以,這應該是比剛剛的方法還正確的方式

$KAFKA_HOME/bin/zookeeper-server-start.sh -daemon $KAFKA_HOME/config/zookeeper.properties

關閉 ZooKeeper Server

$KAFKA_HOME/bin/zookeeper-server-stop.sh $KAFKA_HOME/config/zookeeper.properties &

啟動完成後,會在 netstat 裡面看到 TCP 2181 Port

# netstat -tnlp|grep 2181
tcp        0      0 0.0.0.0:2181                0.0.0.0:*                   LISTEN      30841/java

我們也可以使用 Apache ZooKeeper 的套件包 zookeeper-3.4.8.tar.gz,不要用 Kafka 內建的 zookeeper。

ZOOKEEPER_HOME=/root/download/kafka/zookeeper-3.4.8
cp $ZOOKEEPER_HOME/conf/zoo_sample.cfg $ZOOKEEPER_HOME/conf/zoo.cfg

啟動 zookeeper server

$ZOOKEEPER_HOME/bin/zkServer.sh start

查看 zookeeper server status

$ZOOKEEPER_HOME/bin/zkServer.sh status

Kafka Broker

因為 Kafka 可以設定 replication-factor,讓資料複製到多個 Broker。

Kafka Broker server 的設定檔在 config/server.properties 裡面,首先要在設定檔中加上這一行,讓我們可以刪除 topic。

delete.topic.enable=true

然後看設定檔的最前面,broker.id 以及 listeners的設定,這是在多個 broker 時,有需要調整的設定項目。

broker.id=0

listeners=PLAINTEXT://:9092
# The port the socket server listens on
port=9092

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=localhost:2181

如果只要執行一個 Kafka Broker 測試,直接這樣執行就好了,因為接下來要安裝 kafka-manager,所以啟動時順道加上 JMX_PORT 的資訊。

env JMX_PORT=8092 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

如果要再執行兩個 Broker,要先把設定檔調整好

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

調整 config/server-1.properties

broker.id=1

listeners=PLAINTEXT://:9093

log.dirs=/tmp/kafka-logs-1

delete.topic.enable=true

調整 config/server-2.properties

broker.id=2

listeners=PLAINTEXT://:9094

log.dirs=/tmp/kafka-logs-2

zookeeper.connect=localhost:2181

接下來再啟動兩個 kafka servers

env JMX_PORT=8093 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties

env JMX_PORT=8094 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties

netstat 可看到這三個 Brokers 的 Port

# netstat -tnlp|grep 909
tcp        0      0 0.0.0.0:9092                0.0.0.0:*                   LISTEN      2694/java
tcp        0      0 0.0.0.0:9093                0.0.0.0:*                   LISTEN      3593/java
tcp        0      0 0.0.0.0:9094                0.0.0.0:*                   LISTEN      3657/java

Topic

啟動 Kafka Broker 之後,必須先建立 topic。

因為我們只有三個 Broker,所以 replication-factor 只能指定為 3,如果指定為 4,就會發生 kafka.admin.AdminOperationException。

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 4 --partitions 1 --topic test
Error while executing topic command : replication factor: 4 larger than available brokers: 3
[2016-02-26 16:17:15,436] ERROR kafka.admin.AdminOperationException: replication factor: 4 larger than available brokers: 3
    at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:77)
    at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:236)
    at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:105)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)
 (kafka.admin.TopicCommand$)

所以我們將 replication-factor 改為 3

$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test
Created topic "test".

可以用指令查看 kafka-topic 的資訊,ReplicationFactor是3份, Replicate 在 0, 1, 2 這三台broker上面, topic的leader是 broker id 0,leader負責 partition 的read and write。Isr的意思是有哪些 broker 正在同步當中,簡單的說可以知道哪些broker是活著的。

$KAFKA_HOME/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test  PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: test Partition: 0    Leader: 0   Replicas: 0,1,2 Isr: 0,1,2

可以用以下指令刪除 topic

$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test

刪除了 test 這個 topic 之後,就查不到 topic 的資訊了

Message Producer Consumer 測試

啟動 Console Producer,啟動後會進入互動模式,可以一直往 test 這個 topic 發送文字訊息。

$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test

啟動 Console Consumer,這裡要注意的是,Consumer 是連結到 ZooKeeper 而不是直接連到 Broker,--from-beginning 這個參數的意思是要從頭開始,而不是從連結的時候開始。

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

如果在啟動 consumer 之前,已經在 producer 對 topic: test 發送了幾個訊息,那啟動 consumer 如果有加上 --from-beginning 這個參數,就會把先前那幾個訊息也列印出來。

如果沒有加上 --from-beginning 這個參數,就會從啟動 consumer 開始接收後面的訊息。

列印所有 consumer group 的資訊

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list

刪除 consumer group: test

bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --delete -group test

yahoo kafka-manager

Yahoo 對 Kafka 開發了一個 kafka-manager 網頁管理介面

我們在這個連結下載 kafka-manager 1.3.0.4.tar.gz

解壓縮後,利用 sbt 就可以建置 kadka-manager 的 production deployment package

sbt clean dist

可以在這個目錄中,找到編譯後的 zip file

kafka-manager-1.3.0.4/target/universal/kafka-manager-1.3.0.4.zip

KAFKA_MANAGER_HOME=/root/download/kafka/kafka-manager-1.3.0.4

解壓縮 kafka-manager-1.3.0.4.zip 可以看到 bin, conf, lib, share 這些資料夾。

首先修改 conf/application.conf 裡面的 zkhosts

kafka-manager.zkhosts="192.168.1.7:2181"

然後用這個指令,啟動 kafka-manager

nohup $KAFKA_MANAGER_HOME/bin/kafka-manager -Dconfig.file=$KAFKA_MANAGER_HOME/conf/application.conf -Dhttp.port=9000 > /dev/null 2>&1

依照這個畫面的資訊,把 zookeeper 加入 kafka cluster list

接下來就可以用網頁查看 Kafka cluster 的資訊

References

kafka 文件

kafka-example-in-scala

Apache kafka原理與特性(0.8V)

Deploy Apache Kafka Cluster on AWS

Apache Kafka: Distributed messaging system

架構師一席談(二) zookeeper在分佈式應用中的作用

ZooKeeper 基礎知識、部署和應用程序

kafka集群操作指南

2016/07/18

Kafka

Kafka 是一種 distributed, partitioned, replicated commit log service,核心概念是 messaging system。

basic messaging terminology

  • topic: 訊息的分類
  • producer: 發佈訊息到 topics
  • consumer: 註冊 topics 並持續處理 published messages
  • broker: 由一到多個 broker servers 可組成 cluster

Kafka cluster 會針對每一個 topic 維護一個類似以下圖形的 partitioned log。

每一個 partition 都是 ordered, immutable sequences of messages,並會持續附加到 commit log,訊息在 partitions 中會被 assigned 一個 sequential id number 稱為 offset,代表這個訊息在 partition 中的位置。

partition 可讓 log size 超過單一 server 的限制,雖然每一個 partition 都受到 server 的限制,但組合多個 partitions 的 topic 就可以處理任意數量的資料。partition 的另一個優點是 parallelism 平行處理。

在 Kafka cluster 中,為了處理 fault tolerance,每個 partition 可設定複製到多個 servers。這些 partitions 會設定讓某個 server 的 partition 成為 "leader" 身份,所有讀寫的 request 都會由他處理,其他的 server 是 "followers" 身份,他們只會複製 partitions,當 "leader" 消失,所有 followers 中會找出一個新的 "leader"。每個 server 都可能是某些 partitions 的 leader,用以讓這個 cluster well-balanced。

offset 是由 consumer 控制,通常 consumer 會在讀取訊息時,自己(linearly)增加 offset,但實際上,consumer 可以自由以任何順序讀取並消化訊息。唯一一個會永久保存的資訊是,每一個 consumer 的 offset 位置。

不管訊息有沒有被消化,Kafka cluster 將會保留所有發佈的訊息一段時間,時間長短可設定調整,舉例來說,log retention 應用會設定為 2天。

Producers, Consumers

Producer 發佈訊息到 topic 時,是以 round-robin 的方式 load balanced,也可以依照某個 partition function 來決定。

Consumers 有兩種模式:queuing 與 publish-subscribe,queuing 模式中,每一個訊息只會丟給一堆 consumers 中的某一個來處理,publish-subscribe 模式中,訊息會 broadcast 到所有 consumers。

Kafka 會用 consumer group name 來標記 consumer group,每一個訊息是發送給 consumer group 中的某一個 consumer instance,consumer instances 可以在不同的 processes 或是不同機器上。

通常為了 fault tolerance 的緣故,topics 有 small number of consumer groups。

2 server Kafka cluster, 4 partitions(P0~P3), 2 consumer groups, 2 consumer instances in consumer group A

Kafka 可確保訊息的順序,可在 a pool of consumer processes 中提供 ordering quarantees 以及 load balancing。有個條件限制是,在 consumer group 中,consumer instance 的數量不能超過 partitions 的數量。

Kafka 只能確保在一個 partition 中的訊息順序,如果是 topic 的多個 partitions 則無法保證順序。如果應用是需要 topic 裡面訊息的順序,就只能讓 topic 只有一個 partition,這表示每一個 consumer group 只能有一個 consumer process。

Guarantees

Kafka 保證以下事項

  1. producer 發送給一個特殊的 topic partition 的訊息,會根據發送的順序來排序。先發送的訊息會有比較低的 offset,比較早出現在 log 中。

  2. consumer 讀取訊息的順序會根據儲存在 log 的順序來決定。

  3. 對於一個有 replication factor N 的 topic 來說,可容許 N-1 個 server failures,不會遺失任何已經 commit 到 log 中的訊息。

Use Cases, 幾個 Kafka 的應用情境

  • Messaging

    Kafka 跟其他 messaging system 的差別是:better throughput, built-in partitioning, replication, and fault-tolerance 通常 messaging system 是 low-throughput 的,但實際上卻需要 low end-to-end latency,也需要有 stong durability 的功能。

  • Website Activity Tracking

    利用 real-time publish-subscribe 方式,建立 user activity traking pipeline。可提供 real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting 這些功能。

  • Metrics

    operational monitoring data,在 distributed application 中整合統計資料。

  • Log Aggregation

    Log aggregation 用來收集 server 的 physical log files,集中放在 file system 裡面。Kafka 將 log 轉換成 a steam of messages,建立 abstraction of log system

  • Stream Processing

    處理 stage-wise processing of data,由 raw data 經過處理與轉換,產生新的資料,例如由 RSS 經過 crawl article content,然後發佈到 topic: articles,然後進行 normalize 或 deduplicate content,最後推薦給使用者建議閱讀的文章。Storm 與 Samza 是處理這種轉換的常用 framework。

  • Event Sourcing

    event sourcing 是一種 application design 的形式,state change 會依照時間順序記錄起來。

  • Commit Log

    Kafka 可作為 distributed system 的 external commit-log,可在 nodes 之間複製資料,並在 failed nodes 用來 restore data。Apache BookKeeper 就是這種應用的類似的 framework。

References

Spark Streaming使用Kafka保證數據零丟失

Kafka 文件

High Performance Kafka Consumer for Spark Streaming. Now Support Spark 1.6 and Kafka 0.9

【Apache Kafka】代碼範例

ZooKeeper