2017年12月11日

Fluentd

Fluentd 將 data source 及 backend system 分離,提供兩者之間的一個 Unified Logging Layer,可讓 developers 及 data analysts 能同時使用多種資料源,同時也解決格式錯誤的資料所造成的系統變慢或解譯錯誤的問題。

Fluentd 有三種版本,全部都是以 Apache2 License 釋出。

  1. Fluentd

    社群版本,只能用 ruby gems 安裝,沒有 init scripts,如果想要修改 Fluentd 或是做更多事情,可以用這個社群版

  2. ta-agent

    這是 Treasure Data, Inc 這家公司維護並測試的版本,可直接用 rpm/deb/dmg 套件安裝,安裝時同時安裝了一些預設設定值。如果是第一次使用 Fluentd,建議安裝 ta-agent。

  3. Fluent Bit

    Fluent Bit 是 Fluentd 的 lightweight data forwarder,用在 forward 資料給 Fluentd aggregators。可安裝在 embedded system 或是嵌入到 server 系統中。

Architecture

Fluentd 的架構圖為

由於 data inputs 及 output 透過 Fluentd 中繼資料,Fluentd 這個 Unified Logging Layer 野食作為 pluggable 架構,可不斷地增加不同的 inputoutput plugins,目前已經有超過 500+ 的 plugins。

假設有 M 種 data input,N 種 data output,pluggable 架構可讓原本複雜度 O(M*N) 的系統,變成 O(M+N) 的系統。

安裝

Download Fluentd 有列出所有安裝方式的資訊。我們選擇 Installing Fluentd Using rpm Package 安裝到 CentOS 7。

產生一個新的有 sshd 的 docker machine

docker run -d \
 -p 10022:22\
 -p 80:80\
 -p 8888:8888\
 --sysctl net.ipv6.conf.all.disable_ipv6=1\
 -e "container=docker" --privileged=true -v /sys/fs/cgroup:/sys/fs/cgroup --name fluentd centosssh /usr/sbin/init

在安裝前,Before Installing Fluentd 必須要先處理幾項系統設定。

  1. NTP

    要同步時間,確保 log 的 timestamp 是正確的

    CentOS 7 修改 timezone,校正時間

    timedatectl set-timezone Asia/Taipei
    /usr/sbin/ntpdate time.stdtime.gov.tw && /sbin/hwclock -w
  2. Max # of File Descriptors

    ulimit -n 65535

    vi /etc/security/limits.conf

    root soft nofile 65535
    root hard nofile 65535
    * soft nofile 65535
    * hard nofile 65535
  3. Network Kernel Parameters

    解決 TCP_WAIT 的問題,(如果在 docker 測試,會無法修改 kernel 參數,跳過這個步驟就好了,參考這邊的說明 對docker container進行內核參數調優

    vi /etc/sysctl.conf

    net.ipv4.tcp_tw_recycle = 1
    net.ipv4.tcp_tw_reuse = 1
    net.ipv4.ip_local_port_range = 10240 65535

    sysctl -p 或是 reboot


以 script 安裝 FluentD,daemon 名稱為 td-agent

curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent2.sh | sh

安裝後會增加 /etc/yum.repos.d/td.repo,以及 td-agent service

啟動 daemon

systemctl enable td-agent
systemctl status td-agent

systemctl start td-agent
/etc/init.d/td-agent start
/etc/init.d/td-agent stop
/etc/init.d/td-agent restart
/etc/init.d/td-agent status

設定檔在 /etc/td-agent/td-agent.conf 預設是由 HTTP 接收 logs 轉至 stdout /var/log/td-agent/td-agent.log

發送測試資料

curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test

Use Cases

  • Centralized App Logging 收集不同語言實作的 Applcation 的 Log

  • Log Management & Search 以 Fluentd + Elasticsearch 的整合替代 Splunk

  • Data Analysis 將 Log 儲存到 Hadoop 或 MongoDB,以供後續分析處理

  • Data Archiving 將 Log 儲存到 Amazon S3/Riak/GlusterFS Logs

  • Stream Processing

  • Windows Event Collection 收集 Windows Event Logs (目前 stable 版本 v0.12 還不支援 Windows,要到 v0.14 才有支援)

  • IoT Data Logger

    Cloud Data Logger by Raspberry Pi 說明可在 Raspberry Pi 整合其他 Sensor 後,透過 Fluentd 收集資料。

Life of a Fluentd event

以實例解釋 event 是如何倍 Fluentd 處理的,包含 Setup, Inputs, Filters, Matches, and Labels

使用 inhttp 及 outstdout plugins 解釋 events cycle,首先修改 /etc/td-agent/td-agent.conf

# listening for HTTP Requests
<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

# print the data arrived on each incoming request to standard output
<match test.cycle>
  @type stdout
</match>

發送兩個 curl 測試

# curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test

# curl -i -X POST -d 'json={"action":"login","user":2}' http://localhost:8888/test.cycle
HTTP/1.1 200 OK
Content-type: text/plain
Connection: Keep-Alive
Content-length: 0

tail -f /var/log/td-agent/td-agent.log

2017-10-31 15:15:40 +0800 [info]: adding match pattern="test.cycle" type="stdout"
2017-10-31 15:15:40 +0800 [info]: adding source type="http"
2017-10-31 15:15:40 +0800 [info]: using configuration file: <ROOT>
  <source>
    @type http
    port 8888
    bind 0.0.0.0
  </source>
  <match test.cycle>
    @type stdout
  </match>
</ROOT>
2017-10-31 15:15:48 +0800 [warn]: no patterns matched tag="debug.test"
2017-10-31 15:15:58 +0800 test.cycle: {"action":"login","user":2}
Event structure

Fluentd event 包含 tag, time, record 三個部分

  • tag: event 來自哪裡
  • time: Epoch time,event 發生時間
  • record: log content,JSON object

以 apache log 為例,利用 in_tail 會由一行一行的 text line log 產生 event

192.168.0.1 - - [28/Feb/2013:12:00:00 +0900] "GET / HTTP/1.1" 200 777

tag: apache.access # set by configuration
time: 1362020400   # 28/Feb/2013:12:00:00 +0900
record: {"user":"-","method":"GET","code":200,"size":777,"host":"192.168.0.1","path":"/"}

tag 是由 a.b.c 這樣的字串組成的,用 "." 組合不同部分的字串

設定檔 td-agent.conf

  • source: input source

標準 input 有兩個: http 及 forward,可同時使用

http 將 fluentd 轉變為 HTTP endpoint,由 HTTP 接收 event message

forward 將 fluentd 轉變為 TCP endpoint,接收 TCP packets

ex:

# Receive events from 24224/tcp
# This is used by log forwarding and the fluent-cat command
<source>
  @type forward
  port 24224
</source>

# http://this.host:8888/myapp.access?json={"event":"data"}
<source>
  @type http
  port 8888
</source>
  • match: output destination

比對 event 的 tag,並處理符合定義 tag 的 event

fluentd 的 stdout output plugin 為 file 及 forward

ex:

# Match events tagged with "myapp.access" and
# store them to /var/log/fluent/access.%Y-%m-%d
# Of course, you can control how you partition your data
# with the time_slice_format option.
<match myapp.access>
  @type file
  path /var/log/fluent/access
</match>

match 後面的參數有以下規則,依照在設定檔中的順序進行比對

    • matches a single tag part

    ex: a.* matches a.b a.* not match a or a.b.c

  1. ** matches zero or more tag parts

    a.** matches a, a.b and a.b.c

  2. {X,Y,Z} matches X, Y, or Z, where X, Y, and Z are match patterns

    {a,b} matches a and b a.{b,c}.* a.{b,c.**}

  3. 可用 填寫多個 patterns

    match a and b match a, a.b, a.b.c, and b.d

  • filter: 決定 event processing pipelines

Input -> filter 1 -> ... -> filter N -> Output

ex:

# http://this.host:9880/myapp.access?json={"event":"data"}
<source>
  @type http
  port 9880
</source>

<filter myapp.access>
  @type record_transformer
  <record>
    host_param "#{Socket.gethostname}"
  </record>
</filter>

<match myapp.access>
  @type file
  path /var/log/fluent/access
</match>

event 處理過程

收到 {"event":"data"}
-> 送到 record_transformer filter
-> 增加 "host_param" 欄位
-> {"event":"data","host_param":"webserver1"}
-> 送到 file output
  • system: 設定系統參數
<system>
  # equal to -qq option
  log_level error
  # equal to --without-source option
  without_source
  # suppress_repeated_stacktrace
  # emit_error_log_interval
  # suppress_config_dump
  
  # fluentd’s supervisor and worker process names
  process_name fluentd1
</system>
  • label: group output 及 filter for internal routing
<label @SYSTEM>
  <filter var.log.middleware.**>
    @type grep
    # ...
  </filter>
  <match **>
    @type s3
    # ...
  </match>
</label>
  • @include: include other files
# Include config files in the ./config.d directory
@include config.d/*.conf
Processing Events

在設定 Setup 後,Router Engine 就已經包含了幾個基本的 rules,內部會經過幾個步驟處理 Event。

  • Filters

可用來設定一個 rule,決定要不要接受這個 event

ex: filter test.cycle 放棄不處理 logout,這是用 @grep 處理的,判斷 action 的部分,有沒有 "logout" 這個字串

<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

<filter test.cycle>
  @type grep
  exclude1 action logout
</filter>

<match test.cycle>
  @type stdout
</match>

測試

# curl -i -X POST -d 'json={"action":"login","user":2}' http://localhost:8888/test.cycle
HTTP/1.1 200 OK
Content-type: text/plain
Connection: Keep-Alive
Content-length: 0

# curl -i -X POST -d 'json={"action":"logout","user":2}' http://localhost:8888/test.cycle
HTTP/1.1 200 OK
Content-type: text/plain
Connection: Keep-Alive
Content-length: 0

結果在 log 裡面只有看到 login

2017-10-31 15:50:55 +0800 test.cycle: {"action":"login","user":2}
Labels

可用來定義新的 Routing sections,且不遵循 top-bottom 的順序,類似 linked references 的行為。

ex: 在 source 增加了 @label,表示要跳到 @STAGING 處理 event,而不是用上面的 filter

<source>
  @type http
  bind 0.0.0.0
  port 8880
  @label @STAGING
</source>

<filter test.cycle>
  @type grep
  exclude1 action login
</filter>

<label @STAGING>
  <filter test.cycle>
    @type grep
    exclude1 action logout
  </filter>

  <match test.cycle>
    @type stdout
  </match>
</label>
Buffers

在範例中,使用 stdout 是 non-buffered output,但在正式環境,會需要對 output 增加 buffer,例如 forward, mongodb, s3 ...

buffered output plugins 會儲存收到的 events 到 buffers,並在達到 flush condition 時,再將資料一次寫入目標。換句話說,database 可能不會馬上看到新進的 event。

Execution unit

Fluentd events 預設是在 input plugin thread 中處理的,例如 intail -> filtergrep -> outstdout pipeline,就是在 intail 的 thread 中處理的。filtergrep 及 outstdout 並沒有自己的 thread。

但 buffered output plugin 中,另外有一個自己的 thread 可處理 flushing buffer。

Sample

Collecting Tomcat logs using Fluentd and Elasticsearch

fluentd-catch-all-config

Tomcat容器日誌收集方案fluentd+elasticsearch+kilbana

安裝 fluentd 的 elasticsearch plugin

td-agent-gem install fluent-plugin-elasticsearch

定義 tomcat catalina.out 的 source

<source>
  @type tail
  format none
  path /var/log/tomcat*/localhost_access_log.%Y-%m-%d.txt
  pos_file /var/lib/google-fluentd/pos/tomcat.pos
  read_from_head true
  tag tomcat-localhost_access_log
</source>

<source>
  @type tail
  format multiline
  # Match the date at the beginning of each entry, which can be in one of two
  # different formats.
  format_firstline /^(\w+\s\d+,\s\d+)|(\d+-\d+-\d+\s)/
  format1 /(?<message>.*)/
  path /var/log/tomcat*/catalina.out,/var/log/tomcat*/localhost.*.log
  pos_file /var/lib/google-fluentd/pos/tomcat-multiline.pos
  read_from_head true
  tag tomcat.logs
</source>

<match tomcat.logs>
    @type elasticsearch
    host localhost
    port 9200
    logstash_format true
    logstash_prefix tomcat.logs
    flush_interval 1s
</match>

References

fluentd architecture

用 ElasticSearch + FluentD 打造 Log 神器與數據分析工具


logstash + kibana - Make sense of a mountain of logs

LogStash::Inputs::Syslog 性能測試與優化


使用LogHub進行日誌實時採集

Docker日志收集新方案:fluentd-pilot

Fluentd, Logstash, LogHub, Flume, Kafka

而系統的 log,其實就是眾多的 event,我們必須設法將散落在不同地方的內部或外部 log 收集並儲存起來,集中到某個系統進行管理及分析,才能簡化處理異質環境訊息處理的問題。而 Fluentd, Logstash, LogHub, Flume, Kafka 這些技術,是以不同的方式解決問題。

在營運網路服務時,可能會遇到下面這些問題

  1. 在不同的廣告通路上,取得的使用者,要評估不同的廣告得到的收益結果
  2. 使用者抱怨服務的速度太慢,但要分析是在哪一個部分出問題
  3. 發送優惠券時,要如何評估優惠券的效益
  4. 要分析什麼時候該儲備多一點貨品,或是要調配更多人力
  5. 客戶在使用過程中發生問題,要如何分析是在哪個步驟出錯

由於網路服務的系統,可能會有這些特性,連帶造成使用者在處理某個工作時,必須跨越多種異質環境。

  1. 多個促銷或銷售管道
  2. 多個使用介面,如網頁、手機或是 APP
  3. 多台雲端機器
  4. 多種開發程式語言或環境
  5. 多個作業系統平台

通常會將 Fluentd, Logstash, LogHub 在一起比較,而將 Flume, Kafka 一起比較。

LogHub 是阿里雲的 Log Service,在別的環境中,可以先不考慮這個方案。

Fluentd, Logstash 是用 ruby,而 Flume 及 Kafka 是 Java。

Fluentd 有個基本的限制,他並不保證訊息一定會被傳送,如果不能容忍訊息遺失的狀況,就不要考慮 Fluentd。

但 Logstash 及 Flume,為了保證訊息一定會被傳遞,
同樣的訊息可能會收到兩次以上。

Flume 是訊息收集系統,而 Kafka 更接近於訊息cache系統,他可以儲存一定時間內的資訊。因此可以看到很多是採用 Fluentd + Kafka + Storm/ElasticSearch 這樣混搭使用狀況。

至於 Logstash 跟 Flume 的比較,可以看 logstash vs flume 以及 請對logstash與flume做比較 這篇文章。

Logstash 重視資料的預處理,多個 input 會把資料匯總到 input 和 filter 之間的 buffer中。filter則會從buffer中讀取數據,進行過濾解析,然後儲存在 filter 和 output 之間的Buffer中。當 buffer 滿足一定的條件時,會觸發output的刷新。

而 Flume 比較重視資料的傳輸,只有封裝 event 然後就傳送,沒有資料解析處理的部份,傳輸時比較重視資料的可靠性。

References

Fluentd vs. Logstash: A Comparison of Log Collectors

日誌客戶端(Logstash,Fluentd, Logtail)橫評

深度解讀:為何要使用日誌服務LogHub替換Kafka?

公網數據採集比較(LOGHUB VS 前端機+KAFKA)

Flume和Logstash的那些事兒

日誌採集系統flume和kafka有什麼區別及聯繫,它們分別在什麼時候使用,什麼時候又可以結合?

Kafka 與 Flume的區別

Logging 日誌記錄最佳實踐

你一定需要 六款大數據採集平台的架構分析

深夜實堂:從業務需求淺談 Log aggregators

2017年12月4日

statsd

statsd 是 Graphite/Carbon metrics server 的 front-end proxy,最初由 Etsy's Erik Kastner 以 Node.js 撰寫,目前已經有多種程式語言的實作版本。他是一個 event counter/aggregation service,接收 event timeings,做基本計算後,就產生 values,這可用來收集 custom application metrics,而 application 只需要不斷地發送 events。

collectd 在 5.4 版後就支援了 statsd plugin,也就是將 statsd 嵌入了 collectd。

statsd 是一個 UDP (也可換成 TCP) daemon,根據簡單的協議收集statsd客戶端發送來的數據,聚合統計之後,再定時推送給後端,如graphite和influxdb等,然後透過grafana顯示資料。

系統分成三個部分: client, server, backend。client 要植入 application 中,將相應的 metrics 發送給 statsd server。statsd server 聚合這些 metrics 後,定時發送給 backends。backends 負責儲存這些 Time Series Data,再透過適當的圖表工具展示資料。

安裝

要先安裝 nodejs,由 EPEL 安裝的是 nodejs 6.11.3-1.el7 版

yum install -y epel-release
yum install -y nodejs

如果要改安裝 nodejs 7,必須改用下面的程序

# Install Node.js 7.x repository
curl -sL https://rpm.nodesource.com/setup_7.x | bash -

# Install Node.js and npm
yum install nodejs

直接由 statsd github clone 並安裝 statsd

cd /usr/local/src

git clone https://github.com/etsy/statsd.git

cd statsd

npm install

設定

首先複製一份設定檔

cp exampleConfig.js config.js

修改 graphite 的設定

vi config.js

{
  graphitePort: 2003, 
  graphiteHost: "localhost",
  port: 8125,
  backends: [ "./backends/graphite" ]
}

修改 graphite 的設定

vi /opt/graphite/conf/storage-schemas.conf

[carbon]
pattern = ^carbon\.
retentions = 60:90d

[stats]
pattern = ^stats.*
retentions = 10s:6h,10m:7d,1d:5y

[stats_counts]
pattern = ^stats_counts.*
retentions = 10s:6h,10m:7d,1d:5y

[collectd]
pattern = ^collectd.*
retentions = 10s:6h,10m:7d,1d:5y

[default_1min_for_1day]
pattern = .*
retentions = 60s:1d

10s:6h,10m:7d,1d:5y

  • 6 hours of 10 seconds data
  • 7 days of 10 mins data
  • 5 years of 1 day data

如果 retentions 時間設定為這樣,資料會更多一些

[carbon]
pattern = ^carbon\.
retentions = 60:90d

[stats]
pattern = ^stats.*
retentions = 10s:1d,30s:7d,1m:30d,15m:5y

[stats_counts]
pattern = ^stats_counts.*
retentions = 10s:1d,30s:7d,1m:30d,15m:5y

[collectd]
pattern = ^collectd.*
retentions = 10s:1d,30s:7d,1m:30d,15m:5y

[default_1min_for_1day]
pattern = .*
retentions = 60s:1d

10s:1d,30s:7d,1m:30d,15m:5y

  • 1 day of 10 seconds data
  • 7 days of 30 seconds data
  • 30 days of 1 minute data
  • 5 years of 15 minutes data

必須要同時修改 /opt/graphite/storage/whisper 路徑的 *.wsp 資料,可參考Whisper Scripts 文件。

# 修改 wsp size
find /opt/graphite/storage/whisper/collectd -type f -name '*.wsp' -exec whisper-resize.py --nobackup {} 10s:6h 10m:7d 1d:5y \;

# 列印 wsp file size
find /opt/graphite/storage/whisper/collectd -type f -name '*.wsp' -exec whisper-info.py {} \;

vim /opt/graphite/conf/storage-aggregation.conf

[lower]
pattern = \.lower$
xFilesFactor = 0.1
aggregationMethod = min

[min]
pattern = \.min$
xFilesFactor = 0.1
aggregationMethod = min

[upper]
pattern = \.upper(_\d+)?$
xFilesFactor = 0.1
aggregationMethod = max

[max]
pattern = \.max$
xFilesFactor = 0.1
aggregationMethod = max

[sum]
pattern = \.sum$
xFilesFactor = 0
aggregationMethod = sum

[gauges]
pattern = ^.*\.gauges\..*
xFilesFactor = 0
aggregationMethod = last

[count]
pattern = \.count$
xFilesFactor = 0
aggregationMethod = sum

[count_legacy]
pattern = ^stats_counts.*
xFilesFactor = 0
aggregationMethod = sum

[default_average]
pattern = .*
xFilesFactor = 0.3
aggregationMethod = average
  • 以 .lower .min 或 .upper .max 結尾的 metrics,只會儲存 max, min values,如果少於 10% datapoints,就只會儲存 None

  • 以 count 或 sum 結尾的 metrics,還有在 'stats_counts' 下面的 metrics,會加總所有 values,如果沒有收到資料,會儲存 None

  • 其他資料庫,會計算平均值,如果少於 30% 的 datapoint,就會儲存 None

重新啟動 graphite

systemctl restart carbon
systemctl restart graphite

啟動

有三種方式

  1. 直接在 console 啟動

    cd /usr/local/src/statsd
    node ./stats.js ./config.js
  2. 以 system service 方式啟動

    vi /usr/lib/systemd/system/statsd.service

    [Unit]
    Description=statsd daemon
    
    [Service]
    ExecStart=/usr/bin/node /usr/local/src/statsd/stats.js /usr/local/src/statsd/config.js
    ExecReload=/bin/kill -HUP $MAINPID
    KillMode=process
    
    [Install]
    WantedBy=multi-user.target

    啟動服務

    systemctl daemon-reload
    systemctl enable statsd
    systemctl start statsd
  3. 透過 npm forever-service 安裝服務

    cd /usr/local/src/statsd
    sudo npm install -g forever
    sudo npm install -g forever-service
    sudo forever-service install statsd -s stats.js -o " config.js"
    sudo service statsd start

statsd 會在 UDP:8125 運作,可檢查

netstat -nap | grep 8125

graphite 中會看到這些 metrics

stats.gauges.statsd.timestamp_lag

stats.statsd.graphiteStats.calculationtime
stats.statsd.graphiteStats.flush_length
stats.statsd.graphiteStats.flush_time
stats.statsd.graphiteStats.last_exception
stats.statsd.graphiteStats.last_flush

stats.statsd.bad_line_seen
stats.statsd.metrics_received
stats.statsd.packets_received
stats.statsd.processing_time

stats_counts.statsd.bad_line_seen
stats_counts.statsd.metrics_received
stats_counts.statsd.packets_received

statsd.numStats

Key Concepts

  • buckets 每一個 stat 都有自己的 bucket,不需要預先定義,最後將會被轉換到 graphite,periods ( . ) 會被換成 folders

  • values 每個 stat 都有自己的 value,解譯方式由 modifier 決定,values 一般都是 integer

  • flush 在 flush interval timeout (config.flushInterval 定義,預設值為 10 秒)後,stats 會被 aggregted 並發送到一個 backend service

使用

stats 是使用最基本的 line protocol

<metricname>:<value> | <type>

可用 nc 測試

echo "foo:1|c" | nc -u 127.0.0.1 8125

graphite 會增加這些 metrics

stats.foo
stats_counts.foo

Metric Types

Metric Types

  • Counting
foo:1|c

把 foo 加 1,flush 後,count 會發送到後端,並 reset 為 0。

如果設定了 config.deleteCounters,在 flush 時,如果 count 是 0,就不會發送 metric 到後端

如果你使用 flush interval(10秒),並在每個間隔通過某個計數器給 statsd 傳送7次 counting。則計時器的 value (stats_counts.foo) 為 7,而 per-second value (stats.foo) 為 0.7,另外 numStats 為 7。

  • Sampling
foo:1|c|@0.1

最後面 @0.1,表示每 1/10 的時間間隔,都會發送一次 counter

  • Timing

用來記錄某個 operation 消耗多少時間

foo:320|ms

foo 要花 320ms 完成

statsd 會自動計算該 flush interval 內的 percetiles, average(mean), 標準差, sum, 上下界

在 flush interval 內,你將下列計數器 values 傳給 statsd

450
120
553
994
334
844
675
496

會計算下面的 values,並傳送給 graphite

mean_90 496
upper_90 844
sum_90 3472
upper 994
lower 120
count 8
sum 4466
mean 558.25
  • Gauges

一個被記錄的任意數值

gaugor:333|g

如果 flush 時,值沒有改變,就會再發送一次。設定 config.deleteGauges,就不會再發送一次。

在數值前面加上 + 或 -,是值的計算,而不是覆寫,這表示不能將 gauge 設定為負整數

gaugor:333|g
gaugor:-10|g
gaugor:+4|g

gaugor 結果為 333 - 10 + 4 = 327

  • Sets

在 flushes 之間,記錄發生的 events,但不重複,可用來記錄某個事件在時間區段中,有哪些使用者曾經使用過

request:1|s  // 1
request:2|s  // 1 2
request:1|s  // 1 2
  • Multi-Metric Packets

可以在一行 packet 中,以 \n 區隔多個欄位的資料。但要注意網路單一 packet 的傳輸長度上限,例如 Fast Ethernet 為 1432 (包含)。

gorets:1|c\nglork:320|ms\ngaugor:333|g\nuniques:765|s

將 statsd 整合到 collectd

雖然會減少一個 daemon,改用 collectd 同時啟動 statsd,但目前不採用這種安裝方式

修改 /etc/collectd.conf

LoadPlugin statsd

<Plugin statsd>
  Host "0.0.0.0"
  Port "8125"
#  DeleteCounters true
#  DeleteTimers   false
#  DeleteGauges   false
  DeleteSets     true
  CounterSum     true
  TimerPercentile 90.0
#  TimerPercentile 95.0
#  TimerPercentile 99.0
  TimerLower     true
#  TimerUpper     false
#  TimerSum       false
#  TimerCount     false
</Plugin>

restart collectd

systemctl restart collectd

statsd 會在 UDP:8125 運作,可用 netstat 檢查,但卻是由 collectd process 處理的

netstat -nap | grep 8125

如果用 nc 測試時

echo "foo:1|c" | nc -u 127.0.0.1 8125

會在 graphite 發現,metrics 是在 collectd 下面

collectd.testserver.statsd.count-foo
collectd.testserver.statsd.derive-foo

clients

StatsD Example Clients 這裡有多種程式語言的獨立的測試 Client

3rd Party Client Implementations 這裡有第三方 StatsD 的 Library

node-statsd 為例。

安裝 node-statsd libray

npm install -g node-statsd

撰寫測試程式,發送 api 回應時間,到 statsd 的 timeing

vi test.js

'use strict';

const StatsD = require('node-statsd'),
client = new StatsD({
  host: 'localhost',
  port: 8125
});

setInterval(function () {
  const responseTime = Math.floor(Math.random() * 100);
  client.timing('api', responseTime, function (error, bytes) {
    if (error) {
      console.error(error);
    } else {
      console.log(`Successfully sent ${bytes} bytes, responseTime: ${responseTime}`);
    }
  });
}, 1000);

執行測試程式

export NODE_PATH=/usr/lib/node_modules
node test.js

在 graphite 中可以取得 stats.timers.api.* 這些 metrics

References

StatsD wiki

statsd學習小結

StatsD!次世代系統監控的核心

使用 Statsd + Graphite 的 Monitoring 心得

聊聊 Statsd 和 Collectd 那點事!

StatsD vs collectd vs fluentd and Other Daemons You Should Know 2016/8

How do StatsD and CollectD relate?

StatsD embedded into CollectD

如何深入理解 StatsD 與 Graphite

使用 StatsD + Grafana + InfluxDB 搭建 Node.js 監控系統


How to install Node.js 7.x on Ubuntu/Debian and CentOS

2017年11月27日

Grafana

Grafana 是個 metrics 資料的分析、告警及視覺化圖表的工具平台,最常用來作 Time Series Data 的圖表,也能用在收集 sensor 資料、home automation、天氣及 process control 這些領域。

安裝

參考 download Grafana 這個網頁。

在 CentOS 可用以下程序安裝

wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.5.2-1.x86_64.rpm
sudo yum install -y initscripts fontconfig urw-fonts
sudo rpm -Uvh grafana-4.5.2-1.x86_64.rpm

軟體包含以下這幾個部分

  • /usr/sbin/grafana-server

  • /etc/init.d/grafana-server

    init.d script

  • /etc/sysconfig/grafana-server

    default file (environment vars)

  • /etc/grafana/grafana.ini

    configuration file

  • grafana-server.service

    systemd service (if systemd is available)

  • /var/log/grafana/grafana.log

    default configuration uses a log file

  • /var/lib/grafana/grafana.db

    default configuration specifies an sqlite3 database

啟動

sudo systemctl daemon-reload
sudo systemctl enable grafana-server.service
### start grafana-server
sudo systemctl start grafana-server.service

使用

網址為 http://localhost:3000 預設帳號/密碼為 admin/admin

登入後,首先要設定 DataSource,我們設定使用 graphite,Url 的部分就連到 Graphite-web 的網址

下一步是建立 DashBoard,參考 Using Graphite in Grafana 的說明,另外也可以到 Garfana Labs Dashboards 尋找適當的 template。

以 Graphite Server Metrics 為例,我們先到 Graphite Dashboard Tempates 右邊下載 JSON: graphite-server-carbon-metrics_rev1.json。

然後直接在 Grafana 網頁的 DashBoard 功能上,直接以 import 方式將剛剛的 json 匯入,就可以直接看到下面的圖表畫面。


CollectD Server Metrics

Graphite Dashboard Templates 裡面有關 CollectD Metrics 有四個,但下載後都沒辦法直接看到 CollectD 的資料圖表。

我們還是先載入一個 Template,再修改圖表的 metrics。

修改 collectd.conf,並重新啟動 collectd,主要是要增加 CPU 部分的 aggregation 計算,另外再多載入一些 Plugins。

Hostname "testserver"

FQDNLookup false
Interval 1
#Timeout 2
ReadThreads 5

LoadPlugin cpu
LoadPlugin df
LoadPlugin load
LoadPlugin memory
LoadPlugin disk
LoadPlugin interface
LoadPlugin uptime
LoadPlugin swap
LoadPlugin write_graphite
LoadPlugin processes
LoadPlugin aggregation
LoadPlugin match_regex
LoadPlugin syslog
LoadPlugin logfile

<Plugin logfile>
  LogLevel info
  # File STDOUT
  File "/var/log/collectd/collectd.log"
  Timestamp true
  PrintSeverity false
</Plugin>

<Plugin df>
  # expose host's mounts into container using -v /:/host:ro  (location inside container does not matter much)
  # ignore rootfs; else, the root file-system would appear twice, causing
  # one of the updates to fail and spam the log
  FSType rootfs
  # ignore the usual virtual / temporary file-systems
  FSType sysfs
  FSType proc
  FSType devtmpfs
  FSType devpts
  FSType tmpfs
  FSType fusectl
  FSType cgroup
  FSType overlay
  FSType debugfs
  FSType pstore
  FSType securityfs
  FSType hugetlbfs
  FSType squashfs
  FSType mqueue
  MountPoint "/"
  IgnoreSelected true
  ReportByDevice false
  ReportReserved true
  ReportInodes true
  ValuesAbsolute true
  ValuesPercentage true
  ReportInodes true
</Plugin>

<Plugin "disk">
  Disk "/^[hs]d[a-z]/"
  IgnoreSelected false
</Plugin>

<Plugin interface>
  Interface "lo"
  Interface "/^eth.*/"
  Interface "/^docker.*/"
  IgnoreSelected false
  ReportInactive true
  UniqueName false
</Plugin>

<Plugin memory>
  ValuesAbsolute true
  ValuesPercentage false
</Plugin>

<Plugin "aggregation">
  <Aggregation>
    Plugin "cpu"
    Type "cpu"
    GroupBy "Host"
    GroupBy "TypeInstance"
    CalculateAverage true
  </Aggregation>
</Plugin>

<Chain "PostCache">
  <Rule>
    <Match regex>
      Plugin "^cpu$"
      PluginInstance "^[0-9]+$"
    </Match>
    <Target write>
      Plugin "aggregation"
    </Target>
    Target stop
  </Rule>
  Target "write"
</Chain>

<Plugin write_graphite>
 <Node "example">
   Host "localhost"
   Port "2003"
   Protocol "tcp"
   ReconnectInterval 0
   LogSendErrors true
   Prefix "collectd."
   # Postfix "collectd"
   StoreRates true
   AlwaysAppendDS false
   EscapeCharacter "_"
   SeparateInstances false
   PreserveSeparator false
   DropDuplicateFields false
 </Node>
</Plugin>

在 CollectD DashBoard 的第一個 CPU Average 圖表上,點擊編輯會出現以下的畫面

Metrics #A 的部分,是原本 Template 提供的寫法,用類似的方式,加入 #B 及 #C 的部分,查看 cpu-user 及 cpu-idle 的資料,其他部分就不需要修改。

用類似的方式,修改其他 metrics 圖表,最後的結果為

Metric Editor

Using Graphite in Grafana Metrics Editor 有比較完整的 Metric 圖表功能的說明。

  1. Select metric

    因為 graphite 的樹狀 metrics 資料結構,這邊的 metric 也是一層一層選擇的

  2. Functions

    在選到 metrics 數值後,按下 + ,就可以選用某一個 graphite 的 function

    以 collectd 的 loading 為例,他是使用 graphite 提供的 aliasByNode 的函數,搭配第一個參數是 metric 資料,第二個參數是階層的數字,也就是 "shortterm"

    aliasByNode($prefix.$server.load.load.shortterm, 4)

    如果是下面這樣, legend 就會變成 "shortterm.load"

    aliasByNode($prefix.$server.load.load.shortterm, 4, -2)
  3. Nested Queries

    以這個為例,在使用 sumSeries 時,可參考到 #A 的 metrics,因為 #A 已經有四個 memory-{used,cached,free,buffered} 數值,sumSeries 會直接加總。

  4. Point consolidation

    Graphite 在傳給 Grafana 前,會先進行 consolidate,減少傳送的資料點數量,預設是用 avg 這個 function 處理,也可以利用 consolidateBy 處理。

  5. Query variable

    在 Dashboard 上面,增加一些可以調整的參數,在圖表中,以 $varname 或是 [[varname]] 的方式,參考到這些參數,就像是 DashBoard 的參數一樣。

    Graphite Templated Dashboard 中就用到了 $app, $server, $interval 三個參數。

Alert 告警

首先參考 Configuration 的內容,修改 smtp server 那個部分的設定。

vi /etc/grafana/grafana.ini

[smtp]
enabled = true
host = smtp.gmail.com:465
user = user@maxkit.com.tw
password = password
;cert_file =
;key_file =
;skip_verify = false
from_address = user@maxkit.com.tw

重新啟動 grafana

systemctl restart grafana-server

參考 Alert Rules 的說明

到 Grafana 網頁新增一個 Notification Channel

然後到 DashBoard 的圖表上,編輯某一個想要監控的指標,切換到 Alert 頁籤,設定告警的規則,這裡可以用 AND 或 OR 疊加多個 metric 規則,但 metric 不能有 Template Variables,這是比較麻煩的地方,前面都是用 variable 的方式設定 metric。

回到 metric 設定的部分,要把 variable 移除。

在檢查點,由正常狀態 變成 告警狀態時,會發送一次 email,而由告警狀態 變成 正常狀態時,會再發送一次 email。

References

高顏值監控繪圖工具 Grafana

新監控系統技術選型 Telegraf + Influxdb + Grafana + 二次開發

Graphite 和 Grafana 簡介

使用Grafana+Diamond+Graphite構造完美監控面板

使用StatsD, Graphite, Grafana, Kamon搭建可用於JVM項目的可視化性能監控系統

利用 collectd + InfluxDB + Grafana 監測系統效能

Grafana

2017年11月20日

collectd

collectd 是 system statistics collection daemon,會定時由多個資料來源收集 metrics 資料,完全由 C 語言編寫,故性能高且移植性好,它也能運作在沒有腳本語言支持或者 cron daemon 的 OS 上,例如嵌入式系統,架構上除了核心程式,其他的部分,包含資料 input/output,發送通知,Logging 等等,都是以 plugin 方式處理。

collectd 通常是用來收集硬體的相關資訊,例如 CPU loading,記憶體及網路使用狀況等等,然後透過 Plugin 寫入 graphite。

安裝

collectd 是在 EPEL repository 中,可直接用 yum 安裝。

yum install -y epel-release
yum install -y collectd

systemctl enable collectd
systemctl start collectd

設定

collectd 主要設定檔在 /etc/collectd.conf,最後一行是直接 Include "/etc/collectd.d" include 一個目錄裡面的所有設定檔。所有支援的 plugin 是以 so 形式放在 /usr/lib64/collectd 這個目錄中。

以下是要修改的 collecd.conf 設定檔內容

vi /etc/collectd.conf

# 指定主機名稱,也可以寫成IP
Hostname    "testserver"
# 是否允許以 DNS 查詢主機名稱,如果 DNS 可能有錯誤,建議不要開啟
FQDNLookup   false

# 各種文件、目錄的設置
#BaseDir     "/var/lib/collectd"
#PIDFile     "/var/run/collectd.pid"
#PluginDir   "/usr/lib64/collectd"
#TypesDB     "/usr/share/collectd/types.db"

# 設置為true時,可以根據 <Plugin XXX> 自動載入 plugin,而不需要 LoadPlugin
#AutoLoadPlugin false

# 是否同時上傳 collectd 自己的狀態#CollectInternalStats false

# global 的資料收集時間間隔,單位是秒,可以在 plugin 設定中進行覆寫
Interval                 1
MaxReadInterval        180

# 單位不是秒,而是次數,實際的時間是timeout*interval
Timeout                  2

# 用於配置讀寫線程數
#WriteThreads             5
#ReadThreads              5

# 配置緩存的上下限
WriteQueueLimitLow    8000
WriteQueueLimitHigh  12000

# 建立一個Unix Socket用在命令發送,狀態查看等
LoadPlugin unixsock
<Plugin unixsock>
    SocketFile "/var/run/collectd-unixsock"
    SocketGroup "collectd"
    SocketPerms "0660"
    DeleteSocket true  # 啟動時如果存在 sock,是否嘗試刪除
</Plugin>

# 設置日誌文件,儲存到文件中,可以通過logrotate管理
LoadPlugin logfile
<Plugin logfile>
    LogLevel info
    # File STDOUT
    File "/var/log/collectd/collectd.log"  # 也可以設定為 STDOUT
    Timestamp true
    PrintSeverity false
</Plugin>

# 收集 CPU 資訊
LoadPlugin cpu

# 收集系統 Loading 資訊
LoadPlugin load

# 收集記憶體資訊
LoadPlugin memory

# 收集網路 interface 資料
LoadPlugin interface

<Plugin interface>
    Interface "eth0"
    IgnoreSelected false
    ReportInactive true
    UniqueName false
</Plugin>

# 將資料寫入 graphite
# https://collectd.org/wiki/index.php/Plugin:Write_Graphite
LoadPlugin write_graphite

<Plugin write_graphite>
 <Node "graphing">
   Host "localhost"
   Port "2003"
   Protocol "tcp"
   ReconnectInterval 0
   LogSendErrors true
   Prefix "collectd."
#   Postfix "collectd"
   StoreRates true
   AlwaysAppendDS false
   EscapeCharacter "_"
   SeparateInstances false
   PreserveSeparator false
   DropDuplicateFields false
 </Node>
</Plugin>

# disk 資訊
# https://collectd.org/wiki/index.php/Plugin:DF
LoadPlugin df
<Plugin df>
    # Device "/dev/hda1"
    # Device "192.168.0.2:/mnt/nfs"
    MountPoint "/"
    MountPoint "/home"
    # FSType "ext3"
    # IgnoreSelected false
    # ReportByDevice false
    # ReportInodes false
    # ValuesAbsolute true
    # ValuesPercentage false
</Plugin>

要建立 log 的目錄

mkdir /var/log/collectd

因應 collectd 資料,要修改 graphite 的 Carbon 設定,要注意 collectd 的 pattern 要放在 .* 的前面

vi /opt/graphite/conf/storage-schemas.conf

[carbon]
pattern = ^carbon\.
retentions = 60:90d

[collectd]
pattern = ^collectd.*
retentions = 10s:1d,30s:7d,1m:30d,15m:5y

[default_1min_for_1day]
pattern = .*
retentions = 60s:1d

啟動

重新啟動 collectd, graphite

systemctl restart collectd
systemctl restart carbon
systemctl restart graphite

結果

graphite 收到的 collectd metrics 資料為

collectd.testserver.cpu-0.cpu-XXX

collectd.testserver.df-root.df_complex-free
collectd.testserver.df-root.df_complex-reserved
collectd.testserver.df-root.df_complex-used

collectd.testserver.interface-eth0.if_dropped.rx
collectd.testserver.interface-eth0.if_dropped.tx
collectd.testserver.interface-eth0.if_errors.rx
collectd.testserver.interface-eth0.if_errors.tx
collectd.testserver.interface-eth0.if_octets.rx
collectd.testserver.interface-eth0.if_octets.tx
collectd.testserver.interface-eth0.if_packets.rx
collectd.testserver.interface-eth0.if_packets.tx

collectd.testserver.load.load.longterm
collectd.testserver.load.load.midterm
collectd.testserver.load.load.shortterm

collectd.testserver.memory.memory-buffered
collectd.testserver.memory.memory-cached
collectd.testserver.memory.memory-free
collectd.testserver.memory.memory-slab_recl
collectd.testserver.memory.memory-slab_unrecl
collectd.testserver.memory.memory-used

References

聊聊 Statsd 和 Collectd 那點事!

Collectd 簡介

使用 collectd 進行服務監控

StatsD vs collectd vs fluentd and Other Daemons You Should Know 2016/8

How do StatsD and CollectD relate?

collectd 系統資訊收集服務

2017年11月13日

Graphite

Graphite 可以即時收集、儲存、顯示時間序列類型的資料,由 Carbon, Whisper, Graphite-Web 三個元件組成的,其中 Graphite-Web 可用 Grafana 取代。

Graphite 是由三個元件組成的

  1. Carbon: 是一個 Twisted daemon,負責接受 Time Series Data 資料,把資料暫存到記憶體中

  2. Whisper: Graphite 專用的類似 RRD 的 database,儲存時間序列資料的小型資料庫

  3. Graphite-Web: 用 Django 實作的網頁介面,可用 Grafana 取代

安裝

如果直接要用 Docker 測試 Graphite 可參考 Docker image 網頁s Official Docker image for Graphite

用以下指令啟動 graphite docker container

docker run -d\
 --name graphite\
 --restart=always\
 -p 80:80\
 -p 2003-2004:2003-2004\
 -p 2023-2024:2023-2024\
 -p 8125:8125/udp\
 -p 8126:8126\
 graphiteapp/graphite-statsd
Port Service Usage
80 nginx reverse proxies the graphite front-end dashboard
2003 carbon receiver - plaintext backend
2004 carbon receiver - pickle
2023 carbon aggregator - plaintext
2024 carbon aggregator - pickle
8125 (UDP) statsd UDP based backend proxy
8126 statsd admin

如果要直接用 pip 安裝,參考 Installing From Pip 的文件,可用以下的程序。

首先參考上面的 port 對應,先啟動一個測試的 centos docker node,只安裝了 sshd。(note: TCP Port 3000 是給 grafana 使用的)

docker run -d \
 -p 10022:22\
 -p 80:80\
 -p 2003-2004:2003-2004\
 -p 2023-2024:2023-2024\
 -p 8085:8085\
 -p 8125:8125/udp\
 -p 8126:8126\
 -p 3000:3000\
 -e "container=docker" --privileged=true -v /sys/fs/cgroup:/sys/fs/cgroup --name graphitetest centosssh /usr/sbin/init

用以下指令安裝 graphite,安裝目錄為預設的 /opt/graphite

yum groupinstall -y "Development Tools"
yum install -y python-devel cairo-devel libffi-devel

yum install -y epel-release
yum install -y git
yum install -y python-pip
pip install --upgrade pip

pip install twisted


export PYTHONPATH="/opt/graphite/lib/:/opt/graphite/webapp/"
pip install --no-binary=:all: https://github.com/graphite-project/whisper/tarball/master
pip install --no-binary=:all: https://github.com/graphite-project/carbon/tarball/master
pip install --no-binary=:all: https://github.com/graphite-project/graphite-web/tarball/master

安裝完成後, whisper 安裝在 /usr/bin,carbon 跟 graphite-web 安裝在 /opt/graphite/

bin/
conf/
lib/
    Carbon PYTHONPATH
storage/
    log/
    rrd/
    whisper/        whisper 存放資料的目錄
    ceres/          ceres 存放資料的目錄
webapp/         Graphite-web PYTHONPATH
    graphite/       local_settings.py 的位置
    content/        static content 目錄

設定及啟動

先由設定檔範例複製一份設定檔

cd /opt/graphite/conf/

sudo cp storage-schemas.conf.example storage-schemas.conf  
sudo cp storage-aggregation.conf.example storage-aggregation.conf  
sudo cp graphite.wsgi.example graphite.wsgi  
sudo cp graphTemplates.conf.example graphTemplates.conf  
sudo cp carbon.conf.example carbon.conf

mkdir backup
mv *.example backup/

cd /opt/graphite/webapp/graphite/   
sudo cp local_settings.py.example local_settings.py 
Webapp Database

在 /opt/graphite/webapp/graphite/local_settings.py 中,有一部分 Database Configuration,這是要讓 Django 產生Graphite 使用的資料庫,用來儲存 user profiles, dashboards, and for the Events functionality。

預設使用 /opt/graphite/storage/graphite.db 這個 sqlite 資料庫,可改成 PostgreSQL or MySQL。

vi /opt/graphite/webapp/graphite/local_settings.py

# 修改 Timezone
TIME_ZONE = 'Asia/Taipei'

# 修改 database
DATABASES = {
    'default': {
        'NAME': '/opt/graphite/storage/graphite.db',
        'ENGINE': 'django.db.backends.sqlite3',
        'USER': '',
        'PASSWORD': '',
        'HOST': '',
        'PORT': ''
    }
}

產生 graphite.db

cd /opt/graphite
export PYTHONPATH="/opt/graphite/lib/:/opt/graphite/webapp/"

django-admin.py migrate --settings=graphite.settings --run-syncdb

local_settings.py 的其他設定值,可參考文件 Graphite-web’s local_settings.py

Carbon

carbon 主要的設定檔為 /opt/graphite/conf/carbon.conf,設定檔分為多個區塊,每一個區塊對應到不同的 daemon,例如 [cache] 設定 carbon-cache,[relay] 設定 carbon-relay,[aggregator] 設定 carbon-aggregator。

可增加新的 cache 區塊,就會多一個 carbon-cache process,名稱為 b,Port 為 2004

[cache:b]
LINE_RECEIVER_INTERFACE = 0.0.0.0
LINE_RECEIVER_PORT = 2004
CACHE_QUERY_INTERFACE = 0.0.0.0
CACHE_QUERY_PORT = 7003

在 /opt/graphite/bin 資料夾中,可找到以下三種不同類型的 Carbon daemon

  1. cache:接受通過各種協議傳輸來的指標項數據並以儘可能高的效率將它們寫入磁盤;在接收到指標項時,將指標項值緩存在RAM中,並用底層的Whisper庫按照指定的時間間隔將這些值寫入 disk
  2. Relay:有兩個不同的用途,可複製並分區輸入的指標項。
  3. Aggregator:運作於 cache 前方,在 Whisper 記錄指標項前,緩存這些指標項一段時間。

有兩種方式啟動 carbon daemon

  1. 用 script 啟動 carbon daemon

    ## start
    cd /opt/graphite/bin
    ./carbon-cache.py start
    
    ## stop
    ./carbon-cache.py stop

    啟動後會出現

    Starting carbon-cache (instance a)

    可用 ps 及 netstat 查詢狀況

    # ps -efla | grep carbon-cache
    1 S root     12473     1  0  80   0 - 55806 -      07:42 ?        00:00:00 /usr/bin/python2 ./carbon-cache.py start
    0 S root     12476   196  0  80   0 -  2663 -      07:42 pts/0    00:00:00 grep --color=auto carbon-cache
    
    # netstat -nap | grep 2003
    tcp        0      0 0.0.0.0:2003            0.0.0.0:*               LISTEN      12473/python2
  2. 利用 system service 啟動 carbon service

    vi /usr/lib/systemd/system/carbon.service
    
    [Unit]
    Description=Graphite Carbon Daemon
    
    [Service]
    Type = forking
    GuessMainPID = false
    PIDFile = /opt/graphite/bin/carbon-cache-a.pid
    ExecStart=/opt/graphite/bin/carbon-cache.py start --pidfile /opt/graphite/bin/carbon-cache-a.pid
    ExecReload=/bin/kill -HUP $MAINPID
    KillMode=process
    Restart=on-failure
    RestartSec=42s
    
    [Install]
    WantedBy=multi-user.target

    起動 carbon service

    chmod 755 /usr/lib/systemd/system/carbon.service
    # reload carbon.service
    systemctl daemon-reload
    
    # enable
    systemctl enable carbon
    
    # start carbon service
    systemctl start carbon
    
    # stop carbon service
    systemctl stop carbon

/opt/graphite/conf/storage-schemas.conf

記錄儲存 metrics 的 retention rates,以 pattern 的方式比對 metric paths,並告訴 whisper 該用什麼頻率儲存 datapoints 以及處理歷史資料。

  1. 設定檔中有多個設定區塊
  2. pattern 比對由上至下依照順序比對,使用第一個符合的 pattern 的 metric name 的規則
  3. pattern 為 regular expressions
  4. 在發送第一個 metric 時,就決定了儲存資料的 retention 方式
  5. 如果修改這個設定檔,並不會改變已經產生的 .wsp 檔案,除非要用 whisper-resize.py 套用那些設定

預設內容為

[carbon]
pattern = ^carbon\.
retentions = 60:90d

[default_1min_for_1day]
pattern = .*
retentions = 60s:1d

每一個區塊的規則有三行

  1. [name] 區塊名稱
  2. pattern regular expression
  3. retentions = frequency:history 可有多個 frequency:history retention rate,中間用逗點隔開

ex1: each datapoint represents 10 seconds,只儲存 14 天的資料

[garbage_collection]
pattern = garbageCollections$
retentions = 10s:14d

ex2: metric scheme 為 servers..

[apache_busyWorkers]
pattern = ^servers\.www.*\.workers\.busyWorkers$
retentions = 15s:7d,1m:21d,15m:5y

有多個 retention 設定,最前面的是 most-precise:least-history,最後面的是 least-precise:most-history,whisper 會自動處理 downsample metrics (預設為 averaging)。為求資料正確,least-precise 必須要能被 most-precise 的時間間隔整除。

retentions = 15s:7d,1m:21d,15m:5y
1m = 60s, 60/15 = 4 (ok)
retentions = 180s:7d,300s:30d

300/180 = 3.33 (invalid)

ex3: default1minfor_1day 因為 pattern 設定為 .* ,必須放在最後面,中間的設定分別是給 statsd 及 collected 使用

[carbon]
pattern = ^carbon\.
retentions = 60:90d

[stats]
pattern = ^stats.*
retentions = 10s:6h,10m:7d,1d:5y

[stats_counts]
pattern = ^stats_counts.*
retentions = 10s:6h,10m:7d,1d:5y

[collectd]
pattern = ^collectd.*
retentions = 10s:6h,10m:7d,1d:5y

[default_1min_for_1day]
pattern = .*
retentions = 60s:1d

retentions 也可設定為 seconds-per-datapoint:count-of-datapoints 這種格式

例如

retentions = 60:1440

60 代表 the number of seconds per datapoint,而 1440 代表 the number of datapoints to store


/opt/graphite/conf/storage-schemas.conf

設定如何 aggregate 資料為低精確度的 retention rate

預設內容為

[min]
pattern = \.min$
xFilesFactor = 0.1
aggregationMethod = min

[max]
pattern = \.max$
xFilesFactor = 0.1
aggregationMethod = max

[sum]
pattern = \.count$
xFilesFactor = 0
aggregationMethod = max

[default_average]
pattern = .*
xFilesFactor = 0.5
aggregationMethod = average
  1. xFilesFactor 必須為 0~1 之間的 floating point number,指定要如何切割前一個 retention level slot,切割後必須要有 non-null values,這樣才能 aggregate 為 non-null value。
  2. aggregation Method,指定如何 aggregate values,可使用 average(預設), sum, min, max, last
  3. 在發送第一個 metric 時,就決定了這個設定值
  4. 如果修改這個設定檔,並不會改變已經產生的 .wsp 檔案,除非要用 whisper-set-aggregation-method.py 套用那些設定,例如: /opt/graphite/bin/whisper-set-aggregation-method.py /opt/graphite/storage/whisper/test.wsp max

ex1: pattern 是以 .min 結束的 metric,xFilesFactor 設定為 10% of the slots,前一個 retention level 中,最少要有 10% 的 datapoints,這樣才能 aggregate 到下一個 retention level

[all_min]
pattern = \.min$
xFilesFactor = 0.1
aggregationMethod = min

/opt/graphite/conf/relay-rules.conf

可將某些 metrics 轉送給特定的 backend,這是由 carbon-relay 這個 daemon 處理的

ex:

[example]
pattern = ^mydata\.foo\..+
servers = 10.1.2.3, 10.1.2.4:2004, myserver.mydomain.com

aggregation-rules.conf

可在收到資料時,將某些 metrics 合併在一起。


rewrite-rules.conf

利用 regular expression,修改 metric name

graphite-web
  • 用 script 啟動 graphite-web
cd /opt/graphite

export PYTHONPATH="/opt/graphite/lib/:/opt/graphite/webapp/"

./bin/run-graphite-devel-server.py --port=8085 --libs=`pwd`/webapp /opt/graphite 1>/opt/graphite/storage/log/webapp/process.log 2>&1 &
  • 用 service 啟動
vi /usr/lib/systemd/system/graphite.service

[Unit]
Description=Graphite web daemon

[Service]
Environment=PYTHONPATH=/opt/graphite/storage/whisper
ExecStart=/opt/graphite/bin/run-graphite-devel-server.py --port=8085 --libs=/opt/graphite/webapp /opt/graphite
ExecReload=/bin/kill -HUP $MAINPID
KillMode=control-group

[Install]
WantedBy=multi-user.target
chmod 755 /usr/lib/systemd/system/graphite.service
# reload graphite.service
systemctl daemon-reload
    
# enable
systemctl enable graphite
    
# start graphite service
systemctl start graphite
    
# stop graphite service
systemctl stop graphite

log

tail -f /opt/graphite/storage/log/webapp/process.log

check web process

# ps -efla | grep graphite-devel-server
4 S root       415     1  2  80   0 - 35864 -      01:27 ?        00:00:00 /usr/bin/python2 /opt/graphite/bin/run-graphite-devel-server.py --port=8085 --libs=/opt/graphite/webapp /opt/graphite
0 S root       422   415  7  80   0 - 73878 -      01:27 ?        00:00:01 /usr/bin/python2 /opt/graphite/bin/run-graphite-devel-server.py --port=8085 --libs=/opt/graphite/webapp /opt/graphite
0 S root       437   198  0  80   0 -  2663 -      01:27 pts/0    00:00:00 grep --color=auto graphite-devel-server

網址為 http://localhost:8085

data in

有三種方式,可發送 metrics 資料給 graphite: Plaintext, Pickle, and AMQP。

這些工具 是能跟 graphite 一起運作的資料收集工具,有分為 collection, forwarding, visualization, monitoring, and Storage Backend Alternates 這些種類。

collection 要注意 collectd, diamond, forwarding 要注意 statsd, visualization 則是 grafana, Storage Backend Alternates 則是 ceres, InfluxDB

  • Plaintext Protocol

這是 carbon 支援的最基本的接收資料的方式。資料格式為

<metric path> <metric value> <metric timestamp>
echo "local.random.diceroll 4 `date +%s`" | nc localhost 2003
  • Pickle Protocol

pickle 是比較有效率的資料發送方式,可一次發送多個 metrics 資料。資料格式為

[(path, (timestamp, value)), ...]

pickle receiver port 預設為 TCP 2004,必須將 pickle 資料加上 header,封裝為以下這種格式的 message,才能發送給 pickle server

payload = pickle.dumps(listOfMetricTuples, protocol=2)
header = struct.pack("!L", len(payload))
message = header + payload
  • AMQP

如果 carbon.conf 的 AMQPMETRICNAMEINBODY 設定為 True,就支援跟 Plaintext 一樣的資料格式,ex: echo "local.random.diceroll 4 date +%s",如果設定為 False,就要省略 local.random.diceroll。

functions

Graphite Functions

  • url相關的函數:

url中可以帶有*,會選擇出多個 metrics:

&target=rest.getUser.count.*          
&target=rest.get*.count.succeeded     
&target=rest.getUser.count.{succeeded,failed}
&target=rest.server[1-20].getUser.count    

因為url中的每個 . 代表了一層目錄,所以不能用rest.* 選出 rest.getUser.count

  1. maxSeries,sumSeries,averageSeries: 將 * 代表的多個counter通過某個方式運算一個值。 ex: sumSeries(rest.getUser.count.*),將 * 代表的metrics 加總。

  2. groupByNode,url中有兩顆參數,將第1顆星星代表的多個Series組合,而第2顆代表的則分開來顯示。 groupByNode(ganglia.by-function...cpu.load5,2,"sumSeries"),第一顆星代表服務器,第2顆星代表服務器,此函數將顯示每台Server上所有函數的CPU總和。averageSeriesWithWildcards,sumSeriesWithWildcards也有類似的功能。

  3. aliasByNode,將 * 中的 metric 的名稱當做Alias,aliasByNode(rest.getUser.count.*, 1),alias就是succeeded或failed

  4. exclude,單獨去除某個 metric,exclude(servers*.threads,"server02") 就是只顯示某些 metric

  5. highestAverage,highestMax,只顯示值最高(還可以是最低)的幾個metrics。

  6. currentAbove,currentBelow,以last值(或是max,min,avg值)做條件過濾。

  7. asPercent,以幾個metrics相對的百分比來顯示。

  • 圖形相關的函數

  1. legendValue(rest.getuser.latency, 'avg', 'max'),圖形下面顯示數據的最大小值、平均值(可以是last, avg, total, min, max)

  2. threshold(60, "latency 60ms threshold","red"),在圖形裡顯示警戒線。

  3. consolidateBy(rest.getuser.latency, 'max'),如果看一週的曲線,圖上的每個點包含了多個datapoint的數據,不用本函數,默認行為是算平均值,可以用本函數設為取max,min,sum等。

  • 其他:

  1. summarize(rest.getuser.latency, "1min"),按1分鐘進行聚合而不是原來的10秒。聚合的方式默認是sum,也可以是avg,max,last。

  2. 將自增長的metric轉化為TPS: perSecond(rest.getuser.totalCount),但當前版本此函數好像不存在,一個解決方式是:scaleToSeconds(deriviative(rest.getuser.totalCount),1) ,第一個函數先算出兩個點之間的差值,第二個平均到每秒。

  3. integral(rest.getUser.count),顯示所有datapoint的逐漸累積值。

  4. hitcount, 將rate換算成totalCount,能跨越各種時間長度。

  5. Timeshift, 可以比較當前數值和一週前的數值(兩條線): &target=alias(summarize(rest.getuser.count, "1min"), "today")&target=alias(summarize(rest.getuser.count, "1min"),"1w"), "last week")。 movingAverage,幾個datapoint的移動平均值。

