Проблема синхронизации в этой программе Golang

Я пытаюсь создать программу, которая работает как прокси-сервер и может динамически переключаться на новую конечную точку. но я столкнулся с проблемой, что после вызова switchOverToNewEndpoint() все еще есть некоторые прокси-объекты, подключающиеся к исходной конечной точке 8.8.8.8, которые должны быть закрыты.

package main

import (
    "net"
    "sync"
    "sync/atomic"
    "time"
)

type Proxy struct {
    ID       int32
    From, To *net.TCPConn
}

var switchOver int32 = 0

func SetSwitchOver() {
    atomic.StoreInt32((*int32)(&switchOver), 1)
}

func SwitchOverEnabled() bool {
    return atomic.LoadInt32((*int32)(&switchOver)) == 1
}

var proxies map[int32]*Proxy = make(map[int32]*Proxy, 0)
var proxySeq int32 = 0
var mu sync.RWMutex

func addProxy(from *net.TCPConn) {
    mu.Lock()
    proxySeq += 1
    proxy := &Proxy{ID: proxySeq, From: from}
    proxies[proxySeq] = proxy
    mu.Unlock()

    var toAddr string
    if SwitchOverEnabled() {
        toAddr = "1.1.1.1"
    } else {
        toAddr = "8.8.8.8"
    }
    tcpAddr, _ := net.ResolveTCPAddr("tcp4", toAddr)
    toConn, err := net.DialTCP("tcp", nil, tcpAddr)
    if err != nil {
        panic(err)
    }
    proxy.To = toConn
}

func switchOverToNewEndpoint() {
    mu.RLock()
    closedProxies := proxies
    mu.RUnlock()

    SetSwitchOver()
    for _, proxy := range closedProxies {
        proxy.From.Close()
        proxy.To.Close()
        mu.Lock()
        delete(proxies, proxy.ID)
        mu.Unlock()
    }
}

func main() {
    tcpAddr, _ := net.ResolveTCPAddr("tcp4", "0.0.0.0:5432")
    ln, _ := net.ListenTCP("tcp", tcpAddr)
    go func() {
        time.Sleep(time.Second * 30)
        switchOverToNewEndpoint()
    }()
    for {
        clientConn, err := ln.AcceptTCP()
        if err != nil {
            panic(err)
        }
        go addProxy(clientConn)
    }
}

Немного подумав, я предполагаю, что проблема была в

    mu.RLock()
    closedProxies := proxies
    mu.RUnlock()

Но я не уверен, было ли это основной причиной и могу ли я исправить это, заменив его следующим:

    closedProxies := make([]*Proxy, 0)
    mu.RLock()
    for _, proxy := range proxies {
        closedProxies = append(closedProxies, proxy)
    }
    mu.RUnlock()

Поскольку этот случай трудно воспроизвести, может ли кто-нибудь с опытом поделиться идеей или подсказкой? Любые комментарии приветствуются. Заранее спасибо.

Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
2
0
107
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Проблемы

Изменение необходимо. В исходной реализации closedProxies содержит ту же карту. Посмотрите эту демонстрацию:

package main

import "fmt"

func main() {
    proxies := make(map[int]int, 0)
    for i := 0; i < 10; i++ {
        proxies[i] = i
    }

    closeProxies := proxies

    proxies[10] = 10
    proxies[11] = 11

    for k := range closeProxies {
        delete(proxies, k)
    }

    fmt.Printf("items left: %d\n", len(proxies))
    // Output:
    //   items left: 0
}

Но это не первопричина. Новый прокси может быть добавлен после копирования closeProxies, но до вызова SetSwitchOver. В этом случае новый прокси подключается к старому адресу, но не в closeProxies. Я думаю, что это первопричина.

И есть еще одна проблема. Новый прокси добавляется в proxies до установки поля To. Может случиться так, что программа захочет закрыть этот прокси до того, как будет установлено поле To, что приведет к панике.

Надёжно работающая конструкция

Идея состоит в том, чтобы поместить все конечные точки в слайс и позволить каждой конечной точке управлять своим собственным списком прокси. Поэтому нам нужно только отслеживать индекс текущей конечной точки. Когда мы хотим переключиться на другую конечную точку, нам просто нужно изменить индекс и указать устаревшей конечной точке очистить свои прокси. Осталась единственная сложная вещь — убедиться, что устаревшая конечная точка может очистить все свои прокси. См. реализацию ниже:

менеджер.го

Это реализация идеи.

package main

import (
    "sync"
)

