首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >具有高吞吐量的春季集成的Redis队列正在丢失消息。

具有高吞吐量的春季集成的Redis队列正在丢失消息。
EN

Stack Overflow用户
提问于 2016-02-25 15:17:27
回答 1查看 1.2K关注 0票数 2

我使用redis作为队列(使用spring队列中/出站通道适配器)来分发任务(消息进入队列,等等)。

由于吞吐量很高,我们观察到,虽然消息被发送到redis队列,但是很多消息丢失了,没有消息在入站后到达组件(报头路由器)。

通道配置附在下面;重点是我们认为问题在入站地址器之后的这个报头路由器中,它无法管理从队列读取的消息的速率,所以它们丢失了。

我们在入站适配器和这个组件(即报头路由器)之间使用了一个中间元素,并添加了一个队列来修复这个问题。

这很好,但实际上我们并不完全理解解决方案,如果这是正确的解决方案。

专家对此配置的看法和意见将是惠康!

谢谢

代码语言:javascript
复制
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
     from a Redis List. -->
    <redis:queue-inbound-channel-adapter
     id="fromRedis" channel="in" queue="${name}"
        receive-timeout="1000" recovery-interval="3000" expect-message="true"
            auto-startup="true"/>

    <!-- a queue to avoid lost messages before the header router -->
    <int:channel id="in">
        <int:queue capacity="1000"/>
    </int:channel>

    <!-- a bridge to connect channels and have a poller -->
    <int:bridge input-channel="in" output-channel="out">
        <int:poller fixed-delay="500" />
    </int:bridge>

    <int:header-value-router id="router" timeout="15000"
        input-channel="out" header-name="decision"
        resolution-required="false" default-output-channel="defaultChannel" />

要将消息插入redis,我们有一个web服务,但实际上正如您所说的,只需将消息写入redis (

代码语言:javascript
复制
for... channel.send(msg)

仅此而已

关于您的回答,我现在正在考虑删除in通道及其队列,并直接使用报头值路由器;但我还有更多的问题:

  1. 我认为正确的解决方案是头-值路由器中的超时值较低,因此如果没有可用的使用者,我将得到更快的错误通知。如果我不使用一个值作为超时,它将无限期地阻塞,这是一个坏主意,不是吗?
  2. 我不知道如何管理MesssageDeliveryException,因为路由器没有错误信道配置,?
  3. 我认为,如果我能够管理这个错误,并得到消息,我可以重新发送它的红色。还有其他服务器从redis那里获得消息,他们幸运地可以参加。

我添加了我提议的解决方案,但还没有完成,我们不确定是否存在错误管理,就像我前面解释的那样。

代码语言:javascript
复制
<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
 from a Redis List. -->
<redis:queue-inbound-channel-adapter
 id="fromRedis" channel="in" queue="${name}"
    receive-timeout="1000" recovery-interval="3000" expect-message="true"
        auto-startup="true"/>

 <!-- a header-value-router with quite low timeout -->
   <int:header-value-router id="router" timeout="150"
    input-channel="in" header-name="decision"
    resolution-required="false" default-output-channel="defaultChannel" />

 <!-- ¿if MessageDeliveryException???? what to do??? -->

<int:channel id="someConsumerHeaderValue">
    <int:dispatcher task-executor="ConsumerExecutor" />
</int:channel>
<!-- If 5 threads are busy we queue messages up to 5; if queue is full we can increase to 5 more working threads; if no more threads we have a... ¿¿MessageDeliveryException?? -->

<task:executor id="ConsumerExecutor" pool-size="5-5"
               queue-capacity="5" />
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-02-25 15:46:35

看到这样的观察真是太好了。可能会在某种程度上改善框架。

所以,我想看看:

  1. 从框架的角度来再现的一些测试用例。尽管我想这足够发送大量的消息到Redis并使用您的配置来消费。(如有其他需要,请指正我)
  2. <int:header-value-router>后的下游流。看,您在那里使用的是timeout="15000",它是send-timeout的同义词:

如果可能阻塞,则指定在向目标MessageChannels发送消息时等待的最大时间,以毫秒为单位(例如,当前已满的有界队列通道)。默认情况下,发送将无限期阻塞。“超时值”的同义词-只能提供一个。

从这里我可以说,如果您的下游消费者--如果在某些QueueChannel上足够慢的话--您的结果是:

代码语言:javascript
复制
/**
 * Inserts the specified element at the tail of this queue, waiting if
 * necessary up to the specified wait time for space to become available.
 *
 * @return {@code true} if successful, or {@code false} if
 *         the specified waiting time elapses before space is available
 * @throws InterruptedException {@inheritDoc}
 * @throws NullPointerException {@inheritDoc}
 */
public boolean offer(E e, long timeout, TimeUnit unit)
....
while (count.get() == capacity) {
       if (nanos <= 0)
             return false;
       nanos = notFull.awaitNanos(nanos);
}

注意,return false;准确地指示了message lost

这也是众所周知的back-pressure drop策略。

如果你有不同的照片,请告诉我。

您可以考虑删除该timeout="15000"以满足相同的in队列通道行为。

更新

嗯,错误处理的方式有点不同。“有罪”组件只是抛出异常,就像使用原始Java一样,并且这个组件不负责由调用方来捕获异常是可以的。在我们的例子中,调用者是一个上游组件- <redis:queue-inbound-channel-adapter>

任何入站通道适配器都有一个error-channel选项。如果它是<poller>,则通过MessageSource,或者当它是MessageProducer时,直接通过MessageProducer

我相信你能处理好:

代码语言:javascript
复制
if (!sent) {
    throw new MessageDeliveryException(message,
            "failed to send message to channel '" + channel + "' within timeout: " + timeout);
}

在那个error-channel子流中实现您的需求恢复。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35631277

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档