References

Step by Step Install of Graphite with Carbon and StatsD on CentOS 7.1.x

Graphite 部署實戰

使用 Statsd + Graphite 的 Monitoring 心得

DevOps實戰:Graphite監控上手指南

Practical Guide to StatsD/Graphite Monitoring

Graphite學習系列

2017年11月6日

InfluxDB

InfluxDB是一個由 InfluxData 開發的開源時序型資料庫 Time Series Database (TSDB)。是以Go實作,提供高性能地查詢與儲存 Time Series Data。InfluxDB 可應用於儲存系統的監控資料,IoT的即時資料,類似的資料庫有Elasticsearch、Graphite。

主要特色功能

  1. 基於時間序列儲存資料,支援與時間有關的相關函數(如min, max, sum, count, mean, median 等)

  2. 以 Go 實作,編譯為一個獨立的 binary 程式,不需要其他 dependency library

  3. 提供 write and query HTTP/S API

  4. 支援 Telegraf, Graphite, collectd, OpenTSDB 的 plugin,用以取得其他資料庫的資料。

  5. InfluxQL 是 SQL-like query language,支援regular expressions, arithmetic expressions, and time series specific functions,可用在 InfluxQL 中

  6. 每秒可處理百萬個資料點,在有限時間內,提供高精確度的原始資料,舊資料就改為儲存低精確度,統計後的資料。以 Continuous Queries (CQ) and Retention Policies (RP) 支援 downsamping data 及 expiring old data

  7. InfluxDB 屬於 influxdata 的 Time Series Platform TICK 平台的一部分

    1. Telegraf: Agent of collecting and reporting metrics and events (100+ plugins)

    2. InfluxDatabase: 儲存 Time Series Data

    3. Chronograf: 包含 Dashboard 及 access control,是 InluxData Platform 的操作介面

    4. Kapacitor: Real-time streaming data processing engine