// Conn is abstraction of a connection to make Manager easy to test.
type Conn interface {
    Close() error
}

// Dialer is abstraction of a dialer to make Manager easy to test.
type Dialer interface {
    Dial(addr string) (Conn, error)
}

type Manager struct {
    // muCurrent protects the "current" member.
    muCurrent sync.RWMutex
    current   int // When current is -1, the manager is shuted down.
    endpoints []*endpoint

    // mu protects the whole Switch action.
    mu sync.Mutex
}

func NewManager(dialer Dialer, addresses ...string) *Manager {
    if len(addresses) < 2 {
        panic("a manger should handle at least 2 addresses")
    }

    endpoints := make([]*endpoint, len(addresses))
    for i, addr := range addresses {
        endpoints[i] = &endpoint{
            address: addr,
            dialer:  dialer,
        }
    }
    return &Manager{
        endpoints: endpoints,
    }
}

func (m *Manager) AddProxy(from Conn) {
    // 1. AddProxy will wait when the write lock of m.muCurrent is taken.
    // Once the write lock is released, AddProxy will connect to the new endpoint.
    // Switch only holds the write lock for a short time, and Switch is called
    // not so frequently, so AddProxy won't wait too much.
    // 2. Switch will wait if there is any AddProxy holding the read lock of
    // m.muCurrent. That means Switch waits longer. The advantage is that when
    // e.clear is called in Switch, All AddProxy requests to the old endpoint
    // are done. So it's safe to call e.clear then.
    m.muCurrent.RLock()
    defer m.muCurrent.RUnlock()

    current := m.current

    // Do not accept any new connection when m has been shutdown.
    if current == -1 {
        from.Close()
        return
    }

    m.endpoints[current].addProxy(from)
}

func (m *Manager) Switch() {
    // In a real world, Switch is called not so frequently.
    // So it's ok to add a lock here.
    // And it's necessary to make sure the old endpoint is cleared and ready
    // for use in the future.
    m.mu.Lock()
    defer m.mu.Unlock()

    // Take the write lock of m.muCurrent.
    // It waits for all the AddProxy requests holding the read lock to finish.
    m.muCurrent.Lock()
    old := m.current

    // Do nothing when m has been shutdown.
    if old == -1 {
        m.muCurrent.Unlock()
        return
    }
    next := old + 1
    if next >= len(m.endpoints) {
        next = 0
    }
    m.current = next
    m.muCurrent.Unlock()

    // When it reaches here, all AddProxy requests to the old endpoint are done.
    // And it's safe to call e.clear now.
    m.endpoints[old].clear()
}

func (m *Manager) Shutdown() {
    m.mu.Lock()
    defer m.mu.Unlock()

    m.muCurrent.Lock()
    current := m.current
    m.current = -1
    m.muCurrent.Unlock()

    m.endpoints[current].clear()
}

type proxy struct {
    from, to Conn
}

type endpoint struct {
    address string
    dialer  Dialer

    mu      sync.Mutex
    proxies []*proxy
}

func (e *endpoint) clear() {
    for _, p := range e.proxies {
        p.from.Close()
        p.to.Close()
    }

    // Assign a new slice to e.proxies, and the GC will collect the old one.
    e.proxies = []*proxy{}
}

func (e *endpoint) addProxy(from Conn) {
    toConn, err := e.dialer.Dial(e.address)
    if err != nil {
        // Close the from connection so that the client will reconnect?
        from.Close()
        return
    }

    e.mu.Lock()
    defer e.mu.Unlock()
    e.proxies = append(e.proxies, &proxy{from: from, to: toConn})
}

main.go

Эта демонстрация показывает, как использовать тип Manager, реализованный ранее:

package main

import (
    "net"
    "time"
)

type realDialer struct{}

func (d realDialer) Dial(addr string) (Conn, error) {
    tcpAddr, err := net.ResolveTCPAddr("tcp4", addr)
    if err != nil {
        return nil, err
    }
    return net.DialTCP("tcp", nil, tcpAddr)
}

func main() {
    manager := NewManager(realDialer{}, "1.1.1.1", "8.8.8.8")

    tcpAddr, _ := net.ResolveTCPAddr("tcp4", "0.0.0.0:5432")
    ln, _ := net.ListenTCP("tcp", tcpAddr)

    go func() {
        for range time.Tick(30 * time.Second) {
            manager.Switch()
        }
    }()
    for {
        clientConn, err := ln.AcceptTCP()
        if err != nil {
            panic(err)
        }
        go manager.AddProxy(clientConn)
    }
}

