我在我的spring boot应用程序中有一个计划的任务,我想更快地运行它。
我的任务包括获取一些数据(订阅),对于每个订阅,调用API客户端并向我的kafka代理发送一条消息。
这是我的代码
@Scheduled(cron="0 0 12 1/1 * ? *",zone = "Europe/Paris")
public void execute() {
List<StoreSubscription> storeSubscriptions = subscriptionRepository.findAllByExpirationDate(LocalDate.now().plusMonths(1));
storeSubscriptions.forEach(subscription -> {
CompletableFuture.supplyAsync( () -> this.message(subscription))
.thenApply(message -> kt.send(SUBSCRIPTION_EXPIRED_REMINDER_EARLY,message));
});
}消息是异步创建的吗?是否在不等待前一条消息结束的情况下创建消息?
谢谢。
发布于 2021-04-01 22:37:07
是的,第二条消息不会等待第一条消息的结束。之所以会发生这种情况,是因为CompletableFuture不是组合的。
如果你想一个接一个地执行它们,你需要这样的东西:
CompletableStage loop = CompletableFuture.completedFuture(null);
storeSubscriptions.forEach(subscription -> {
loop = loop.thenCompose( unusued -> CompletableFuture
.supplyAsync( () -> this.message(subscription))
.thenAccept(message -> kt.send(SUBSCRIPTION_EXPIRED_REMINDER_EARLY,message));
});这种解决方案的缺点是它可能会在内存中创建许多对象。要避免此问题,您可以使用IBM async utils
Iterator iterator = storeSubscriptions.iterator();
AsyncTrampoline.asyncWhile( () -> CompletableFuture
.supplyAsync( () -> this.message(iterator.next()))
.thenAccept(message -> kt.send(SUBSCRIPTION_EXPIRED_REMINDER_EARLY,message))
.thenApply( r -> iterator.hasNext() )
)使用Maven,您可以使用以下命令将其添加到项目中:
<dependency>
<groupId>com.ibm.async</groupId>
<artifactId>asyncutil</artifactId>
<version>0.1.0</version>
</dependency>这个库实现了Trampoline pattern
https://stackoverflow.com/questions/66784595
复制相似问题