我们正在迁移到Spring WebFlux (使用reactor-netty)。应用程序使用HTTP协议和Spring控制器。目前我们有一个过渡性的解决方案,它将入站IO缓冲区累积到CompositeByteBuf中,而不进行复制(然后将其作为InputStream处理)。reactor-netty为我们提供了直接的字节缓冲区。因此,为这些缓冲区调用release()是至关重要的。最初我们有代码:
public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
return Flux.from(data).reduce(
EMPTY,
(CompositeByteBuf acc, DataBuffer buffer) -> {
ByteBuf byteBuf = toByteBuf(buffer);
CompositeByteBuf composite = (acc == EMPTY) ? byteBuf.alloc().compositeBuffer(256) : acc;
composite.addComponent(true, byteBuf);
return composite;
}
).map(composite -> composite != EMPTY ? composite : createEmptyComposite());
}并且在处理得到的复合缓冲器之后放置释放。
但是这种方法在上行Publisher信号错误的情况下会导致泄漏。因此,在下一次尝试中,我们尝试使用类似这样的方式来处理错误和释放缓冲区(省略了一些转角情况的处理):
public static Mono<CompositeByteBuf> collectToComposite(Publisher<DataBuffer> data) {
// such code is not suitable for multiple subscribers
class CompositeHolder {
CompositeByteBuf composite;
void addComponent(ByteBuf component) {
if (composite == null) {
composite = component.alloc().compositeBuffer(256);
}
composite.addComponent(true, component);
}
}
CompositeHolder holder = new CompositeHolder();
return Flux.from(data)
.doOnNext(buffer -> holder.addComponent(toByteBuf(buffer)))
.doOnError(e -> holder.composite.release())
.then(Mono.fromSupplier(() -> holder.composite));
}但在那之后,我们意识到有必要在取消订阅时回收缓冲区(这发生在底层连接关闭时)。第一个想法是使用doOnCancel运算符,但实际上不能保证我们不能为同一请求调用doOnError和doOnCancel回调。因此,简单的解决方案要求我们显式地检查以前是否释放过缓冲区。
现在我被卡住了。我不知道如何处理这种情况,避免额外的复杂性。
发布于 2018-03-06 20:32:47
doFinally是一个doOn运算符,只要源出现错误、完成或订阅被取消,就会调用它。此外,它还保证回调只执行一次(在错误+取消的情况下)。
您需要提供一个用导致回调调用的事件的SignalType填充的Consumer<SignalType>。
https://stackoverflow.com/questions/49108208
复制相似问题