package com.shark.spark;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.ConsumerStrategy;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
public class KafkaStreamingWordCount {
public static void main(String argv[]) {
//要在mesos上運行要把Master拿掉
// SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local");
SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(2000));
Collection<String> topics = Collections.singleton("test");
Map<String, Object> kafkaParameters = new HashMap<>();
kafkaParameters.put("metadata.broker.list", "hadoop5:9092,hadoop6:9092,hadoop7:9092");
kafkaParameters.put("bootstrap.servers", "hadoop5:9092,hadoop6:9092,hadoop7:9092");
kafkaParameters.put("group.id", "1");
kafkaParameters.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParameters.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParameters.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
ConsumerStrategy<String, String> consumerStrategy =
ConsumerStrategies.Subscribe(topics, kafkaParameters);
JavaInputDStream<ConsumerRecord<String, String>> lines = KafkaUtils.createDirectStream(
javaStreamingContext,
LocationStrategies.PreferConsistent(),
consumerStrategy);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<ConsumerRecord<String,String>, String>() {
@Override
public Iterator<String> call(ConsumerRecord<String, String> record) throws Exception {
return Arrays.asList(record.value().toString().split(" ")).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts =
words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) {
return new Tuple2<String, Integer>(word, 1);
}
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
javaStreamingContext.start();
try {
javaStreamingContext.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}