[关闭]
@EVA001 2017-10-29T12:14:44.000000Z 字数 5306 阅读 323

2017-3-15 远程IDEA调试Storm

有道云笔记


现在IDEA编译成功topo后,使用WinSCP将打包好的包传到主节点
注意:打包之前--要讲 [ 本地模式 ] 改为 [ 集群模式 ]

  1. //本地测试模式
  2. LocalCluster cluster = new LocalCluster();
  3. cluster.submitTopology("firstTopo", conf, builder.createTopology());
  4. Utils.sleep(100000);
  5. cluster.killTopology("firstTopo");
  6. cluster.shutdown();
  7. //集群提交模式
  8. StormSubmitter.submitTopology("firstTopo", conf, builder.createTopology());

storm.yaml文件中的seeds选项不能与host同时存在
如果同时存在,则storm运行时会出错:提交任务找不到主类等等


修改storm配置中的UI端口,只能!在nimbus中修改

  1. supervisor中修改会导致supervisor启动不了,会报一下错误
  2. Caused by: while scanning a simple key
  3. in 'reader', line 32, column 2:
  4. ui.port = 8000
  5. ^
  6. could not found expected ':'
  7. in 'reader', line 33, column 1:
  8. #

package是把jar打到本项目的target下,
install时把target下的jar安装到本地仓库,供其他项目使用
此处使用
先clean再使用pakage打包

  1. storm jar simple-1.0.jar Random.FirstTopo

Maven配置文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.test</groupId>
  7. <artifactId>simple</artifactId>
  8. <version>1.0</version>
  9. <packaging>jar</packaging>
  10. <name>simple</name>
  11. <url>http://maven.apache.org</url>
  12. <properties>
  13. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14. </properties>
  15. <dependencies>
  16. <dependency>
  17. <groupId>junit</groupId>
  18. <artifactId>junit</artifactId>
  19. <version>3.8.1</version>
  20. <scope>test</scope>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.storm</groupId>
  24. <artifactId>storm-core</artifactId>
  25. <version>1.0.1</version>
  26. <scope>compile</scope>
  27. //引入依赖的方式默认[没有scope]为compile,意为最后打包无需包含此依赖,
  28. //provided必须显式写出scope[打包时会包含依赖]
  29. </dependency>
  30. </dependencies>
  31. <build>
  32. <plugins>
  33. //可删去
  34. <plugin>
  35. <artifactId>maven-compiler-plugin</artifactId>
  36. <version>2.3.2</version>
  37. <configuration>
  38. <source>1.8</source>
  39. //实测此处的1.8没有什么用处,改成1.6也行,但是最好改成与系统一致的jdk版本
  40. <target>1.8</target>
  41. <encoding>UTF-8</encoding>
  42. </configuration>
  43. </plugin>
  44. <plugin>
  45. <plugin>
  46. <artifactId>maven-assembly-plugin</artifactId>
  47. <version>2.4</version>
  48. <configuration>
  49. <descriptorRefs>
  50. <descriptorRef>jar-with-dependencies</descriptorRef>
  51. </descriptorRefs>
  52. </configuration>
  53. <executions>
  54. <execution>
  55. <id>make-assembly</id>
  56. <phase>package</phase>
  57. <goals>
  58. <goal>single</goal>
  59. </goals>
  60. </execution>
  61. </executions>
  62. </plugin>
  63. </plugins>
  64. </build>
  65. </project>

截图.png-328.3kB

Random包下有三个文件,则再提交时,main入口class应为 Random.FirstTopo
提交命令中的jar应为上图中的simple-1.0.jar

  1. simple-1.0.jar 5 KB
  2. simple-1.0-jar-with-dependencies.jar 24735 KB
  3. src/main/java/Random : src,main,java都不算路径,Random才对应eclipse中的package

附SenqueceBolt:

  1. package Random; /**
  2. * Created by hadoop on 2017/3/1.
  3. */
  4. import org.apache.storm.topology.BasicOutputCollector;
  5. import org.apache.storm.topology.OutputFieldsDeclarer;
  6. import org.apache.storm.topology.base.BaseBasicBolt;
  7. import org.apache.storm.tuple.Tuple;
  8. public class SenqueceBolt extends BaseBasicBolt{
  9. /* (non-Javadoc)
  10. * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector)
  11. */
  12. public void execute(Tuple input, BasicOutputCollector collector) {
  13. // TODO Auto-generated method stub
  14. String word = (String) input.getValue(0);
  15. String out = "I'm " + word + "!";
  16. System.out.println("out=" + out);
  17. }
  18. /* (non-Javadoc)
  19. * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
  20. */
  21. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  22. // TODO Auto-generated method stub
  23. }
  24. }

附RandomSpout:

  1. package Random; /**
  2. * Created by hadoop on 2017/3/1.
  3. */
  4. import org.apache.storm.spout.SpoutOutputCollector;
  5. import org.apache.storm.task.TopologyContext;
  6. import org.apache.storm.topology.OutputFieldsDeclarer;
  7. import org.apache.storm.topology.base.BaseRichSpout;
  8. import org.apache.storm.tuple.Fields;
  9. import org.apache.storm.tuple.Values;
  10. import java.util.Map;
  11. import java.util.Random;
  12. public class RandomSpout extends BaseRichSpout{
  13. private SpoutOutputCollector collector;
  14. private static String[] words = {"happy","excited","angry"};
  15. /* (non-Javadoc)
  16. * @see backtype.storm.spout.ISpout#open(java.util.Map, backtype.storm.task.TopologyContext, backtype.storm.spout.SpoutOutputCollector)
  17. */
  18. public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
  19. // TODO Auto-generated method stub
  20. this.collector = arg2;
  21. }
  22. /* (non-Javadoc)
  23. * @see backtype.storm.spout.ISpout#nextTuple()
  24. */
  25. public void nextTuple() {
  26. // TODO Auto-generated method stub
  27. String word = words[new Random().nextInt(words.length)];
  28. collector.emit(new Values(word));
  29. }
  30. /* (non-Javadoc)
  31. * @see backtype.storm.topology.IComponent#declareOutputFields(backtype.storm.topology.OutputFieldsDeclarer)
  32. */
  33. public void declareOutputFields(OutputFieldsDeclarer arg0) {
  34. // TODO Auto-generated method stub
  35. arg0.declare(new Fields("randomstring"));
  36. }
  37. }

附FirstTopo:

  1. package Random; /**
  2. * Created by hadoop on 2017/3/1.
  3. */
  4. import org.apache.storm.Config;
  5. import org.apache.storm.LocalCluster;
  6. import org.apache.storm.StormSubmitter;
  7. import org.apache.storm.topology.TopologyBuilder;
  8. import org.apache.storm.utils.Utils;
  9. public class FirstTopo {
  10. public static void main(String[] args) throws Exception {
  11. TopologyBuilder builder = new TopologyBuilder();
  12. builder.setSpout("spout", new RandomSpout());
  13. builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
  14. Config conf = new Config();
  15. conf.setDebug(false);
  16. if (args != null && args.length > 0) {
  17. conf.setNumWorkers(3);
  18. StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
  19. } else {
  20. LocalCluster cluster = new LocalCluster();
  21. cluster.submitTopology("firstTopo", conf, builder.createTopology());
  22. Utils.sleep(100000);
  23. cluster.killTopology("firstTopo");
  24. cluster.shutdown();
  25. }
  26. }
  27. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注