apache-kafka - Kafka 监听器中的线程休眠

我正在尝试暂停/恢复 Kafka 容器。使用以下代码片段来执行此操作:

kafkaListenerEndpointRegistry.getListenerContainer("MAIN").pause();

当我调用 pause 时,我还需要做一个 thread.sleep 以便不处理批处理中的消息。对于批处理中的每条消息,我正在调用另一个具有速率限制的 API。为了保持这个速率限制,我需要停止对消息的处理。

如果主线程休眠,它会阻止监听器发送心跳吗?它是否也会在后台停止心跳线程?文档说,“当容器暂停时,它会继续 poll() 消费者,如果正在使用组管理,则避免重新平衡,但它不会检索任何记录。”但是我正在暂停容器并使线程休眠。这将如何影响流量?

回答1

您绝不能休眠消费者线程,以避免重新平衡。

相反,减少 max.poll.records 以便暂停会更快生效(消费者实际上不会暂停,直到处理之前的轮询接收到的记录)。

您可以在暂停消费者后抛出异常,但您需要以某种方式恢复容器。

我开了一个新问题来改善这种行为 https://github.com/spring-projects/spring-kafka/issues/2280

如果您受到速率限制,请考虑按计划使用 KafkaTemplate.receive() 方法或轮询 https://docs.spring.io/spring-integration/docs/current/reference/html/kafka.html#kafka-inbound-pollable,而不是使用消息驱动的方法。

相似文章