[关闭]
@rickyChen 2017-08-14T04:51:21.000000Z 字数 815 阅读 1302

WaterDrop问题记录-2017.08.14

WaterDrop


2017.08.14

  1. events遍历问题, 每个插件遍历一次events?

    1. // WaterDropMain.scala
    2. for (f <- filters) {
    3. val (processedEvents, isSuccess) = f.filter(events)
    4. events = processedEvents
    5. }
    6. // Split.scala
    7. for (i <- 0 until events.length) {
    8. for (v <- events(i).getField(srcField)) {
    9. }
    10. }
  2. 提供支持删除source_field的配置

    1. // WaterDropMain.scala
    2. val parts = v.asInstanceOf[String].split(conf.getString("delimiter")).map(_.trim)
    3. val kvs = (keys zip parts).toMap
    4. events(i).setField(conf.getString("target_field") kvs)
  3. beforeOutputafterOutput为什么放在inputs模块里

    1. // WaterDropMain.scala
    2. inputs.head.beforeOutput
    3. eventRDD.foreachPartition { partitionOfRecords =>
    4. outputs.head.process(partitionOfRecords)
    5. }
    6. inputs.head.afterOutput
  4. 多日志源支持

    1. // WaterDropMain.scala
    2. val dstream = inputs.head.getDstream.mapPartitions{ iter => ...
  5. Interval配置问题

    1. // WaterDropMain.scala
    2. val ssc = new StreamingContext(sparkConf, Seconds(15))
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注