安裝

influxdata Downloads 可取得不同 OS 的安裝方式。

在 CentOS 為

wget https://dl.influxdata.com/influxdb/releases/influxdb-1.3.6.x86_64.rpm
sudo yum localinstall influxdb-1.3.6.x86_64.rpm

啟動

有兩種方式

  1. 以 service 啟動

    sudo service influxdb start
  2. 以執行檔直接啟動

    cd /usr/bin
    ./influxd

舊版的 influxdb 有包含一個 Web 管理界面 Port 是 8083,但在 1.1 版以後就被移除了,HTTP API Port 是 8086

Command Line Client

在 /usr/bin 裡面有一個 influx 執行檔,也是連接到 TCP Port 8086,進行 Database 操作。

> influx
Connected to http://localhost:8086 version 1.3.6
InfluxDB shell version: 1.3.6
  1. 查詢資料庫

    > show databases
    name: databases
    name
    ----
    _internal
  2. 建立/刪除資料庫

    > create database test
    > show databases
    name: databases
    name
    ----
    _internal
    test
    > drop database test
  3. 使用資料庫

    influxdb 的 measurements 就等同 Relational DB 的 table,不需要 create table,可直接 insert。

    查詢語法類似 SQL 指令,沒有提供修改和刪除資料的方法

    > show measurements
    > insert disk_free,hostname=server01 value=442221834240i 1435362189575692182
    > show measurements
    name: measurements
    name
    ----
    disk_free
    > select * from disk_free
    name: disk_free
    time                hostname value
    ----                -------- -----
    1435362189575692182 server01 442221834240
    
    > drop measurement disk_free

    Point 由時間(time)、標籤(tags)、數據(field)組成,代表 measurement 裡面的一個 record。

    insert 語法中,measurement 跟資料用逗號(,)隔開,tag 及 field 之間,用空格隔開,多個 tag 或多個 field 之間,用逗號(,)隔開

    在這個語法中,disk_free 為 measurement,hostname=server01 是 tag,value=442221834240i 是 field,最後面 1435362189575692182 是 Time

    insert disk_free,hostname=server01 value=442221834240i 1435362189575692182
  4. series

    series 是 measurement 裡面的數據資料,表示可以在圖表上產生幾條線,series 是通過 tags 排列組合計算出來的

    > show series from disk_free
    key
    ---
    disk_free,hostname=server01

