我们有一些应用程序运行在不同的PC上。必须有一个Redis服务器包含所有应用程序的共享密钥/值队列。每个应用程序有两个线程,一个用于填充队列的线程,另一个用于迭代和处理队列的线程。
假设队列包含以下项:(1,value1),(2,v2),(3,v3),(4,v4)。我们想要的是只有一个客户端窥视密钥3的项目,以及一个具有密钥4或任何其他密钥的并发请求窥视项。
SET达到目标?提前谢谢。
注意到用C#,StackExchange.Redis编写的客户机
发布于 2018-09-24 00:01:14
要使进程相互排斥,可以使用RedLock.Net。它是一个Distributed Lock Manager,它就像一个lock语句,适用于没有方法互相了解的进程。下面是一个例子:
public async Task ProcessMessage(Message message)
{
// the thing we are trying to lock, i.e: "3"
var resource = message.Key;
// determines how long will the lock be alive untill it's automatically released
var expiry = TimeSpan.FromSeconds(30);
// how long will the thread wait trying to acquire the lock
var wait = TimeSpan.FromSeconds(10);
// time span between each request to Redis trying to acquire the lock
var retry = TimeSpan.FromSeconds(1);
// blocks the thread until acquired or 'wait' timeout
using (var redLock = await redlockFactory.CreateLockAsync(resource, expiry, wait, retry))
{
// make sure we got the lock
if (redLock.IsAcquired)
{
// we successfully locked the resource, now other processes will have to wait
ProcessMessageInternal(message.Value);
}
else
{
// could't get the lock within the wait time
// handle collision
}
}
// the lock is automatically released at the end of the using block
// which means the IDisposable.Dispose method makes a request to Redis to release the lock
}请注意,我如何使用消息的Key作为要锁定的资源。这意味着在锁被释放或过期之前,任何其他进程都无法锁定资源。
至于实现pub/子系统,我强烈建议您使用Azure Storage Queue,创建一个Queue Trigger并订阅您的程序。
所有这些听起来都很复杂,但很容易实现:您可以将应用程序的线程分为两个进程:
消息读取器:只要消息到达,就会简单地排队:
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Create the queue if it doesn't already exist.
queue.CreateIfNotExists();
var message = // get message
var json = SerializeMessage(message);
// Create a message and add it to the queue.
CloudQueueMessage message = new CloudQueueMessage(json);
queue.AddMessage(message);消息处理器:将通过使用QueueTrigger订阅队列,Visual有一个项目模板,称为Azure Functions,您只需传递存储连接字符串和队列名称,它将为您处理并发性。这个过程将水平升级(这意味着它将有许多实例),因此它需要与它的兄弟姐妹相互排斥,并通过使用RedLock.Net来实现这一点。azure函数将像这样锁定:
public class Functions
{
public static void ProcessQueueMessage([QueueTrigger(QueueName)] string serializedMessage)
{
var message = DeserializeMessage(serializedMessage);
MessageProcesor.ProcessMessage(message);
}
}如果您需要以较高的速度处理更大的消息,也可以使用Service Bus Queue而不是Azure Storage Queue,下面对两者进行比较:https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-azure-and-service-bus-queues-compared-contrasted
发布于 2018-09-24 08:05:18
您可以使用LIST保存这些项,并将LIST作为队列。使用推送/弹出命令填充队列并从队列中获取项。
LIST保证并发读/写,并且一个项只能由一个客户端获取。
https://stackoverflow.com/questions/52469729
复制相似问题