Как эффективно передавать клиентские зависимости в async?

Я пытаюсь внедрить Asynq, известный планировщик заданий Golang, в свой проект, и сейчас меня очень смущает отсутствие документации относительно возможных конкретных сценариев внутри самих задач. Имейте в виду, что в Интернете нет ни единого следа, который заставил бы меня думать, что я могу сильно ошибаться в том, как я хочу решить эту проблему.

Обычно я настраиваю соединение с базой данных в main.go и передаю его всем через внедрение зависимостей.

clients := services.Clients{
  Db:           dbClient,
  Redis:        redisClient,
  // and many more
}

// In my case I use GraphQL to handle requests
srv := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: &resolvers.Resolver{
  Services: &catalogue, // some services I need to use depending the request
  Clients:  &clients, // here we communicate the database, etc.
}}))

Как видите, я действительно могу передать clients моим обработчикам запросов (резольверам) и жить там счастливой жизнью, отправляя запросы к базе данных.

При настройке Asynq клиентская часть выглядит так

asynqClient := asynq.NewClient(asynq.RedisClientOpt{
  Addr:     os.Getenv("REDIS_ENDPOINT"),
  DB:       0, // will use the default one
  Password: os.Getenv("REDIS_PASSWORD"),
})
defer asynqClient.Close()

tsk, err := tasks.NewPing("pong")
if err != nil {
    fmt.Printf("%s", err)
    panic(err)
}
_, err = asynqClient.Enqueue(tsk)
if err != nil {
    fmt.Printf("%s", err)
    panic(err)
}

Я уже абстрагировал пинг-код в tasks.go

package tasks

import (
    "context"
    "encoding/json"
    "log"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypePing = "misc:ping"
)

type pingTaskPayload struct {
    Test string
}

func NewPing(test string) (*asynq.Task, error) {
    payload, err := json.Marshal(pingTaskPayload{Test: test})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypePing, payload), nil
}

func HandlePing(ctx context.Context, t *asynq.Task) error {
    var p pingTaskPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return err
    }
    log.Printf(" [*] Ping received with arguments: %s", p.Test)
    return nil
}

Вы маршалируете его, он проходит через Redis и перехватывается на другой стороне.

asynqServer := asynq.NewServer(
  asynq.RedisClientOpt{
    Addr:     os.Getenv("REDIS_ENDPOINT"),
    DB:       0, // will use the default one
    Password: os.Getenv("REDIS_PASSWORD"),
  },
  asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypePing, tasks.HandlePing)
go asynqServer.Run(mux)

Как видите, нет места для инъекции чего-либо и куда угодно. Где зависимости? Почему не предлагает это как-то донести до задач? Как все это используют? Начало работы никогда не указывает на какие-либо зависимости.

В идеале я бы хотел использовать mux.HandleFunc и предоставить кучу клиентов (например, соединение с БД) для tasks.HandlePing

На данный момент единственное «жизнеспособное» решение, которое я вижу, — это установить глобальный параметр main, чтобы мой сервер можно было брать из любой точки системы, чего я не хочу делать. Мне нужна четкая схема внедрения зависимостей.

Как мне правильно передать свои зависимости (clients включая базу данных) в Asynq? Я ошибаюсь, не устанавливая здесь глобальные значения?

Я долго искал. Будто никто никогда не задавался этим вопросом о передаче зависимостей, а библиотека эта довольно известная, так что возможно я как-то делаю что-то не так.

Вы можете создать тип структуры, например Ping, содержащий все, что вы хотите внедрить. Реализуйте HandlePing как метод. В main создайте экземпляр структуры Ping и передайте ping.HandlePing в качестве функции-обработчика.

Burak Serdar 20.04.2024 21:32

О боже мой, ты прав! Как я мог об этом не подумать. Спасибо, это мне очень помогло! Я кружил вокруг без всякой причины.

Laurent 20.04.2024 22:03
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
2
87
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Что ж, это оказалось намного проще, чем я думал, мне просто нужно передать клиентов в созданную мной структуру Tasks.

# in /tasks/
package tasks

import (
    "aquiestoy/pkg/mailer"
    "aquiestoy/pkg/tracking"

    "github.com/hibiken/asynq"
    "github.com/redis/go-redis/v9"
    "gorm.io/gorm"
)

type Clients struct {
    Db       *gorm.DB
    Redis    *redis.Client
    Tracking *tracking.Tracking
    Mailer   *mailer.Mailer
    Asynq    *asynq.Client
}

type Tasks struct {
    Clients *Clients
}

А потом в main.go

tsks := tasks.Tasks{
  Clients: &tasks.Clients{
    Db:       dbClient,
    Redis:    redisClient,
    Tracking: tkClient,
    Mailer:   mailClient,
    Asynq:    asynqClient,
  },
}

tsk, err := tsks.NewPing("pong")
if err != nil {
  fmt.Printf("%s", err)
  panic(err)
}
_, err = asynqClient.Enqueue(tsk)
if err != nil {
  fmt.Printf("%s", err)
  panic(err)
}

// NOTE : this should eventually be separated from the client
asynqServer := asynq.NewServer(
  asynq.RedisClientOpt{
    Addr:     os.Getenv("REDIS_ENDPOINT"),
    DB:       0, // will use the default one
    Password: os.Getenv("REDIS_PASSWORD"),
  },
  asynq.Config{Concurrency: 10},
)

mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypePing, tsks.HandlePing)
go asynqServer.Run(mux)

Для удобства можно было бы заключить это в NewTasks, но в моем случае это работает.

Концептуально задачи должны быть простыми, вы можете внедрять зависимости в предназначенные для них обработчики задач, и такие случаи описаны в документации.

https://github.com/hibiken/asynq/tree/master?tab=readme-ov-file#quickstart

// PingProcessor implements asynq.Handler interface.
type PingProcessor struct {
    db *sql.DB
    rdb redis.UniversalClient
    // etc
}

func NewPingProcessor(db *sql.DB, rdb redis.UniversalClient) *PingProcessor {
  return &PingProcessor{db: db, rdb: rdb}
}

func (p *PingProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    // call db using p.db 
    // call redis using p.rdb 
}

но вам следует изменить вложение мультиплексора

mux.Handle(task.TypePing, task.NewPingProcessor(db, rdb))

Если вам не нужны накладные расходы на создание новых объектов для выделенных процессоров задач, вы можете использовать функцию закрытия, чтобы она возвращала функцию, имеющую доступ к внедренным зависимостям.

func HandlePing(db *sql.DB, rdb redis.UniversalHandler) asynq.HandlerFunc {
    return func(ctx context.Context, t *asynq.Task) error {
        return nil
    }
}


mux.HandleFunc(task.TypePing, task.HandlePing(db, rdb))

Другие вопросы по теме