[关闭]
@EVA001 2017-11-03T09:17:20.000000Z 字数 3455 阅读 278

2017-3-4 IDEA中运行Topology

有道云笔记


基本调试过程

现在IDEA编译成功topo后,使用WinSCP将打包好的包传到主节点

注意:打包之前--要讲 [ 本地模式 ] 改为 [ 集群模式 ]

  1. //本地测试模式
  2. LocalCluster cluster = new LocalCluster();
  3. cluster.submitTopology("firstTopo", conf, builder.createTopology());
  4. Utils.sleep(100000);
  5. cluster.killTopology("firstTopo");
  6. cluster.shutdown();
  7. //集群提交模式
  8. StormSubmitter.submitTopology("firstTopo", conf, builder.createTopology());

storm.yaml文件中的seeds选项不能与host同时存在
如果同时存在,则storm运行时会出错:提交任务找不到主类等等

修改storm配置中的UI端口,只能!在nimbus中修改
在supervisor中修改会导致supervisor启动不了,会报一下错误

  1. Caused by: while scanning a simple key
  2. in 'reader', line 32, column 2:
  3. ui.port = 8000
  4. ^
  5. could not found expected ':'
  6. in 'reader', line 33, column 1:
  7. #

package是把jar打到本项目的target下,
install时把target下的jar安装到本地仓库,供其他项目使用
此处使用
先clean再使用pakage打包

  1. storm jar simple-1.0.jar Random.FirstTopo
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <project xmlns="http://maven.apache.org/POM/4.0.0"
  4. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  5. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  6. <modelVersion>4.0.0</modelVersion>
  7. <groupId>com.test</groupId>
  8. <artifactId>simple</artifactId>
  9. <version>1.0</version>
  10. <packaging>jar</packaging>
  11. <name>simple</name>
  12. <url>http://maven.apache.org</url>
  13. <properties>
  14. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>junit</groupId>
  19. <artifactId>junit</artifactId>
  20. <version>3.8.1</version>
  21. <scope>test</scope>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.apache.storm</groupId>
  25. <artifactId>storm-core</artifactId>
  26. <version>1.0.1</version>
  27. <scope>compile</scope>
  28. //引入依赖的方式默认[没有scope]为compile,意为最后打包无需包含此依赖,
  29. //provided必须显式写出scope[打包时会包含依赖]
  30. </dependency>
  31. </dependencies>
  32. <build>
  33. <plugins>
  34. //可删去
  35. <plugin>
  36. <artifactId>maven-compiler-plugin</artifactId>
  37. <version>2.3.2</version>
  38. <configuration>
  39. <source>1.8</source>
  40. //实测此处的1.8没有什么用处,改成1.6也行,但是最好改成与系统一致的jdk版本
  41. <target>1.8</target>
  42. <encoding>UTF-8</encoding>
  43. </configuration>
  44. </plugin>
  45. <plugin>
  46. <plugin>
  47. <artifactId>maven-assembly-plugin</artifactId>
  48. <version>2.4</version>
  49. <configuration>
  50. <descriptorRefs>
  51. <descriptorRef>jar-with-dependencies</descriptorRef>
  52. </descriptorRefs>
  53. </configuration>
  54. <executions>
  55. <execution>
  56. <id>make-assembly</id>
  57. <phase>package</phase>
  58. <goals>
  59. <goal>single</goal>
  60. </goals>
  61. </execution>
  62. </executions>
  63. </plugin>
  64. </plugins>
  65. </build>
  66. </project>

Random包下有三个文件,则再提交时,main入口class应为 Random.FirstTopo
提交命令中的jar应为上图中的simple-1.0.jar
simple-1.0.jar 5 KB
simple-1.0-jar-with-dependencies.jar 24735 KB
src/main/java/Random : src,main,java都不算路径,Random才对应eclipse中的package

运行组合用例

  1. Object : kafka-storm-demo
  2. Assign
  3. [IDEA] 打包的时候要改为 集群 模式
  4. [IDEA] 修改topic的名称
  5. [IDEA] 验证成功与否需要在console中查看System.out的输出
  6. [Attention!] 此实例可以放在集群中提交,但是在集群中无法验证是否执行成功
  7. 因为代码中只有 [ 系统输出 ] 在集群中提交并不会将输出写入日志,
  8. 也就是说,查看日志等方法无法知道是否执行成功,唯一的方式是
  9. IDEA的调试窗口看输出。
  1. [Prepare] IDEA运行程序
  2. [Prepare] CRT_1开启flume
  3. [Prepare] CRT_2开启kafka-consumer
  4. [Prepare] CRT_3开启shell脚本循环写log文件(模拟生产)
  1. [Process-1] CRT_3循环写log,文本 => logg.log [会在界面输出]
  2. [Process-2] CRT_1监控logg文件,文本 => logg.log => flume [输出同上一致]
  3. [Process-3] flume充当producer,文本 => logg.log => flume => kafkaproducer
  4. [Process-4] storm获取产生数据并处理,文本 => logg.log => flume => kafkaproducer => storm
  5. [Process-5] 数据被订阅方consumer接收,文本 => logg.log => flume => kafkaproducer => storm => kafkaconsumer
  1. flume+kafka+storm
  2. [do] topictest 上述各方的topic要一致,不一致要在代码中改过来
  3. [do] cd cmd => sh logg.sh
  4. 打开脚本生产数据[1]
  5. [do]应有的终端窗口 flume | consumer | shell-log | idea-console
  6. [do] bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console
  7. 开启flume[2]
  8. [do] kafka-console-producer.sh --broker-list hadoop01:9092 --topic test 开启消费者[3]
  9. [do] storm jar simple-1.0.jar Skafka.KafkaTopologytest 提交任务[4]
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注