Как правильно реализовать отложенный ответ/тайм-аут между serverHTTP и каналом?

У меня есть концепция, которую я не знаю, как правильно решить с минимальным воздействием на систему в Go.

Я делаю «диспетчер очереди печати», в котором клиенты могут обращаться к API (/StartJob) для обработки заданий на печать.

Так как есть только один принтер, поэтому узким местом является один рабочий процесс, который обрабатывает каждое задание за раз, но клиенты могут передать одно задание в любой момент времени, он просто будет стоять в очереди, и рабочий процесс будет обрабатывать каждое задание за время, необходимое для выполнения шага. шаг за шагом.

То, как я это делаю, заключается в том, что ServeHTTP отправляет задание на канал (обратите внимание, здесь я просто передаю идентификатор, если рабочий будет искать из него данные для печати):

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    
    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

Затем Worker просто работает все время и обрабатывает любые поступающие задания. Реального кода немного больше, но в конце он выполняет внешний процесс:

  func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec ("START /i /b processAndPrint.exe -"+job.jobid)
      }

Дело в том, что внешнему процессу может потребоваться время для выполнения, иногда мгновенное, но при некоторых обстоятельствах выполнение задачи может занять 1 минуту, прежде чем он вернется.

Моя проблема здесь в том, что теперь в serverHTTP я мгновенно пишу клиенту, не зная, было ли задание первым в очереди и может быть немедленно обработано, или если оно было поставлено в очередь и, возможно, будет через несколько секунд или минут раньше его обработали:

  fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

Я хотел бы дать клиенту до 5 секунд, чтобы получить ответ, если его работа была обработана в течение этого времени, или, если нет, сказать ему, что ему нужно перезвонить позже, чтобы проверить статус своей работы.

Я имел в виду несколько подходов:

  1. В моем QueueElemnt я передаю http.ResponseWriter, поэтому я могу написать ответщику (ответить клиенту) из Worker. Это я могу сделать, если я позволю serveHTTP спать, поскольку ResponseWriter отключится, когда существует процедура go. Итак, здесь мне нужно будет подождать в serveHTTP, а затем, когда он ожидает, рабочему разрешено писать в ResponseWriter.

    Проблема с этим подходом заключается в том, что если задание находится в нескольких минутах, Worker ничего не запишет в этот ResponseWriter, и serveHTTP не будет знать, был ли отправлен ответ от worker.

  2. Я мог бы создать канал для каждого элемента QueueElement, чтобы serveHTTP и не только рабочий процесс, но и фактическое задание, если оно обрабатывается рабочим процессом, могли общаться друг с другом.

    Этот подход я не тестировал, но я также беспокоюсь здесь о том, что он излишен и тяжел для системы, поскольку у нас может быть ситуация, когда у нас поступает много-много-много запросов API и, следовательно, обрабатывается большая очередь, поэтому, хотя я потребуется тайм-аут / отмена через 5 секунд, я думаю, что концепция излишня?

  3. Возможно, я мог бы передать мьютексированное значение в элементе очереди, который как serveHTTP мог бы проверять до 5 секунд, так и очередь могла бы проверять/манипулировать, но в случае завершения задания элемент очереди исчезает, поэтому это может привести к конфликтам.

  4. Я мог бы сделать вариант № 1), где я пишу свой собственный генератор ответов и использую флаг, если что-то уже было записано в него, поэтому serveHTTP будет проверять это до 5 секунд, чтобы проверить, написал ли уже Worker ответ клиенту и в этом случае выйти из serveHTTP без ответа, или, в случае отсутствия записи, serveHTTP отправит сообщение обратно клиенту, немного похожее на строки это.

Но ни один из них, как мне кажется, не очень гладкий, и я не хочу запускать бесконечное количество go-процедур или каналов или везде замыкаться на mutux, поскольку я не знаю, как это влияет на систему.

Может ли кто-нибудь помочь мне в правильном способе реализации такой вещи? Я читал страницу вверх и вниз и не нашел хорошего и чистого способа добиться этого.

gv.printQueue это буферизованный канал?
Ankit Deshpande 31.05.2019 09:39

создал его как небуферизованный, но я не совсем уверен, правильно ли это делать, я сделал это с той точки зрения, что не хотел иметь верхний предел элементов, которые можно было бы поместить в канал/очередь, но я могу прочитать здесь, что они действуют по другому - что посоветуете? (я не хочу блокировать входящие запросы API, и в основном я хотел бы иметь возможность обрабатывать любое количество входящих и помещать их в канал/очередь)

MdTp 31.05.2019 11:03
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
0
2
513
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Я думаю, что самый простой подход - это первый, слегка измененный. Вы можете передать http.ResponseWriter воркеру, который охватывает другого воркера, фактически выполняющего работу, в то время как «родительский» воркер ждет ее завершения или тайм-аута. Он ответит HTTP-клиенту, как только произойдет одно из двух событий.

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        newPrintJob := QueueElement {writer: w, jobid: "jobid"}
        gv.printQueue <- newPrintJob
        fmt.Fprintf(w, "instant reply from serveHTTP\r\n")

    default:
        fmt.Fprintf(w, "No such Api")
    }
  }

func worker(jobs <-chan QueueElement) {
    for {
        done := make(chan struct{})
        job := <-jobs

        go func(d chan struct{}, q QueueElement) {
            processExec ("START /i /b processAndPrint.exe -"+q.jobid)
            d <- struct{}{}
        }(done, job)

        select {
            //Timeout
            case <-time.After(time.Second * 5):
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            //Job processing finished in time
            case <-done:
                fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
        }
   }

Вы можете создать «ожидающую» горутину, как только получите HTTP-запрос. Таким образом, таймер тайм-аута будет учитывать всю обработку запроса/задания.

Пример:

package main

import (
    "context"
    "fmt"
    "net/http"
    "time"
)

func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {

    switch r.URL.Path {
    case "/StartJob":
        doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
        newPrintJob := QueueElement{
            doneChan: doneC,
            jobid:    "jobid",
        }
        go func(doneChan chan struct{}) {
            ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
            defer cancel()
            select {
            //If this triggers first, then this waiting goroutine would exit
            //and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
            case <-ctx.Done():
                fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
            case <-doneChan:
                fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
            }
        }(doneC)
        gv.printQueue <- newPrintJob

    default:
        fmt.Fprintf(w, "No such Api")
    }
}

func worker(jobs <-chan QueueElement) {
    for {
        job := <-jobs
        processExec("START /i /b processAndPrint.exe -" + job.jobid)
        job.doneChan <- struct{}{}

    }
}

это была умная абстракция - для обработки длинных заданий - когда - задание вынимается из очереди, проблема с этим будет заключаться в том, что в случае, если задание вернется в очередь / канал ( рабочий не близок к обработке этого jobid пока еще) serveHTTP все равно остался бы в неизвестности?

MdTp 31.05.2019 10:48

Решением будет создание горутины ближе к тому месту, где вы получаете HTTP-запрос. Я отредактировал свой ответ с новым примером для этого. Вам все равно нужно будет создать дополнительную горутину для каждого HTTP-запроса. Следуя тому же шаблону, который я предлагаю, вы можете создать только одну горутину «официанта» в начале и передать идентификатор задания в сообщении канала.

Giulio Micheloni 31.05.2019 11:09

это красивое решение, и оно имеет смысл, большое спасибо, но как обстоят дела с подпрограммами go, было бы это нормально, скажем, если бы у нас было 1000 запросов API, и, таким образом, было бы создано вдвое больше горутин, чем обычно создается с помощью одного serveHTTP? это тяжело для системы или эти цифры даже близко не тяжелые?

MdTp 31.05.2019 11:16

Вы определенно можете реализовать версию приведенного выше примера, где горутина «waiter» — всего одна. Ему нужно будет отслеживать текущие HTTP-запросы, устанавливать для них таймер и ждать их «готового» канала. Все это внутри одного блока select{}. Там есть сложность памяти. Но, как говорится, Преждевременная оптимизация — корень всех зол. Более того, производственный код Go может легко породить сотни тысяч горутин. Подробнее здесь.

Giulio Micheloni 31.05.2019 11:25

да, вы правы, это был отличный подход, который обслуживает всех официантов! офигенно большое спасибо Джулио также за добавленное объяснение здесь с подпрограммами go!

MdTp 31.05.2019 11:29

в вашем примере -: ctx, cancel: = context.WithTimeout (ctx, 5 * time.Second) - где должен быть создан ctx? я не уверен на 100%, как это работает, мне нужно создать контекст в структуре канала или?

MdTp 31.05.2019 17:05

Вы можете уточнить? Не принято хранить context.Context в структуре данных. Было бы лучше явно передать его вызову функции или горутине.

Giulio Micheloni 31.05.2019 17:14

просто я пытаюсь реализовать код здесь и не уверен, где мне нужно объявить dtx (если я напишу код так же, как в вашем примере, но ctx := context.WaitTimeout(ctx,...) жалуется, что не знает ctx на тот момент (первый раз пробую контекст)

MdTp 31.05.2019 17:39

хм, я не могу заставить его работать с этим подходом - запуск процедуры go внутри serveHTTP действительно получает ответ от работника, но проблема в том, что serveHTTP сразу же завершает работу (тогда соединение с ResponseWriter теряется). Если я изменю его так, чтобы это была не функция перехода, а просто функция, тогда концепция работает, но проблема в том, что serveHTTP блокируется, поэтому, если я запускаю запрос API ex 10 против него, они будут обрабатываться последовательно, что дает все более и более длинный ответ время, когда отправляется больше запросов API :(

MdTp 31.05.2019 23:16

Я бы не стал держать запрос в течение длительного времени, потому что мы не уверены, когда задание будет обработано.

Один из подходов, о котором я могу думать, это:

Сначала ответьте с сервера как принятый/поставленный в очередь и верните job_id.

{
   "job_id": "1",
   "status": "queued"
}

Клиент может опрашивать (скажем, каждые 5 секунд) или использовать длительный опрос для проверки состояния задания.

Когда работает:

{
   "job_id": "1",
   "status": "processing"
}

После завершения:

{
   "job_id": "1",
   "status": "success/failed"
}

да, это было бы моим последним средством, заканчивающимся мгновенным ответом и опросом по запросу, но я хотел бы как-то избежать этого, так как в некоторых случаях на канал не будет нагрузки, и работник сразу же обработает задание. В моей голове я сравниваю это немного, как если бы вы вызывали API, который будет запрашивать больший объем данных из базы данных, у вас также иногда будет время обработки ответа от API, и вам не потребуется его опрашивать.

MdTp 31.05.2019 10:51

Затем вы можете передать канал результата в структуру задания, когда рабочий закончит работу, он может записать результаты в канал результата.

Ankit Deshpande 31.05.2019 11:00

Это хорошая практика запускать столько каналов? (даже если они живут не более 5 секунд в случае, если задание еще не может быть обработано) - также в случае, если рабочий еще даже не приступил к заданию, мы выйдем из serveHTTP через 5 секунд (тайм-аут ответа на канале?) и то рабочий будет писать на мёртвый канал, где больше никто не слушает?

MdTp 31.05.2019 11:07

Напишите метод для проверки статуса задания. Через 5 секунд или непосредственно перед выходом из serveHTTP проверьте статус и верните его. Если статус не завершен, позже нажмите API с помощью job_id и проверьте статус. Это приведет к избеганию канала результатов.

Ankit Deshpande 31.05.2019 11:27

если это длится более 5 секунд, то я сообщаю клиенту, что они должны опросить оттуда, и я думал о том, чтобы сделать дополнительное хранилище статуса каждого задания и просто выполнить проверку, но тогда я думаю, что мне придется поставить мьютекс вокруг проверьте значение и создайте 2 области хранения (канал только для рабочих элементов и, например, карту или базу данных, если я хочу сохранить ее как базу данных статуса задания). просто не нравится делать цикл со сном, чтобы каждые 250 мс проверять значение состояния:/

MdTp 31.05.2019 11:32

и я хотел бы отпустить клиента как можно скорее, я не хочу, чтобы они были вынуждены ждать 5 секунд каждый раз, если задание было обработано сразу, я хочу сразу же сообщить им.

MdTp 31.05.2019 11:33

Другие вопросы по теме