spring-kafka - Spring kafka MessageListener 和 max.poll.records

我正在使用 spring kafka 2.7.8 来使用来自 kafka 的消息。消费者监听器如下

@KafkaListener(topics = "topicName",
            groupId = "groupId",
            containerFactory = "kafkaListenerFactory")
    public void onMessage(ConsumerRecord record) {

   }

上面的 onMessage 方法一次接收一条消息。

这是否意味着 spring 库将 max.poll.records 设置为 1 或者它一次轮询 500 个(默认 value)并且该方法会一一接收。

这个问题的原因是,我们经常在 prod 中一起看到以下错误。在一分钟内收到多个消费者的以下所有 4 个错误。试图了解是由于间歇性 kafka 代理连接问题还是由于负载。请指教。

Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

seek to current after exception; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {topic-9=OffsetAndMetadata{offset=2729058, leaderEpoch=null, metadata=''}}

Consumer clientId=consumer-groupName-5, groupId=consumer] Offset commit failed on partition topic-33 at offset 2729191: The coordinator is not aware of this member.

Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records

回答1

max.poll.records 不会被 Spring 改变;它将采用默认值(或您设置的任何值)。在下一次轮询之前,这些记录一次一个地交给听众。

这意味着您的侦听器必须能够在 max.poll.interval.ms 中处理 max.poll.records

您需要减少 max.poll.records 和/或增加 max.poll.interval.ms 以便您可以在这段时间内处理记录,并留有足够的余量,以避免这些重新平衡。

相似文章

随机推荐

最新文章