Горутина Kafka Consumers

В настоящее время у меня есть программа, которая создает рабочую группу размера 1, которая затем вызывает startworker:

package main

import (
    "db_write_consumer/db"
    "db_write_consumer/worker"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
    mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
    workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
    for _, w := range workers {
        w_ := w
        worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
    }
}

где CreateGroup написано:

func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
    consumers := []*kafka.Consumer{}
    for i := 0; i < numWorkers; i++ {
        c := NewWorker(bootstrapServers, groupId)
        consumers = append(consumers, c)
    }
    return consumers
}

а в Стартворкере написано:

func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    run := true
    for run {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev, _ := c.ReadMessage(100)
            if ev == nil {
                continue
            }
            msg := &pb.Person{}
            proto.Unmarshal(ev.Value, msg)
            WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
            if ev.Headers != nil {
                fmt.Printf("%% Headers: %v\n", ev.Headers)
            }
            _, err := c.StoreMessage(ev)
            if err != nil {
                fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
                    ev.TopicPartition)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}

это отлично работает для рабочей группы размера 1, но каждая попытка заставить это работать для большего размера рабочей группы терпит неудачу - все, что я узнал до сих пор, это то, что я хочу, чтобы context.WithCancel(context.Background()) передавался в рабочие функции из основной, но я потерялся с тем, как настроить группу ожидания или горутины для выполнения этой работы

так что мы можем оставить CreateGroup как блокировку, это нормально. Кажется, нам нужно изменить написание на go worker.StartWorker(ctx, w_, []string{"test-topic"}, sigchan, mySQLClient) и полностью связать контексты, но этого оказалось недостаточно.

ILoveCliques 29.12.2022 23:04
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
1
58
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я понимаю, что ваш вопрос заключается в том, как управлять временем жизни воркеров, используя контекст (вместо sigchan). Самый простой способ — использовать signal.NotifyContext — это дает вам контекст, который отменяется при отправке одного из сигналов. Так что основным станет

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer stop()

    mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
    workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
    var wg sync.WaitGroup
    for _, w := range workers {
        w_ := w
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
        }()
    }
    wg.Wait()
}

Обратите также внимание на использование группы ожидания, чтобы избежать выхода из main до того, как все рабочие завершат работу. И StartWorker будет похоже

func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
    _ = c.SubscribeTopics(topics, nil)
    fmt.Println(c)
    for {
        select {
        case <-ctx.Done:
            return
        default:
        ...

Другие вопросы по теме