У меня есть запрос LINQ, который НЕ следует перечислять более одного раза, и я хочу избежать его двойного перечисления по ошибке. Есть ли какой-либо метод расширения, который я могу использовать, чтобы защитить себя от такой ошибки? Я думаю о чем-то вроде этого:
var numbers = Enumerable.Range(1, 10).OnlyOnce();
Console.WriteLine(numbers.Count()); // shows 10
Console.WriteLine(numbers.Count()); // throws InvalidOperationException: The query cannot be enumerated more than once.
Причина, по которой мне нужна эта функциональность, заключается в том, что у меня есть множество задач, которые предназначены для постепенного создания и запуска задач, в то время как они медленно перечисляются под контролем. Я уже сделал ошибку, чтобы запустить задачи дважды, потому что я забыл, что это другое перечисляемое, а не массив.
var tasks = Enumerable.Range(1, 10).Select(n => Task.Run(() => Console.WriteLine(n)));
Task.WaitAll(tasks.ToArray()); // Lets wait for the tasks to finish...
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id))); // Lets see the completed task IDs...
// Oups! A new set of tasks started running!
Перечисления перечисляют, конец истории. Вам просто нужно вызвать ToList
, или ToArray
// this will enumerate and start the tasks
var tasks = Enumerable.Range(1, 10)
.Select(n => Task.Run(() => Console.WriteLine(n)))
.ToList();
// wait for them all to finish
Task.WaitAll(tasks.ToArray());
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id)));
Хм, если вы хотите параллелизма
Parallel.For(0, 100, index => Console.WriteLine(index) );
или если вы используете асинхронный и ожидающий шаблон
public static async Task DoWorkLoads(IEnumerable <Something> results)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 50
};
var block = new ActionBlock<Something>(MyMethodAsync, options);
foreach (var result in results)
block.Post(result);
block.Complete();
await block.Completion;
}
...
public async Task MyMethodAsync(Something result)
{
await SomethingAsync(result);
}
Обновление. Поскольку вы ищете способ контролировать максимальную степень параллелизма, вы можете использовать это
public static async Task<IEnumerable<Task>> ExecuteInParallel<T>(this IEnumerable<T> collection,Func<T, Task> callback,int degreeOfParallelism)
{
var queue = new ConcurrentQueue<T>(collection);
var tasks = Enumerable.Range(0, degreeOfParallelism)
.Select(async _ =>
{
while (queue.TryDequeue(out var item))
await callback(item);
})
.ToArray();
await Task.WhenAll(tasks);
return tasks;
}
Я не хочу звонить ToArray
, потому что тогда все задачи начнут выполняться сразу. Я хочу перечислять задачи медленно (для достижения максимальной степени параллелизма).
Затем @TheodorZoulias использует ActionBlock или реактивные расширения.
У них есть кривая обучения. Я обязательно воспользуюсь ими после того, как изучу их. В настоящее время я довольно хорошо знаком с библиотекой параллельных задач, и я предпочел бы решение, которое я мог бы применить немедленно с моим текущим уровнем знаний.
@TheodorZoulias, пожалуйста, объясните, что это значит I want to enumerate the tasks slowly
Я использую метод @Ohad Schneider из здесь. Он ведет список активных задач, затем перестает перечислять, когда запущено достаточно задач, а затем ждет завершения некоторых из них.
@TheodorZoulias правильная формулировка, которую вам нужно использовать, это «вам нужно контролировать максимальную степень параллелизма»
Спасибо, Майкл! Ваш ExecuteInParallel
метод очень классный! Я немного изменил его, потому что хочу, чтобы результаты задачи возвращались. Я думаю, что ваш метод предпочтительнее метода Охада Шнайдера, потому что ожидание всех активных задач с Task.WhenAny
в цикле должно иметь накладные расходы, а ваш метод позволяет этого избежать.
I want to avoid enumerating it twice by mistake.
Вы можете обернуть коллекцию коллекцией, которая выдает исключение, если она перечисляется дважды.
например:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
namespace ConsoleApp8
{
public static class EnumExtension
{
class OnceEnumerable<T> : IEnumerable<T>
{
IEnumerable<T> col;
bool hasBeenEnumerated = false;
public OnceEnumerable(IEnumerable<T> col)
{
this.col = col;
}
public IEnumerator<T> GetEnumerator()
{
if (hasBeenEnumerated)
{
throw new InvalidOperationException("This collection has already been enumerated.");
}
this.hasBeenEnumerated = true;
return col.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
public static IEnumerable<T> OnlyOnce<T>(this IEnumerable<T> col)
{
return new OnceEnumerable<T>(col);
}
}
class Program
{
static void Main(string[] args)
{
var col = Enumerable.Range(1, 10).OnlyOnce();
var colCount = col.Count(); //first enumeration
foreach (var c in col) //second enumeration
{
Console.WriteLine(c);
}
}
}
}
Хороший ответ, голосуйте, однако я чувствую, что мы застряли в проблеме xy
@ Фабио, я знаю, как сделать его потокобезопасным, если это необходимо. Но в настоящее время мое перечисляемое используется в одном потоке выполнения, поэтому я буду использовать решение Дэвида как есть.
@Michael Randall, это проблема XY, но X шире, чем Y. Поскольку мою проблему с задачами можно решить с помощью более надежной реализации, но мой первоначальный запрос на перечисление, которое можно перечислить только один раз, может иметь другие приложения как хорошо. Приятно иметь класс OnceEnumerable
Дэвида Брауна в моем наборе инструментов. :-)
Rx, безусловно, является опцией для управления параллелизмом.
var query =
Observable
.Range(1, 10)
.Select(n => Observable.FromAsync(() => Task.Run(() => new { Id = n })));
var tasks = query.Merge(maxConcurrent: 3).ToArray().Wait();
Console.WriteLine(String.Join(", ", tasks.Select(t => t.Id)));
Спасибо, интересно. Я попробовал фрагмент, и он действительно делает то, что должен делать.
Rx действительно очень крутой. Я использую его для обработки всех событий и при работе с задачами. ИМХО намного мощнее.
Представьте собственный класс, который будет использовать Queue под капотом. Реализуйте
GetEnumerator
, который будет удалять элемент из очереди на каждой итерации - таким образом вы сможете безопасно повторять свой класс столько раз, сколько хотите, не выполняя задачи более одного раза, потому что задачи будут удалены на первой итерации. ИспользуйтеImmutableQueue
для получения класса потокобезопасности.