我使用redis作为队列(使用spring队列中/出站通道适配器)来分发任务(消息进入队列,等等)。
由于吞吐量很高,我们观察到,虽然消息被发送到redis队列,但是很多消息丢失了,没有消息在入站后到达组件(报头路由器)。
通道配置附在下面;重点是我们认为问题在入站地址器之后的这个报头路由器中,它无法管理从队列读取的消息的速率,所以它们丢失了。
我们在入站适配器和这个组件(即报头路由器)之间使用了一个中间元素,并添加了一个队列来修复这个问题。
这很好,但实际上我们并不完全理解解决方案,如果这是正确的解决方案。
专家对此配置的看法和意见将是惠康!
谢谢
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="in" queue="${name}"
receive-timeout="1000" recovery-interval="3000" expect-message="true"
auto-startup="true"/>
<!-- a queue to avoid lost messages before the header router -->
<int:channel id="in">
<int:queue capacity="1000"/>
</int:channel>
<!-- a bridge to connect channels and have a poller -->
<int:bridge input-channel="in" output-channel="out">
<int:poller fixed-delay="500" />
</int:bridge>
<int:header-value-router id="router" timeout="15000"
input-channel="out" header-name="decision"
resolution-required="false" default-output-channel="defaultChannel" />要将消息插入redis,我们有一个web服务,但实际上正如您所说的,只需将消息写入redis (
for... channel.send(msg)仅此而已
关于您的回答,我现在正在考虑删除in通道及其队列,并直接使用报头值路由器;但我还有更多的问题:
我添加了我提议的解决方案,但还没有完成,我们不确定是否存在错误管理,就像我前面解释的那样。
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="in" queue="${name}"
receive-timeout="1000" recovery-interval="3000" expect-message="true"
auto-startup="true"/>
<!-- a header-value-router with quite low timeout -->
<int:header-value-router id="router" timeout="150"
input-channel="in" header-name="decision"
resolution-required="false" default-output-channel="defaultChannel" />
<!-- ¿if MessageDeliveryException???? what to do??? -->
<int:channel id="someConsumerHeaderValue">
<int:dispatcher task-executor="ConsumerExecutor" />
</int:channel>
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? -->
<task:executor id="ConsumerExecutor" pool-size="5-5"
queue-capacity="5" />发布于 2016-02-25 15:46:35
看到这样的观察真是太好了。可能会在某种程度上改善框架。
所以,我想看看:
<int:header-value-router>后的下游流。看,您在那里使用的是timeout="15000",它是send-timeout的同义词:如果可能阻塞,则指定在向目标MessageChannels发送消息时等待的最大时间,以毫秒为单位(例如,当前已满的有界队列通道)。默认情况下,发送将无限期阻塞。“超时值”的同义词-只能提供一个。
从这里我可以说,如果您的下游消费者--如果在某些QueueChannel上足够慢的话--您的结果是:
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
....
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}注意,return false;准确地指示了message lost。
这也是众所周知的back-pressure drop策略。
如果你有不同的照片,请告诉我。
您可以考虑删除该timeout="15000"以满足相同的in队列通道行为。
更新
嗯,错误处理的方式有点不同。“有罪”组件只是抛出异常,就像使用原始Java一样,并且这个组件不负责由调用方来捕获异常是可以的。在我们的例子中,调用者是一个上游组件- <redis:queue-inbound-channel-adapter>。
任何入站通道适配器都有一个error-channel选项。如果它是<poller>,则通过MessageSource,或者当它是MessageProducer时,直接通过MessageProducer。
我相信你能处理好:
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to channel '" + channel + "' within timeout: " + timeout);
}在那个error-channel子流中实现您的需求恢复。
https://stackoverflow.com/questions/35631277
复制相似问题