首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从Redis集/在Redis集上并发读/写多个客户端

从Redis集/在Redis集上并发读/写多个客户端
EN

Stack Overflow用户
提问于 2018-09-23 19:54:32
回答 2查看 1K关注 0票数 1

我们有一些应用程序运行在不同的PC上。必须有一个Redis服务器包含所有应用程序的共享密钥/值队列。每个应用程序有两个线程,一个用于填充队列的线程,另一个用于迭代和处理队列的线程。

假设队列包含以下项:(1,value1),(2,v2),(3,v3),(4,v4)。我们想要的是只有一个客户端窥视密钥3的项目,以及一个具有密钥4或任何其他密钥的并发请求窥视项。

  • 用Redis实现这一点的最佳方法是什么?
  • 有没有办法通过字符串SET达到目标?
  • 是否可以用Redis实现pub/子系统?

提前谢谢。

注意到用C#,StackExchange.Redis编写的客户机

EN

回答 2

Stack Overflow用户

发布于 2018-09-24 00:01:14

要使进程相互排斥,可以使用RedLock.Net。它是一个Distributed Lock Manager,它就像一个lock语句,适用于没有方法互相了解的进程。下面是一个例子:

代码语言:javascript
复制
public async Task ProcessMessage(Message message) 
{   
    // the thing we are trying to lock, i.e: "3"
    var resource = message.Key; 

    // determines how long will the lock be alive untill it's automatically released
    var expiry = TimeSpan.FromSeconds(30);

    // how long will the thread wait trying to acquire the lock
    var wait = TimeSpan.FromSeconds(10);

    // time span between each request to Redis trying to acquire the lock
    var retry = TimeSpan.FromSeconds(1);

    // blocks the thread until acquired or 'wait' timeout
    using (var redLock = await redlockFactory.CreateLockAsync(resource, expiry, wait, retry))
    {
        // make sure we got the lock
        if (redLock.IsAcquired)
        {
            // we successfully locked the resource, now other processes will have to wait
            ProcessMessageInternal(message.Value);
        }
        else 
        {
            // could't get the lock within the wait time
            // handle collision
        }
    }

    // the lock is automatically released at the end of the using block
    // which means the IDisposable.Dispose method makes a request to Redis to release the lock
}

请注意,我如何使用消息的Key作为要锁定的资源。这意味着在锁被释放或过期之前,任何其他进程都无法锁定资源。

至于实现pub/子系统,我强烈建议您使用Azure Storage Queue,创建一个Queue Trigger并订阅您的程序。

所有这些听起来都很复杂,但很容易实现:您可以将应用程序的线程分为两个进程:

消息读取器:只要消息到达,就会简单地排队:

代码语言:javascript
复制
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
    CloudConfigurationManager.GetSetting("StorageConnectionString"));

// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();

// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");

// Create the queue if it doesn't already exist.
queue.CreateIfNotExists();

var message = // get message

var json = SerializeMessage(message);

// Create a message and add it to the queue.
CloudQueueMessage message = new CloudQueueMessage(json);
queue.AddMessage(message);

消息处理器:将通过使用QueueTrigger订阅队列,Visual有一个项目模板,称为Azure Functions,您只需传递存储连接字符串和队列名称,它将为您处理并发性。这个过程将水平升级(这意味着它将有许多实例),因此它需要与它的兄弟姐妹相互排斥,并通过使用RedLock.Net来实现这一点。azure函数将像这样锁定:

代码语言:javascript
复制
public class Functions 
{
    public static void ProcessQueueMessage([QueueTrigger(QueueName)] string serializedMessage)
    {
        var message = DeserializeMessage(serializedMessage);

        MessageProcesor.ProcessMessage(message);
    }
}

如果您需要以较高的速度处理更大的消息,也可以使用Service Bus Queue而不是Azure Storage Queue,下面对两者进行比较:https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted

票数 1
EN

Stack Overflow用户

发布于 2018-09-24 08:05:18

您可以使用LIST保存这些项,并将LIST作为队列。使用推送/弹出命令填充队列并从队列中获取项。

LIST保证并发读/写,并且一个项只能由一个客户端获取。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52469729

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档