Как я могу добавить многопоточность в Concurrentqueue, в данный момент я обрабатываю текстовый файл с использованием Concurrentqueue в 1 потоке, но что, если я хочу запустить его в нескольких потоках, чтобы уменьшить общее время обработки?
Пример текущих методов -
private static ConcurrentQueue<string> queue;
static void Main(string[] args)
{
queue = new ConcurrentQueue<string>(System.IO.File.ReadAllLines("input.txt"));
Process();
}
static void Process()
{
while (queue.Count > 0)
{
string entry;
if (queue.TryDequeue(out entry))
{
Console.WriteLine(entry);
log("out.txt", entry);
}
}
}
private static void log(string file, string data)
{
using (StreamWriter writer = System.IO.File.AppendText(file))
{
writer.WriteLine(data);
writer.Flush();
writer.Close();
}
}
Разбивка кода -
queue = new ConcurrentQueue<string>(System..) // assigns queue to a text file
Process(); // Executes the Process method
static void Process() {
while ... // runs a loop whilst queue.count is not equal to 0
if (queueTryDequeue... // takes one line from queue and assigns it to 'string entry'
Console.. // Writes 'entry' to console
log.. // adds 'string entry' to a new line inside 'out.txt'
input.txt, например, содержит 1000 записей, и я хочу создать 10 потоков, которые берут запись из input.txt и обрабатывают ее, избегая при этом использования одной и той же записи/дублирования того же процесса, что и другой поток. Как мне добиться этого?
Вы должны использовать цикл Параллельно:
Примечание: элементы не будут зацикливаться в исходном порядке!
private static StreamWriter logger;
static void Main(string[] args)
{
// Store your entries from a file in a queue.
ConcurrentQueue<string> queue = new ConcurrentQueue<string>(System.IO.File.ReadAllLines("input.txt"));
// Open StreamWriter here.
logger = File.AppendText("log.txt");
// Call process method.
ProcessParallel(queue);
// Close the StreamWriter after processing is done.
logger.Close();
}
static void ProcessParallel(ConcurrentQueue<string> collection)
{
ParallelOptions options = new ParallelOptions()
{
// A max of 10 threads can access the file at one time.
MaxDegreeOfParallelism = 10
};
// Start the loop and store the result, so we can check if all the threads are done.
// The Parallel.For will do all the mutlithreading for you!
ParallelLoopResult result = Parallel.For(0, collection.Count, options, (i) =>
{
string entry;
if (collection.TryDequeue(out entry))
{
Console.WriteLine(entry);
log(entry);
}
});
// Parallel.ForEach can also be used.
// Block the main thread while it is still processing the entries...
while (!result.IsCompleted) ;
// Every thread is done
Console.WriteLine("Multithreaded loop is done!");
}
private static void log(string data)
{
if (logger.BaseStream == null)
{
// Cannot log, because logger.Close(); was called.
return;
}
logger.WriteLine(data);
}
Кажется, это работает, однако, если я попытаюсь использовать свой метод журнала вместо того, который вы предоставили, он выдает исключение «процесс не может получить доступ к этому файлу, потому что он используется другим процессом»... если я не воспользуюсь вашим подходом и установите регистратор внутри основного в "log.txt"
Я пометил как правильный ответ, так как этот вопрос касается потоковой передачи, и ваш ответ для этого находится в рабочем состоянии.
@ R2-D2 Ваш подход к ведению журнала не работает, потому что цикл одновременно регистрирует несколько потоков. Функция журнала открывает файл 'log.txt', что-то записывает и закрывает его. Если другой поток обращается к методу журнала, в то время как другой поток все еще записывает в файл журнала, вы получите это исключение, сначала необходимо снять «блокировку» файла, вызвав writer.Close();
.
Во многом зависит от того, что здесь означает и обработать его. ЦП или ввод-вывод связаны?