Здравствуйте, у меня следующая проблема:
Я хочу совершить нечто похожее на транзакцию. Я хочу выполнить ряд операций async после получения внешнего триггера. Поэтому я использую TaskCompletionSource, который устанавливается в методе, представляющем триггер: TriggerTransaction.
Этот метод триггера вызывается в Main в пуле потоков, когда я нажимаю определенный консольный ключ.
После того, как я нажму ключевое слово A, TriggerTransaction будет запущен, и TaskCompletionSource-s будут установлены. Тем не менее, основной поток не вычисляет сумму двух ожидаемых задач.
class Program
{
public static Task<Task<int>> TransactionOperation1()
{
TaskCompletionSource<Task<int>> tcs = new TaskCompletionSource<Task<int>>();
tasks.Add(tcs);
Task<Task<int>> result = tcs.Task;
return result;
}
public static Task<Task<int>> TransactionOperation2()
{
TaskCompletionSource<Task<int>> tcs = new TaskCompletionSource<Task<int>>();
tasks.Add(tcs);
Task<Task<int>> result = tcs.Task;
return result;
}
public static async Task<int> ExecuteTransactionOnDB()
{
await Task.Delay(1000);
return 5;
}
public static async Task TriggerTransaction()
{
int value = await ExecuteTransactionOnDB();
foreach (var item in tasks)
{
item.SetResult(value);
}
}
public static List<dynamic> tasks = new List<dynamic>();
static async Task Main(string[] args)
{
Task<Task<int>> a = TransactionOperation1();
Task<Task<int>> b = TransactionOperation2();
Task.Run(async() =>
{
while (Console.ReadKey().Key != ConsoleKey.A) ;
await TriggerTransaction();
});
if (!File.Exists("D:\\data.txt"))
{
File.Create("D:\\data.txt");
}
using(FileStream stream=new FileStream("data.txt",FileMode.Append,FileAccess.Write))
{
int sum=await await a + await await b;//thread wont pass this line when tasks are set.
ReadOnlyMemory<byte> bytes = Encoding.UTF8.GetBytes(sum);
stream.Write(bytes.ToArray());
}
Console.WriteLine(await await a + await await b);
}
}
}
P.S Если вам интересно, почему я использовал List<dynamic> для хранения TaskCompletionSource-s, то это потому, что TransactionOperations будут отличаться по типу возвращаемого значения. Некоторые из них будут возвращать int, другие String ..Bool..etc.
Для лучшего понимания я сделал схему -
Как вы увидите, это:
-Список, где я хочу хранить TCS-es
-Некоторые вызовы, которые выполняются только после установки внешнего триггера (транзакция была выполнена)
Как вы можете видеть в Calls, все они имеют разные типы возвращаемых значений.
@BercoviciAdrian, это похоже на XY проблема. У вас есть проблема X и вы думаете, что Y - это решение, несколько TCS и список? Когда это не сработает, вы спрашиваете о X, а не о Y. Какую реальную проблему вы хотите решить? Можно ли это решить простым await Task.WhenAll()? Или какие-то экземпляры ActionBlock<T>?
Если у меня есть Task<int> и Task<bool>, как я могу использовать Task.WhenAll и получить оба результата?
@BercoviciAdrian, что ты пытаешься сделать, вообще? Это не имеет ничего общего с транзакциями - атомарными операциями, которые либо завершаются, либо откатываются. Транзакции не связаны с задачами. Ваш вопрос касается того, как вы пытались решить проблему некоторый, а не самой проблемы
Если вам нужны транзакции, отметьте TransactionScope. Если вы хотите реализовать свои собственные транзакционные объекты (они называются «диспетчерами ресурсов»), вам необходимо реализовать IEnlistmentNotification. Этот статья показывает, как это сделать, это проще, чем кажется
Я выполняю несколько вызовов метода async, каждый из которых имеет свой тип возвращаемого значения. В каждом из этих вызовов я должен выполнить операцию async. ПОСЛЕ Я получаю триггер откуда-то еще. Этот «триггер» представляет собой Execution транзакции.
@BercoviciAdrian, это все еще описание кода, а не проблема. Для получения результатов выполненных задач без блокировки требуется только проверка .Result. И, вероятно, все это может быть один ActionBlock или один вызов для двух асинхронных методов. Что ты пытаешься сделать ?!
@BercoviciAdrian Мне каждый день нужно загружать несколько записей о платежах из нескольких банков. Я не использую сырые задачи и списки. Я создаю конвейер блоков потока данных TPL, каждый из которых выполняет задачу один для входного сообщения. Один блок для получения списка транзакций и передачи его блоку, который загружает каждую транзакцию, а затем блокам синтаксического анализа и массового импорта. Я отправляю только те даты, которые хочу, в начальный блок и жду завершения конечного блока. Ни TCS, ни списков задач. Получение, обработка, импорт транзакций - вот проблема. Dataflow - вот как я это решил





Зачем вам нужен Task<Task<int>>? Достаточно просто Task<int>, и соответственно TaskCompletionSource<int>. И вы также избавитесь от неудобного await await ..., который в вашем случае тоже не требуется.
Обратите внимание, что я также добавил Close() в поток, возвращаемый File.Create().
Вот рабочая версия программы:
class Program
{
public static Task<int> TransactionOperation1()
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
tasks.Add(tcs);
return tcs.Task;
}
public static Task<int> TransactionOperation2()
{
TaskCompletionSource<int> tcs = new TaskCompletionSource<int>();
tasks.Add(tcs);
return tcs.Task;
}
public static async Task<int> ExecuteTransactionOnDB()
{
await Task.Delay(1000);
return 5;
}
public static async Task TriggerTransaction()
{
int value = await ExecuteTransactionOnDB();
foreach (var item in tasks)
{
item.SetResult(value);
}
}
public static List<dynamic> tasks = new List<dynamic>();
static async Task Main(string[] args)
{
Task<int> a = TransactionOperation1();
Task<int> b = TransactionOperation2();
Task input = Task.Run(async () => {
while (Console.ReadKey().Key != ConsoleKey.A);
await TriggerTransaction();
});
if (!File.Exists("C:\\temp\\data.txt"))
{
File.Create("C:\\temp\\data.txt").Close();
}
using (FileStream stream = new FileStream("C:\\temp\\data.txt", FileMode.Append, FileAccess.Write))
{
int sum = await a + await b; // now it works ok
var bytes = Encoding.UTF8.GetBytes(sum.ToString());
stream.Write(bytes);
}
Console.WriteLine(await a + await b);
}
}
Ознакомьтесь с измененной версией кода, она дает ожидаемый результат, выполняя Task, созданный с помощью TaskCompletionSource. Я также сделал код универсальным, поэтому вам не нужно использовать тип dynamic и определять тип данных во время компиляции.
static async Task Main(string[] args)
{
var a = Program<int>.TransactionOperation1();
var b = Program<int>.TransactionOperation2();
await Task.Run(async() =>
{
Console.ReadLine();
await Program<int>.TriggerTransaction(5);
});
if (!File.Exists("D:\\data.txt"))
{
File.Create("D:\\data.txt");
}
using (FileStream stream = new FileStream("D:\\data.txt", FileMode.Append, FileAccess.Write))
{
int sum = await a + await b;//thread wont pass this line when tasks are set.
var bytes = Encoding.UTF8.GetBytes(sum.ToString());
stream.Write(bytes, 0, bytes.Length);
}
Console.WriteLine(await a + await b);
}
class Program<T>
{
public static Task<T> TransactionOperation1()
{
var tcs = new TaskCompletionSource<T>();
tasks.Add(tcs);
return tcs.Task;
}
public static Task<T> TransactionOperation2()
{
var tcs = new TaskCompletionSource<T>();
tasks.Add(tcs);
return tcs.Task;
}
public static async Task<T> ExecuteTransactionOnDB(T t)
{
return await Task.FromResult(t);
}
public static async Task TriggerTransaction(T t)
{
T value = await ExecuteTransactionOnDB(t);
foreach (var item in tasks)
{
item.SetResult(value);
}
}
public static List<TaskCompletionSource<T>> tasks = new List<TaskCompletionSource<T>>();
}
Ниже приведены важные модификации:
List<dynamic> заменен на List<TaskCompletionSource<T>>TransactionOperation1/2 имеют тип возврата Task<T>, который представляет собой задачу, созданную с помощью TaskCompletionSource<T>.В Task.Run добавлено дополнительное ожидание, которое выполняет TriggerTransaction внутренне, хотя вы можете заменить следующий код:
await Task.Run(async() =>
{
Console.ReadLine();
await Program<int>.TriggerTransaction(5);
});
с участием
await Program<int>.TriggerTransaction(5);
Теперь он дает ожидаемый результат, он суммирует два целых числа. Еще несколько небольших изменений, например, удаление Task.Delay, которое не требуется.
РЕДАКТИРОВАТЬ 1 - Использование Task.WhenAll
static async Task Main(string[] args)
{
var a = Program.TransactionOperation1(5);
var b = Program.TransactionOperation1(5);
Console.ReadLine();
var taskResults = await Task.WhenAll(a,b);
dynamic finalResult = 0;
foreach(var t in taskResults)
finalResult += t;
if (!File.Exists("D:\\data.txt"))
{
File.Create("D:\\data.txt");
}
using (FileStream stream = new FileStream("D:\\data.txt", FileMode.Append, FileAccess.Write))
{
var bytes = Encoding.UTF8.GetBytes(finalResult.ToString());
stream.Write(bytes, 0, bytes.Length);
}
Console.WriteLine(finalResult);
}
class Program
{
public static Task<dynamic> TransactionOperation1(dynamic val)
{
return Task<dynamic>.Run(() => val);
}
public static Task<dynamic> TransactionOperation2(dynamic val)
{
return Task<dynamic>.Run(() => val);
}
}
Проблема в том, что TCS и список задач сами по себе избыточны. Что также делает отдых кода избыточным. Можно ли все заменить одним await Task.WhenAll()? Или один ActionBlock?
Дело в том, что я не могу использовать дженерики, так как я ожидаю от TCS разных типов возвращаемых данных. Также вы действительно правы, я хотел использовать WhenAll, но как получить детализацию по различным типам возвращаемых данных? В моем списке могут быть TCS<Task<int>> и TCS<Task<bool>>. Как их достать?
@PanagiotisKanavos, конечно, да, мы можем использовать Task.WhenAll для выполнения агрегированных задач, с помощью TaskCompletionSource ничего особенного не достигается, поскольку не задействовано событие, инициируемое пользователем, хотя я просто изменял код OP
@BercoviciAdrian, который должен быть простым, иметь дело с Task.WhenAll, внутри вы получаете выполнение Task, который является базовым типом для Task<T>, здесь вы можете использовать Task<dynamic> и по-прежнему выполнять любую операцию, которая вам нужна, до тех пор, пока внутренний тип точки не поддерживает ее
@BercoviciAdrian создание универсального кода было просто добавлением ценности, вы, безусловно, можете продолжать использовать неуниверсальную версию и работать с динамическим типом до тех пор, пока он не будет внутренне поддерживать операцию, такую как +, которую вы определили
Обратите внимание на модифицированную версию (РЕДАКТИРОВАТЬ1) с использованием Task.WhenAll, которая, безусловно, значительно упрощает и упрощает код. Здесь для обработки использовали тип динамичный.
Я думаю, вы, наверное, что-то неправильно поняли. Создание
TaskCompletionSourceвозникает редко (помимо адаптации асинхронных API EAP к TAP). Задачи, объединяющие другие Задачи, также обычно редко бывают правильными. К сожалению, большая часть вашего кода, похоже, посвящена этим двум вещам, поэтому я даже не могу сказать, какую проблему вы пытаетесь решить является.