Kafka2.0生产者客户端源码分析 - Sender线程
2019-08-16 09:45:21来源:博客园 阅读 ()
Kafka2.0生产者客户端源码分析 - Sender线程
??Kafka 在初始化生产者客户端时,创建并启动 Sender 线程。通过 Sender 线程来发送消息、处理消息的响应。通过“volatile boolean running”状态控制 Sender 线程不断轮询,调用 NetworkClient 的 poll 方法。NetworkClient 是 Kafka 实现的用来和 broker 通信的类,实现了 KafkaClient 接口,底层实际上就是利用 JDK NIO 来实现的,而 Kafka 把 NIO 又封装成 Selector。
??Sender 的执行过程可以粗略地分为:发送准备、开始发送。
void run(long now) {
long pollTimeout = sendProducerData(now); // 发送准备
client.poll(pollTimeout, now); // 开始发送
}
发送准备
- 取出记录累加器中的记录,转换成节点->消息队列的映射 Map<Integer, List> batches
- 使用上述 batches 构造可以发送的请求,缓存到 InFlightRequests
- 获取 KafkaChannel,添加消息 NetworkSend,并注册写事件 OP_WRITE
private long sendProducerData(long now) {
// 把分区->消息队列的映射关系转换成节点->消息队列的映射关系
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// 准备发送消息
sendProduceRequests(batches, now);
return pollTimeout;
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) { // 请求完成后的回调
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
// 构造请求对象
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback);
client.send(clientRequest, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
// 构造 Send 的实现类 NetworkSend
Send send = request.toSend(destination, header);
InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
// 加入 InFlightRequests
this.inFlightRequests.add(inFlightRequest);
// 将 NetworkSend 绑定到 KafkaChannel,并注册写操作
selector.send(send);
}
public void send(Send send) {
String connectionId = send.destination();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId); // 获取 KafkaChannel 通道
channel.setSend(send);
}
public void setSend(Send send) {
this.send = send; // 绑定到当前 KafkaChannel
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); // 注册写操作
}
开始发送
- 调用 NIO.Selector.select() 方法阻塞轮询,当有事件时,返回准备就绪的 key 数量
- 根据事件类型(可读/可写)处理通道内的记录
- 把不同事件处理后的响应加入集合,回调准备阶段实现的请求完成处理器来处理响应
- 把处理完的响应再次回调 Trunk.onCompletion(),即发送消息时定义的异步回调
// 真正开始发送
public List<ClientResponse> poll(long timeout, long now) {
long metadataTimeout = metadataUpdater.maybeUpdate(now);
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 调用 kafka.Selector.poll()
// 处理响应
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
...
completeResponses(responses); // 回调处理响应
return responses;
}
// kafka.Selector
public void poll(long timeout) throws IOException {
// 执行 NIO.Selector 当有通道准备就绪时,返回 key 的数量
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
// Poll from channels where the underlying socket has more data
pollSelectionKeys(readyKeys, false, endSelect);
}
// 把已经接收完成的加入 completedReceives 集合
addToCompletedReceives();
}
// 处理 SelectionKey 准备就绪的 IO
void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
try {
// 判断通道是否可读
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null) { // 保证接收到了完整消息
madeReadProgressLastPoll = true;
addToStagedReceives(channel, networkReceive);
}
}
// 判断通道是否可写
if (channel.ready() && key.isWritable()) {
Send send = channel.write(); // 写到 SocketChannel
}
}
}
}
整体流程
原文链接:https://www.cnblogs.com/bigshark/p/11184070.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- Spring Cloud Ribbon 客户端负载均衡 2020-06-04
- 多线程:生产者消费者(管程法、信号灯法) 2020-06-01
- 多线程问题整理 2020-05-06
- java socket实现服务端,客户端简单网络通信。Chat 2020-05-03
- 基于netty实现rpc框架-spring boot客户端 2020-04-23
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash