热点新闻
Flink 源码之 KafkaSource
2023-07-08 05:54  浏览:1182  搜索引擎搜索“手机财发网”
温馨提示:信息一旦丢失不一定找得到,请务必收藏信息以备急用!本站所有信息均是注册会员发布如遇到侵权请联系文章中的联系方式或客服删除!
联系我时,请说明是在手机财发网看到的信息,谢谢。
展会发布 展会网站大全 报名观展合作 软文发布

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

前言

FLIP-27: Refactor Source Interface - Apache Flink - Apache Software Foundation提出了新的Source架构。该新架构的分析请参见Flink 源码之新 Source 架构。针对这个新架构,Flink社区新推出了新的Kafka connector - KafkaSource。老版本的实现FlinkKafkaConsumer目前被标记为Deprecated,不再推荐使用。本篇展开KafkaSource的源代码分析。

本篇包含4个部分的源代码分析:

  • KafkaSource创建
  • 数据读取
  • 分区发现
  • checkpoint

KafkaSource创建

如官网所示,编写Flink消费Kafka场景应用,我们可以按照如下方式创建KafkaSource:

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueonlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

env.fromSource生成了一个DataStreamSourceDataStreamSource对应了SourceTransformation,然后经过SourceTransformationTranslator翻译成StreamGraphSource节点,执行的时候对应的是SourceOperatorSourceOperator是新Source API对应的Operator。它直接和SourceReader交互。调用sourceReader.pollNext方法拉取数据。这一连串逻辑与Kafka关系不大,不再展开介绍,了解即可。

最终,KafkaSourceBuilder按照我们配置的参数,返回符合要求的kafkaSource对象。

public KafkaSource<OUT> build() { sanityCheck(); parseAndSetRequiredProperties(); return new KafkaSource<>( subscriber, startingOffsetsInitializer, stoppingOffsetsInitializer, boundedness, deserializationSchema, props); }

KafkaSourcecreateReader方法生成KafkaSourceReader。代码如下:

@Internal @Override public SourceReader<OUT, KafkaPartitionSplit> createReader(SourceReaderContext readerContext) throws Exception { return createReader(readerContext, (ignore) -> {}); } @VisibleForTesting SourceReader<OUT, KafkaPartitionSplit> createReader( SourceReaderContext readerContext, Consumer<Collection<String>> splitFinishedHook) throws Exception { // elementQueue用来存放从fetcher获取到的ConsumerRecord // reader从elementQueue读取缓存的Kafka消息 FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> elementsQueue = new FutureCompletingBlockingQueue<>(); // 初始化deserializationSchema deserializationSchema.open( new DeserializationSchema.InitializationContext() { @Override public MetricGroup getMetricGroup() { return readerContext.metricGroup().addGroup("deserializer"); } @Override public UserCodeClassLoader getUserCodeClassLoader() { return readerContext.getUserCodeClassLoader(); } }); // 创建Kafka数据源监控 final KafkaSourceReaderMetrics kafkaSourceReaderMetrics = new KafkaSourceReaderMetrics(readerContext.metricGroup()); // 创建一个工厂方法,用于创建KafkaPartitionSplitReader。它按照分区读取Kafka消息 Supplier<KafkaPartitionSplitReader> splitReaderSupplier = () -> new KafkaPartitionSplitReader(props, readerContext, kafkaSourceReaderMetrics); KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema); return new KafkaSourceReader<>( elementsQueue, new KafkaSourceFetcherManager( elementsQueue, splitReaderSupplier::get, splitFinishedHook), recordEmitter, toConfiguration(props), readerContext, kafkaSourceReaderMetrics); }

数据读取流程

KafkaSourceFetcherManager继承了SingleThreadFetcherManager。当发现数据分片的时候,它会获取已有的SplitFetcher,将split指派给它。如果没有正在运行的fetcher,创建一个新的。

@Override // 发现新的分片的时候调用这个方法 // 将分片指派给fetcher public void addSplits(List<SplitT> splitsToAdd) { SplitFetcher<E, SplitT> fetcher = getRunningFetcher(); if (fetcher == null) { fetcher = createSplitFetcher(); // Add the splits to the fetchers. fetcher.addSplits(splitsToAdd); startFetcher(fetcher); } else { fetcher.addSplits(splitsToAdd); } }

然后我们分析fetcher如何拉取数据的。上面的startFetcher方法启动SplitFetcher线程。

protected void startFetcher(SplitFetcher<E, SplitT> fetcher) { executors.submit(fetcher); }

SplitFetcher用于执行从外部系统拉取数据的任务,它一直循环运行SplitFetchTaskSplitFetchTask有多个子类:

  • AddSplitTask: 为reader指派split的任务
  • PauseOrResumeSplitsTask: 暂停或恢复Split读取的任务
  • FetchTask: 拉取数据到elemeQueue中

接下来分析SplitFetcherrun方法:

@Override public void run() { LOG.info("Starting split fetcher {}", id); try { // 循环执行runOnce方法 while (runonce()) { // nothing to do, everything is inside #runOnce. } } catch (Throwable t) { errorHandler.accept(t); } finally { try { splitReader.close(); } catch (Exception e) { errorHandler.accept(e); } finally { LOG.info("Split fetcher {} exited.", id); // This executes after possible errorHandler.accept(t). If these operations bear // a happens-before relation, then we can checking side effect of // errorHandler.accept(t) // to know whether it happened after observing side effect of shutdownHook.run(). shutdownHook.run(); } } } boolean runonce() { // first blocking call = get next task. blocks only if there are no active splits and queued // tasks. SplitFetcherTask task; lock.lock(); try { if (closed) { return false; } // 重要逻辑在此 // 这里从taskQueue中获取一个任务 // 如果队列中有积压的任务,优先运行之 // 如果taskQueue为空,检查是否有已分配的split,如果有的话返回一个FetchTask // FetchTask在SplitFetcher构造e时候被创建出来 task = getNextTaskUnsafe(); if (task == null) { // (spurious) wakeup, so just repeat return true; } LOG.debug("Prepare to run {}", task); // store task for #wakeUp this.runningTask = task; } finally { lock.unlock(); } // execute the task outside of lock, so that it can be woken up boolean taskFinished; try { // 执行task的run方法 taskFinished = task.run(); } catch (Exception e) { throw new RuntimeException( String.format( "SplitFetcher thread %d received unexpected exception while polling the records", id), e); } // re-acquire lock as all post-processing steps, need it lock.lock(); try { this.runningTask = null; processTaskResultUnsafe(task, taskFinished); } finally { lock.unlock(); } return true; }

用来拉取数据的SplitFetchTask子类为FetchTask。它的run方法代码如下所示:

@Override public boolean run() throws IOException { try { // 在wakeup状态会跳过这一轮执行 if (!isWakenUp() && lastRecords == null) { // 调用splitReader从split拉取一批数据 lastRecords = splitReader.fetch(); } if (!isWakenUp()) { // The order matters here. We must first put the last records into the queue. // This ensures the handling of the fetched records is atomic to wakeup. // 将读取到的数据放入到elementQueue中 if (elementsQueue.put(fetcherIndex, lastRecords)) { //如果有已经读取完的split if (!lastRecords.finishedSplits().isEmpty()) { // The callback does not throw InterruptedException. // 调用读取完成回调函数 splitFinishedCallback.accept(lastRecords.finishedSplits()); } lastRecords = null; } } } catch (InterruptedException e) { // this should only happen on shutdown throw new IOException("Source fetch execution was interrupted", e); } finally { // clean up the potential wakeup effect. It is possible that the fetcher is waken up // after the clean up. In that case, either the wakeup flag will be set or the // running thread will be interrupted. The next invocation of run() will see that and // just skip. if (isWakenUp()) { wakeup = false; } } // The return value of fetch task does not matter. return true; }

上面代码片段中splitReader.fetch()对应的是KafkaPartitionSplitReaderfetch方法。

@Override public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException { ConsumerRecords<byte[], byte[]> consumerRecords; try { // 调用KafkaConsumer拉取一批消息,超时时间为10s consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); } catch (WakeupException | IllegalStateException e) { // IllegalStateException will be thrown if the consumer is not assigned any partitions. // This happens if all assigned partitions are invalid or empty (starting offset >= // stopping offset). We just mark empty partitions as finished and return an empty // record container, and this consumer will be closed by SplitFetcherManager. // 如注释所说,如果consumer没有指定消费的partition,会抛出IllegalStateException // 所有已分配的partition无效或者是为空(起始offset >= 停止offset)的时候也会出现这种情况 // 返回空的KafkaPartitionSplitRecords,并且标记分区已完成 // 这个consumer稍后会被SplitFetcherManager关闭 KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords( ConsumerRecords.empty(), kafkaSourceReaderMetrics); markEmptySplitsAsFinished(recordsBySplits); return recordsBySplits; } // 将consumerRecords包装在KafkaPartitionSplitRecords中返回 // KafkaPartitionSplitRecords具有pattition和record两个iterator KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords, kafkaSourceReaderMetrics); List<TopicPartition> finishedPartitions = new ArrayList<>(); // 遍历consumerRecords中的partition for (TopicPartition tp : consumerRecords.partitions()) { // 获取分区停止offset long stoppingOffset = getStoppingOffset(tp); // 读取这个partition的所有数据 final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition = consumerRecords.records(tp); // 如果读取到数据 if (recordsFromPartition.size() > 0) { // 获取该分区最后一条读取到的数据 final ConsumerRecord<byte[], byte[]> lastRecord = recordsFromPartition.get(recordsFromPartition.size() - 1); // After processing a record with offset of "stoppingOffset - 1", the split reader // should not continue fetching because the record with stoppingOffset may not // exist. Keep polling will just block forever. // 如果最后一条消息的offset大于等于stoppingOffset // stopping使用consumer的endOffsets方法获取, // 设置recordsBySplits的结束offset // 然后标记这个split为已完成 if (lastRecord.offset() >= stoppingOffset - 1) { recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset); finishSplitAtRecord( tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits); } } // Track this partition's record lag if it never appears before // 添加kafka记录延迟监控 kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp); } // 将空的split标记为已完成的split markEmptySplitsAsFinished(recordsBySplits); // Unassign the partitions that has finished. // 不再记录已完成分区记录的延迟 // 取消分配这些分区 if (!finishedPartitions.isEmpty()) { finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric); unassignPartitions(finishedPartitions); } // Update numBytesIn // 更新已读取的字节数监控数值 kafkaSourceReaderMetrics.updateNumBytesInCounter(); return recordsBySplits; }

到这里为止,我们分析完了从KafkaConsumer读取消息到并放置到ElementQueue的逻辑。接下来是Flink内部将ElementQueue中的数据读取出来并发送到下游的过程。

SourceReaderbase将数据从elementQueue中读出然后交给recordEmitter

SourceReaderbasegetNextFetch方法内容如下:

@Nullable private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) { splitFetcherManager.checkErrors(); LOG.trace("Getting next source data batch from queue"); // 从elementQueue中拿出一批数据 final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll(); // 如果当前split没有读取到数据,并且没有下一个split,返回null if (recordsWithSplitId == null || !moveTonextSplit(recordsWithSplitId, output)) { // No element available, set to available later if needed. return null; } currentFetch = recordsWithSplitId; return recordsWithSplitId; }

getNextFetch这个方法在pollNext中调用。SourceOperator调用reader的pollNext方法,不断拉取数据发送交给recordEmitter发送给下游。

@Override public InputStatus pollNext(ReaderOutput<T> output) throws Exception { // make sure we have a fetch we are working on, or move to the next RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch; if (recordsWithSplitId == null) { recordsWithSplitId = getNextFetch(output); if (recordsWithSplitId == null) { return trace(finishedOrAvailableLater()); } } // we need to loop here, because we may have to go across splits while (true) { // Process one record. final E record = recordsWithSplitId.nextRecordFromSplit(); if (record != null) { // emit the record. numRecordsInCounter.inc(1); recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state); LOG.trace("Emitted record: {}", record); // We always emit MORE_AVAILABLE here, even though we do not strictly know whether // more is available. If nothing more is available, the next invocation will find // this out and return the correct status. // That means we emit the occasional 'false positive' for availability, but this // saves us doing checks for every record. Ultimately, this is cheaper. return trace(InputStatus.MORE_AVAILABLE); } else if (!moveTonextSplit(recordsWithSplitId, output)) { // The fetch is done and we just discovered that and have not emitted anything, yet. // We need to move to the next fetch. As a shortcut, we call pollNext() here again, // rather than emitting nothing and waiting for the caller to call us again. return pollNext(output); } } }

最后我们一路分析到KafkaRecordEmitteremitRecord方法。它把接收到的kafka消息逐条反序列化之后,发送给下游output。接着传递给下游算子。

@Override public void emitRecord( ConsumerRecord<byte[], byte[]> consumerRecord, SourceOutput<T> output, KafkaPartitionSplitState splitState) throws Exception { try { sourceOutputWrapper.setSourceOutput(output); sourceOutputWrapper.setTimestamp(consumerRecord.timestamp()); deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper); splitState.setCurrentOffset(consumerRecord.offset() + 1); } catch (Exception e) { throw new IOException("Failed to deserialize consumer record due to", e); } }

分区发现

Flink KafkaSource支持按照配置的规则(topic列表,topic正则表达式或者直接指定分区),以定时任务的形式周期扫描Kafka分区,从而实现Kafka分区动态发现。

KafkaSourceEnumeratorstart方法创建出KafkaAdminClient。然后根据partitionDiscoveryIntervalMs(分区自动发现间隔时间),确定是否周期调用分区发现逻辑。

@Override public void start() { // 创建Kafka admin client adminClient = getKafkaAdminClient(); // 如果配置了分区自动发现时间间隔 if (partitionDiscoveryIntervalMs > 0) { LOG.info( "Starting the KafkaSourceEnumerator for consumer group {} " + "with partition discovery interval of {} ms.", consumerGroupId, partitionDiscoveryIntervalMs); // 周期调用getSubscribedTopicPartitions和checkPartitionChanges两个方法 context.callAsync( this::getSubscribedTopicPartitions, this::checkPartitionChanges, 0, partitionDiscoveryIntervalMs); } else { // 否则只在启动的时候调用一次 LOG.info( "Starting the KafkaSourceEnumerator for consumer group {} " + "without periodic partition discovery.", consumerGroupId); context.callAsync(this::getSubscribedTopicPartitions, this::checkPartitionChanges); } }

getSubscribedTopicPartitions方法:

private Set<TopicPartition> getSubscribedTopicPartitions() { return subscriber.getSubscribedTopicPartitions(adminClient); }

这个方法调用KafkaSubscriber,根据配置的条件,获取订阅的partition。

KafkaSubscriber具有3个子类,分别对应不同的分区发现规则:

  • PartitionSetSubscriber: 通过KafkaSourceBuildersetPartitions方法创建,直接根据partition名称订阅内容。
  • TopicListSubscriber: 根据topic列表获取订阅的partition。使用KafkaSourceBuildersetTopics可以订阅一系列的topic,使用的subscriber就是这个。
  • TopicPatternSubscriber: 使用正则表达式匹配topic名称的方式获取订阅的partition。使用KafkaSourceBuildersetTopicPattern方法的时候会创建此subscriber。

接下来以TopicListSubscriber为例,分析获取订阅partiton的逻辑。

@Override public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient adminClient) { LOG.debug("Fetching descriptions for topics: {}", topics); // 使用admin client读取Kafka topic的元数据 // 包含指定topic对应的分区信息 final Map<String, TopicDescription> topicmetadata = getTopicmetadata(adminClient, new HashSet<>(topics)); // 将各个分区的partition信息加入到subscribedPartitions集合,然后返回 Set<TopicPartition> subscribedPartitions = new HashSet<>(); for (TopicDescription topic : topicmetadata.values()) { for (TopicPartitionInfo partition : topic.partitions()) { subscribedPartitions.add(new TopicPartition(topic.name(), partition.partition())); } } return subscribedPartitions; }

获取订阅分区的逻辑不是特别复杂,其他两个subscriber的逻辑这里不再分析。

getSubscribedTopicPartitions方法的返回值和异常(如果抛出异常的话)将会传递给checkPartitionChange方法。它将检测分区信息是否发生变更。代码逻辑如下:

private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwable t) { if (t != null) { throw new FlinkRuntimeException( "Failed to list subscribed topic partitions due to ", t); } // 检测分区变更情况 final PartitionChange partitionChange = getPartitionChange(fetchedPartitions); // 如果没有变更,直接返回 if (partitionChange.isEmpty()) { return; } // 如果检测到变更,调用initializePartitionSplits和handlePartitionSplitChanges方法 context.callAsync( () -> initializePartitionSplits(partitionChange), this::handlePartitionSplitChanges); } @VisibleForTesting PartitionChange getPartitionChange(Set<TopicPartition> fetchedPartitions) { // 保存被移除的分区 final Set<TopicPartition> removedPartitions = new HashSet<>(); Consumer<TopicPartition> dedupOrMarkAsRemoved = (tp) -> { if (!fetchedPartitions.remove(tp)) { removedPartitions.add(tp); } }; // 如果分区在assignedPartitions(已分配分区)存在,在fetchedPartitions中不存在,说明分区已经移出 // 将其加入到removedPartitions中 assignedPartitions.forEach(dedupOrMarkAsRemoved); // pendingPartitionSplitAssignment为上轮发现但是还没有分配给reader读取的分区 // 从pendingPartitionSplitAssignment中找到被移除的分区 pendingPartitionSplitAssignment.forEach( (reader, splits) -> splits.forEach( split -> dedupOrMarkAsRemoved.accept(split.getTopicPartition()))); // 如果fetchedPartitions还有分区没有remove掉,说明有新发现的分区 if (!fetchedPartitions.isEmpty()) { LOG.info("Discovered new partitions: {}", fetchedPartitions); } if (!removedPartitions.isEmpty()) { LOG.info("Discovered removed partitions: {}", removedPartitions); } // 包装新增加的分区和移除的分区到PartitionChange中返回 return new PartitionChange(fetchedPartitions, removedPartitions); }

对比完新发现的分区和原本订阅的分区之后,接下来需要对这些变更做出响应。

initializePartitionSplits方法将分区变更信息包装为PartitionSplitChange。这个对象记录了新增加的分区和移除的分区。和PartitionChange不同的是,PartitionSplitChange包含的新增分区的类型为KafkaPartitionSplit,它额外保存了分区的起始和终止offset。

private PartitionSplitChange initializePartitionSplits(PartitionChange partitionChange) { // 获取新增的分区 Set<TopicPartition> newPartitions = Collections.unmodifiableSet(partitionChange.getNewPartitions()); // 获取分区offset获取器 OffsetsInitializer.PartitionOffsetsRetriever offsetsRetriever = getOffsetsRetriever(); // 获取起始offset Map<TopicPartition, Long> startingOffsets = startingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever); // 获取终止offset Map<TopicPartition, Long> stoppingOffsets = stoppingOffsetInitializer.getPartitionOffsets(newPartitions, offsetsRetriever); Set<KafkaPartitionSplit> partitionSplits = new HashSet<>(newPartitions.size()); // 将每个分区对应的starting offset和stopping offset包装起来 for (TopicPartition tp : newPartitions) { Long startingOffset = startingOffsets.get(tp); long stoppingOffset = stoppingOffsets.getOrDefault(tp, KafkaPartitionSplit.NO_STOPPING_OFFSET); partitionSplits.add(new KafkaPartitionSplit(tp, startingOffset, stoppingOffset)); } // 返回结果 return new PartitionSplitChange(partitionSplits, partitionChange.getRemovedPartitions()); }

上面的方法的关键逻辑是获取各个分区的起始offset(startingOffsetInitializer)和终止offset(KafkaSourceBuilder)。

startingOffsetInitializerKafkaSourceBuilder中创建,默认为OffsetsInitializer.earliest()。代码如下:

static OffsetsInitializer earliest() { return new ReaderHandledOffsetsInitializer( KafkaPartitionSplit.EARLIEST_OFFSET, OffsetResetStrategy.EARLIEST); }

它创建出ReaderHandledOffsetsInitializer对象,含义是对于所有新发现的topic,从它们最开头的地方开始读取。

ReaderHandledOffsetsInitializergetPartitionOffsets方法代码内容如下。它将所有的分区offset设置为startingOffset,结合前面的场景,即KafkaPartitionSplit.EARLIEST_OFFSET

@Override public Map<TopicPartition, Long> getPartitionOffsets( Collection<TopicPartition> partitions, PartitionOffsetsRetriever partitionOffsetsRetriever) { Map<TopicPartition, Long> initialOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { initialOffsets.put(tp, startingOffset); } return initialOffsets; }

对于stoppingOffsetInitializerKafkaSourceBuilder创建的默认为NoStoppingOffsetsInitializer。含义为没有终止offset,针对unbounded(无界)kafka数据流。它的代码很少,这里就不再分析了。

我们回到应对分区变更的方法handlePartitionSplitChanges。这个方法将新发现的分区分配给pending和已注册的reader。

private void handlePartitionSplitChanges( PartitionSplitChange partitionSplitChange, Throwable t) { if (t != null) { throw new FlinkRuntimeException("Failed to initialize partition splits due to ", t); } if (partitionDiscoveryIntervalMs < 0) { LOG.debug("Partition discovery is disabled."); noMoreNewPartitionSplits = true; } // TODO: Handle removed partitions. addPartitionSplitChangeToPendingAssignments(partitionSplitChange.newPartitionSplits); assignPendingPartitionSplits(context.registeredReaders().keySet()); }

addPartitionSplitChangeToPendingAssignments将分区加入到待读取(pending)集合中。

private void addPartitionSplitChangeToPendingAssignments( Collection<KafkaPartitionSplit> newPartitionSplits) { int numReaders = context.currentParallelism(); for (KafkaPartitionSplit split : newPartitionSplits) { // 将这些分区均匀分配给所有的reader int ownerReader = getSplitOwner(split.getTopicPartition(), numReaders); pendingPartitionSplitAssignment .computeIfAbsent(ownerReader, r -> new HashSet<>()) .add(split); } LOG.debug( "Assigned {} to {} readers of consumer group {}.", newPartitionSplits, numReaders, consumerGroupId); }

assignPendingPartitionSplits方法分配分区给reader。它的逻辑分析如下:

private void assignPendingPartitionSplits(Set<Integer> pendingReaders) { Map<Integer, List<KafkaPartitionSplit>> incrementalAssignment = new HashMap<>(); // Check if there's any pending splits for given readers for (int pendingReader : pendingReaders) { // 检查reader是否已在SourceCoordinator中注册 checkReaderRegistered(pendingReader); // Remove pending assignment for the reader // 获取这个reader对应的所有分配给它的分区,然后从pendingPartitionSplitAssignment中移除 final Set<KafkaPartitionSplit> pendingAssignmentForReader = pendingPartitionSplitAssignment.remove(pendingReader); // 如果有分配给这个reader的分区,将他们加入到incrementalAssignment中 if (pendingAssignmentForReader != null && !pendingAssignmentForReader.isEmpty()) { // Put pending assignment into incremental assignment incrementalAssignment .computeIfAbsent(pendingReader, (ignored) -> new ArrayList<>()) .addAll(pendingAssignmentForReader); // Mark pending partitions as already assigned // 标记这些分区为已分配 pendingAssignmentForReader.forEach( split -> assignedPartitions.add(split.getTopicPartition())); } } // Assign pending splits to readers // 将这些分区分配给reader if (!incrementalAssignment.isEmpty()) { LOG.info("Assigning splits to readers {}", incrementalAssignment); context.assignSplits(new SplitsAssignment<>(incrementalAssignment)); } // If periodically partition discovery is disabled and the initializing discovery has done, // signal NoMoreSplitsEvent to pending readers // 如果没有新的分片(分区发现被关闭),并且设置为有界模式 // 给reader发送没有更多分片信号(signalNoMoreSplits) if (noMoreNewPartitionSplits && boundedness == Boundedness.BOUNDED) { LOG.debug( "No more KafkaPartitionSplits to assign. Sending NoMoreSplitsEvent to reader {}" + " in consumer group {}.", pendingReaders, consumerGroupId); pendingReaders.forEach(context::signalNoMoreSplits); } }

调用assignPendingPartitionSplits方法的地方有三处:

  • addSplitsBack: 某个reader执行失败,在上次成功checkpoint之后分配给这个reader的split需要再添加回SplitEnumerator中。
  • addReader: 增加新的reader。需要给新的reader分配split。
  • handlePartitionSplitChanges: 上面介绍的检测到分区变更的时候,需要为reader分配新发现的分区。

接着我们关心的问题是这些分片是如何添加给SplitEnumerator的。我们展开分析context.assignSplits调用。这里的context实现类为SourceCoordinatorContext。继续分析SourceCoordinatorContext::assignSplits方法代码:

@Override public void assignSplits(SplitsAssignment<SplitT> assignment) { // Ensure the split assignment is done by the coordinator executor. // 在SourceCoordinator线程中调用 callInCoordinatorThread( () -> { // Ensure all the subtasks in the assignment have registered. // 逐个检查需要分配的split所属的reader是否已注册过 assignment .assignment() .forEach( (id, splits) -> { if (!registeredReaders.containsKey(id)) { throw new IllegalArgumentException( String.format( "Cannot assign splits %s to subtask %d because the subtask is not registered.", splits, id)); } }); // 记录已分配的assignment(加入到尚未checkpoint的assignment集合中) assignmentTracker.recordSplitAssignment(assignment); // 分配split assignSplitsToAttempts(assignment); return null; }, String.format("Failed to assign splits %s due to ", assignment)); }

assignSplitsToAttempts有好几个重载方法。一路跟随到最后,它创建出了AddSplitEvent对象,通过OperatorCoordinator发送这个事件给SourceOperator。代码如下所示:

private void assignSplitsToAttempts(SplitsAssignment<SplitT> assignment) { assignment.assignment().forEach((index, splits) -> assignSplitsToAttempts(index, splits)); } private void assignSplitsToAttempts(int subtaskIndex, List<SplitT> splits) { getRegisteredAttempts(subtaskIndex) .forEach(attempt -> assignSplitsToAttempt(subtaskIndex, attempt, splits)); } private void assignSplitsToAttempt(int subtaskIndex, int attemptNumber, List<SplitT> splits) { if (splits.isEmpty()) { return; } checkAttemptReaderReady(subtaskIndex, attemptNumber); final AddSplitEvent<SplitT> addSplitEvent; try { // 创建AddSplitEvent(添加split事件) addSplitEvent = new AddSplitEvent<>(splits, splitSerializer); } catch (IOException e) { throw new FlinkRuntimeException("Failed to serialize splits.", e); } final OperatorCoordinator.SubtaskGateway gateway = subtaskGateways.getGatewayAndCheckReady(subtaskIndex, attemptNumber); // 将事件发送给subtaskIndex对应的SourceOperator gateway.sendEvent(addSplitEvent); }

gateway.sendEvent() -> SourceOperator::handleOperatorEvent

网络通信之间的过程这里不再分析了。我们查看SourceOperator接收event的方法handleOperatorEvent,内容如下:

public void handleOperatorEvent(OperatorEvent event) { if (event instanceof WatermarkAlignmentEvent) { updateMaxDesiredWatermark((WatermarkAlignmentEvent) event); checkWatermarkAlignment(); checkSplitWatermarkAlignment(); } else if (event instanceof AddSplitEvent) { handleAddSplitsEvent(((AddSplitEvent<SplitT>) event)); } else if (event instanceof SourceEventWrapper) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof NoMoreSplitsEvent) { sourceReader.notifyNoMoreSplits(); } else { throw new IllegalStateException("Received unexpected operator event " + event); } }

如果接收到的事件类型为AddSplitEvent,调用handleAddSplitsEvent方法。分析如下:

private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) { try { // 反序列化得到split信息 List<SplitT> newSplits = event.splits(splitSerializer); numSplits += newSplits.size(); // 如果下游output还没有初始化,加入到pending集合中缓存起来 // 否则创建output,将split分配给这些output if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) { // For splits arrived before the main output is initialized, store them into the // pending list. Outputs of these splits will be created once the main output is // ready. outputPendingSplits.addAll(newSplits); } else { // Create output directly for new splits if the main output is already initialized. createOutputForSplits(newSplits); } // 将split添加到sourceReader sourceReader.addSplits(newSplits); } catch (IOException e) { throw new FlinkRuntimeException("Failed to deserialize the splits.", e); } }

最后我们一路跟踪到SourceReaderbaseaddSplits方法。

@Override public void addSplits(List<SplitT> splits) { LOG.info("Adding split(s) to reader: {}", splits); // Initialize the state for each split. splits.forEach( s -> splitStates.put( s.splitId(), new SplitContext<>(s.splitId(), initializedState(s)))); // Hand over the splits to the split fetcher to start fetch. splitFetcherManager.addSplits(splits); }

它把split交给splitFetcherManager执行。在本篇KafkaSource环境下它的实现类为KafkaSourceFetcherManager。它的addSplits方法位于父类SingleThreadFetcherManager中。

分析到这里,我们回到了上一节"数据读取流程"的开头"添加分片"方法。至此KafkaSource分区发现逻辑分析完毕。

Checkpoint逻辑

KafkaSourceReadersnapshotState方法返回当前需要checkpoint的分片信息,即Reader分配的分片。如果用户配置了commit.offsets.on.checkpoint=true,保存各个分片对应的分区和offset分区到offsetsToCommit中。

@Override public List<KafkaPartitionSplit> snapshotState(long checkpointId) { // 获取分配给当前Reader的分片(checkpointId参数实际上没有用到) List<KafkaPartitionSplit> splits = super.snapshotState(checkpointId); // 由配置项commit.offsets.on.checkpoint决定 // 是否在checkpoint的时候,提交offset if (!commitOffsetsOnCheckpoint) { return splits; } // 下面逻辑只有在开启commit.offsets.on.checkpoint的时候才会执行 // offsetToCommit保存了需要commit的offset信息 // 是一个Map<checkpointID, Map<partition, offset>>数据结构 // 如果当前Reader没有分片,并且也没有读取完毕的分片 // offsetsToCommit记录checkpoint id对应一个空的map if (splits.isEmpty() && offsetsOfFinishedSplits.isEmpty()) { offsetsToCommit.put(checkpointId, Collections.emptyMap()); } else { // 为当前checkpoint id创建一个offsetMap,保存在offsetsToCommit中 Map<TopicPartition, OffsetAndmetadata> offsetsMap = offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap<>()); // Put the offsets of the active splits. // 遍历splits,保存split对应的分区和offset到offsetsMap中 for (KafkaPartitionSplit split : splits) { // If the checkpoint is triggered before the partition starting offsets // is retrieved, do not commit the offsets for those partitions. if (split.getStartingOffset() >= 0) { offsetsMap.put( split.getTopicPartition(), new OffsetAndmetadata(split.getStartingOffset())); } } // 保存所有完成读取的split的partition和offset信息 // Put offsets of all the finished splits. offsetsMap.putAll(offsetsOfFinishedSplits); } return splits; }

notifyCheckpointComplete方法。该方法在checkpoint完毕的时候执行。由SourceCoordinator发送checkpoint完毕通知。在这个方法中Kafka数据源提交Kafka offset。

@Override public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.debug("Committing offsets for checkpoint {}", checkpointId); // 同上,如果没有启用checkpoint时候提交offset的配置,方法退出,什么也不做 if (!commitOffsetsOnCheckpoint) { return; } // 从offsetsToCommit中获取当前checkpoint需要提交的分区offset信息 Map<TopicPartition, OffsetAndmetadata> committedPartitions = offsetsToCommit.get(checkpointId); // 如果为空,退出 if (committedPartitions == null) { LOG.debug( "Offsets for checkpoint {} either do not exist or have already been committed.", checkpointId); return; } // 调用KafkaSourceFetcherManager,提交offset到kafka // 稍后分析 ((KafkaSourceFetcherManager) splitFetcherManager) .commitOffsets( committedPartitions, (ignored, e) -> { // The offset commit here is needed by the external monitoring. It won't // break Flink job's correctness if we fail to commit the offset here. // 这里是提交offset的回调函数 // 如果遇到错误,监控指标记录下失败的提交 if (e != null) { kafkaSourceReaderMetrics.recordFailedCommit(); LOG.warn( "Failed to commit consumer offsets for checkpoint {}", checkpointId, e); } else { LOG.debug( "Successfully committed offsets for checkpoint {}", checkpointId); // 监控指标记录成功的提交 kafkaSourceReaderMetrics.recordSucceededCommit(); // If the finished topic partition has been committed, we remove it // from the offsets of the finished splits map. committedPartitions.forEach( (tp, offset) -> kafkaSourceReaderMetrics.recordCommittedOffset( tp, offset.offset())); // 由于offset已提交,从已完成split集合中移除 offsetsOfFinishedSplits .entrySet() .removeIf( entry -> committedPartitions.containsKey( entry.getKey())); // 移除当前以及之前的checkpoint id对应的offset信息,因为已经commit过,无需再保存 while (!offsetsToCommit.isEmpty() && offsetsToCommit.firstKey() <= checkpointId) { offsetsToCommit.remove(offsetsToCommit.firstKey()); } } }); }

接下来我们关注KafkaSourceFetcherManager。这个类负责向KafkaConsumer提交offset,逻辑对应commitOffsets方法,内容如下:

public void commitOffsets( Map<TopicPartition, OffsetAndmetadata> offsetsToCommit, OffsetCommitCallback callback) { LOG.debug("Committing offsets {}", offsetsToCommit); // 如果没有offset需要commit,返回 if (offsetsToCommit.isEmpty()) { return; } // 获取正在运行的SplitFetcher SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher = fetchers.get(0); if (splitFetcher != null) { // The fetcher thread is still running. This should be the majority of the cases. // 如果fetcher仍在运行,创建提交offset的任务,加入队列 enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback); } else { // 如果没有SplitFetcher运行,创建一个新的SplitFetcher // 和上面异常创建任务之后,启动这个SplitFetcher splitFetcher = createSplitFetcher(); enqueueOffsetsCommitTask(splitFetcher, offsetsToCommit, callback); startFetcher(splitFetcher); } }

继续分析创建offset提交任务的方法。代码如下:

private void enqueueOffsetsCommitTask( SplitFetcher<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> splitFetcher, Map<TopicPartition, OffsetAndmetadata> offsetsToCommit, OffsetCommitCallback callback) { // 获取splitFetcher对应的KafkaReader KafkaPartitionSplitReader kafkaReader = (KafkaPartitionSplitReader) splitFetcher.getSplitReader(); 为fetcher创建一个SplitFetcherTask splitFetcher.enqueueTask( new SplitFetcherTask() { @Override public boolean run() throws IOException { kafkaReader.notifyCheckpointComplete(offsetsToCommit, callback); return true; } @Override public void wakeUp() {} }); }

到此,一个SplitFetcherTask已被添加到SplitFetchertaskQueue中。根据我们在前面"数据读取流程"分析的结论,SplitFetcher通过runOnce方法逐个读取taskQueue中排队的任务执行。当它取出SplitFetcherTask时,会运行它的run方法。调用kafkaReader.notifyCheckpointComplete方法。这个方法调用KafkaConsumer的异步提交offset方法commitAsync

public void notifyCheckpointComplete( Map<TopicPartition, OffsetAndmetadata> offsetsToCommit, OffsetCommitCallback offsetCommitCallback) { consumer.commitAsync(offsetsToCommit, offsetCommitCallback); }

到这里,KafkaSource checkpoint提交offset的过程分析完毕。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

发布人:2e39****    IP:139.201.99.***     举报/删稿
展会推荐
让朕来说2句
评论
收藏
点赞
转发