В настоящее время у меня есть программа, которая создает рабочую группу размера 1, которая затем вызывает startworker:
package main
import (
"db_write_consumer/db"
"db_write_consumer/worker"
"os"
"os/signal"
"syscall"
)
func main() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
for _, w := range workers {
w_ := w
worker.StartWorker(w_, []string{"test-topic"}, sigchan, mySQLClient)
}
}
где CreateGroup написано:
func CreateGroup(bootstrapServers string, groupId string, numWorkers int) []*kafka.Consumer {
consumers := []*kafka.Consumer{}
for i := 0; i < numWorkers; i++ {
c := NewWorker(bootstrapServers, groupId)
consumers = append(consumers, c)
}
return consumers
}
а в Стартворкере написано:
func StartWorker(c *kafka.Consumer, topics []string, sigchan chan os.Signal, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev, _ := c.ReadMessage(100)
if ev == nil {
continue
}
msg := &pb.Person{}
proto.Unmarshal(ev.Value, msg)
WriteStuff(mySQLClient, msg.Id, msg.Lastname, msg.Firstname, msg.Address, msg.City)
if ev.Headers != nil {
fmt.Printf("%% Headers: %v\n", ev.Headers)
}
_, err := c.StoreMessage(ev)
if err != nil {
fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n",
ev.TopicPartition)
}
}
}
fmt.Printf("Closing consumer\n")
c.Close()
}
это отлично работает для рабочей группы размера 1, но каждая попытка заставить это работать для большего размера рабочей группы терпит неудачу - все, что я узнал до сих пор, это то, что я хочу, чтобы context.WithCancel(context.Background()) передавался в рабочие функции из основной, но я потерялся с тем, как настроить группу ожидания или горутины для выполнения этой работы

Я понимаю, что ваш вопрос заключается в том, как управлять временем жизни воркеров, используя контекст (вместо sigchan). Самый простой способ — использовать signal.NotifyContext — это дает вам контекст, который отменяется при отправке одного из сигналов. Так что основным станет
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
mySQLClient, _ := db.NewMySQLDBClient("root", "", "localhost", 3306, "testbase")
workers := worker.CreateGroup("localhost:9092", "testgroup", 1)
var wg sync.WaitGroup
for _, w := range workers {
w_ := w
wg.Add(1)
go func() {
defer wg.Done()
worker.StartWorker(ctx, w_, []string{"test-topic"}, mySQLClient)
}()
}
wg.Wait()
}
Обратите также внимание на использование группы ожидания, чтобы избежать выхода из main до того, как все рабочие завершат работу. И StartWorker будет похоже
func StartWorker(ctx context.Context, c *kafka.Consumer, topics []string, mySQLClient *sql.DB) {
_ = c.SubscribeTopics(topics, nil)
fmt.Println(c)
for {
select {
case <-ctx.Done:
return
default:
...
так что мы можем оставить
CreateGroupкак блокировку, это нормально. Кажется, нам нужно изменить написание наgo worker.StartWorker(ctx, w_, []string{"test-topic"}, sigchan, mySQLClient)и полностью связать контексты, но этого оказалось недостаточно.