[关闭]
@rickyChen 2016-04-29T06:49:47.000000Z 字数 3816 阅读 4761

如何构建第一个Spark项目代码

Spark

环境准备

本地环境

  1. 操作系统
    Window7/Mac
  2. IDE
    IntelliJ IDEA Community Edition 14.1.6
    下载地址
  3. JDK 1.8.0_65
    下载地址
  4. Scala 2.11.7
    下载地址

其它环境

  1. Spark:1.4.1
    下载地址
  2. Hadoop Yarn:Hadoop 2.5.0-cdh5.3.2

IDE项目创建

新建一个项目

  1. New Project
    如图
  2. 使用Maven模型创建一个Scala项目
    如图
  3. 填写自己的GroupId、ArtifactId,Version不需要修改,Maven会根据GroupId生成相应的目录结构,GroupId的取值一般为a.b.c 结构,ArtifactId为项目名称。之后点击next,填写完项目名称和目录,点击finish就可以让maven帮你创建Scala项目
    这里写图片描述
    项目创建完成后,目录结构如下
    这里写图片描述
    4.为项目添加JDK以及Scala SDK
    点击File->Project Structure,在SDKS和Global Libraries中为项目配置环境。
    这里写图片描述
    至此整个项目结构、项目环境都搭建好了

编写主函数

主函数的编写在 projectName/src/main/scala/.../下完成,如果按照上述步骤完成代码搭建,将在目录最后发现

  1. MyRouteBuild
  2. MyRouteMain

这两个文件为模块文件,删除MyRouteBuild,重命名MyRouteMainDirectKafkaWordCount。这里,我使用Spark Streaming官方提供的一个代码为实例代码,代码如下

  1. package org.apache.spark.examples.streaming
  2. import kafka.serializer.StringDecoder
  3. import org.apache.spark.streaming._
  4. import org.apache.spark.streaming.kafka._
  5. import org.apache.spark.SparkConf
  6. object DirectKafkaWordCount {
  7. def main(args: Array[String]) {
  8. if (args.length < 2) {
  9. System.err.println("...")
  10. System.exit(1)
  11. }
  12. //StreamingExamples.setStreamingLogLevels()
  13. val Array(brokers, topics) = args
  14. val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
  15. val ssc = new StreamingContext(sparkConf, Seconds(2))
  16. // Create direct kafka stream with brokers and topics
  17. val topicsSet = topics.split(",").toSet
  18. val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  19. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  20. ssc, kafkaParams, topicsSet)
  21. // Get the lines, split them into words, count the words and print
  22. val lines = messages.map(_._2)
  23. val words = lines.flatMap(_.split(" "))
  24. val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
  25. wordCounts.print()
  26. // Start the computation
  27. ssc.start()
  28. ssc.awaitTermination()
  29. }
  30. }

将代码最上面的package org.apache.spark.examples.streaming,替换为DirectKafkaWordCount里的package部分即可。并覆盖DirectKafkaWordCount文件。
至此Spark处理代码已经编写完成。

修改pom.xml,为项目打包做准备

pom.xml中编写了整个项目的依赖关系,这个项目中我们需要导入一些Spark Streaming相关的包。

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.10</artifactId>
  4. <version>1.4.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-streaming-kafka_2.10</artifactId>
  9. <version>1.4.1</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-streaming_2.10</artifactId>
  14. <version>1.4.1</version>
  15. </dependency>
  16. <!-- scala -->
  17. <dependency>
  18. <groupId>org.scala-lang</groupId>
  19. <artifactId>scala-library</artifactId>
  20. <version>2.10.4</version>
  21. </dependency>

除此之外,如果需要把相关依赖打包到最终JAR包中,需要在pom.xmlbulid标签中写入以下配置:

  1. <plugins>
  2. <!-- Plugin to create a single jar that includes all dependencies -->
  3. <plugin>
  4. <artifactId>maven-assembly-plugin</artifactId>
  5. <version>2.4</version>
  6. <configuration>
  7. <descriptorRefs>
  8. <descriptorRef>jar-with-dependencies</descriptorRef>
  9. </descriptorRefs>
  10. </configuration>
  11. <executions>
  12. <execution>
  13. <id>make-assembly</id>
  14. <phase>package</phase>
  15. <goals>
  16. <goal>single</goal>
  17. </goals>
  18. </execution>
  19. </executions>
  20. </plugin>
  21. <plugin>
  22. <groupId>org.apache.maven.plugins</groupId>
  23. <artifactId>maven-compiler-plugin</artifactId>
  24. <version>2.0.2</version>
  25. <configuration>
  26. <source>1.7</source>
  27. <target>1.7</target>
  28. </configuration>
  29. </plugin>
  30. <plugin>
  31. <groupId>net.alchim31.maven</groupId>
  32. <artifactId>scala-maven-plugin</artifactId>
  33. <executions>
  34. <execution>
  35. <id>scala-compile-first</id>
  36. <phase>process-resources</phase>
  37. <goals>
  38. <goal>add-source</goal>
  39. <goal>compile</goal>
  40. </goals>
  41. </execution>
  42. <execution>
  43. <id>scala-test-compile</id>
  44. <phase>process-test-resources</phase>
  45. <goals>
  46. <goal>testCompile</goal>
  47. </goals>
  48. </execution>
  49. </executions>
  50. </plugin>
  51. </plugins>

pom.xml文件修改完成后,即可开始maven打包,操作如图:
这里写图片描述
点击右侧弹出窗口的Execute Maven Goal,在command line中输入clean package
这里写图片描述

Spark作业提交

在项目projectname/target目录下即可找到两个jar包,其中一个仅包含Scala代码,另一个包含所有依赖的包。
jar包导到Spark服务器,运行Spark作业,运行操作如下

../bin/spark-submit --master yarn-client --jars ../lib/kafka_2.10-0.8.2.1.jar --class huochen.spark.example.DirectKafkaWordCount sparkExample-1.0-SNAPSHOT-jar-with-dependencies.jar kafka-broker topic

利用spark-submit把任务提交到Yarn集群,即可看到运行结果。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注