Звонивший:
var snapshotMessages = snapshotRepository.GetMessages();
_ = Console.Error.WriteLineAsync("Loading XML timetables...");
// some lengthy operation which loads a large dataset from a SQL database
await foreach (var item in snapshotMessages) {
// process the item
}
Вызываемый:
public async IAsyncEnumerable<Pport> GetMessages() {
Console.Error.WriteLine("Start getting messages");
var timestamp = DateTime.MinValue;
// some code which start downloading a large file from FTP
}
Я хочу распараллелить загрузку базы данных и загрузку. Однако строка «Начать получать сообщения» не появляется, что указывает на то, что программа не ведет себя параллельно, как я ожидал.
В документации сказано, что:
Асинхронный метод выполняется синхронно до тех пор, пока не достигнет своего первого выражения ожидания, после чего метод приостанавливается до завершения ожидаемой задачи. Тем временем управление возвращается вызывающему методу, как показано в примере в следующем разделе.
что здесь не похоже на правду. Что я сделал не так?
Я не понимаю. Разве код не должен выполняться до того, как будет возвращена итерация?
Не тот код, который вы написали в своем методе, нет. Компилятор генерирует метод, который эффективно создает конечный автомат, и запускает код вашего метода на «ленивой» основе при использовании итерируемого объекта. Я настоятельно рекомендую сначала убедиться, что вы понимаете блоки итераторов синхронизации, а затем смотреть блоки асинхронных итераторов.
Не нужно закрывать этот вопрос, он на самом деле весьма познавательный. Хотя мой первый инстинкт заключается в том, что он использует неправильный инструмент (вероятно, это должен быть Task<IEnumerable<...>>
), и после того, как вы узнаете ответ, он кажется очевидным, я бы не обязательно думал, что вызов IAsyncEnumerable
не приведет к вызову его неасинхронной части.
Связанный материал: Как перечислить IAsyncEnumerable<T> и вызвать асинхронное действие для каждого элемента, обеспечивая параллелизм для каждой пары итерация/действие?
Какова цель _ = Console.Error.WriteLineAsync("Loading XML timetables...");
в отличие от более простого Console.WriteLine("Loading XML timetables...");
?
Итераторы — как синхронные, так и асинхронные — имеют отложенное выполнение — по сути, конечный автомат неактивен до фактического foreach
. Однако это можно легко исправить, немного реструктурировав:
public IAsyncEnumerable<Pport> GetMessages()
{
// note: this is neither "async" nor has "yield"
Console.Error.WriteLine("Start getting messages");
var timestamp = DateTime.MinValue;
return GetMessagesCore(timestamp);
}
private async IAsyncEnumerable<Pport> GetMessagesCore(DateTime timestamp)
{
// some code which start downloading a large file from FTP
... yield etc
}
Этот подход также часто используется, чтобы разрешить только методу private
иметь параметр [EnumeratorCancellation] CancellationToken cancellationToken
для использования с WithCancellation()
.
Однако! Блок асинхронного итератора, как и обычный блок итератора, представляет собой всего лишь один пассивный насос, позволяющий итеративно выбирать последовательность; это не активно параллельная/параллельная машина. Для этого вам, вероятно, понадобится Channel<T>
, то есть Channel<Pport>
с отдельным исполнителем, инициируемым через Task.Run
(или аналогичный), чтобы у вас была активная пара производитель/потребитель с некоторым ограниченным или неограниченным устройством между ними (Channel<T>
). Канал имеет API ReadAllAsync()
, который отображает канал как IAsyncEnumnable<T>
.
Например:
await foreach(var i in new Foo().GetMessages())
{
Console.WriteLine(i);
}
class Foo
{
public IAsyncEnumerable<int> GetMessages()
{
Console.Error.WriteLine("Start getting messages");
// use a bounded channel as a buffer of 10 pending items;
// there is also an unbounded option available
Channel<int> channel = Channel.CreateBounded<int>(
new BoundedChannelOptions(capacity: 10));
_ = Task.Run(() => Produce(channel.Writer));
return channel.Reader.ReadAllAsync();
}
private async Task Produce(ChannelWriter<int> writer)
{
try
{
for (int i = 0; i < 1000; i++)
{
await writer.WriteAsync(i);
}
writer.TryComplete();
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
}
}
Ваше «исправление» не решило проблему. Он просто изменил ведение журнала так, что журналы показывают, что код работает так, как задумано, хотя это не так. ОП хочет, чтобы их длительная операция выполнялась одновременно с их методом GetMessagesCore
, что при этом не изменится.
@Servy, это зависит от вашей точки зрения; некоторый код теперь запускается немедленно - но да, если ОП хочет полностью активный насос, это что-то другое - я уточню
Да, код, который утверждает, что работа была начата, был запущен немедленно, а работа, которая, как он утверждает, была начата, на самом деле не началась. В их ситуации это явно контрпродуктивно, поскольку заставляет журналистов думать, что они делают то, что хотят, даже если это не так.
@Серви счастливее? (см. редактирование)
Это ведет себя так же, как и неасинхронный блок итератора: пока вы не начнете итерацию, код не запустится.