Spark 入門實戰之最好的實例
來源:程序員人生 發布時間:2016-06-12 08:27:30 閱讀次數:3318次
轉載:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/
搭建開發環境
-
安裝 Scala IDE
搭建 Scala 語言開發環境很容易,Scala IDE 官網 下載適合的版本并解壓就能夠完成安裝,本文使用的版本是
4.1.0。
-
安裝 Scala 語言包
如果下載的 Scala IDE 自帶的 Scala 語言包與 Spark 1.3.1 使用的 Scala 版本 (2.10.x) 不1致,那末就需要下載和本文所使用的 Spark 所匹配的版本,以確保實現的 Scala 程序不會由于版本問題而運行失敗。
請下載并安裝 Scala 2.10.5 版本
-
安裝 JDK
如果您的機器上沒有安裝 JDK,請下載并安裝 1.6 版本以上的 JDK。
-
創建并配置 Spark 工程
打開 Scala IDE,創建1個名稱為 spark-exercise 的 Scala 工程。
圖 1. 創建 scala 工程
在工程目錄下創建1個 lib 文件夾,并且把您的 Spark 安裝包下的 spark-assembly jar 包拷貝到 lib 目錄下。
圖 2. Spark 開發 jar 包
并且添加該 jar 包到工程的 classpath 并配置工程使用剛剛安裝的 Scala 2.10.5 版本.,工程目錄結構以下。
圖 3. 添加 jar 包到 classpath
回頁首
運行環境介紹
為了不讀者對本文案例運行環境產生困惑,本節會對本文用到的集群環境的基本情況做個簡單介紹。
-
本文所有實例數據存儲的環境是1個 8 個機器的 Hadoop 集群,文件系統總容量是 1.12T,NameNode 叫 hadoop036166, 服務端口是 9000。讀者可以不關心具體的節點散布,由于這個不會影響到您瀏覽后面的文章。
-
本文運行實例程序使用的 Spark 集群是1個包括4個節點的 Standalone 模式的集群, 其中包括1個 Master 節點 (監聽端口 7077) 和3個 Worker 節點,具體散布以下:
Server Name |
Role |
hadoop036166 |
Master |
hadoop036187 |
Worker |
hadoop036188 |
Worker |
hadoop036227 |
Worker |
-
Spark 提供1個 Web UI 去查看集群信息并且監控履行結果,默許地址是:http://<spark_master_ip>:8080 ,對該實例提交后我們也能夠到 web 頁面上去查看履行結果,固然也能夠通過查看日志去找到履行結果。
圖 4. Spark 的 web console
回頁首
案例分析與編程實現
案例1
a. 案例描寫
提起 Word Count(詞頻數統計),相信大家都不陌生,就是統計1個或多個文件中單詞出現的次數。本文將此作為1個入門級案例,由淺入深的開啟使用 Scala 編寫 Spark 大數據處理程序的大門。
b.案例分析
對詞頻數統計,用 Spark 提供的算子來實現,我們首先需要將文本文件中的每行轉化成1個個的單詞, 其次是對每個出現的單詞進行記1次數,最后就是把所有相同單詞的計數相加得到終究的結果。
對第1步我們自然的想到使用 flatMap 算子把1行文本 split 成多個單詞,然后對第2步我們需要使用 map 算子把單個的單詞轉化成1個有計數的 Key-Value 對,即 word -> (word,1). 對最后1步統計相同單詞的出現次數,我們需要使用 reduceByKey 算子把相同單詞的計數相加得到終究結果。
c. 編程實現
清單 1.SparkWordCount 類源碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkWordCount {
def FILE_NAME:String = "word_count_results_";
def main(args:Array[String]) {
if (args.length < 1) {
println("Usage:SparkWordCount FileName");
System.exit(1);
}
val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");
val sc = new SparkContext(conf);
val textFile = sc.textFile(args(0));
val wordCounts = textFile.flatMap(line => line.split(" ")).map(
word => (word, 1)).reduceByKey((a, b) => a + b)
//print the results,for debug use.
//println("Word Count program running results:");
//wordCounts.collect().foreach(e => {
//val (k,v) = e
//println(k+"="+v)
//});
wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());
println("Word Count program running results are successfully saved.");
}
}
d. 提交到集群履行
本實例中, 我們將統計 HDFS 文件系統中/user/fams 目錄下所有 txt 文件中詞頻數。其中 spark-exercise.jar 是 Spark 工程打包后的 jar 包,這個 jar 包履行時會被上傳到目標服務器的/home/fams 目錄下。運行此實例的具體命令以下:
清單 2.SparkWordCount 類履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.SparkWordCount \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g --executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/*.txt
e. 監控履行狀態
該實例把終究的結果存儲在了 HDFS 上,那末如果程序運行正常我們可以在 HDFS 上找到生成的文件信息
圖 5. 案例1輸出結果
打開 Spark 集群的 Web UI, 可以看到剛才提交的 job 的履行結果。
圖 6. 案例1完成狀態
如果程序還沒運行完成,那末我們可以在 Running Applications 列表里找到它。
案例2
a. 案例描寫
該案例中,我們將假定我們需要統計1個 1000 萬人口的所有人的平均年齡,固然如果您想測試 Spark 對大數據的處理能力,您可以把人口數放的更大,比如 1 億人口,固然這個取決于測試所用集群的存儲容量。假定這些年齡信息都存儲在1個文件里,并且該文件的格式以下,第1列是 ID,第2列是年齡。
圖 7. 案例2測試數據格式預覽
現在我們需要用 Scala 寫1個生成 1000 萬人口年齡數據的文件,源程序以下:
清單 3. 年齡信息文件生成類源碼
import java.io.FileWriter
import java.io.File
import scala.util.Random
object SampleDataFileGenerator {
def main(args:Array[String]) {
val writer = new FileWriter(new File("C: \\sample_age_data.txt"),false)
val rand = new Random()
for ( i <- 1 to 10000000) {
writer.write( i + " " + rand.nextInt(100))
writer.write(System.getProperty("line.separator"))
}
writer.flush()
writer.close()
}
}
b. 案例分析
要計算平均年齡,那末首先需要對源文件對應的 RDD 進行處理,也就是將它轉化成1個只包括年齡信息的 RDD,其次是計算元素個數即為總人數,然后是把所有年齡數加起來,最后平均年齡=總年齡/人數。
對第1步我們需要使用 map 算子把源文件對應的 RDD 映照成1個新的只包括年齡數據的 RDD,很明顯需要對在 map 算子的傳入函數中使用 split 方法,得到數組后只取第2個元素即為年齡信息;第2步計算數據元素總數需要對第1步映照的結果 RDD 使用 count 算子;第3步則是使用 reduce 算子對只包括年齡信息的 RDD 的所有元素用加法求和;最后使用除法計算平均年齡便可。
由于本例輸出結果很簡單,所以只打印在控制臺便可。
c. 編程實現
清單 4.AvgAgeCalculator 類源碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object AvgAgeCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:AvgAgeCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val count = dataFile.count()
val ageData = dataFile.map(line => line.split(" ")(1))
val totalAge = ageData.map(age => Integer.parseInt(
String.valueOf(age))).collect().reduce((a,b) => a+b)
println("Total Age:" + totalAge + ";Number of People:" + count )
val avgAge : Double = totalAge.toDouble / count.toDouble
println("Average Age is " + avgAge)
}
}
d. 提交到集群履行
要履行本實例的程序,需要將剛剛生成的年齡信息文件上傳到 HDFS 上,假定您剛才已在目標機器上履行生成年齡信息文件的 Scala 類,并且文件被生成到了/home/fams 目錄下。
那末您需要運行1下 HDFS 命令把文件拷貝到 HDFS 的/user/fams 目錄。
清單 5. 年齡信息文件拷貝到 HDFS 目錄的命令
hdfs dfs –copyFromLocal /home/fams /user/fams
清單 6.AvgAgeCalculator 類的履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.AvgAgeCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt
e. 監控履行狀態
在控制臺您可以看到以下所示信息:
圖 8. 案例2輸出結果
我們也能夠到 Spark Web Console 去查看 Job 的履行狀態
圖 9. 案例2完成狀態
案例3
a. 案例描寫
本案例假定我們需要對某個省的人口 (1 億) 性別還有身高進行統計,需要計算出男女人數,男性中的最高和最低身高,和女性中的最高和最低身高。本案例中用到的源文件有以下格式, 3列分別是 ID,性別,身高 (cm)。
圖 10. 案例3測試數據格式預覽
我們將用以下 Scala 程序生成這個文件,源碼以下:
清單 7. 人口信息生成類源碼
import java.io.FileWriter
import java.io.File
import scala.util.Random
object PeopleInfoFileGenerator {
def main(args:Array[String]) {
val writer = new FileWriter(new File("C:\\LOCAL_DISK_D\\sample_people_info.txt"),false)
val rand = new Random()
for ( i <- 1 to 100000000) {
var height = rand.nextInt(220)
if (height < 50) {
height = height + 50
}
var gender = getRandomGender
if (height < 100 && gender == "M")
height = height + 100
if (height < 100 && gender == "F")
height = height + 50
writer.write( i + " " + getRandomGender + " " + height)
writer.write(System.getProperty("line.separator"))
}
writer.flush()
writer.close()
println("People Information File generated successfully.")
}
def getRandomGender() :String = {
val rand = new Random()
val randNum = rand.nextInt(2) + 1
if (randNum % 2 == 0) {
"M"
} else {
"F"
}
}
}
b. 案例分析
對這個案例,我們要分別統計男女的信息,那末很自然的想到首先需要對男女信息從源文件的對應的 RDD 中進行分離,這樣會產生兩個新的 RDD,分別包括男女信息;其次是分別對男女信息對應的 RDD 的數據進行進1步映照,使其只包括身高數據,這樣我們又得到兩個 RDD,分別對應男性身高和女性身高;最后需要對這兩個 RDD 進行排序,進而得到最高和最低的男性或女性身高。
對第1步,也就是分離男女信息,我們需要使用 filter 算子,過濾條件就是包括”M” 的行是男性,包括”F”的行是女性;第2步我們需要使用 map 算子把男女各自的身高數據從 RDD 中分離出來;第3步我們需要使用 sortBy 算子對男女身高數據進行排序。
c. 編程實現
在實現上,有1個需要注意的點是在 RDD 轉化的進程中需要把身高數據轉換成整數,否則 sortBy 算子會把它視為字符串,那末排序結果就會遭到影響,例如 身高數據如果是:123,110,84,72,100,那末升序排序結果將會是 100,110,123,72,84,明顯這是不對的。
清單 8.PeopleInfoCalculator 類源碼
object PeopleInfoCalculator {
def main(args:Array[String]) {
if (args.length < 1){
println("Usage:PeopleInfoCalculator datafile")
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")
val sc = new SparkContext(conf)
val dataFile = sc.textFile(args(0), 5);
val maleData = dataFile.filter(line => line.contains("M")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
val femaleData = dataFile.filter(line => line.contains("F")).map(
line => (line.split(" ")(1) + " " + line.split(" ")(2)))
//for debug use
//maleData.collect().foreach { x => println(x)}
//femaleData.collect().foreach { x => println(x)}
val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)
val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)
//for debug use
//maleHeightData.collect().foreach { x => println(x)}
//femaleHeightData.collect().foreach { x => println(x)}
val lowestMale = maleHeightData.sortBy(x => x,true).first()
val lowestFemale = femaleHeightData.sortBy(x => x,true).first()
//for debug use
//maleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
//femaleHeightData.collect().sortBy(x => x).foreach { x => println(x)}
val highestMale = maleHeightData.sortBy(x => x, false).first()
val highestFemale = femaleHeightData.sortBy(x => x, false).first()
println("Number of Male Peole:" + maleData.count())
println("Number of Female Peole:" + femaleData.count())
println("Lowest Male:" + lowestMale)
println("Lowest Female:" + lowestFemale)
println("Highest Male:" + highestMale)
println("Highest Female:" + highestFemale)
}
}
d. 提交到集群履行
在提交該程序到集群履行之前,我們需要將剛才生成的人口信息數據文件上傳到 HDFS 集群,具體命令可以參照上文。
清單 9.PeopleInfoCalculator 類的履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.PeopleInfoCalculator \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 3g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt
e. 監控履行狀態
對該實例,如程序中打印的1樣,會在控制臺顯示以下信息:
圖 11. 案例3輸出結果
在 Spark Web Console 里可以看到具體的履行狀態信息
圖 12. 案例3完成狀態
案例4
a. 案例描寫
該案例中我們假定某搜索引擎公司要統計過去1年搜索頻率最高的 K 個科技關鍵詞或詞組,為了簡化問題,我們假定關鍵詞組已被整理到1個或多個文本文件中,并且文檔具有以下格式。
圖 13. 案例4測試數據格式預覽
我們可以看到1個關鍵詞或詞組可能出現屢次,并且大小寫格式可能不1致。
b. 案例分析
要解決這個問題,首先我們需要對每一個關鍵詞出現的次數進行計算,在這個進程中需要辨認不同大小寫的相同單詞或詞組,如”Spark”和“spark” 需要被認定為1個單詞。對出現次數統計的進程和 word count 案例類似;其次我們需要對關鍵詞或詞組依照出現的次數進行降序排序,在排序前需要把 RDD 數據元素從 (k,v) 轉化成 (v,k);最后取排在最前面的 K 個單詞或詞組。
對第1步,我們需要使用 map 算子對源數據對應的 RDD 數據進行全小寫轉化并且給詞組記1次數,然后調用 reduceByKey 算子計算相同詞組的出現次數;第2步我們需要對第1步產生的 RDD 的數據元素用 sortByKey 算子進行降序排序;第3步再對排好序的 RDD 數據使用 take 算子獲得前 K 個數據元素。
c. 編程實現
清單 10.TopKSearchKeyWords 類源碼
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TopKSearchKeyWords {
def main(args:Array[String]){
if (args.length < 2) {
println("Usage:TopKSearchKeyWords KeyWordsFile K");
System.exit(1)
}
val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")
val sc = new SparkContext(conf)
val srcData = sc.textFile(args(0))
val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)
//for debug use
//countedData.foreach(x => println(x))
val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)
val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }
topKData.foreach(println)
}
}
d. 提交到集群履行
清單 11.TopKSearchKeyWords 類的履行命令
./spark-submit \
--class com.ibm.spark.exercise.basic.TopKSearchKeyWords \
--master spark://hadoop036166:7077 \
--num-executors 3 \
--driver-memory 6g \
--executor-memory 2g \
--executor-cores 2 \
/home/fams/sparkexercise.jar \
hdfs://hadoop036166:9000/user/fams/inputfiles/search_key_words.txt
e. 監控履行狀態
如果程序成功履行,我們將在控制臺看到以下信息。固然讀者也能夠仿照案例2和案例3那樣,自己嘗試使用 Scala 寫1段小程序生成此案例需要的源數據文件,可以根據您的 HDFS 集群的容量,生成盡量大的文件,用來測試本案例提供的程序。
圖 14. 案例4輸出結果
圖 15. 案例4完成狀態
回頁首
Spark job 的履行流程簡介
我們可以發現,Spark 利用程序在提交履行后,控制臺會打印很多日志信息,這些信息看起來是雜亂無章的,但是卻在1定程度上體現了1個被提交的 Spark job 在集群中是如何被調度履行的,那末在這1節,將會向大家介紹1個典型的 Spark job 是如何被調度履行的。
我們先來了解以下幾個概念:
DAG: 即 Directed Acyclic Graph,有向無環圖,這是1個圖論中的概念。如果1個有向圖沒法從某個頂點動身經過若干條邊回到該點,則這個圖是1個有向無環圖。
Job:我們知道,Spark 的計算操作是 lazy 履行的,只有當碰到1個動作 (Action) 算子時才會觸發真實的計算。1個 Job 就是由動作算子而產生包括1個或多個 Stage 的計算作業。
Stage:Job 被肯定后,Spark 的調度器 (DAGScheduler) 會根據該計算作業的計算步驟把作業劃分成1個或多個 Stage。Stage 又分為 ShuffleMapStage 和 ResultStage,前者以 shuffle 為輸出邊界,后者會直接輸出結果,其邊界可以是獲得外部數據,也能夠是以1個
ShuffleMapStage 的輸出為邊界。每個 Stage 將包括1個 TaskSet。
TaskSet: 代表1組相干聯的沒有 shuffle 依賴關系的任務組成任務集。1組任務會被1起提交到更加底層的 TaskScheduler。
Task:代表單個數據分區上的最小處理單元。分為 ShuffleMapTask 和 ResultTask。ShuffleMapTask 履行任務并把任務的輸出劃分到 (基于 task 的對應的數據分區) 多個 bucket(ArrayBuffer) 中,ResultTask 履行任務并把任務的輸動身送給驅動程序。
Spark 的作業任務調度是復雜的,需要結合源碼來進行較為詳實的分析,但是這已超過本文的范圍,所以這1節我們只是對大致的流程進行分析。
Spark 利用程序被提交后,當某個動作算子觸發了計算操作時,SparkContext 會向 DAGScheduler 提交1個作業,接著 DAGScheduler 會根據 RDD 生成的依賴關系劃分 Stage,并決定各個 Stage 之間的依賴關系,Stage 之間的依賴關系就構成了 DAG。Stage 的劃分是以 ShuffleDependency 為根據的,也就是說當某個 RDD 的運算需要將數據進行 Shuffle 時,這個包括了 Shuffle 依賴關系的 RDD 將被用來作為輸入信息,進而構建1個新的
Stage。我們可以看到用這樣的方式劃分 Stage,能夠保證有依賴關系的數據可以以正確的順序履行。根據每一個 Stage 所依賴的 RDD 數據的 partition 的散布,會產生出與 partition 數量相等的 Task,這些 Task 根據 partition 的位置進行散布。其次對 finalStage 或是 mapStage 會產生不同的 Task,最后所有的 Task 會封裝到 TaskSet 內提交到 TaskScheduler 去履行。有興趣的讀者可以通過瀏覽 DAGScheduler
和 TaskScheduler 的源碼獲得更詳細的履行流程。
回頁首
結束語
通過本文,相信讀者對如何使用 Scala 編寫 Spark 利用程序處理大數據已有了較為深入的了解。固然在處理實際問題時,情況可能比本文舉得例子復雜很多,但是解決問題的基本思想是1致的。在碰到實際問題的時候,首先要對源數據結構格式等進行分析,然后肯定如何去使用 Spark 提供的算子對數據進行轉化,終究根據實際需求選擇適合的算子操作數據并計算結果。本文并未介紹其它 Spark 模塊的知識,明顯這不是1篇文章所能完成的,希望以后會有機會總結更多的 Spark 利用程序開發和性能調優方面的知識,寫成文章與更多的
Spark 技術愛好者分享,1起進步。由于時間倉促并且本人知識水平有限,文章難免有未斟酌周全的地方乃至是毛病,希望各位朋友不吝賜教。有任何問題,都可以在文末留下您的評論,我會及時回復。
生活不易,碼農辛苦
如果您覺得本網站對您的學習有所幫助,可以手機掃描二維碼進行捐贈