2016年8月29日

Currying 柯里化 in Scala

在 functional programming 中,Currying 是由 Moses Schönfinkel and Gottlob Frege 發明,並以 Haskell Brooks Curry 來命名,核心概念是將一個多個參數的函數轉換成只接受一個參數的函數,並回傳一個接受剩下的參數的新函數 (the other arguments having been specified by the curry)。

Currying 的優點在於,可以將所有的函數,都轉換成只接受一個參數,並回傳一個值的固定形式,也就是用多個單一參數的函數,組合成多參數的函數。

Currying

add 是一個一般的 function,有兩個參數,所以呼叫時必須直接傳入兩個參數 add(1,2),而 add2 是 currying 後的 function,可以分成兩個部分來看,(y:Int) => x+y 是一個包含了 closure 變數 x 的 匿名函數,再以一個參數 x 的 function 封裝這個匿名函數。

scala> def add(x:Int, y:Int) = x+y
add: (x: Int, y: Int)Int

scala> def add2(x:Int) = (y:Int) => x + y
add2: (x: Int)Int => Int
scala> add(1,2)
res14: Int = 3

scala> add2(1)(2)
res16: Int = 3

然後就可以分兩段呼叫 add2,但呼叫時必須依照參數的順序,不能先指定 y 的值。

scala> add2(1)
res17: Int => Int = <function1>

scala> add2(1)(2)
res18: Int = 3

如果要直接使用 (y:Int) => x +y 這個匿名函數,前面必須要先能定義 x 得值,當無法先知道 x 的數值時,我們就以一個新的函數,加上參數 x,再後面使用這個新的函數,也就是剛剛 add2 的 currying 化的版本。

scala> val x=1
x: Int = 1

scala> val add5 = (y:Int) => x +y
add5: Int => Int = <function1>

scala> add5(2)
res25: Int = 3

Uncurrying

如果沒有 Currying 的功能,其實只用匿名函數也可以實現多參數函數。在 Scala 的 Function 中,提供了一個將 currying 化參數展開的 uncurried 方法。

展開後,就跟 add(x:Int, y:Int) 使用起來是一樣的。

scala> def add2(x:Int) = (y:Int) => x + y
add2: (x: Int)Int => Int

scala> val add3 = Function.uncurried(add2 _)
add3: (Int, Int) => Int = <function2>

scala> add3(1,2)
res15: Int = 3

Partial Function

Partial Function 跟 Currying 化的 function 看起來很類似,但實際上並不同,Partial Function 是將一個多參數的函數,給定其中幾個參數的值,然後能得到一個新的函數。

以剛剛的 add 為例,原本的 add 需要兩個整數參數 x 及 y,但我們可以先給定 y 的值,得到新的函數 add4,然後在後面使用 add4。

scala> val add4 = add(_:Int, 2)
add4: Int => Int = <function1>

scala> add4(1)
res21: Int = 3

這跟 currying 最大的差異是,把一個多參數的函數 currying 化的時候,必須要從第一個參數開始,一個一個拆解成單一參數的函數,但 Partial Function 能直接給定眾多參數中的某幾個參數,不需要依照參數的順序依序拆解。

Function Composition

functional programming 的概念最重要的是從數學的代數理論而來的,因為在進行代數運算時,通常都是一些方程式跟變數的推導,因此會產生出很多以函數來進行運算的概念。

舉個最簡單的例子來說,函數的基本定義如下,y是x的函數

y=f(x)

但如果有另一個函數 z,是 y 的函數,我們會寫成

z=g(y)

做個簡單的代換,y 可以直接換成 f(x),也就是說,z 可以經過兩次函數運算,得到結果

z=g(f(x))

以下兩篇文章,以一個 summation 的函數為例,說明如何以 scala 實作,functional programming 的用途最重要的是可以用最接近數學代數的方式,進行程式設計,不同於物件導向的出發點,是要以物件、繼承等等概念反應現實生活中的模型。

如何在Scala中實現合成函數

Scala中的Currying

就如同 Scala中的函數式特性 所說一段話:2003年,一個叫Martin Odersky 的醉漢看見了好時瑞森花生醬杯的廣告,展示了某個人的花生醬倒入另一個人的巧克力的場景,他忽然有了個點子,創造了Scala,一種結合了面向對象和函數式編程的語言。這同時激怒了兩個陣營的忠實信徒,他們立刻宣佈要發動聖戰燒死異教徒。

整合了兩者而誕生的 Scala 註定會一直存在著無解的爭議。

References

柯里化

scala Currying

Function Currying in Scala

柯裡化對函數式編程有何意義?

Swift 柯裡化(Currying)

柯裡化在scala中用途?

JS 柯里化(curry)

為什麼要柯裡化(why-curry-helps)

2016年8月22日

Lets Encrypt SSL Key 申請與更新

Let's Encrypt是由EFF、Mozilla基金會、Akamai和Cisco等等許多大公司及非營利組織於2014年共同創立的ISRG組織所成立的數位憑證認證機構,目標是讓提供免費申請並自動更新的憑證服務,推廣及加速全球網站採用HTTPS安全的加密傳輸協定。

Let's Encrypt已在2016年4月正式進入穩定階段,原本在舊版 windows xp 的相容性問題也解決了,需要注意的是,Let's Encrypt簽發的憑證有效期為3個月(90天),也就是說網站每3個月都需要重新更新一次憑證,但我們可以透過 renew script 來定期更新憑證。

ACME (Automated Certificate Management Environment) protocol

在 Let's Encrypt 的 How It Works 有說明實作的原理,是使用了 ACME (Automated Certificate Management Environment) Protocol,但是 Let's Encrypt 實作的是自己的 Automated Certificate Management Environment (ACME),實際上在 IETF 有另一個 ACME spec: IETF ACME,Let's Encrypt 規格文件中並不保證能夠影響 IETF ACME,Let's Encrypt 應該是為了在市場驗證才先提供這樣的服務,未來在 IETF ACME 標準化之後,或許可能會經歷一段相容性的過渡期。

Let's Encrypt 在去年 beta 時還是以 letsencrypt script 提供申請與更新的服務,今年就改以 EFF certbot release 正式版。

certbot

要安裝 certbot 之前,必須先將要申請的 domain name 的 DNS A record 指向到要申請的機器上,例如 named 的設定中,將 testdomain.com.tw 以及 c1.testdomain.com.tw 指向到 211.72.214.209。

# testdomain.zone

@   IN  A   211.72.214.209
c1  IN  A   211.72.214.209

要安裝 cerbot,就直接在 官網 的下面選擇要安裝的 web server 以及 OS,目前我們是選用 Apache on CentOS 6。

sudo yum install epel-release
wget https://dl.eff.org/certbot-auto
mv certbot-auto certbot

## 如果是 centos 7 則是直接用 yum 安裝 certbot
# yum install certbot

chmod a+x certbot

直接執行 cerbot-auto 可以將 certbot 所有需要的相依性套件都安裝好

./certbot

certbot 支援幾種 plugins:Apache, Webroot, Standalone, Manual, Nginx, 其他 plugins。Standalone 的部分適合機器上沒有 web server daemon 的設定方式,我們可以使用 Apache 或是 Webroot,Apache 或 Nginx 的方式比較簡單,如果是 Webroot 的方式,需要讓 web server 可以存取 .well-known 這個目錄的檔案。

如果只要申請 terdomain.com.tw 這個 domain 的 certificate,只需要執行以下的指令,並依照畫面的問題填寫內容。

./certbot --apache

如果需要一個簽章裡面有多個 sub domain,可以直接在 command line 填寫兩個 domain names。

./certbot --apache -d testdomain.com.tw -d c1.testdomain.com.tw -m test@testdomain.com.tw

可以用 ssltest 檢測 https://www.ssllabs.com/ssltest/analyze.html?d=testdomain.com.tw 憑證的狀態。

檢測結果是 C,ssltest 建議要 disable SSL3,disable RC4 cipher,修改 ssl.conf

SSLProtocol all -SSLv2 -SSLv3

SSLHonorCipherOrder on

SSLCipherSuite ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:!aNULL:!MD5:!DSS

然後重新啟動 httpd,再去 ssltest 重新檢測一次,可以達到 Rating A。

Debian 8 安裝程序

安裝 certbot for apache web server & debian 8 (jessie)

sudo apt-get install python-certbot-apache -t jessie-backports

certbot --apache

自動更新 ssl

certbot renew --dry-run

將 certbot 放到 cronjob/systemd

certbot renew --quiet

自動更新

以下指令可測試 cerbot 自動更新是否可以正常運作

./certbot renew --dry-run

將 certbot-auto 移動到 /etc 目錄中,在 crontab 增加一行,每週一凌晨 1:00AM 進行一次 renew。

* 1 * * 1 root /etc/certbot renew --quiet

Rate Limit

Let's Encrypt 在發送簽證有幾個限制,最後兩個是針對開發 ACME client 的限制,可以用 staging environment 替代,就可以避免該限制: Rate Limits for Let’s Encrypt

  1. 在單一簽證上,最多只支援 100 個 names

  2. 每週每個 domain 限制只能申請 20 個 certificates

  3. 每週每個 FQDN,只能申請 5 個 certificates

  4. 每 3 hrs 接受 500 次註冊動作

  5. 每週每個 account 只能驗證 300 次

References

Let's Encrypt

Let's Encrypt Get Started

letsencrypt

申請​​ Let's Encrypt 免費​ SSL ​​憑證於​在 NGINX​ 伺服器上配置和​自動更新教學

免費SSL加密: Let's Encrypt 設定教學

2016年8月15日

Apache Zeppelin 測試記錄

Zeppelin 是 Big Data 資料分析的筆記本,以網頁為介面,筆記本可以共用、分享、共筆,支援 scala, java, shell script, markdown, SparkSQL 等語法,可以直接在網頁筆記本上面,製作筆記、執行並即時取得結果,在資料分析後,可以將結果直接以圖表方式展現出來,甚至還可以自訂自己的語法。

目前 Zeppelin git 的版本是 0.6,還沒有正式 release,脫離 incubator 的階段,成為一個成熟的產品。

Zeppelin Installation

安裝一些必要的套件

rpm -ivh jdk-8u72-linux-x64.rpm
yum install git nodejs npm libfontconfig

# install maven
wget http://www.eu.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz
sudo tar -zxf apache-maven-3.3.3-bin.tar.gz -C /usr/local/
sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn

直接由 git 取得 zeppelin 的 source code,然後進行編譯。

git clone https://github.com/apache/incubator-zeppelin

mv incubator-zeppelin zeppelin
mv zeppelin /usr/local/

cd /usr/local/zeppelin

mvn install -DskipTests

zeppelin-web 在建構過程中會出錯

首先要修改 zeppelin-web/pom.xml,把以下這個區塊的 plugin 註解掉:

<plugin>
        <groupId>com.github.eirslett</groupId>
....
</plugin>
cd zeppelin-web
mvn clean

npm install
./bower --allow-root install
./grunt build

mvn install -DskipTests

cd ..

mvn install -DskipTests

mvn package -Pspark-1.5 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests

編譯後的結果會看到

[INFO] Reactor Summary:
[INFO]
[INFO] Zeppelin .......................................... SUCCESS [23.566s]
[INFO] Zeppelin: Interpreter ............................. SUCCESS [21.797s]
[INFO] Zeppelin: Zengine ................................. SUCCESS [9.997s]
[INFO] Zeppelin: Display system apis ..................... SUCCESS [5.388s]
[INFO] Zeppelin: Spark dependencies ...................... SUCCESS [33.753s]
[INFO] Zeppelin: Spark ................................... SUCCESS [10.863s]
[INFO] Zeppelin: Markdown interpreter .................... SUCCESS [4.452s]
[INFO] Zeppelin: Angular interpreter ..................... SUCCESS [3.246s]
[INFO] Zeppelin: Shell interpreter ....................... SUCCESS [3.275s]
[INFO] Zeppelin: Hive interpreter ........................ SUCCESS [6.779s]
[INFO] Zeppelin: HBase interpreter ....................... SUCCESS [6.773s]
[INFO] Zeppelin: Apache Phoenix Interpreter .............. SUCCESS [8.796s]
[INFO] Zeppelin: PostgreSQL interpreter .................. SUCCESS [4.135s]
[INFO] Zeppelin: JDBC interpreter ........................ SUCCESS [4.550s]
[INFO] Zeppelin: Tajo interpreter ........................ SUCCESS [4.240s]
[INFO] Zeppelin: Flink ................................... SUCCESS [7.043s]
[INFO] Zeppelin: Apache Ignite interpreter ............... SUCCESS [4.051s]
[INFO] Zeppelin: Kylin interpreter ....................... SUCCESS [3.608s]
[INFO] Zeppelin: Lens interpreter ........................ SUCCESS [5.966s]
[INFO] Zeppelin: Cassandra ............................... SUCCESS [19.541s]
[INFO] Zeppelin: Elasticsearch interpreter ............... SUCCESS [4.712s]
[INFO] Zeppelin: Alluxio interpreter ..................... SUCCESS [4.832s]
[INFO] Zeppelin: web Application ......................... SUCCESS [2.911s]
[INFO] Zeppelin: Server .................................. SUCCESS [47.256s]
[INFO] Zeppelin: Packaging distribution .................. SUCCESS [2.663s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4:15.422s
[INFO] Finished at: Tue Mar 08 16:25:02 CST 2016
[INFO] Final Memory: 101M/289M
[INFO] ------------------------------------------------------------------------

調整 zeppelin 的設定

cp /usr/local/zeppelin/conf/zeppelin-env.sh.template /usr/local/zeppelin/conf/zeppelin-env.sh
cp /usr/local/zeppelin/conf/zeppelin-site.xml.template /usr/local/zeppelin/conf/zeppelin-site.xml

chmod 755 /usr/local/zeppelin/conf/zeppelin-site.xml

為了避免跟後面的 ambari 網頁介面的 port 衝突,我們修改 zeppelin 的 port 設定

vi /usr/local/zeppelin/conf/zeppelin-site.xml
# 將 zeppelin.server.port 改為 8000

<property>
  <name>zeppelin.server.port</name>
  <value>8000</value>
  <description>Server port.</description>
</property>

為了讓 java 可以直接使用 TCP IPv4,必須 disable CentOS 的 ipv6。

vi /etc/sysctl.conf
增加一行
net.ipv6.conf.all.disable_ipv6 = 1

vi /etc/sysconfig/network
增加一行
NETWORKING_IPV6=no

vi /etc/sysconfig/network-scripts/ifcfg-eth0
vi /etc/sysconfig/network-scripts/ifcfg-eth1
檢查要有此行
IPV6INIT="no"

echo 'options ipv6 disable=1' > /etc/modprobe.d/disable-ipv6.conf
service ip6tables stop; chkconfig ip6tables off

reboot

停掉 CentOS 7 的預設 firewall

systemctl stop firewalld
systemctl disable firewalld

啟動 zeppelin

/usr/local/zeppelin/bin/zeppelin-daemon.sh start

Log dir doesn't exist, create /usr/local/zeppelin/logs
Pid dir doesn't exist, create /usr/local/zeppelin/run
Zeppelin start                                             [  OK  ]

停止 zeppelin

/usr/local/zeppelin/bin/zeppelin-daemon.sh stop

Zeppelin 的主畫面如下,如果在右上角,看到的不是 "Connected",這就表示還需要安裝 Spark, Hadoop 等套件,要繼續往下安裝 Ambari,才能正常使用 Zeppelin。

Ambari for CentOS 7

如果直接連結 Zeppelin 網頁 http://192.168.1.24:8000/ ,會看到中間的內容是空白的,而且右上角的連線資訊是 "Disconnected",只有安裝 Zeppelin 還不夠。

我們還需要安裝 HDFS,Spark 等等後端的工具,但一個一個安裝非常地辛苦,有個簡便的安裝套件 ambari 可以使用,Apache Ambari 是用來管理並維護 Hadoop cluster 環境,他也提供了一個簡易的網頁介面,可以直接安裝 Hadoop Cluster 環境。

參考 如何使用Ambari部署HDP 以及 Ambari Installation in RHEL/CentOS/Oracle Linux 7 的說明,我們就能將 ambari 在 CentOS 7 設定好。因為只是一開始的測試用途,我並沒有安裝很多台機器,只安裝了一台,跟 Zeppelin 放在一起。

wget -nv http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.1.0/ambari.repo -O /etc/yum.repos.d/ambari.repo

yum repolist

yum install ambari-server

# server 設定, 需要檢查 /etc/hosts 裡面有沒有 domain name 的對應設定,要將 hostname: zeppelin 對應到實際的 IP
vi /etc/hosts
192.168.1.24    zeppelin

ambari-server setup

# 啟動 ambari
ambari-server start

安裝並啟動 ambari-server 還不夠,我們還需要安裝 ambari-agent。

yum install ambari-agent

# 測試機器的 hostname 是 zeppelin
ambari-agent reset zeppelin

ambari-agent start

再來就是利用網頁 http://192.168.1.24:8080/ 以帳號:admin 密碼: admin,點擊主頁面上的“Launch Install Wizard”按鈕後,進入安裝程序。

我只填寫了一個節點: zeppelin,選擇服務是 HDFS,Zookeeper,Spark,Kafka,YARN,基本上都沒有調整什麼預設的設定,就可以安裝完成。


用Ambari跟Zeppelin來玩Apache Spark 提供了一個安裝 Zeppelin 的簡便方法,就是透過 Ambari service for Apache Zeppelin notebook 的協助,直接在 Ambari 安裝 Zeppelin,相信這個方法遇到的問題會少一點。以下紀錄一下他的做法,但我們還沒有實際測試過。

#下載zeppelin到Ambari
[sudouser@server1]$ VERSION=`hdp-select status hadoop-client | sed 's/hadoop-client - \([0-9]\.[0-9]\).*/\1/'`
[sudouser@server1]$ sudo git clone https://github.com/hortonworks-gallery/ambari-zeppelin-service.git   /var/lib/ambari-server/resources/stacks/HDP/$VERSION/services/ZEPPELIN

#重開ambari
[sudouser@server1]$ sudo su -
[root@server1]$ ambari-server restart

# 從開好之後從左邊最下面"Actions"選單"Add Service"
# 多了zeppelin的選項可以選,選擇之後一直Next就行了

Zeppelin json 異常

在測試 Zeppelin Tutorial 裡面的 Spark 功能的時候,一直遇到錯誤訊息。後來發現是這個原因:Apache Zeppelin & Spark 解析Json異常

處理方法,就是將 Zeppelin 使用的 Jackson jar 檔的版本都由 2.5 換成 2.4,然後重新啟動 Zeppelin 就可以了。

ls -al /usr/local/zeppelin/zeppelin-server/target/lib/jackson*
-rw-r--r-- 1 root root   39815  3月  8 16:24 jackson-annotations-2.5.0.jar
-rw-r--r-- 1 root root  229998  3月  8 16:24 jackson-core-2.5.3.jar
-rw-r--r-- 1 root root 1143162  3月  8 16:24 jackson-databind-2.5.3.jar

# 找到這些 2.4 版的 jackson jar,然後替換掉這些檔案
-rw-r--r-- 1 root root   38597 11月 25  2014 jackson-annotations-2.4.4.jar
-rw-r--r-- 1 root root  225302  3月  9 14:17 jackson-core-2.4.4.jar
-rw-r--r-- 1 root root 1076926  3月  9 14:17 jackson-databind-2.4.4.jar

Zeppelin Tutorial

參考 Zeppelin Tutorial 的說明,我們先建立一個新的 Notebook: test。

%sh 是直接執行 shell command

%sh
echo $PATH

接下來,先取得測試需要用的資料 bank-full.csv

%sh
rm /root/bank.zip
rm -rf /root/data
cd ~
wget http://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip
mkdir data
unzip bank.zip -d data
#rm bank.zip

因為 Zeppelin 預設是執行 scala 程式,所以我們可以直接呼叫 Spark 的 程式,下面的程式會將 csv 讀取進來,然後以 map 的方式對應到 Bank 這個物件上,再轉換成 DataFrame,產生 Temp Table。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val bankText = sc.textFile(s"/root/data/bank-full.csv")

case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)

// split each line, filter out header (starts with "age"), and map it into Bank case class

val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
    s=>Bank(s(0).toInt, 
            s(1).replaceAll("\"", ""),
            s(2).replaceAll("\"", ""),
            s(3).replaceAll("\"", ""),
            s(5).replaceAll("\"", "").toInt
        )
)

// convert to DataFrame and create temporal table
bank.toDF().registerTempTable("bank")

接下來可以用 %sql 產生圖表

%sql
select age, count(1) value
from bank
where age<30
group by age
order by age

也可以讓 sql 讀取外部的參數,這個功能在 Zeppelin 稱為 Dynamic Form

%sql
select age, count(1) value
from bank
where age<${maxAge=30}
group by age
order by age

Dynamic Form 也可以做成 select 下拉選單的方式,調整參數。

%sql
select age, count(1)
from bank
where marital="${marital=single,single|divorced|married}"
group by age order by age

References

Spark交互式分析平台Apache Zeppelin的安裝

incubator-zeppelin/README.md

Apache Zeppelin安裝及介紹

How-to: Install Apache Zeppelin on CDH

【數據可視化】Zeppelin JDBC 數據可視化(WEB方式)

2016年8月8日

Lambda Architecture

Lambda Architecture 這個名詞是 Storm 開發者 Nathan Marz 提出的,用來說明一個 generic, scalable and fault-tolerant data processing architecture 的 high level design 系統架構,這也是他在 Backtype 及 Twitter 多年進行分散式資料處理系統的經驗,而得到的一個一般化的架構結論。

基本概念

Lambda Architecture 的基本概念就是以下這個等式。

query = function(all data)

根據一個查詢的 function,對一個龐大的資料來源,進行分析與查詢,得到查詢後的結果,以往在大數據的資料分析中,都是先收集資料,將資料以 batch 的方式,存放在一個大數據資料庫中,然後資料分析人員再進入這個資料庫,進行數據分析。

然而因為資料收集的來源太多,資料量太大,往往取得資料時,已經過時了,一瞬間就成為了歷史資料。但在資料分析中,還有一些需求,希望能得到更即時的查詢結果,例如最近這 5 mins 的交易量,上線人數。

這時候,同樣的原始資料,必須要用另一種更快的方式處理,也就是以更快速的資料收集方式,只收集最近的資料,查詢也是針對最近的資料進行查詢。

雖然跟原本的歷史資料的資料來源都一樣,但是快速處理即時資料的部分,並不在意太久以前的歷史,所以他會直接將過時的資料丟掉,只保留最新的資料持續進行分析。

架構圖

以下五種架構圖分別來自不同的網頁(參考 references),都是在描述一個 Lambda Architecture 的架構組成。

  1. 進入系統的 data 分別傳入 batch 及speed layer,用以繼續後續的資料處理
  2. batch layer 有兩個功能:(1) 管理 master dataset,這是一個不能被修改而且只能一直增加的 raw dataset (2) 預先產生 batch view,讓 query 能做簡易的查詢
  3. serving layer 會整理 batch views,讓 query 能以 low-latency, ad-hoc 的方式查詢資料
  4. speed layer 彌補了 serving layer 更新速度太慢的問題,專注在處理最近的資料
  5. query 能夠整合 batch view 以及 real-time view 資訊的查詢結果

這個架構圖跟 Lambda Architecture 原始網站的架構一樣,但更精確地把上面五點文字說明的部分填寫到圖形上。


這是個簡化的架構圖。


跟第二個架構圖一樣,網頁上同時提供了每一個 layer 可能的系統,不過這個網頁已經是三年前的資訊了,現在應該有更好的解決方案

  1. batch layer: Apache Hadoop
  2. serving layer: Cloudera Impala
  3. speed layer: Storm, Apache HBase

這個架構圖比較明確地說明 spark 的 Lambda Architecture。

前端的原始資料以 Flume 或 Kafka 進行資料分流,分別導入 Spark Streaming data source 以及 HDFS 進行儲存,Spark Application 同時使用 HDFS 以及 Flume 或 Kafka 的資料進行分析,最後將分析後的資料存放到 HBase,提供並展示 Real-Time 分析結果。

Reference

Lambda Architecture

The Lambda architecture: principles for architecting realtime Big Data systems

Lambda Architecture: Achieving Velocity and Volume with Big Data

Lambda Architecture with Apache Spark

Apache Spark: Usage and Roadmap in Hadoop

2016年8月1日

Kafka in Scala and Java

Kafka 的範例,可參考這個 project:A kafka producer and consumer example in scala and java,通常是先執行 Consumer,再執行 Producer,才會在 Consumer console 中看到 Producer 的訊息。

Kafka Comsumer Examples

Scala 版本

import java.util.Properties
import java.util.concurrent._
import scala.collection.JavaConversions._
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
import kafka.utils._
import kafka.utils.Logging
import kafka.consumer.KafkaStream

class ScalaConsumerExample(val zookeeper: String,
                           val groupId: String,
                           val topic: String,
                           val delay: Long) extends Logging {

  val config = createConsumerConfig(zookeeper, groupId)
  val consumer = Consumer.create(config)
  var executor: ExecutorService = null

  def shutdown() = {
    if (consumer != null)
      consumer.shutdown();
    if (executor != null)
      executor.shutdown();
  }

  def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
    val props = new Properties()
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("auto.offset.reset", "largest");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    val config = new ConsumerConfig(props)
    config
  }

  def run(numThreads: Int) = {
    val topicCountMap = Map(topic -> numThreads)
    val consumerMap = consumer.createMessageStreams(topicCountMap);
    val streams = consumerMap.get(topic).get;

    executor = Executors.newFixedThreadPool(numThreads);
    var threadNumber = 0;
    for (stream <- streams) {
      executor.submit(new ScalaConsumerTest(stream, threadNumber, delay))
      threadNumber += 1
    }
  }
}

object ScalaConsumerExample extends App {
  // 程式的進入點

  if (args.length <= 0) {
    val server = "192.168.1.7:2181";
    val group = "group1";
    val topic = "test";
    val delay = 0
    val numThreads = 10

    val example = new ScalaConsumerExample(server, group, topic, delay)
    example.run(numThreads)

  } else {
    val server = args(0)
    val group = args(1)
    val topic = args(2)
    val numThreads = args(3).toInt
    val delay = args(4).toLong

    val example = new ScalaConsumerExample(server, group, topic, delay)
    example.run(numThreads)
  }

}

class ScalaConsumerTest(val stream: KafkaStream[Array[Byte], Array[Byte]], val threadNumber: Int, val delay: Long) extends Logging with Runnable {
  def run {
    val it = stream.iterator()

    while (it.hasNext()) {
      val msg = new String(it.next().message());
      System.out.println(System.currentTimeMillis() + ",Thread " + threadNumber + ": " + msg);
    }

    System.out.println("Shutting down Thread: " + threadNumber);
  }
}

Java 版本

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerExample {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    private long delay;

    public ConsumerExample(String zookeeper, String groupId, String topic, long delay) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(zookeeper, groupId));
        this.topic = topic;
        this.delay = delay;
    }

    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }

    public void run(int numThreads) {

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        executor = Executors.newFixedThreadPool(numThreads);
        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
            executor.submit(new ConsumerTest(consumer, stream, threadNumber, delay));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        props.put("auto.offset.reset", "largest");
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        //props.put("auto.commit.enable", "false");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) throws InterruptedException {

        String args0 = "";
        String args1 = "";
        String args2 = "";
        int args3 = 0;
        long args4 = 0;

        if (args.length <= 0) {
            args0 = "192.168.1.7:2181";
            args1 = "group1";
            args2 = "test";
            args3 = 10;
            args4 = 0;
        } else {
            args0 = args[0];
            args1 = args[1];
            args2 = args[2];
            args3 = Integer.parseInt(args[3]);
            args4 = Long.parseLong(args[4]);
        }

        String zooKeeper = args0;
        String groupId = args1;
        String topic = args2;
        int threads = args3;
        long delay = args4;

        ConsumerExample example = new ConsumerExample(zooKeeper, groupId, topic, delay);
        example.run(threads);

        Thread.sleep(24 * 60 * 60 * 1000);

        example.shutdown();
    }
}

Kafka Producer Examples

Scala 版本

import kafka.producer.ProducerConfig
import java.util.Properties
import scala.util.Random
import kafka.producer.Producer
import kafka.producer.KeyedMessage
import java.util.Date

object ScalaProducerExample extends App {

  // java -cp kafka_example-0.1.0-SNAPSHOT.jar com.colobu.kafka.ScalaProducerExample 10000 colobu localhost:9092
  var args0 = 0;
  var args1 = "";
  var args2 = "";

  if (args.length <= 0) {
    args0 = 500
    args1 = "test"
    args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094"

  } else {
    args0 = args(0).toInt
    args1 = args(1)
    args2 = args(2)
  }


  val events = args0
  val topic = args1
  val brokers = args2
  val rnd = new Random()

  val props = new Properties()
  props.put("metadata.broker.list", brokers)
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  //props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner")
  props.put("producer.type", "async")
  //props.put("request.required.acks", "1")


  val config = new ProducerConfig(props)
  val producer = new Producer[String, String](config)
  val t = System.currentTimeMillis()

  for (nEvents <- Range(0, events)) {
    val runtime = new Date().getTime()
    val ip = "192.168.2." + rnd.nextInt(255)
    val msg = runtime + "," + nEvents + ",www.example.com," + ip
    val data = new KeyedMessage[String, String](topic, ip, msg)
    producer.send(data)
  }

  System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t));
  producer.close();
}

Java 版本

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerExample {
    public static void main(String[] args) {

        long args0 = 0;
        String args1 = "";
        String args2 = "";

        if (args.length <= 0) {
            args0 = 500;
            args1 = "test";
            args2 = "192.168.1.7:9092,192.168.1.7:9093,192.168.1.7:9094";

        } else {
            args0 = Long.parseLong(args[0]);
            args1 = args[1];
            args2 = args[2];
        }


        long events = args0;
        String topic = args1;
        String brokers = args2;
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", brokers);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        //props.put("partitioner.class", "com.colobu.kafka.SimplePartitioner");
        props.put("producer.type", "async");
        //props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        long t = System.currentTimeMillis();
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = "192.168.2." + rnd.nextInt(255); 
               String msg = runtime + "," + nEvents + ",www.example.com," + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, ip, msg);
               producer.send(data);
        }
        
        System.out.println("sent per second: " + events * 1000/ (System.currentTimeMillis() - t));
        producer.close();
    }
}

Reference

ProducerExample.scala

kafka-storm-starter

Getting Started with Kafka from Scala: Scala Clients

0.8.0 Producer Example

0.8.0 SimpleConsumer Example

社區電商系統架構之消息隊列篇:kafka的實驗