在我的一个流中,我将每10秒进行一次聚合,并将这些有效负载写入文件共享。我不太清楚如何使用聚合器。
@Bean
public IntegrationFlow errorHandlingQueueFlow() {
return IntegrationFlows.from(ERROR_QUEUE_CHANNEL)
.bridge(e -> e.poller(Pollers.fixedDelay(1000).maxMessagesPerPoll(MAX_MSG_PER_POLL)))
.aggregate(a -> a.groupTimeout(10000))// How do i make it collect every 10 seconds.
.transform(objectToCSVTransformer, "transform")//Converts payload to a CSV
.handle(smbErrorMessageHandler())// Takes care of writing into Fileshare
.get();
}由于这是用于错误处理的,因此只有少数错误的消息会进入这个ERROR_QUEUE_CHANNEL。所以我想每10秒收集一次,而不是等待一个组的所有消息被接收。当我使用grouptimeout时,每10秒将收集到的所有消息发送到nullchannel。
发布于 2021-02-02 14:45:39
groupTimeout()的默认用途是清理过期的组。如果您想要正常地发布它们而不是放弃它们,那么应该考虑使用sendPartialResultOnExpiry = true。当然,如果您在这些消息中有相关的详细信息头,这一切都是有意义的。否则,您需要考虑correlationStrategy,以便将这些错误消息分组。
请阅读更多关于聚合器及其在docs:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator中的选项的内容。
https://stackoverflow.com/questions/66005806
复制相似问题