HTTP API

### 產生 mydb database
# curl -POST http://localhost:8086/query --data-urlencode "q=CREATE DATABASE mydb"
{"results":[{"statement_id":0}]}

### 寫入一筆資料
# curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1434055562000000000'

HTTP/1.1 204 No Content
Content-Type: application/json
Request-Id: 6306518c-b54a-11e7-8023-000000000000
X-Influxdb-Version: 1.3.6
Date: Fri, 20 Oct 2017 03:54:35 GMT

### 查詢
# curl -GET 'http://localhost:8086/query?pretty=true' --data-urlencode "db=mydb" --data-urlencode "q=SELECT value FROM cpu_load_short WHERE region='us-west'"
{
    "results": [
        {
            "statement_id": 0,
            "series": [
                {
                    "name": "cpu_load_short",
                    "columns": [
                        "time",
                        "value"
                    ],
                    "values": [
                        [
                            "2015-06-11T20:46:02Z",
                            0.64
                        ]
                    ]
                }
            ]
        }
    ]
}


### epoch=[h,m,s,ms,u,ns] 指定輸出的時間格式
# curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=mydb" --data-urlencode "epoch=s" --data-urlencode "q=SELECT value FROM cpu_load_short WHERE region='us-west'"
{
    "results": [
        {
            "statement_id": 0,
            "series": [
                {
                    "name": "cpu_load_short",
                    "columns": [
                        "time",
                        "value"
                    ],
                    "values": [
                        [
                            1434055562,
                            0.64
                        ]
                    ]
                }
            ]
        }
    ]
}

