spring boot 使用kafka
2019-08-16 09:26:31来源:博客园 阅读 ()
spring boot 使用kafka
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring-kafka version会使用spring boot 对应版本
启用spring-kafka
在spring boot 配置类上添加
@EnableKafka
配置kafka
在application.properties中添加配置
生产者
#Kafka producer
spring.kafka.producer.bootstrap-servers=168.61.2.47:9092,168.61.2.48:9092,168.61.2.49:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
消费者者
#Kafka consumer
spring.kafka.consumer.group-id=thfx00
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.bootstrap-servers=168.61.2.47:9092,168.61.2.48:9092,168.61.2.49:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
项目中使用
package com.htsc.thfx.kafka;
import com.alibaba.fastjson.JSONObject;
import com.google.protobuf.InvalidProtocolBufferException;
import com.htsc.mdc.model.MDSecurityRecordProtos;
import com.htsc.mdc.model.MDStockRecordProtos;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class kafkaTest {
// 泛型与生产者key,value配置对应
@Resource
KafkaTemplate<String,byte[]> template;
@Scheduled(fixedRate = 1000*10)
public void send() {
template.send("thfx-test00", "2", "你好00".getBytes());
template.send("thfx-test01", "2","你好01".getBytes());
}
// 泛型与消费者key,value配置对应
@KafkaListener(topics = {"thfx-test00","thfx-test01"})
public void consumerRecord00(ConsumerRecord<String, byte[]> record) {
String s = new String(record.value());
System.out.println(s);
}
@KafkaListener(topics = {"PT-MDC-XSHG-INDEXTYPE"})
public void consumerRecord01(ConsumerRecord<String, byte[]> record) throws InvalidProtocolBufferException {
MDSecurityRecordProtos.MDSecurityRecord mdSecurityRecord = MDSecurityRecordProtos.MDSecurityRecord.parseFrom(record.value());
String s = mdSecurityRecord.toString();
System.out.println("PT-MDC-XSHG-INDEXTYPE : " +s);
}
}
原文链接:https://www.cnblogs.com/ronniery/p/11103457.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
上一篇:Mybatis架构与原理
下一篇:Tomcat服务器
- Spring系列.ApplicationContext接口 2020-06-11
- springboot2配置JavaMelody与springMVC配置JavaMelody 2020-06-11
- 给你一份超详细 Spring Boot 知识清单 2020-06-11
- SpringBoot 2.3 整合最新版 ShardingJdbc + Druid + MyBatis 2020-06-11
- 掌握SpringBoot-2.3的容器探针:实战篇 2020-06-11
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash