@rickyChen
2017-03-03T09:23:40.000000Z
字数 2650
阅读 2929
Java
JavaDStream<String> lines = messages.map(s -> s.substring(0, 5))
// Function[T1, R]JavaDStream<String> lines = messages.map(new Function<String, String>() {// call(v1: T1): Rpublic String call(String s) {return s.substring(0, 5);}});
class GetLength implements Function<String, int> {public Inter call(String s) { return s.length(); }}JavaDStream<String> lineLengths = lines.map(new GetLength())
int totalLength = lineLengths.reduce((a, b) -> a + b)
// Function2[T1, T2, R]int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {public Integer call(Integer a, Integer b) { return a + b; }});
class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) { return a + b; }}int totalLength = lineLengths.reduce(new Sum());
JavaPairDStream<String, Integer> wordCounts = wordCount.reduceByKey((a, b) -> a + b);
// Function2[T1, T2, R]JavaPairDStream<String, Integer> wordCounts = wordCount.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});
JavaDStream<String> words = lines.flatMap(x -> Lists.newArrayList(x.split(" ")));
// FlatMapFunction[T, R]JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {// call(t: T): Iterator[R]public Iterable<String> call(String x) {return Lists.newArrayList(SPACE.split(x));}});
JavaDStream<Tuple2<String, String>> mapLines = lines.mapPartitions(parts -> {List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();while(parts.hasNext()){String msg = parts.next();String ip = msg.split(" ")[0];String domain = msg.split(" ")[1];list.add(new Tuple2<String, String>(ip, domain));};return list;});
// FlatMapFunction[T, R]JavaDStream<Tuple2<String, String>> mapLines = lines.mapPartitions(new FlatMapFunction<Iterator<String>, Tuple2<String, String>>() {List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();// call(t: T): Iterator[R]public Iterable<Tuple2<String, String>> call(Iterator<String> s){while(s.hasNext()){String msg = s.next();String ip = msg.split(" ")[0];String domain = msg.split(" ")[1];list.add(new Tuple2<String, String>(ip, domain));}return list;}});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2(s, 1));
// PairFunction[T, K, V]JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {// call(t: T): (K, V)public Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}});