### chunk_size 限制查詢結果的資料筆數
# curl -G 'http://localhost:8086/query' --data-urlencode "db=mydb" --data-urlencode "chunk_size=200" --data-urlencode "q=SELECT value FROM cpu_load_short WHERE region='us-west'"
{"results":[{"statement_id":0,"series":[{"name":"cpu_load_short","columns":["time","value"],"values":[["2015-06-11T20:46:02Z",0.64]]}]}]}

數據保留策略 RP (Retention Policies)

因 influxdb 可處理大量資料,如果全部都儲存下來,會佔用大量 disk 空間,可設定 Retention Policies (RP) 決定要如何保留歷史資料。

查詢 RP

> use mydb
Using database mydb
>  SHOW RETENTION POLICIES ON mydb
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 0s       168h0m0s           1        true
  1. name: 名稱
  2. duration: 持續時間,0代表無限制
  3. shardGroupDuration: shardGroup的存儲時間,shardGroup是InfluxDB的一個基本儲存結構,大於這個時間的數據在查詢效率上應該有所降低。
  4. replicaN: 全稱是REPLICATION,副本個數
  5. default: 是否是默認策略
> CREATE RETENTION POLICY "2_hours" ON "mydb" DURATION 2h REPLICATION 1 DEFAULT
> SHOW RETENTION POLICIES ON mydb
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 0s       168h0m0s           1        false
2_hours 2h0m0s   1h0m0s             1        true
> drop RETENTION POLICY "2_hours" ON "mydb"
>  SHOW RETENTION POLICIES ON mydb
name    duration shardGroupDuration replicaN default
----    -------- ------------------ -------- -------
autogen 0s       168h0m0s           1        false

連續查詢 CQ (Continuous Queries)

連續查詢主要用在將資料歸檔,以降低系統空間的佔用,主要是以降低精度為代價是。CQ 是在數據庫中自動定時啟動的一組語法,必須包含 SELECT 關鍵詞和 GROUP BY time() 關鍵詞,會將查詢結果放在指定的資料表中。

CREATE CONTINUOUS QUERY <cq_name> ON <database_name> 
[RESAMPLE [EVERY <interval>] [FOR <interval>]] 
BEGIN SELECT <function>(<stuff>)[,<function>(<stuff>)] INTO <different_measurement> 
FROM <current_measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<stuff>] 
END

測試

# curl -i -XPOST 'http://localhost:8086/write?db=mydb' --data-binary 'cpu_load_short,host=server01,region=us-west value=0.64 1508479696000000000'

# influx
Connected to http://localhost:8086 version 1.3.6
InfluxDB shell version: 1.3.6
> use mydb
Using database mydb
> select * from cpu_load_short
name: cpu_load_short
time                host     region  value
----                ----     ------  -----
1508479696000000000 server01 us-west 0.64

> CREATE RETENTION POLICY "2_hours" ON "mydb" DURATION 2h REPLICATION 1 DEFAULT
> 
> CREATE CONTINUOUS QUERY cq_30m ON mydb BEGIN SELECT mean(cpu_load_short) INTO mem_load_30m FROM mem GROUP BY time(30m) END
> 
> SHOW CONTINUOUS QUERIES
name: _internal
name query
---- -----

name: test
name query
---- -----

name: mydb
name   query
----   -----
cq_30m CREATE CONTINUOUS QUERY cq_30m ON mydb BEGIN SELECT mean(cpu_load_short) INTO mydb."2_hours".mem_load_30m FROM mydb."2_hours".mem GROUP BY time(30m) END

References

InfluxDB

InfluxDB學習系列教程

[Linux] 安裝與使用 influxDB @ Ubuntu 14.04

2017年10月30日

protobuf

Protocol Buffers - Google's data interchange format Protocol Buffers 是一種輕便高效的結構化數據存儲格式,可以用於資料序列化,適合做儲存資料或 RPC 資料交換的格式,可用在通訊協定、數據儲存等領域,是一種與程式語言無關、平台無關、可擴展的序列化結構資料格式。

安裝 protobuf compiler

在 mac 可透過 macport 安裝 protobuf 的 compiler for java, cpp

sudo port install protobuf-java

安裝時會同時安裝

protobuf-cpp

如果要用 protobuf 3,則要安裝 protobuf3-java,如果已經裝了 protobuf-java,必須要先移除後才能安裝 protobuf3

sudo port uninstall protobuf-java
sudo port uninstall protobuf-cpp

sudo port install protobuf3-java

安裝時會同時安裝

maven3
maven_select
protobuf3-cpp

使用 protoc 的 command line 指令

protoc --proto_path=IMPORT_PATH --cpp_out=DST_DIR --java_out=DST_DIR --python_out=DST_DIR --go_out=DST_DIR --ruby_out=DST_DIR --javanano_out=DST_DIR --objc_out=DST_DIR --csharp_out=DST_DIR path/to/file.proto

