大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全家桶1年46,售后保障稳定
一、实现流程
1、注意
Kafka中的数据都以<key, value>的形式存在。
2、wordCount流程
(1)Stream 从topic中取出每一条数据记录 (<key, value>格式): <null, “Spark and spark”>
(2)MapValue 将value中所有文本转换成小写形式:<null, “spark and spark”>
(3)FlatMapValues 按空格分解成单词 :<null, “spark”>,<null, “and”>, <null, “spark”>
(4)SelectKey 将value的值赋给key :<“spark”, “spark”>,<“and”, “and”>, <“spark”, “spark”>
(5)GroupByKey 按相同的Key分组 :(<“spark”, “spark”>, <“spark, “spark”>),(<“and”, “and”>)
(6)Count 计算每个组中元素个数 :<“spark”, 2>,<“and”, 1>
(7)To 将结果返回Kafka
二、代码实现
1、pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2</version>
</dependency>
2、kafkaStreams主程序
package com.cn.kafkaStreams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Arrays;
import java.util.Properties;
public class KafkaStreamsMain {
public static void main(String[] args) {
//首先进行配置
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
//构建KStream
KStream<String, String> textLines = builder.stream("test_wordCount");
//得到结果后将其存储为KTable
KTable<String, Long> wordCounts =
//将数据记录中的大写全部替换成小写:
textLines.mapValues(values -> values.toLowerCase())
//将各行数据按空格拆分
/**
* 由于flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> var1)
* key: ? super V
* value(属于集合): ? extends Iterable<? extends VR>
* 故将数组转化为集合方式:Arrays.asList()
*/
.flatMapValues(values -> Arrays.asList(values.split(" ")))
//将value作为新的key
.selectKey((key, word) -> word)
//aggregation操作前group by key:
.groupByKey()
//计算每个组中的元素个数
.count(Materialized.as("Counts"));
//将KStream写回Kafka,key为String,value为Long。
wordCounts.toStream().to("test_out", Produced.with(Serdes.String(), Serdes.Long()));
Topology topology = builder.build();
//System.out.println(topology.describe());
KafkaStreams kafkaStreams = new KafkaStreams(topology, config);
kafkaStreams.start();
}
}
3、向kafka造数据
package com.cn.kafkaStreams;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class kafkaProducer {
static String arr[]={"Spark is spark","hbase can save bigdata","hive can select data"};
static int p= -1;
public static String getWord(){
p=(p+1)%arr.length;
return arr[p];
}
public static void main(String[] args) {
String topic = "test_wordCount";
String brokers = "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667";
//设置属性,配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", brokers);
props.setProperty("metadata.broker.list", brokers);
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//生成producer对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
//传输数据
while (true) {
String event = getWord();
System.out.println(event);
//发送数据
producer.send(new ProducerRecord<String, String>(topic, event));
try{
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
4、消费回写kafka的结果
package com.cn.kafkaStreams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class kafkaConsumerMain {
public static void main(String[] args) {
// Kafka consumer configuration settings
String topicName = "test_out";
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.230.21:6667,192.168.230.22:6667,192.168.230.23:6667");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
// Kafka Consumer subscribes list of topics here.
kafkaConsumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(5);
for (ConsumerRecord<String, String> record : records) {
// print the offset,key and value for the consumer records.
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
}
}
}
三、控制台输出
1、kafkaProducer
...
Spark is spark
hbase can save bigdata
hive can select data
Spark is spark
hbase can save bigdata
hive can select data
...
2、kafkaConsumerMain
...
offset = 32, key = spark, value = 45
offset = 33, key = hbase, value = 40
offset = 34, key = save, value = 82
offset = 35, key = bigdata, value = 40
offset = 36, key = hive, value = 37
offset = 37, key = can, value = 163
offset = 38, key = select, value = 65
offset = 39, key = data, value = 123
offset = 40, key = is, value = 48
offset = 41, key = spark, value = 55
offset = 42, key = hbase, value = 45
offset = 43, key = save, value = 87
offset = 44, key = bigdata, value = 45
offset = 45, key = hive, value = 42
offset = 46, key = can, value = 173
offset = 47, key = select, value = 70
offset = 48, key = data, value = 128
...
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/227640.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...