manager_test.go

Запустите тест с помощью этой команды: go test ./... -race -count 10

package main

import (
    "errors"
    "math/rand"
    "sync"
    "sync/atomic"
    "testing"
    "time"

    "github.com/google/uuid"
)

func TestManager(t *testing.T) {
    addresses := []string{"1.1.1.1", "8.8.8.8"}
    dialer := newDialer(addresses...)
    manager := NewManager(dialer, addresses...)

    ch := make(chan int, 1)

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        for range ch {
            manager.Switch()
        }
        wg.Done()
    }()

    count := 1000
    total := count * 10

    wg.Add(total)

    fromConn := &fakeFromConn{}
    for i := 0; i < total; i++ {
        if i%count == count-1 {
            ch <- 0
        }
        go func() {
            manager.AddProxy(fromConn)
            wg.Done()
        }()
    }
    close(ch)

    wg.Wait()

    manager.Shutdown()

    for _, s := range dialer.servers {
        left := len(s.conns)
        if left != 0 {
            t.Errorf("server %s, unexpected connections left: %d", s.addr, left)
        }
    }

    closedCount := fromConn.closedCount.Load()

    if closedCount != int32(total) {
        t.Errorf("want closed count: %d, got: %d", total, closedCount)
    }
}

type fakeFromConn struct {
    closedCount atomic.Int32
}

func (c *fakeFromConn) Close() error {
    c.closedCount.Add(1)

    return nil
}

type fakeToConn struct {
    id     uuid.UUID
    server *fakeServer
}

func (c *fakeToConn) Close() error {
    if c.id == uuid.Nil {
        return nil
    }

    c.server.removeConn(c.id)

    return nil
}

type fakeServer struct {
    addr  string
    mu    sync.Mutex
    conns map[uuid.UUID]bool
}

func (s *fakeServer) addConn() (uuid.UUID, error) {
    s.mu.Lock()
    defer s.mu.Unlock()

    id, err := uuid.NewRandom()
    if err == nil {
        s.conns[id] = true
    }
    return id, err
}

func (s *fakeServer) removeConn(id uuid.UUID) {
    s.mu.Lock()
    defer s.mu.Unlock()

    delete(s.conns, id)
}

type fakeDialer struct {
    servers map[string]*fakeServer
}

func newDialer(addresses ...string) *fakeDialer {
    servers := make(map[string]*fakeServer)
    for _, addr := range addresses {
        servers[addr] = &fakeServer{
            addr:  addr,
            conns: make(map[uuid.UUID]bool),
        }
    }
    return &fakeDialer{
        servers: servers,
    }
}

func (d *fakeDialer) Dial(addr string) (Conn, error) {
    n := rand.Intn(100)
    if n == 0 {
        return nil, errors.New("fake network error")
    }
    // Simulate network latency.
    time.Sleep(time.Duration(n) * time.Millisecond)

    s := d.servers[addr]
    id, err := s.addConn()
    if err != nil {
        return nil, err
    }
    conn := &fakeToConn{
        id:     id,
        server: s,
    }
    return conn, nil
}

спасибо за ваше подробное объяснение. относительно того, как устранить основную причину. Как вы думаете, смогу ли я это исправить, поставив SetSwitchOver() перед closeProxies?

HFX 09.04.2023 14:44

Это вызовет новую проблему. Представьте, что после добавления прокси в proxies и до достижения оператора if SwitchOverEnabled() { вызывается switchOverToNewEndpoint и он переключается на новую конечную точку. Теперь новый прокси находится в closeProxies, но подключается к новой конечной точке.

Zeke Lu 09.04.2023 15:04

@HFX Я обновил ответ, чтобы добавить предложенный мной дизайн, который должен решить эту проблему. Обратите внимание, что я не тестировал код. Пожалуйста, отредактируйте ответ или прокомментируйте, если обнаружите какие-либо очевидные ошибки. Спасибо!

Zeke Lu 09.04.2023 19:05

@HFX Я реорганизовал реализацию, чтобы сделать ее пригодной для тестирования, и добавил тестовый пример, чтобы убедиться, что она работает должным образом. Пожалуйста, взгляните еще раз. Спасибо!

Zeke Lu 10.04.2023 08:12

ценить это. Я многому научился у вас. Мне нравится идея структуры endpoint, которая отделяет прокси от разных конечных точек. поэтому мы можем легко управлять прокси, подключающимися к старой конечной точке.

HFX 10.04.2023 08:46

Другие вопросы по теме