ex:

protoc -I=. --java_out=. ./file.proto

.proto 資料格式文件

安裝 protobuf compiler 後,需要編寫一個 .proto 的資料格式文件,用來描述要傳遞的資料內容。

目前官方有提供 proto3 的規格,但還沒有把 tutorial 的部分由 proto2 更新到 proto3。

以下為 proto3 language guide 的內容:

Defining A Message Type 定義 message

syntax = "proto3";

message SearchRequest {
  string query = 1;
  int32 page_number = 2;
  int32 result_per_page = 3;
}

如果是 proto2 就不需要寫 syntax = "proto3";

Specifying Field Types 指定資料型別

資料型別可以是 scalar type 或是 enum 或是其他 message

  • Assigning Tags

    等號後面的的數字 field number 代表 message 中的標記欄位,不能改變。最小的field number 設為 1, 編碼後的tag佔1 byte。最大的field number 設為 2^29 - 1, 編碼後的tag佔5 bytes。19000~19999 是 protobuf 保留的 field number,不能在 .proto 中使用

    field number 需要的 bytes 數量
    1~15 1
    16~2047 2
    2^11~2^18-1 3
    2^18~2^25-1 4
    2^25~2^32-1 5
  • Specifying Field Rules

    singular : 0 or 1 個值,不能超過1個

    repeated : 0 or 1 or more,序列化和反序列化過程中,會保留 repeated values 的順序。在proto3中,repeated fields of scalar numeric types默認使用packed方式。

  • Adding More Message Types

    在一個.proto文件中可以定義多個messages。

    message SearchRequest {
      string query = 1;
      int32 page_number = 2;
      int32 result_per_page = 3;
    }
    
    message SearchResponse {
     ...
    }
  • Add Comments

    使用 // 註釋

    message SearchRequest {
      string query = 1;
      int32 page_number = 2;  // Which page number do we want?
      int32 result_per_page = 3;  // Number of results to return per page.
    }
  • Reserved Fields

    如果更新一個message時,需要完全移除某個欄位,但未來可能又重新使用這個欄位標籤,這可能會在導入舊版本的.proto文件時產生嚴重的bug。因此,需要將deleted fields標記為reserved。reserved field number 和 field name 應該分開寫。

    message Foo {
      reserved 2, 15, 9 to 11;
      reserved "foo", "bar";
    }
  • What's Generated From Your .proto?

    使用protobuf的編譯器protoc編譯.proto文件時,編譯器會產生與message類型相關的所選語言的代碼。包括但不限於:

    1. getting and setting field values
    2. serializing your messages to an output stream
    3. parsing your messages from an input stream

    以 C++ 及 Java 為例: C++ :產生1個 .h 文件和1個 .cc 文件 Java:產生1個 .java 文件,其中每個message class都包括Builder

  • Scalar Value Types

.proto Type Java Type C++ Type 備註
double double double
float float float
int32 int int32 使用variable-length encoding。如果你的資料可能含有負數,那麼請使用sint32。
int64 long int64 使用variable-length encoding。如果你的字段可能含有負數,那麼請使用sint64。
uint32 int[1] uint32 使用variable-length encoding。
uint64 long[1] uint64 使用variable-length encoding。
sint32 int int32 使用variable-length encoding。對於有負數的整數,比平常使用的int32效率還好。
sint64 long int64 使用variable-length encoding。
fixed32 int[1] uint32 總是4個 bytes。如果數值總是比 228 大的話,這個資料型別會比uint32效率還好。
fixed64 long[1] uint64 總是8個 bytes。如果數值總是比 256 大的話,這個資料型別會比uint64效率還好。
sfixed32 int int32 4個 bytes
sfixed64 long int64 8個 bytes
bool boolean bool
string String string 必須是UTF-8編碼或者7-bit ASCII編碼
bytes ByteString string 可能包含任意順序的資料
  • Default Values

    當parse一個message時,如果編碼的message不包括某個singular field,解碼的message的相應的field則被設為默認值。

    1. strings : empty string
    2. bytes : empty bytes
    3. bools : false
    4. numeric types : 0
    5. enums : first defined enum value, which must be 0
    6. message fields : 依賴於不同的語言實現。
    7. repeated fields : empty list

    一旦一個message被解析,我們並不知道某個field是明確地被設為默認值,還是未設定從而自動被設為默認值。因此,我們定義message時,並不能依賴默認值的行為。 例如,如果你有一個bool field isMine, true表示這個方塊(是Mine),false表示這個方塊(不是Mine)。當你收到一個message isMine = false,那這個方塊到底(不是雷)還是(未設定)呢? 因此,不要依賴任何默認值的行為。更應該注意的是,如果一個field的value是默認值,將不會被序列化。

  • Enumerations

    1. 第一個 enum value 必須為0。
    2. 必須有1個0值作為枚舉的默認值。proto2使用第一個枚舉值作為默認值,為了和proto2兼容,必須將一個枚舉值設為0。
    3. 可以定義aliases來使得不同的枚舉量映射到同一枚舉值。
    message SearchRequest {
      string query = 1;
      int32 page_number = 2;
      int32 result_per_page = 3;
      enum Corpus {
        UNIVERSAL = 0;
        WEB = 1;
        IMAGES = 2;
        LOCAL = 3;
        NEWS = 4;
        PRODUCTS = 5;
        VIDEO = 6;
      }
      Corpus corpus = 4;
    }
  • Using Other Message Types

    可在一個 message 中使用另一個 message

    message SearchResponse {
      repeated Result results = 1;
    }
    
    message Result {
      string url = 1;
      string title = 2;
      repeated string snippets = 3;
    }

    Nested Types

    message Outer {       // Level 0
      message MiddleAA {  // Level 1
        message Inner {   // Level 2
          int64 ival = 1;
          bool  booly = 2;
        }
      }
      message MiddleBB {  // Level 1
        message Inner {   // Level 2
          int32 ival = 1;
          bool  booly = 2;
        }
      }
    }
  • Importing Definitions

    使用另一個.proto文件中的定義,需要import

    1. import : 只能直接使用,不可傳遞
    2. import public : 可以傳遞
  • Using proto2 Message Types

    proto2的enum不能用在proto3中,其他的都理論上是相容的。

  • Updateing Message Type

    更新 Message Type 定義時,要注意以下事項

    1. 不要修改任一個 field 的 numeric 標籤
    2. 增加新的 field 時,舊訊息可以用新的 code parsing,同樣的,parsing 舊訊息的程式碼還是能用來 parsing 新訊息。
    3. 可以移除 fields,只要沒有重用 rag number,但也可以修改欄位名稱作為標記 ex: 加上 prefix: "OBSOLETE_",或是將該欄位標記為 reserved。
    4. int32, uint32, int64, uint64, and bool are all compatible,也就是說可以隨時改變為這幾種資料型別中的任一種。
    5. sint32 and sint64 可以互換,但跟其他整數型別不相容。
    6. string and bytes 如果使用 UTF-8 就是相容的。
    7. 內嵌的訊息是跟 bytes 相容的。
    8. fixed32 跟 sfixed32 相容。fixed64 跟 sfixed64 相容
    9. enum 跟 int32, uint32, int64, and uint64 相容。
  • Unknown Fields proto3 可處理未定義的欄位資料,但不同語言的實作有可能會刪除這些未知的欄位,這跟 proto2 的做法不同。

  • Any

    可使用 Any 嵌入沒有 .proto 定義的 message,Any 就是以 bytes 的方式序列化,使用 Any 必須要 import google/protobuf/any.proto

    import "google/protobuf/any.proto";
    
    message ErrorStatus {
      string message = 1;
      repeated google.protobuf.Any details = 2;
    }

    Any 用來取代 proto2 的 extensions

  • Oneof 訊息只會包含多個欄位其中的某一個欄位,Oneof 不能設定為 repeated

    message SampleMessage {
      oneof test_oneof {
        string name = 4;
        SubMessage sub_message = 9;
      }
    }
  • Map

    map<string, Project> projects = 3;

    Map 不能為 repeated,maps are sorted by key

  • Packages

    package foo.bar;
    message Open { ... }
  • Defining Services

    可定義 RPC service interface,compiler 會自動產生 code and stubs

    service SearchService {
      rpc Search (SearchRequest) returns (SearchResponse);
    }

    protobuf 最直覺的 RPC 系統為 gRPC: a language- and platform-neutral open source RPC system developed at Google

proto3 與 proto2 的差異

proto3 比 proto2 支持更多語言,語法更簡單,去掉了一些複雜的語法和特性。

  1. 在第一行非空白非註釋行,必須寫:

    syntax = "proto3";
  2. 移除了 "required"

  3. "repeated" 默認採用 packed 編碼;

    在 proto2 中,需要明確使用 [packed=true] 來指定 packed 編碼方式

  4. 語言增加 Go、Ruby、JavaLite

  5. 移除了 default

    在 proto2 中,可以使用 default 選項為某一字段指定默認值。在 proto3 中,字段的默認值只能根據資料類型由系統決定。在資料被設置為默認值的時候,該字段不會被序列化。這樣可以節省空間,提高效率。

    但這樣就無法區分某資料是根本沒有數值,還是被設定為 default value。這在 proto3 中問題不大,但在 proto2 中會有問題。更新協議的時候使用 default 選項為某個字段指定了一個與原來不同的默認值,舊的程式取到的該值會與新程式不一樣。

  6. enum 類型的第一個數值必須為 0

  7. 移除了對分組的支援

    分組的功能完全可以用消息嵌套的方式來實現,並且更清晰。在 proto2 中已經把分組語法標註為"expired"了。

  8. 舊程式在解析新增字段時,會把不認識的變數丟棄,再序列化後新增的變數就不見了

    在 proto2 中,舊程式雖然會忽視不認識的新增變數,但並不會將其丟棄,再序列化的時候那些字段會被原樣保留。

    但官方終於同意在 proto3 中恢復 proto2 的處理方式了。

  9. 移除了對擴展的支持,新增了 Any 類型

    Any 類型是用來替代 proto2 中的擴展的。目前還在開發中。

    proto2 中的擴展特性很像 Swift 語言中的擴展。理解起來有點困難,使用起來更是會帶來不少混亂。

    相比之下,proto3 中新增的 Any 類型有點像 C/C++ 中的 void* 。

  10. 增加了 JSON 映射特性

ref: Protobuf 的 proto3 與 proto2 的區別

References

比起 JSON 更方便、更快速、更簡短的 Protobuf 格式

Protocol_Buffers wiki

Google Protocol Buffer 的使用和原理

Protobuf 語法指南

protoc java

ScalaPB: protobuf for scala


erlang gpb

erlang_protobuffs

2017年10月23日

Thrift

Apache Thrift 提供了一個產生不同語言程式碼的 compiler engine,可在不同程式語言(C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi)環境之間,透過 Thrift 以 RPC 方式傳輸資料。

安裝 thrift compiler tool

thrift compiler tool 是產生 thrift 程式碼的工具。

在 macos,可透過 macport 安裝,在 thrift Portfile 中查看目前支援的語言是 java, c#, C, haskell, php, erlang,還不是很清楚為什麼不支援 go, python, ruby 等語言。

sudo port install thrift +java +erlang
$ thrift --version
Thrift version 0.10.0

在 CentOS 可直接編譯

cd thrift-0.10.0
./configure
make
sudo make install

.thrift 文件

在安裝 thrift compiler 後,要撰寫一份 .thrift 定義文件,該檔案是一份以 thrift types 及 Services 定義的interface definition,其中 Services 就是 server 要實作的功能,並提供給 client 呼叫,然後才能用 compiler 產生某個對應程式語言的程式碼,使用方式如下。

