Kafka2.0消费者协调器源码

2019-08-16 10:31:05来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

Kafka2.0消费者协调器源码

消费组和消费者

  1. 消费组和消费者是一对多的关系。
  2. 同一个消费组的消费者可以消费多个分区,且是独占的。
  3. 消费者的分区分配策略由接口PartitionAssignor定义,内置三种分配策略RangeAssignorRoundRobinAssignorStickyAssignor,支持自定义策略。
  4. 不同消费组可以消费相同的分区,互不干扰。

消费者协调器和组协调器

  1. 客户端的消费者协调器ConsumerCoordinator和服务端的组协调器GroupCoordinator通过心跳不断保持通信。
  2. 消费者进行消费之前,需要确保协调器是 ready 的。
    1. 选择具有最少请求的节点Node,即具有最少的InFlightRequests的节点。
    2. 向该节点发送获取协调器节点的请求,发送流程类似发送拉取请求。
    3. 向找到的协调器节点发送加入组请求,此时会禁止心跳线程。
    4. 加入组响应处理器JoinGroupResponseHandler对响应进行处理,响应包含generationIdmemberIdleaderIdprotocol
    5. 如果是 leader 消费者,即memberId=leaderId,则需要根据分配策略protocol计算分区分配。
    6. 将分区分配结果封装到同步组请求,再向协调器节点发送同步组请求。
    7. 同步组响应处理器SyncGroupResponseHandler对上述请求的响应进行处理。
    8. 如果第5步判断不是 follower 消费者,同样需要向协调器发送同步组请求,只是请求中不需要封装分区分配结果,而是从组协调器获取。
    9. 加入组成功后,启动心跳线程。
    10. 更新本地缓存的分区分配,此处会调用消费者再平衡监听器。

消费者状态

  • UNJOINED:消费者初始状态为UNJOINED,表示未加入消费组。
  • REBALANCING:消费者向协调器发送加入组请求之前,状态变更为REBALANCING,表示再平衡状态
  • STABLE:消费者监听到消息成功返回,状态变更为STABLE,表示稳定状态,如果是失败的消息,状态重置为UNJOINED

心跳线程

  1. 消费者加入消费组之后会启动心跳线程,并保持和组协调器的通信。
  2. 如果消费者状态不是STABLE,则不发送心跳。
  3. 如果组协调器未知,则等待一段时间重试。
  4. 如果心跳会话超时,则标记协调器节点未知。
  5. 如果心跳轮询超时,则发送离开组请求。
  6. 如果暂不需要发送心跳,则等待一段时间重试。
  7. 发送心跳,注册响应监听器,接收到响应后,设置接收时间,并进行下一轮的心跳。

偏移量

拉取偏移量

  1. 如果有指定的分区,消费者协调器从组协调器拉取一组分区和已提交偏移量的映射关系,缓存到SubscriptionState
  2. 设置偏移量重置策略:LATEST, EARLIEST,NONE
  3. 异步地更新消费的偏移量位置。

提交偏移量

  1. 消费者协调器获取当前的协调器节点。
  2. 向该节点发送提交偏移量请求,返回Future

加入组流程

加入组流程

消费者加入组流程的源码分析

boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
    final long startMs = time.milliseconds();
    if (!coordinator.poll(timeoutMs)) { // 获取协调器
        return false;
    }
    // 更新偏移量
    return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
}
// 获取协调器
public boolean poll(final long timeoutMs) {
    final long startTime = time.milliseconds();
    long currentTime = startTime;
    long elapsed = 0L;

    if (subscriptions.partitionsAutoAssigned()) { // 是自动分配主题类型
        // 更新心跳的上一次的轮询时间
        pollHeartbeat(currentTime);

        if (coordinatorUnknown()) { // 协调器未知
            // 确保协调器已经 ready
            if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                return false;
            }
        }

        if (rejoinNeededOrPending()) { // 需要加入消费组
            // 加入组、同步组
            if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                return false;
            }
            currentTime = time.milliseconds();
        }
    } else { // 指定分区类型
        if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// 如果没有准备就绪的节点
            // 阻塞等待元数据更新
            final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
            if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
                return false; // 更新元数据失败
            }

            currentTime = time.milliseconds();
        }
    }

    maybeAutoCommitOffsetsAsync(currentTime); // 异步自动提交偏移量
    return true;
}
// 确保协调器已经 ready
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
    final long startTimeMs = time.milliseconds();
    long elapsedTime = 0L;

    while (coordinatorUnknown()) { // 如果协调器未知
        final RequestFuture<Void> future = lookupCoordinator(); // 向当前请求队列最少的节点,发送获取协调器的请求
        client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
        if (!future.isDone()) {
            break; // 响应未完成,退出
        }
    }

    return !coordinatorUnknown();
}
// 加入组、同步组
boolean ensureActiveGroup(long timeoutMs, long startMs) {
    startHeartbeatThreadIfNeeded(); // 启动心跳线程
    return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
}
boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
    long elapsedTime = 0L;

    while (rejoinNeededOrPending()) {
        // 发送加入组请求
        final RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
        if (!future.isDone()) {
            // we ran out of time
            return false;
        }

        if (future.succeeded()) { // 加入成功,回调处理响应,更新缓存的分区分配
            ByteBuffer memberAssignment = future.value().duplicate();
            onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
        }
    }
    return true;
}
// 发送加入组请求
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    if (joinFuture == null) {
        disableHeartbeatThread(); // 暂停心跳线程

        state = MemberState.REBALANCING; // 状态改为 REBALANCING
        joinFuture = sendJoinGroupRequest(); // 向协调器发送加入组请求
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // 响应监听器
            @Override
            public void onSuccess(ByteBuffer value) { // 成功
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.STABLE; // 状态改为 STABLE
                    rejoinNeeded = false; // 不需要加入了

                    if (heartbeatThread != null)
                        heartbeatThread.enable(); // 启动暂停了的心跳
                }
            }

            @Override
            public void onFailure(RuntimeException e) { // 失败
                synchronized (AbstractCoordinator.this) {
                    state = MemberState.UNJOINED; // 状态改为 UNJOINED
                }
            }
        });
    }
    return joinFuture;
}
// 向协调器发送加入组请求
RequestFuture<ByteBuffer> sendJoinGroupRequest() {

    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
            groupId,
            this.sessionTimeoutMs,
            this.generation.memberId,
            protocolType(),
            metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);

    int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
    return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
            .compose(new JoinGroupResponseHandler()); // 异步回调响应处理类
}
// 异步回调响应处理类
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    @Override
    public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
        Errors error = joinResponse.error();
        if (error == Errors.NONE) {
            synchronized (AbstractCoordinator.this) {
                if (state != MemberState.REBALANCING) { // 如果是 REBALANCING,状态异常
                    future.raise(new UnjoinedGroupException());
                } else {
                    AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
                    if (joinResponse.isLeader()) { // 当前消费组是 leader
                        onJoinLeader(joinResponse).chain(future);
                    } else { // 当消费者是 follower
                        onJoinFollower().chain(future);
                    }
                }
            }
        }
    }
}
// 发送 leader 消费者同步组请求
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // 根据响应的分配策略,给消费者分配分区
        Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());

        SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}
// 发送 follower 消费者同步组请求
private RequestFuture<ByteBuffer> onJoinFollower() {
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
                    Collections.<String, ByteBuffer>emptyMap()); // 发送不带分配信息的请求
    return sendSyncGroupRequest(requestBuilder);
}

原文链接:https://www.cnblogs.com/bigshark/p/11198481.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:基于Spring Cloud Netflix的TCC柔性事务和EDA事件驱动示例

下一篇:Spring+SpringMVC+MyBatis集成(SSM)