ConcurrentQueue<string> с многопоточностью/задачами?

Как я могу добавить многопоточность в Concurrentqueue, в данный момент я обрабатываю текстовый файл с использованием Concurrentqueue в 1 потоке, но что, если я хочу запустить его в нескольких потоках, чтобы уменьшить общее время обработки?

Пример текущих методов -

    private static ConcurrentQueue<string> queue;

    static void Main(string[] args)
    {

        queue = new ConcurrentQueue<string>(System.IO.File.ReadAllLines("input.txt"));
        Process();

    }

    static void Process()
    {

        while (queue.Count > 0)
        {
            string entry;
            if (queue.TryDequeue(out entry))
            {
                Console.WriteLine(entry);
                log("out.txt", entry);
            }
        }
    }

    private static void log(string file, string data)
    {
        using (StreamWriter writer = System.IO.File.AppendText(file))
        {
            writer.WriteLine(data);
            writer.Flush();
            writer.Close();
        }
    }

Разбивка кода -

queue = new ConcurrentQueue<string>(System..) // assigns queue to a text file

Process(); // Executes the Process method


static void Process() {

    while ... // runs a loop whilst queue.count is not equal to 0

    if (queueTryDequeue... // takes one line from queue and assigns it to 'string entry'

    Console.. // Writes 'entry' to console

    log.. // adds 'string entry' to a new line inside 'out.txt'

input.txt, например, содержит 1000 записей, и я хочу создать 10 потоков, которые берут запись из input.txt и обрабатывают ее, избегая при этом использования одной и той же записи/дублирования того же процесса, что и другой поток. Как мне добиться этого?

Во многом зависит от того, что здесь означает и обработать его. ЦП или ввод-вывод связаны?

Henk Holterman 19.06.2019 18:03
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
1
239
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы должны использовать цикл Параллельно:

Примечание: элементы не будут зацикливаться в исходном порядке!

private static StreamWriter logger;

static void Main(string[] args)
{
    // Store your entries from a file in a queue.
    ConcurrentQueue<string> queue = new ConcurrentQueue<string>(System.IO.File.ReadAllLines("input.txt"));

    // Open StreamWriter here.
    logger = File.AppendText("log.txt");

    // Call process method.
    ProcessParallel(queue);

    // Close the StreamWriter after processing is done.
    logger.Close();
}

static void ProcessParallel(ConcurrentQueue<string> collection)
{
    ParallelOptions options = new ParallelOptions()
    {
        // A max of 10 threads can access the file at one time.
        MaxDegreeOfParallelism = 10
    };

    // Start the loop and store the result, so we can check if all the threads are done.
    // The Parallel.For will do all the mutlithreading for you!
    ParallelLoopResult result = Parallel.For(0, collection.Count, options, (i) =>
    {
        string entry;
        if (collection.TryDequeue(out entry))
        {
            Console.WriteLine(entry);
            log(entry);
        }
    });
    // Parallel.ForEach can also be used.

    // Block the main thread while it is still processing the entries...
    while (!result.IsCompleted) ;

    // Every thread is done
    Console.WriteLine("Multithreaded loop is done!");
}

private static void log(string data)
{
    if (logger.BaseStream == null)
    {
        // Cannot log, because logger.Close(); was called.
        return;
    }

    logger.WriteLine(data);
}

Кажется, это работает, однако, если я попытаюсь использовать свой метод журнала вместо того, который вы предоставили, он выдает исключение «процесс не может получить доступ к этому файлу, потому что он используется другим процессом»... если я не воспользуюсь вашим подходом и установите регистратор внутри основного в "log.txt"

R2-D2 19.06.2019 18:54

Я пометил как правильный ответ, так как этот вопрос касается потоковой передачи, и ваш ответ для этого находится в рабочем состоянии.

R2-D2 19.06.2019 18:55

@ R2-D2 Ваш подход к ведению журнала не работает, потому что цикл одновременно регистрирует несколько потоков. Функция журнала открывает файл 'log.txt', что-то записывает и закрывает его. Если другой поток обращается к методу журнала, в то время как другой поток все еще записывает в файл журнала, вы получите это исключение, сначала необходимо снять «блокировку» файла, вызвав writer.Close();.

codestix 19.06.2019 20:17

Другие вопросы по теме