Я использую библиотеку DataFlow в .NET 4.7.1. По какой-то причине моя программа никогда не завершается, когда у меня есть await Console.Out.WriteLineAsync(DateTime.Now.TimeOfDay.ToString());
в лямбде async
конструктора ActionBlock
. Он просто выводил бы поток строк DateTime.Now.TimeOfDay.ToString()
и случайным образом останавливался, никогда не доходя до Console.WriteLine("Time elapsed:" + watch.Elapsed);
, хотя в некоторых случаях я наблюдал, что консоль выводит "Finished Reading the file"
.
class Program
{
public static async Task Main(string[] args)
{
int numberOfLines = 0;
Console.WriteLine("Number of cores used:" + Convert.ToInt32(Math.Ceiling((Environment.ProcessorCount * 0.75) * 2.0)));
BufferBlock<string> queBuffer = new BufferBlock<string>(new DataflowBlockOptions { BoundedCapacity = 100000 });
var processingBlock = new ActionBlock<string>(async inputline =>
{
Interlocked.Increment(ref numberOfLines);
//Line that causes issue
//await Console.Out.WriteLineAsync(DateTime.Now.TimeOfDay.ToString());
}
, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 48,
SingleProducerConstrained = true,
BoundedCapacity = 500
});
queBuffer.LinkTo(processingBlock);
//Start
var watch = System.Diagnostics.Stopwatch.StartNew();
Console.WriteLine("Processing started at:" + DateTime.Now);
if (File.Exists(args[0]))
{
using (StreamReader sr = new StreamReader(args[0]))
{
string line;
// Read and display lines from the file until the end of the file is reached.
while ((line = await sr.ReadLineAsync()) != null)
{
await queBuffer.SendAsync(line);
}
}
await Console.Out.WriteLineAsync("Finished Reading the file");
}
queBuffer.Complete();
processingBlock.Complete();
await Task.WhenAll(queBuffer.Completion, processingBlock.Completion);
watch.Stop();
Console.WriteLine("Time elapsed:" + watch.Elapsed);
Console.WriteLine("Number of lines read:" + numberOfLines.ToString());
}
}
Однако, если я уберу строку, вызывающую проблему, она сработает и прочитает все строки из текстового файла.
W:\test>.\CompressAMS.exe token2-small.txt
Number of cores used:24
Processing started at:12/17/2018 6:32:50 PM
Finished Reading the file
Time elapsed:00:00:00.3569824
Number of lines read:100000
@RobertHarvey Вы имеете в виду звонок Flush()
по линии, которая вызывает проблему? Или еще где-нибудь?
Не думаю, что вам нужно ждать завершения обеих задач. Вы пробовали просто дождаться последнего блока в конвейере? Вместо Task.WhenAll
просто сделайте await processingBlock.Completion;
.
@pmcilreavy Вы правы! Я только что проверил документацию. Это тоже имеет смысл интуитивно. Спасибо !
Как вы могли догадаться, это не как асинхронный. Так что просто не используйте его, и у вас будет на одну проблему с синхронизацией меньше, о которой нужно беспокоиться.
Что у вас действительно есть, так это состояние гонки по завершении. Вы вызываете Complete()
для обоих блоков, заставляя блок обработки прекратить прием сообщений, и в этот момент в буфере все еще могут быть данные для передачи. Затем, когда вы ждете завершения обоих блоков, если буфер не отправил все свои сообщения, он никогда не будет завершен, и выполнение будет зависать на Finished Reading File
.
Вы можете безопасно дождаться обоих блоков, но только вызвать Complete()
в буфере и позволить TDF распространить завершение на ваш блок обработки нисходящего потока:
queBuffer.LinkTo(processingBlock, new DataflowLinkOptions() { PropagateCompletion = true });
/******/
queBuffer.Complete();
await Task.WhenAll(queBuffer.Completion, processingBlock.Completion);
Попробуйте вызвать в
Flush()
наConsole.Out
.