Spark入门指南 II - 使用IntelliJ IDEA开发

文章也同时在简书更新

引言

“Apache Spark™ is a fast and general engine for large-scale data processing.”

Spark是一种高性能分布式计算框架。它基于RDD使用内存来加速迭代过程。相比于前辈Hadoop,Spark有着更好的性能表现,尤其针对Machine Learning此类典型的应用,提升更为显著。

作为入门指南的第二篇,本文将集中介绍如何使用IntelliJ IDEA进行开发。
前文:《Spark入门指南 I - 使用IntelliJ IDEA开发》

下载IntelliJ IDEA

IntelliJ IDEA 官方链接.

新建Project

  • File -> New -> Project
  • Scale -> SBT或IDEA均可

配置Project

以基于SBT的Project为例,打开build.sbt文件,进行如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
name := "ProjectName"
version := "ProjectVersion"
scalaVersion := "2.11.11"
// additional libraries
libraryDependencies ++= Seq(
//"org.apache.spark" %% "spark-core" % "2.2.0" % "provided"
"org.apache.spark" %% "spark-core" % "2.2.0" % "compile",
"org.apache.spark" % "spark-sql_2.11" % "2.2.0" % "compile"
)

其中ScalaVersion要对应,引官网原文:

Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API, Spark 2.2.0 uses Scala 2.11. You will need to use a compatible Scala version (2.11.x).

Note that support for Java 7, Python 2.6 and old Hadoop versions before 2.6.5 were removed as of Spark 2.2.0.

Note that support for Scala 2.10 is deprecated as of Spark 2.1.0, and may be removed in Spark 2.3.0.

也就是说,我们使用了Spark-2.2.0版本,对应的ScaleVersion就应该设置为2.11系列,因此笔者选择了2.11.11
此外,spark各模块版本也要对应,选择2.2.0。一般而言只需要用到spark-core模块即可。其它模块,如sql,则可以通过上述类似的语句进行添加。而最后的编译项请选择“compiled”。
修改完成后,刷新project,耐心等待IDEA去下载所需要的lib以及依赖。

第一个应用:SparkWordCount

待上述操作执行完毕,便可建立第一个Spark应用。
在Project左侧的层级导航栏中找到src -> main -> scala,右键 -> New -> Scala Class,命名为SparkWordCount。
在原文件中输入如下示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* Created by zhouqihua on 2017/8/23.
*/
import org.apache.spark.{SparkContext, SparkConf}
import java.io.File
object SparkWordCount {
def main(args: Array[String]): Unit = {
// val log: Logger = Logger.
val INPUT_HDFS = "./INPUT_HDFS"
val OUTPUT_HDFS = "./OUTPUT_HDFS_WORDCOUNT"
val outputPath: File = new File(OUTPUT_HDFS)
if (outputPath.exists())
deleteDir(outputPath)
println("************************ Step0: new SparkConf() begin ************************")
// val conf = new SparkConf().setMaster("local").setAppName("wordCount")
val conf = new SparkConf().setAppName("SparkWordCount")
.setMaster("spark://localhost:7077")
.setJars(List("/path/to/artifacts.jar"))
.set("spark.shuffle.compress", "false")
.set("spark.io.compression.codec", "org.apache.spark.io.LZFCompressionCodec")
//.set("spark.executor.extraJavaOptions", "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005")
println("************************ Step0: new SparkConf() end ************************")
println("************************ Step1: new SparkContext(conf) begin ************************")
// Create a Scala Spark Context.
val sc = new SparkContext(conf)
println("************************ Step1: new SparkContext(conf) end ************************")
println("************************ Step2: SparkContext.textFile(inputFile) begin ************************")
// Load our input data.
val input = sc.textFile(INPUT_HDFS)
println("************************ Step2: SparkContext.textFile(inputFile) end ************************")
println("************************ Step3: flatMap begin ************************")
// Split up into words.
val words = input.flatMap(line => line.split(" "))
println("************************ Step3: flatMap end ************************")
println("************************ Step4: map begin ************************")
// Transform into word and count.
val mapWords = words.map(word => (word, 1))
println("************************ Step4: map end ************************")
println("************************ Step5: reduceByKey begin ************************")
val counts = mapWords.reduceByKey{case (x, y) => x + y}
println("************************ Step5: reduceByKey end ************************")
println("************************ Step6: saveAsTextFile(outputFile) begin ************************")
// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile(OUTPUT_HDFS)
// val collection = counts.collect()
// println(s"Result: $collection")
println("************************ Step6: saveAsTextFile(outputFile) end ************************")
println("************************ Step7: Thread Sleep for 100s ... begin ************************")
Thread.sleep(100000)
println("************************ Step7: Thread Sleep for 100s ... end ************************")
}
def deleteDir(dir: File): Unit = {
val files = dir.listFiles()
files.foreach(f => {
if (f.isDirectory) {
deleteDir(f)
} else {
f.delete()
println(s"Delete File: ${f.getAbsolutePath}")
}
})
dir.delete()
println(s"Delete Dir: ${dir.getAbsolutePath}\n" )
}
}

备注:笔者加入了一些额外的清理垃圾的语句。

配置打包编译选项

  • File -> Project Structure
  • 在Artifacts中 Add我们刚才新建的应用SparkWordCount,并在右侧勾选“include in project”选项。
  • 最后排除多余的大包依赖,也就是说只保留我们应用的依赖,其余三方库的依赖均可进行删除。

注意:示例代码中的.setJars(List("/path/to/artifacts.jar"))要与打包输出的jar的目录一致。

编译打包运行

观察控制台的输出结果,有一些列信息反馈,即说明一切顺利。
如有报错,很可能就是spark启动错误,或者是打包失败。
读者后续可修改代码以及加入断点来调试程序,以更多地理解其工作流程。

至此,最基本的“使用IntelliJ IDEA进行spark应用的开发”介绍完毕。

总结

作为Spark入门指南的第二篇,本文介绍了如何使用IntelliJ IDEA进行spark应用的开发。
下一篇笔者将讲解Spark集群化部署以及使用Hibench进行SparkBenchmark的测试。

周鶏🐣(Kimiko) wechat
拿起手机扫一扫,欢迎关注我的个人微信公众号:「洛斯里克的大书库」。
坚持原创技术分享,您的支持将鼓励我继续创作!