Я хочу реализовать RedisLock, используя библиотеку StackExchange.Redis.
Следуя этой статье:
https://www.c-sharpcorner.com/article/creating-distributed-lock-with-redis-in-net-core/
Проблема с запуском скрипта, но каждый проход цикла программа не может реализовать блокировку, но следующий проход может получить доступ к заблокированному потоку:
Моя реализация:
Реализация RedisLock такая же, как в статье.
public async void ListenTask()
{
var handledResult = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
var lowestHandledId = handledResult.Last().Id;
var readTask = Task.Run(async () =>
{
while (!Token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, lowestHandledId, "+", 2);
var handleResult = result.Last();
if (result.Any() && lowestHandledId != handleResult.Id)
{
bool isLocked = RedisLock.AcquireLock(streamName, handleResult.Id.ToString(), expiry);
if (!isLocked)
{
//lock
lowestHandledId = handleResult.Id;
var streamCat = handleResult.Values;
Cat cat = ParseResult(streamCat);
switch (streamCat[0].Value.ToString())
{
case "insert":
Console.WriteLine($"Insert cat at id:{cat.Id} [{cat.Name} - {cat.CreatedDate}]");
cacheDictionary.Add(cat.Id, new WeakReference(cat));
break;
case "delete":
Console.WriteLine($"Deleted cat at id:{cat.Id}");
cacheDictionary.Remove(cat.Id);
break;
}
RedisLock.ReleaseLock(streamName, handleResult.Id.ToString());
}
}
await Task.Delay(2000);
}
});
}
Я решаю свои проблемы с реализацией RedisLock с помощью команд LockTake/LockRelease.
Следуя этой статье:вопрос о переполнении стека
public async void ListenTask()
{
var handledResult = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
var lowestHandledId = handledResult.Last().Id;
RedisValue token = Environment.MachineName;
var readTask = Task.Run(async () =>
{
while (!Token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, lowestHandledId, "+", 2);
var handleResult = result.Last();
if (result.Any() && lowestHandledId != handleResult.Id)
{
if (!db.LockTake(streamName, token, expiry))
{
lowestHandledId = handleResult.Id;
var streamCat = handleResult.Values;
Cat cat = ParseResult(streamCat);
switch (streamCat[0].Value.ToString())
{
case "insert":
Console.WriteLine($"Insert cat at id:{cat.Id} [{cat.Name} - {cat.CreatedDate}]");
cacheDictionary.Add(cat.Id, new WeakReference(cat));
break;
case "delete":
Console.WriteLine($"Deleted cat at id:{cat.Id}");
cacheDictionary.Remove(cat.Id);
break;
}
db.LockRelease(streamName, token);
}
}
await Task.Delay(100);
}
});
}