thrift --gen <language> <Thrift filename>

Thrift Types

Base Types

一般程式語言都提供的最基本的資料類型

  • bool: A boolean value (true or false)
  • byte: An 8-bit signed integer
  • i16: A 16-bit signed integer
  • i32: A 32-bit signed integer
  • i64: A 64-bit signed integer
  • double: A 64-bit floating point number
  • string: A text string encoded using UTF-8 encoding

Special Types

  • binary: a sequence of unencoded bytes

    這是上述 string 的特殊格式,可用在跟 java 系統之間有更好的資料收送,未來可能會提升為 base type

Structs

定義 common object,這等同於 OOP 中的 class,但沒有繼承的機制,前面的數字 1: 是要讓 compiler 可針對不同版本 IDL 識別,可填上 default value,也可以設定為 optional。

ex:

struct User {  
    1: i16 gender = 1,  
    2: string username,  
    3: string password,  
    4: i32 id  
}

Containers

有三種 container types

  • list: 有順序的元素集合 An ordered list of elements,會對應到 STL vector, Java ArrayList, native arrays in scripting languages

  • set: 沒有順序的元素集合 An unordered set of unique elements,會對應到 an STL set, Java HashSet, set in Python, etc.

    Note: PHP 不支援 sets,會以 List 方式處理。

  • map: 唯一的 key 對應 value 的集合 A map of strictly unique keys to values,會對應到 an STL map, Java HashMap, PHP associative array, Python/Ruby dictionary, etc.

    如果有初始值時,可以在不同的程式語言中,以自訂的 code generator 替代為 custom data type

    為了提供最佳的相容性,key 的 data type 最好使用 Basic Type

Exceptions

Exception 等同於 structs,是繼承自不同程式語言的 native exception base class

exception NotFoundException{  
    1:i16 errorType,  
    2:string message  
}

Services

以 Thrift type 定義 services,其作用就像是 OOP 裡面的 interface,Thrift compiler 可以自動產生實作 client, server 的 stubs codes,service 之間有提供繼承的機制。

service 包含了一組 functions,每一個都有一個參數 list 及 return type。

return type 可以使用 void,也就是不回傳資料,但實際上,server 還是會回傳一個 response 給 client,用來告訴 client 已經把所有工作都做完了。

可以在 function 上增加一個 oneway modifier,這代表 clietn 不會等待 server 的 response,這表示只能保證 client 會呼叫 server 的 function,但不能保證 server 會依照呼叫的順序執行該 function。

service <name> {  
  <returntype> <name> (<arguments>)[throws (<exceptions>)]
}
service UserService{  
  void saveUser(1:User user),  
  User get(1:i32 id) throws (1:NotFoundException nfe),
  oneway void zip()
}
service Calculator extends shared.SharedService 

其他

  • include

    通過include引用其他的thrift文件,默認在當前路徑下尋找,也可以在相對路徑下尋找,需要通過編譯參數 -I 來設置

  • namespace 與 java 的 package 作用一樣

namespace java thrift.sa
namespace python thrift.sa
  • 常數
const i32 INT32CONSTANT = 9853
const map<string,string> MAPCONSTANT = {'hello':'world', 'goodnight':'moon'}
  • enum
enum Operation {  
  ADD = 1,  
  SUBTRACT = 2,  
  MULTIPLY = 3,  
  DIVIDE = 4  
}

Java 支援的傳輸格式, 方式

  • 支援的傳輸格式

    1. TBinaryProtocol 二進制格式.
    2. TCompactProtocol 壓縮格式
    3. TJSONProtocol JSON格式
    4. TSimpleJSONProtocol 提供 JSON 只寫協議, 生成的文件很容易通過腳本語言解析
    5. TDebugProtocol 使用易懂的可讀的文本格式,以便於debug
  • 支援的數據傳輸方式

    1. TSocket 阻塞式 socket server
    2. TFramedTransport 以frame為單位進行傳輸,非阻塞式服務中使用
    3. TFileTransport 以文件形式進行傳輸。
    4. TMemoryTransport 將 memory 用於I/O,java 實作時內部實際使用了簡單的ByteArrayOutputStream
    5. TZlibTransport – 使用zlib進行壓縮, 與其他傳輸方式聯合使用,目前無java實現
  • 支援的服務模型

    1. TSimpleServer 簡單的單線程服務模型,常用於測試
    2. TThreadPoolServer 多線程服務模型,使用標準的阻塞式IO。
    3. TNonblockingServer 多線程服務模型,使用非阻塞式IO(需使用TFramedTransport數據傳輸方式)

Example - bookservice.thrift

先寫 bookservice.thrift,然後以 thrift compiler 產生 java 及 erlang 的 code

namespace java tw.com.maxkit.test

struct Book_info{
    1: i32 book_id;
    2: string book_name;
    3: string book_author;
    4: double book_price;
    5: string book_publisher
}

service BookSender{
    void ping(),

    i32 add(
        1:i32 num1, 2:i32 num2
    ),

    bool sender(
        1: list<Book_info> books
    );

    oneway void sender2(
        1: list<Book_info> books
    );
}

產生 java 及 erlang 的 code

thrift --gen erl bookservice.thrift
thrift --gen java bookservice.thrift
gen-erl/
    book_sender_thrift.erl
    book_sender_thrift.hrl
    bookservice_constants.hrl
    bookservice_types.erl
    bookservice_types.hrl
gen-java/tw/com/maxkit/test/
    Book_info.java
    BookSender.java

Java

BookServiceHandler 實作 Server Side 的四個 function

import tw.com.maxkit.test.*;


public class BookServiceHandler implements BookSender.Iface {

    public BookServiceHandler() {
    }

    public void ping() {
        System.out.println("ping()");
    }

    public int add(int n1, int n2) {
        System.out.println("add(" + n1 + "," + n2 + ")");
        return n1 + n2;
    }

    public boolean sender(java.util.List<Book_info> books) throws org.apache.thrift.TException {

        System.out.println("Sender get books");
        for(Book_info b: books) {
            System.out.println("get book "+b.book_id);
        }
        return true;
    }

    public void sender2(java.util.List<Book_info> books) throws org.apache.thrift.TException {
        System.out.println("Sender2 get books");
        for(Book_info b: books) {
            System.out.println("get book "+b.book_id);
        }
    }

}

BookServer

import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServer.Args;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;

import tw.com.maxkit.test.*;

public class BookServer {

    public static BookServiceHandler handler;

    public static BookSender.Processor processor;

    public static void main(String [] args) {
        try {
            handler = new BookServiceHandler();
            processor = new BookSender.Processor(handler);

            Runnable simple = new Runnable() {
                public void run() {
                    simple(processor);
                }
            };
            Runnable secure = new Runnable() {
                public void run() {
                    secure(processor);
                }
            };

            new Thread(simple).start();
            new Thread(secure).start();
        } catch (Exception x) {
            x.printStackTrace();
        }
    }

    public static void simple(BookSender.Processor processor) {
        try {
            TServerTransport serverTransport = new TServerSocket(9090);
            TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));

            // Use this for a multithreaded server
            // TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(processor));

            System.out.println("Starting the simple server...");
            server.serve();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void secure(BookSender.Processor processor) {
        try {
            TSSLTransportParameters params = new TSSLTransportParameters();
            // The Keystore contains the private key
            params.setKeyStore("keystore", "max168kit", null, "PKCS12");

            TServerTransport serverTransport = TSSLTransportFactory.getServerSocket(9091, 0, null, params);
            TServer server = new TSimpleServer(new Args(serverTransport).processor(processor));

            // Use this for a multi threaded server
            // TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(processor));

            System.out.println("Starting the secure server...");
            server.serve();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

BookClient

import org.apache.thrift.TException;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TSSLTransportFactory.TSSLTransportParameters;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import tw.com.maxkit.test.BookSender;
import tw.com.maxkit.test.Book_info;

import java.util.ArrayList;

public class BookClient {
    public static void main(String[] args) {

        String servertype="simple";
        if (args.length == 1) {
            //System.out.println("Please enter 'simple' or 'secure'");
            //System.exit(0);
            servertype=args[0];
        }

        try {
            TTransport transport;
            if (servertype.contains("simple")) {
                transport = new TSocket("localhost", 9090);
                transport.open();
            } else {
                TSSLTransportParameters params = new TSSLTransportParameters();
                params.setTrustStore("keystore", "max168kit", null, "PKCS12");
                transport = TSSLTransportFactory.getClientSocket("localhost", 9091, 0, params);
            }

            TProtocol protocol = new TBinaryProtocol(transport);
            BookSender.Client client = new BookSender.Client(protocol);

            perform(client);

            transport.close();
        } catch (TException x) {
            x.printStackTrace();
        }
    }

    private static void perform(BookSender.Client client) throws TException {
        client.ping();
        System.out.println("ping()");

        int sum = client.add(1, 1);
        System.out.println("1+1=" + sum);

        Book_info b1 = new Book_info(1, "name1", "author1", 11.1, "publisher1");

        Book_info b2 = new Book_info(2, "name2", "author2", 22.2, "publisher2");

        ArrayList list = new ArrayList();
        list.add(b1);
        list.add(b2);

        boolean result = client.sender(list);
        System.out.println("sender1 result="+result);

        client.sender2(list);
        System.out.println("sender2 done");

    }
}

Erlang

book_server.erl

-module(book_server).
-include("book_sender_thrift.hrl").

%% API
-export([start/0, handle_function/2, ping/0, add/2, sender/1, sender2/1, stop/1]).

debug(Data)->
  io:format("Debug info:~s~n",[Data]).
debug(Format, Data) ->
  error_logger:info_msg(Format, Data).

ping() ->
  debug("ping()",[]),
  ok.

add(N1, N2) ->
  debug("add(~p,~p)",[N1,N2]),
  N1+N2.

sender(_L1) ->
  true.

sender2(_L1) ->
  ok.

start()->
  start(9090).

start(Port)->
  Handler = ?MODULE,
  debug("1",[]),
  Res=thrift_socket_server:start([{handler, Handler},
    {service, book_sender_thrift},
    {port, Port},
    {name, book_server}]),
  debug("2 ~p ~n",[Res]),
  Res.

stop(Server)->
  thrift_socket_server:stop(Server).

%%handle_function(Function, Args) when is_atom(Function), is_tuple(Args) ->
%%  case Function of
%%    ping ->
%%      {reply, ping()};
%%    add ->
%%      {reply, add(tuple_to_list(Args))};
%%    % add function here
%%    _ ->
%%      error
%%  end.
handle_function(Function, Args) when is_atom(Function), is_tuple(Args) ->
  debug("handle_function ~n",[]),
  case apply(?MODULE, Function, tuple_to_list(Args)) of
    ok -> ok;
    Reply -> {reply, Reply}
  end.

book_client.erl

-module(book_client).

%% API
-export([]).

%% API
-export([ping/0]).

-spec ping() -> ok.

ping() ->
  io:format("call thrift server\n"),
%%  {ok, Port} = application:get_env(larzio, agent_port),
%%  {ok, Ip} = application:get_env(larzio, agent_server),
  Port = 9090,
  Ip = "localhost",
  {ok, Client0} = thrift_client_util:new(Ip, Port, book_sender_thrift, []),
  {Client1, {ok, ok}} = thrift_client:call(Client0, ping, []),
  {Client1, {ok, AddResult}} = thrift_client:call(Client0, add, [1, 2]),
  io:format("add result ~p ~n", [AddResult]),
  {_Client8, ok} = thrift_client:close(Client1),
  io:format("call thrift done~n"),
  ok.

Reference

thrift 教程

初探Thrift客戶端異步模式

[Android] apache thrift 入門與android上的實作

Apache Thrift 官方JAVA教程

erlang+thrift配合開發

erlang server.erl