Golang HTTP Handler – можно ли реализовать шаблон вызова и ответа в одном потоковом POST?

У меня очень большие наборы данных в клиентах для мобильных устройств и настольных компьютеров, которые я пытаюсь двунаправленно синхронизировать со своим веб-сайтом.

В настоящее время я настроил поток, в котором я отправляю данные на сервер частями. Когда обработка каждого фрагмента завершена и сервер успешно зарегистрировал этот фрагмент данных, мне нужно отправить обратно данные ответа, содержащие информацию реестра, которая помогает поддерживать синхронизацию сервера и клиента.

После получения этого ответа клиент загрузит еще один фрагмент, и процесс начнется снова. Данные от клиента передаются на сервер, чтобы снизить потребление памяти клиентом.

Я перепробовал множество комбинаций, чтобы посмотреть, смогу ли я заставить это работать. Во всех случаях, если сервер записывает ответ, а клиент оставляет поток открытым (ожидая ответа от сервера для продолжения), то я не получаю ответ сервера до тех пор, пока не истечет время ожидания соединения.

Итак, есть ли способ сделать это в потоке?

Отправьте данные – получите ответ Отправьте больше данных – получите ответ И так далее...?

Ниже приведен небольшой надуманный пример того, как мне удалось успешно прочитать потоковые данные, но не отправить обратно несколько ответов. ПРИМЕЧАНИЕ. Фактическая реализация не отправляет ответ на каждую новую строку, это просто упрощение для демонстрационных целей.

func UserSyncHandler(db *psql.Database, cache *serverCache.Cache) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {  
        reader := bufio.NewReader(r.Body) 
        for {
            line, err := reader.ReadBytes('\n')
            if err != nil {
                if err == io.EOF {
                    logger.Info("Received EOF")
                    break // naturally reached end of stream
                }
                writeError(w, "Failed to read from sync stream", err, http.StatusInternalServerError)
                return
            }
            // Send response here
            writeData([]byte("confirmation data"))
        }
    }
}

Большое спасибо за подсказку! Я посмотрю там!

StainlessSteelRat 25.08.2024 07:56

Да, эта линия должна быть в цикле! Это есть и в моей реальной реализации. Спасибо, что указали на это!

StainlessSteelRat 25.08.2024 18:24

Естественным решением является использование HTTP-запроса-ответа для каждого запроса-ответа приложения. Если вы выполняете несколько пар запрос-ответ приложения в одном HTTP-запросе-ответе, то включите полнодуплексный и сбросьте ответ после записи каждого ответа приложения в ответ HTTP. Клиент и любые прокси-серверы также должны поддерживать полнодуплексный режим.

Cerise Limón 25.08.2024 18:28

@LeGEC и Церис Лимон, ваши два комментария, кажется, направили меня на правильный путь!

StainlessSteelRat 25.08.2024 21:22
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
4
62
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

  1. Websocket не поддерживает h2

  2. sse — это односторонний поток, и таймаут Go младшей версии его не поддерживает.

  3. Обратитесь к реализации sse

Пожалуйста, запустите его:

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "log"
    "math/rand"
    "net"
    "net/http"
    "time"
)

const (
    HeaderAccept           = "Accept"
    HeaderCacheControl     = "Cache-Control"
    HeaderConnection       = "Connection"
    HeaderContentType      = "Content-Type"
    MimeTextEventStream    = "text/event-stream"
    HeaderTransferEncoding = "Transfer-Encoding"
)

type Request struct {
    Index int
}

type Response struct {
    Index int
}

func main() {
    go func() {
        // Wait listen port
        time.Sleep(time.Second)
        ch := make(chan *Request, 8)
        ch <- &Request{}

        for {
            resp, err := NewClientStream("GET", "http://localhost:8088/call", ch, func(req *Request, resp *Response) error {
                fmt.Println("client:", req.Index, resp.Index)
                time.Sleep(time.Second*3)
                ch <- &Request{rand.Intn(256)}
                return nil
            })

            if err != nil {
                log.Println(err)
            }
            if resp != nil && resp.StatusCode == http.StatusNoContent {
                return
            }
        }
    }()

    ln ,err := net.Listen("tcp",":8088")
    if err != nil {
        panic(err)
    }
    log.Println("listen :8088")

    srv := http.Server{
        // test timeout
        ReadTimeout:  time.Second,
        WriteTimeout: time.Second,
        IdleTimeout:  time.Second,
        Handler: NewHandlerStream(func(req *Request) (any, error) {
            resp := &Response{req.Index + 1}
            fmt.Println("server:", req.Index, resp.Index)
            return resp, nil
        }),
    }
    srv.Serve(ln)
    srv.ListenAndServe()
}

func NewHandlerStream[T any](rpc func(*T) (any, error)) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        if r.Header.Get(HeaderAccept) != MimeTextEventStream {
            return
        }

        w.Header().Set(HeaderContentType, MimeTextEventStream)
        w.Header().Set(HeaderTransferEncoding, "chunked")
        w.Header().Set(HeaderConnection, "keep-alive")
        w.Header().Set(HeaderCacheControl, "no-cache")
        w.WriteHeader(http.StatusOK)
        ctl := http.NewResponseController(w)
        ctl.EnableFullDuplex()
        ctl.SetReadDeadline(time.Time{})
        ctl.SetWriteDeadline(time.Time{})
        ctl.Flush()

        de := json.NewDecoder(r.Body)
        en := json.NewEncoder(w)
        for {
            // receive
            req := new(T)
            err := de.Decode(req)
            if err != nil {
                return
            }

            resp, err := rpc(req)
            if err != nil {
                return
            }

            err = en.Encode(resp)
            if err != nil {
                return
            }
            ctl.Flush()
        }
    }
}

func NewClientStream[T, P any](method, path string, ch chan *P, call func(*P, *T) error) (*http.Response, error) {
    r, w := io.Pipe()
    req, err := http.NewRequest(method, path, r)
    if err != nil {
        return nil, err
    }
    req.Header.Set(HeaderAccept, MimeTextEventStream)
    req.Header.Set(HeaderTransferEncoding, "chunked")

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }

    if resp.StatusCode == http.StatusNoContent {
        return resp, err
    }
    if resp.StatusCode != http.StatusOK {
        return resp, fmt.Errorf("response status code is %d", resp.StatusCode)
    }

    de := json.NewDecoder(resp.Body)
    en := json.NewEncoder(w)

    for {
        // send request
        data := <-ch
        err := en.Encode(data)
        if err != nil {
            return resp, err
        }

        // receive
        receive := new(T)
        err = de.Decode(receive)
        if err != nil {
            return resp, err
        }

        // handler
        err = call(data, receive)
        if err != nil {
            return resp, err
        }
    }
}
Ответ принят как подходящий

Огромное спасибо LaGec и Серизе Лимон.

Ниже приведен модифицированный, надуманный пример, показывающий, как мне удалось заставить его работать.

func UserSyncHandler(db *psql.Database, cache *serverCache.Cache) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
    
        // Added as noted in a comment
        controller := http.NewResponseController(w)
        _ = controller.EnableFullDuplex() // no error reported 
  
        reader := bufio.NewReader(r.Body) 
        for {
            line, err := reader.ReadBytes('\n')
            if err != nil {
                if err == io.EOF {
                    logger.Info("Received EOF")
                    break // naturally reached end of stream
                }
                writeError(w, "Failed to read from sync stream", err, http.StatusInternalServerError)
                return
            }
            // Send response here
            writeData([]byte("confirmation data"))
            // Added as noted in another comment
            controller.Flush()
        }
    }
}

Некоторые отзывы: ответьте внутренней ошибкой сервера, если вызов EnableFullDuplex не удался; обработать ошибку из writeData; обработать ошибку от Flush.

raygun 25.08.2024 21:41

Определенно, я просто ограничил все это, чтобы проиллюстрировать, как заставить общение работать. Это не та реализация, которую я написал.

StainlessSteelRat 26.08.2024 06:09

Другие вопросы по теме