Kafka2.0生产者客户端使用
2019-08-16 09:39:25来源:博客园 阅读 ()
Kafka2.0生产者客户端使用
1 初始化配置
??Kafka 通过 KafkaProducer 构造器初始化生产者客户端的配置。
??常用的重要配置,详见官网。
- bootstrap.servers:Kafka 集群地址(host1:post,host2:post),Kafka 客户端初始化时会自动发现地址,所以可以不填写所有地址。
- key.serializer:实现了 Kafka 序列化接口的类,用来序列化 key。
- value.serializer:实现了 Kafka 序列化接口的类,用来序列化 value。
- acks:leader 接收到的 follower 确认的数量需要满足 acks 的配置。
?0:生产者把消息发送出去就认为发送完成了。
?1:leader 接收到消息后,不用等 follower 的确认,就表示发送完成了。
?all/-1:leader 接收到消息后,需要所有在 ISR 集合的 follower 确认后,才表示完成了。 - retries:消息发送失败后的重试次数。如果允许重试,而 max.in.flight.requests.per.connection>1,则可能导致消息乱序,因为如果把两批消息发送到同一个分区,第一批失败并重试,而第二批成功了,则第二批消息可能先生成了。
- retry.backoff.ms:消息重试发送的间隔。
- client.id:标识客户端的 id。
- compression.type:压缩类型。可选:none、gzip、snappy、lz4。
- buffer.memory:记录累加器可以使用的最大内存缓冲池大小。
- batch.size:内存缓冲池的缓冲列表大小。当 batch 的大小超过 batch.size 或者时间达到 linger.ms 就会发送 batch。
- transactional.id:事务 ID。
// 基础配置
Map<String, Object> configs = new HashMap<>();
// Kafka broker 集群
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key 序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value 序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
2 构造消息
??Kafka 提供了6种构造器来构造消息。
- topic:消息主题,必填;
- partition:分区号,非必填。如果为空,会计算 key 的 hash 值,再和该主题的分区总数取余得到分区号;如果 key 也为空,客户端会生成递增的随机整数,再和该主题的分区总数区域得到分区号。
- timestamp:时间戳,非必填。如果为空,默认为 KafkaProducer 构造器初始化的时间。
- key:消息 key,非必填。关系到分区分配,broker 会对带 key 的消息进行日志压缩。
- value:消息内容,必填。
- headers:消息头,非必填。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);
3 发送消息
??支持同步发送和异步发送消息。
??同步发送
producer.send(record).get();
??异步发送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 回调处理流程
}
});
原文链接:https://www.cnblogs.com/bigshark/p/11182403.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- Spring Cloud Ribbon 客户端负载均衡 2020-06-04
- 多线程:生产者消费者(管程法、信号灯法) 2020-06-01
- 多线程问题整理 2020-05-06
- java socket实现服务端,客户端简单网络通信。Chat 2020-05-03
- 基于netty实现rpc框架-spring boot客户端 2020-04-23
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash