У меня есть концепция, которую я не знаю, как правильно решить с минимальным воздействием на систему в Go.
Я делаю «диспетчер очереди печати», в котором клиенты могут обращаться к API (/StartJob) для обработки заданий на печать.
Так как есть только один принтер, поэтому узким местом является один рабочий процесс, который обрабатывает каждое задание за раз, но клиенты могут передать одно задание в любой момент времени, он просто будет стоять в очереди, и рабочий процесс будет обрабатывать каждое задание за время, необходимое для выполнения шага. шаг за шагом.
То, как я это делаю, заключается в том, что ServeHTTP отправляет задание на канал (обратите внимание, здесь я просто передаю идентификатор, если рабочий будет искать из него данные для печати):
func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/StartJob":
newPrintJob := QueueElement {jobid: "jobid"}
gv.printQueue <- newPrintJob
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
default:
fmt.Fprintf(w, "No such Api")
}
}
Затем Worker просто работает все время и обрабатывает любые поступающие задания. Реального кода немного больше, но в конце он выполняет внешний процесс:
func worker(jobs <-chan QueueElement) {
for {
job := <-jobs
processExec ("START /i /b processAndPrint.exe -"+job.jobid)
}
Дело в том, что внешнему процессу может потребоваться время для выполнения, иногда мгновенное, но при некоторых обстоятельствах выполнение задачи может занять 1 минуту, прежде чем он вернется.
Моя проблема здесь в том, что теперь в serverHTTP я мгновенно пишу клиенту, не зная, было ли задание первым в очереди и может быть немедленно обработано, или если оно было поставлено в очередь и, возможно, будет через несколько секунд или минут раньше его обработали:
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
Я хотел бы дать клиенту до 5 секунд, чтобы получить ответ, если его работа была обработана в течение этого времени, или, если нет, сказать ему, что ему нужно перезвонить позже, чтобы проверить статус своей работы.
Я имел в виду несколько подходов:
В моем QueueElemnt я передаю http.ResponseWriter, поэтому я могу написать ответщику (ответить клиенту) из Worker. Это я могу сделать, если я позволю serveHTTP спать, поскольку ResponseWriter отключится, когда существует процедура go. Итак, здесь мне нужно будет подождать в serveHTTP, а затем, когда он ожидает, рабочему разрешено писать в ResponseWriter.
Проблема с этим подходом заключается в том, что если задание находится в нескольких минутах, Worker ничего не запишет в этот ResponseWriter, и serveHTTP не будет знать, был ли отправлен ответ от worker.
Я мог бы создать канал для каждого элемента QueueElement, чтобы serveHTTP и не только рабочий процесс, но и фактическое задание, если оно обрабатывается рабочим процессом, могли общаться друг с другом.
Этот подход я не тестировал, но я также беспокоюсь здесь о том, что он излишен и тяжел для системы, поскольку у нас может быть ситуация, когда у нас поступает много-много-много запросов API и, следовательно, обрабатывается большая очередь, поэтому, хотя я потребуется тайм-аут / отмена через 5 секунд, я думаю, что концепция излишня?
Возможно, я мог бы передать мьютексированное значение в элементе очереди, который как serveHTTP мог бы проверять до 5 секунд, так и очередь могла бы проверять/манипулировать, но в случае завершения задания элемент очереди исчезает, поэтому это может привести к конфликтам.
Я мог бы сделать вариант № 1), где я пишу свой собственный генератор ответов и использую флаг, если что-то уже было записано в него, поэтому serveHTTP будет проверять это до 5 секунд, чтобы проверить, написал ли уже Worker ответ клиенту и в этом случае выйти из serveHTTP без ответа, или, в случае отсутствия записи, serveHTTP отправит сообщение обратно клиенту, немного похожее на строки это.
Но ни один из них, как мне кажется, не очень гладкий, и я не хочу запускать бесконечное количество go-процедур или каналов или везде замыкаться на mutux, поскольку я не знаю, как это влияет на систему.
Может ли кто-нибудь помочь мне в правильном способе реализации такой вещи? Я читал страницу вверх и вниз и не нашел хорошего и чистого способа добиться этого.
создал его как небуферизованный, но я не совсем уверен, правильно ли это делать, я сделал это с той точки зрения, что не хотел иметь верхний предел элементов, которые можно было бы поместить в канал/очередь, но я могу прочитать здесь, что они действуют по другому - что посоветуете? (я не хочу блокировать входящие запросы API, и в основном я хотел бы иметь возможность обрабатывать любое количество входящих и помещать их в канал/очередь)
Я думаю, что самый простой подход - это первый, слегка измененный. Вы можете передать http.ResponseWriter
воркеру, который охватывает другого воркера, фактически выполняющего работу, в то время как «родительский» воркер ждет ее завершения или тайм-аута. Он ответит HTTP-клиенту, как только произойдет одно из двух событий.
func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/StartJob":
newPrintJob := QueueElement {writer: w, jobid: "jobid"}
gv.printQueue <- newPrintJob
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
default:
fmt.Fprintf(w, "No such Api")
}
}
func worker(jobs <-chan QueueElement) {
for {
done := make(chan struct{})
job := <-jobs
go func(d chan struct{}, q QueueElement) {
processExec ("START /i /b processAndPrint.exe -"+q.jobid)
d <- struct{}{}
}(done, job)
select {
//Timeout
case <-time.After(time.Second * 5):
fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
//Job processing finished in time
case <-done:
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
}
}
Вы можете создать «ожидающую» горутину, как только получите HTTP-запрос. Таким образом, таймер тайм-аута будет учитывать всю обработку запроса/задания.
Пример:
package main
import (
"context"
"fmt"
"net/http"
"time"
)
func (gv *GlobalVariables) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/StartJob":
doneC := make(chan struct{}, 1) //Buffered channel in order not to block the worker routine
newPrintJob := QueueElement{
doneChan: doneC,
jobid: "jobid",
}
go func(doneChan chan struct{}) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
select {
//If this triggers first, then this waiting goroutine would exit
//and nobody would be listeding the 'doneChan'. This is why it has to be buffered.
case <-ctx.Done():
fmt.Fprintf(w, "job is taking more than 5 seconds to complete\r\n")
case <-doneChan:
fmt.Fprintf(w, "instant reply from serveHTTP\r\n")
}
}(doneC)
gv.printQueue <- newPrintJob
default:
fmt.Fprintf(w, "No such Api")
}
}
func worker(jobs <-chan QueueElement) {
for {
job := <-jobs
processExec("START /i /b processAndPrint.exe -" + job.jobid)
job.doneChan <- struct{}{}
}
}
это была умная абстракция - для обработки длинных заданий - когда - задание вынимается из очереди, проблема с этим будет заключаться в том, что в случае, если задание вернется в очередь / канал ( рабочий не близок к обработке этого jobid пока еще) serveHTTP все равно остался бы в неизвестности?
Решением будет создание горутины ближе к тому месту, где вы получаете HTTP-запрос. Я отредактировал свой ответ с новым примером для этого. Вам все равно нужно будет создать дополнительную горутину для каждого HTTP-запроса. Следуя тому же шаблону, который я предлагаю, вы можете создать только одну горутину «официанта» в начале и передать идентификатор задания в сообщении канала.
это красивое решение, и оно имеет смысл, большое спасибо, но как обстоят дела с подпрограммами go, было бы это нормально, скажем, если бы у нас было 1000 запросов API, и, таким образом, было бы создано вдвое больше горутин, чем обычно создается с помощью одного serveHTTP? это тяжело для системы или эти цифры даже близко не тяжелые?
Вы определенно можете реализовать версию приведенного выше примера, где горутина «waiter» — всего одна. Ему нужно будет отслеживать текущие HTTP-запросы, устанавливать для них таймер и ждать их «готового» канала. Все это внутри одного блока select{}
. Там есть сложность памяти. Но, как говорится, Преждевременная оптимизация — корень всех зол. Более того, производственный код Go может легко породить сотни тысяч горутин. Подробнее здесь.
да, вы правы, это был отличный подход, который обслуживает всех официантов! офигенно большое спасибо Джулио также за добавленное объяснение здесь с подпрограммами go!
в вашем примере -: ctx, cancel: = context.WithTimeout (ctx, 5 * time.Second) - где должен быть создан ctx? я не уверен на 100%, как это работает, мне нужно создать контекст в структуре канала или?
Вы можете уточнить? Не принято хранить context.Context в структуре данных. Было бы лучше явно передать его вызову функции или горутине.
просто я пытаюсь реализовать код здесь и не уверен, где мне нужно объявить dtx (если я напишу код так же, как в вашем примере, но ctx := context.WaitTimeout(ctx,...) жалуется, что не знает ctx на тот момент (первый раз пробую контекст)
хм, я не могу заставить его работать с этим подходом - запуск процедуры go внутри serveHTTP действительно получает ответ от работника, но проблема в том, что serveHTTP сразу же завершает работу (тогда соединение с ResponseWriter теряется). Если я изменю его так, чтобы это была не функция перехода, а просто функция, тогда концепция работает, но проблема в том, что serveHTTP блокируется, поэтому, если я запускаю запрос API ex 10 против него, они будут обрабатываться последовательно, что дает все более и более длинный ответ время, когда отправляется больше запросов API :(
Я бы не стал держать запрос в течение длительного времени, потому что мы не уверены, когда задание будет обработано.
Один из подходов, о котором я могу думать, это:
Сначала ответьте с сервера как принятый/поставленный в очередь и верните job_id.
{
"job_id": "1",
"status": "queued"
}
Клиент может опрашивать (скажем, каждые 5 секунд) или использовать длительный опрос для проверки состояния задания.
Когда работает:
{
"job_id": "1",
"status": "processing"
}
После завершения:
{
"job_id": "1",
"status": "success/failed"
}
да, это было бы моим последним средством, заканчивающимся мгновенным ответом и опросом по запросу, но я хотел бы как-то избежать этого, так как в некоторых случаях на канал не будет нагрузки, и работник сразу же обработает задание. В моей голове я сравниваю это немного, как если бы вы вызывали API, который будет запрашивать больший объем данных из базы данных, у вас также иногда будет время обработки ответа от API, и вам не потребуется его опрашивать.
Затем вы можете передать канал результата в структуру задания, когда рабочий закончит работу, он может записать результаты в канал результата.
Это хорошая практика запускать столько каналов? (даже если они живут не более 5 секунд в случае, если задание еще не может быть обработано) - также в случае, если рабочий еще даже не приступил к заданию, мы выйдем из serveHTTP через 5 секунд (тайм-аут ответа на канале?) и то рабочий будет писать на мёртвый канал, где больше никто не слушает?
Напишите метод для проверки статуса задания. Через 5 секунд или непосредственно перед выходом из serveHTTP проверьте статус и верните его. Если статус не завершен, позже нажмите API с помощью job_id и проверьте статус. Это приведет к избеганию канала результатов.
если это длится более 5 секунд, то я сообщаю клиенту, что они должны опросить оттуда, и я думал о том, чтобы сделать дополнительное хранилище статуса каждого задания и просто выполнить проверку, но тогда я думаю, что мне придется поставить мьютекс вокруг проверьте значение и создайте 2 области хранения (канал только для рабочих элементов и, например, карту или базу данных, если я хочу сохранить ее как базу данных статуса задания). просто не нравится делать цикл со сном, чтобы каждые 250 мс проверять значение состояния:/
и я хотел бы отпустить клиента как можно скорее, я не хочу, чтобы они были вынуждены ждать 5 секунд каждый раз, если задание было обработано сразу, я хочу сразу же сообщить им.
gv.printQueue
это буферизованный канал?