Kafka2.0服务端启动源码
2019-08-16 10:36:28来源:博客园 阅读 ()
Kafka2.0服务端启动源码
??Kafka 服务端通过Kafka.scala
的主函数main
方法启动。KafkaServerStartable
类提供读取配置文件、启动/停止服务的方法。而启动/停止服务最终调用的是KafkaServer
的startup/shutdown
方法。
启动流程
- 启动 zk 客户端。
- 启动动态配置。
- 启动调度线程池。
- 启动日志管理器的后台线程,包括日志清理、日志刷盘、日志删除、日志压缩。
- 启动 NIO Socket 服务。
- 初始化一个接收器
Acceptor
,即启动 NIO Socket。 - 添加
num.network.threads
个接收器到请求通道RequestChannel
的处理器缓存ConcurrentHashMap
,key 为递增编号,value 为处理器Processor
。 Acceptor
执行CountDownLatch.await
等待通知启动。- 缓存
Acceptor
到ConcurrentHashMap
,key 为EndPoint
,value 为Acceptor
。
- 初始化一个接收器
- 启动副本管理器。
- 在 zk 注册 broker。
- 启动控制器。
- 启动组协调器。
- 启动事务协调器。
- 初始化
KafkaApis
。 - 初始化处理器线程缓存池。
- 启动
num.io.threads
个请求处理器线程KafkaRequestHandler
。 - 从阻塞队列
ArrayBlockingQueue
获取请求,调用KafkaApis.handle
方法,进行集中处理请求。
- 启动
- 启动处理器线程。
- 首先
CountDownLatch.countDown
通知唤醒Acceptor
线程。- 使用
NIO.select
轮询。 - 如果有可接收就绪的事件,则将当前的
SocketChannel
加入缓存队列ConcurrentLinkedQueue
- 使用
- 从上述缓存队列取出
SocketChannel
,绑定到KafkaChannel
。 - 将接收到的请求缓存到限长阻塞队列
ArrayBlockingQueue
- 首先
请求处理流程
详细源码分析
Acceptor 线程
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件
startupComplete() // 通知 Acceptor 线程
var currentProcessor = 0
while (isRunning) {
val ready = nioSelector.select(500) // 轮询事件
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
val key = iter.next
iter.remove()
if (key.isAcceptable) { // 有可接受事件
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor) // 缓存 Processor
}
accept(key, processor) // 将 SocketChannel 缓存到队列
}
}
}
}
}
Processor 线程
override def run() {
startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。
while (isRunning) {
configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel
processNewResponses() // 处理返回客户端的响应
poll() // Kafka.Selector 轮询读取/写入事件
processCompletedReceives() // 处理客户端的请求,放到阻塞队列
processCompletedSends() // 处理返回客户端响应后的回调
processDisconnected() // 断开连接后的处理
}
}
KafkaRequestHandler 线程阻塞队列
def run() {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 从阻塞队列拉取请求
val req = requestChannel.receiveRequest(300)
req match {
case request: RequestChannel.Request =>
try {
apis.handle(request) // 调用`KafkaApis.handle`方法,进行集中处理请求。
}
}
}
}
KSelector
??参考客户端源码分析。
原文链接:https://www.cnblogs.com/bigshark/p/11204428.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- 聊聊微服务架构及分布式事务解决方案! 2020-06-10
- java环境教程:Tomcat下载,安装,设置为Windows服务,启动 2020-06-09
- SpringBoot通过web页面动态控制定时任务的启动、停止、创建 2020-06-09
- Spring Cloud微服务(一):公共模块的搭建 2020-06-07
- 如何在Spring Boot应用启动之后立刻执行一段逻辑?本文详解 2020-06-05
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash