2017/02/20

Spark tutorial

要使用 Spark 之前,一般會先遇到 scala 這個語言的熟悉度的問題,當有了一定的語言程度後,再來就是 scala IDE 的選擇,目前的狀況,還是IDEA 會比 scala IDE for Eclipse 好用。接下來就是下載跟安裝 spark,然後進行 WordCount 的範例練習,以下記錄怎麼安裝與設定 stand alone 的 spark 開發環境。

要下載哪一個 spark 套件

當我們連結到 Download Apache Spark 時,首先遇到的問題,就是要下載哪一個 spark release 套件。

基本的原則如下:

如果要直接下載已經編譯好的 binary 套件,我們可以根據 Hadoop 的版本決定要下載哪一個,但如果像我們一樣,不打算安裝 Hadoop 就直接測試,就直接選最新版的 spark-1.6.1-bin-hadoop2.6.tgz 就好了,下載後解壓縮,馬上就可以使用 spark-shell,或直接取得 all-in-one 的 spark-assembly-1.6.1-hadoop2.6.0.jar 套件。

如果我們要編譯 source code,就下載預設的 1.6.1(Mar 09 2016) spark release,Package type 選擇 Source Code:spark-1.6.1.tgz

由於目前 spark 預設是使用 scala 2.10 版,使用預先編譯的 spark 就必須要使用 scala 2.10 版,如果像要改成 2.11,就一定要自己重新編譯 spark,目前 spark 的 JDBC component 還不支援 scala 2.11。

Building for Scala 2.11 有兩行指令說明如何將 spark 由 2.10 調整為 2.11,我們同時把 hadoop 版本改為 2.6。

./dev/change-scala-version.sh 2.11
mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package

編譯 spark 要花的時間很久,以我現在的環境花了 40 分鐘。

[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 39:33 min
[INFO] Finished at: 2016-04-29T09:23:03+08:00
[INFO] Final Memory: 452M/2703M

也可以使用 sbt 來編譯 spark,編譯後會得到 spark-assembly 的 jar。

sbt/sbt assembly

如果要修改 spark souce code,可以啟用增量編譯模式,避免每一次修改都要花很久的時間重新編譯。

export SPARK_PREPEND_CLASSES=true
sbt/sbt compile

unset SPARK_PREPEND_CLASSES

在 compile 前面加上 ~ 可以避免每一次都重開一次新的 sbt console

sbt/sbt ~ compile

可以用 sbt 或是 mvn 指令查閱 dependency map

sbt/sbt dependency-tree

mvn -DskipTests install
mvb dependency:tree

如果要設定 spark source 的開發環境,可以用以下的指令產生 IDEA project file

git clone https://github.com/apache/spark
sbt/sbt gen-idea

Spark 開發環境 in IDEA

  1. 在 IDEA 建立新的 scala project: sparktest

  2. 在 project 中建立一個 lib 目錄,把 spark-assembly-1.6.1-hadoop2.6.0.jar 放在那個目錄中

  3. 在 File -> Project Structure -> Libraries 點 "+",然後把 lib 目錄加入 project 中

  4. 取得一個文字檔的測試資料 pg5000.txt ,將檔案放在新建立的 data 目錄中

  5. 將 RunWordCount.scala 放在 src 目錄中,程式會計算 pg5000.txt 裡面每一個字出現的數量

    import org.apache.log4j.Logger
    import org.apache.log4j.Level
    import org.apache.spark.{ SparkConf, SparkContext }
    import org.apache.spark.rdd.RDD
    
    object RunWordCount {
      def main(args: Array[String]): Unit = {
    
        // 以這兩行設定不顯示 spark 內部的訊息
        Logger.getLogger("org").setLevel(Level.OFF)
        System.setProperty("spark.ui.showConsoleProgress", "false")
    
        // 清除 output folder
        FileUtils.deleteDirectory(new File("data/output"))
    
        println("執行RunWordCount")
    
        // 設定 application 提交到 MASTER 指向的 cluster 或是 local 執行的模式
        // local[4] 代表是在本地以 四核心的 CPU 執行
        val sc = new SparkContext(new SparkConf().setAppName("wordCount").setMaster("local[4]"))
    
        println("讀取文字檔...")
        val textFile = sc.textFile("data/pg5000.txt") 
    
        println("開始建立RDD...")
        // flapMap 是取出文字檔的每一行資料,並以 " " 進行 split,分成一個一個的 word
        // map 是將每一個 word 轉換成 (word, 1) 的 tuple
        // reduceByKey 會根據 word 這個 key,將後面的 1 加總起來,就會得到 (word, 數量) 的結果
        val countsRDD = textFile.flatMap(line => line.split(" "))
          .map(word => (word, 1))
          .reduceByKey(_ + _) 
    
        println("儲存結果至文字檔...")
        try {
          countsRDD.saveAsTextFile("data/output") 
          println("存檔成功")
        } catch {
          case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄");
        }
    
      }
    }
  6. 我們可以直接在 IDEA 就執行這個測試程式

    執行RunWordCount
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    16/04/29 16:28:50 INFO Slf4jLogger: Slf4jLogger started
    16/04/29 16:28:50 INFO Remoting: Starting remoting
    16/04/29 16:28:50 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.1.151:56205]
    讀取文字檔...
    開始建立RDD...
    儲存結果至文字檔...
    存檔成功
    
    Process finished with exit code 0
  7. 最後產生的結果有三個檔案,其中 part-00000 及 part-00001 裡面存了每一個 word 的發生次數

    _SUCCESS
    part-00000
    part-00001

會產生兩個檔案的原因是因為,spark 本身是平行運算的工具,所以會自動產生多個 partitions。

如果需要將結果整合成一個檔案,就必須使用 coalesce,在程式的最後面,用 countsRDD.coalesce(1).saveAsTextFile 將結果輸出到新目錄,也會得到一個檔案的結果。

try {
      countsRDD.coalesce(1).saveAsTextFile("data/output2")
      println("存檔成功")
    } catch {
      case e: Exception => println("輸出目錄已經存在,請先刪除原有目錄");
    }

匯出程式

  1. 在 IDEA 選擇 "File" -> "Project Structure" -> "Artifact"

  2. 點擊 "+" -> "JAR" -> "From modules with dependencies"

  3. Main Class 填成 "RunWordCount",輸出目錄的最後面改為 "out"

  4. 選擇 "Build" -> "Build Artifacts",就能在 out 目錄取得 sparktest.jar 檔

  5. 這樣就能在另一台機器執行 sparktest

java -jar sparktest.jar

References

HADOOP+SPARK大數據巨量分析與機器學習整合開發實戰

Spark大資料分析實戰

沒有留言:

張貼留言