Как всегда получать самую свежую информацию от канала Go?

Я начинаю с Go и сейчас пишу простую программу, которая считывает данные с датчика и помещает их в канал для выполнения с ними некоторых вычислений. Теперь у меня он работает следующим образом:

package main

import (
    "fmt"
    "time"
    "strconv"
)

func get_sensor_data(c chan float64) {
    time.Sleep(1 * time.Second)  // wait a second before sensor data starts pooring in
    c <- 2.1  // Sensor data starts being generated
    c <- 2.2
    c <- 2.3
    c <- 2.4
    c <- 2.5
}

func main() {

    s := 1.1

    c := make(chan float64)
    go get_sensor_data(c)

    for {
        select {
        case s = <-c:
            fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
        default:
            // no new values in the channel
        }
        fmt.Println(s)

        time.Sleep(500 * time.Millisecond)  // Do heavy "work"
    }
}

Это нормально работает, но датчик генерирует много данных, и меня всегда интересуют только последние данные. Однако при такой настройке он считывает только следующий элемент в каждом цикле, что означает, что если канал в какой-то момент содержит 20 значений, самое новое значение считывается только через 10 секунд.

Есть ли способ, чтобы канал всегда содержал только одно значение за раз, чтобы я всегда получал только те данные, которые мне интересны, и канал не использовал ненужную память (хотя память - меньшее из моих беспокойств )?

Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
4
0
5 625
7
Перейти к ответу Данный вопрос помечен как решенный

Ответы 7

Нет. Каналы представляют собой буферы FIFO, точка. Так работают каналы и их единственное предназначение. Если вам нужно только последнее значение, рассмотрите возможность использования одной переменной, защищенной мьютексом; записывайте в него всякий раз, когда поступают новые данные, и всякий раз, когда вы их читаете, вы всегда будете читать последнее значение.

Ответ принят как подходящий

Каналы лучше всего рассматривать как очереди (FIFO). Поэтому вы не можете пропустить мимо. Однако существуют библиотеки, которые делают такие вещи: https://github.com/cloudfoundry/go-diodes - это атомарный кольцевой буфер, который перезаписывает старые данные. Вы можете установить меньший размер, если хотите.

Все это, как говорится, не похоже, что вам нужна очередь (или кольцевой буфер). Вам просто нужен мьютекс:

type SensorData struct{
  mu sync.RWMutex
  last float64
}

func (d *SensorData) Store(data float64) {
 mu.Lock()
 defer mu.Unlock()

 d.last = data
}

func (d *SensorData) Get() float64 {
 mu.RLock()
 defer mu.RUnlock()

 return d.last
}

Здесь используется RWMutex, что означает, что многие вещи могут читать с него одновременно, в то время как только одна вещь может писать. Он будет хранить одну запись, как вы и сказали.

Спасибо, это именно то, что мне нужно. Вот вставка из того, что у меня есть сейчас (для будущих читателей, включая меня): pastebin.com/21KyfYYV. Еще один вопрос: что произойдет, если два процесса захотят записать на RWMutex одновременно? Он ждет, пропускает или терпит неудачу?

kramer65 13.12.2018 23:33

Они выстроятся в очередь. «Первый» захватит блокировку, установит значение и снимет блокировку. Затем «второй» захватит блокировку, изменит значение и снимет блокировку. В данном случае это не имеет большого значения, поскольку они просто устанавливают значение. Блокировка удерживается ограниченное время. Это может стать опасным, если вы заблокируете и сделаете что-то, что может занять некоторое время (запись в файл или что-то еще).

poy 13.12.2018 23:35

Спасибо за объяснение. У меня есть еще один вопрос: я обнаружил, что могу прочитать значение last, используя d.last или используя d.Get(). Есть ли преимущество использования d.Get() перед d.last?

kramer65 14.12.2018 11:15

Если вы читаете d.last напрямую, вы не используете мьютекс и не участвуете в гонке

poy 14.12.2018 14:19

а с have a race вы имеете в виду, что это может вызвать проблемы, если он записывается и читается одновременно? Или может существовать состояние гонки, когда из него одновременно читаются две части?

kramer65 14.12.2018 15:49

Под гонкой я подразумеваю гонку за данные. Гонка данных происходит, когда два потока одновременно пытаются получить доступ к памяти. Гонка за данными делает вашу программу недействительной и может привести к непредсказуемым результатам.

poy 14.12.2018 21:25

Существующий код использует небуферизованный канал. Не будет ли это уже блокировать дальнейшую запись, пока из нее не будет считано значение? то есть ch <- 2.1 необходимо будет прочитать, прежде чем на него можно будет записать ch <- 2.2. Таким образом, в некотором смысле из канала поступают самые свежие данные, которые могут не быть самыми последними с датчика.

Mayank 03.10.2021 19:04

Возможно, я медленно, но я не вижу кода, о котором вы говорите (который использует каналы). При этом приведенный выше код (который не использует каналы) на самом деле перезаписывает старые данные и сохраняет только самые последние.

poy 13.10.2021 06:03

Каналы служат определенной цели. Вы можете использовать код, который находится внутри блокировки, и обновлять переменную всякий раз, когда нужно установить новое значение.

Таким образом, получатель всегда будет получать последнее значение.

Вы не можете получить это напрямую из одного канала, но вы можете использовать один канал для каждого значения и получать уведомления, когда появляются новые значения:

package main

import (
    "fmt"
    "strconv"
    "sync"
    "time"
)

type LatestChannel struct {
    n    float64
    next chan struct{}
    mu   sync.Mutex
}

func New() *LatestChannel {
    return &LatestChannel{next: make(chan struct{})}
}

func (c *LatestChannel) Push(n float64) {
    c.mu.Lock()
    c.n = n
    old := c.next
    c.next = make(chan struct{})
    c.mu.Unlock()
    close(old)
}

func (c *LatestChannel) Get() (float64, <-chan struct{}) {
    c.mu.Lock()
    n := c.n
    next := c.next
    c.mu.Unlock()
    return n, next
}

func getSensorData(c *LatestChannel) {
    time.Sleep(1 * time.Second)
    c.Push(2.1)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.2)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.3)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.4)
    time.Sleep(100 * time.Millisecond)
    c.Push(2.5)
}

func main() {
    s := 1.1

    c := New()
    _, hasNext := c.Get()
    go getSensorData(c)

    for {
        select {
        case <-hasNext:
            s, hasNext = c.Get()
            fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
        default:
            // no new values in the channel
        }
        fmt.Println(s)

        time.Sleep(250 * time.Millisecond) // Do heavy "work"
    }
}

Если вам не нужно уведомление о новом значении, вы можете попробовать прочитать Каналы внутри паттерна каналов на Голанге.

Попробуйте этот пакет https://github.com/subbuv26/chanup

Это позволяет производителю обновлять канал, используя последнее значение, которое заменяет последнее значение. И производит не блокируется. (при этом устаревшие значения переопределяются). Таким образом, со стороны потребителя всегда читается только последний элемент.

import "github.com/subbuv26/chanup"
ch := chanup.GetChan()
_ := ch.Put(testType{
    a: 10,
    s: "Sample",
})
_ := ch.Update(testType{
    a: 20,
    s: "Sample2",
})
// Continue updating with latest values
...

...
// On consumer end
val := ch.Get()
// val contains latest value

val будет типом interface {}, поэтому type преобразует его в желаемый тип.

Subba Reddy 20.09.2020 21:59

Есть элегантное решение, позволяющее использовать только канал. Если вы согласны с добавлением еще одного канала и горутины - вы можете ввести канал без буфера и горутину, которая пытается отправить ему последнее значение из вашего канала:

package main

import (
    "fmt"
    "time"
)

func wrapLatest(ch <-chan int) <-chan int {
    result := make(chan int) // important that this one i unbuffered
    go func() {
        defer close(result)
        value, ok := <-ch
        if !ok {
            return
        } 
        for {
            select {
            case value, ok = <-ch:
                if !ok {
                    return
                }
            case result<-value:
                if value, ok = <-ch; !ok {
                    return
                }
            }
        }
    }()

    return result
}

func main() {
    sendChan := make(chan int, 10) // may be buffered or not
    go func() {
        for i := 0; i < 10; i++ {
            sendChan <- i
        time.Sleep(time.Second)
        }
        close(sendChan)
    }()
    recvChan := wrapLatest(sendChan)
    for i := range recvChan {
        fmt.Println(i)
        time.Sleep(time.Second*2)
    }

}

Есть еще один способ решить эту проблему (трюк)

отправитель работает быстрее: отправитель удаляет канал, если channel_length> 1

go func() {
    for {
        msg:=strconv.Itoa(int(time.Now().Unix()))
        fmt.Println("make: ",msg," at:",time.Now())
        messages <- msg
        if len(messages)>1{
            //remove old message
            <-messages
        }
        time.Sleep(2*time.Second)
    }
}()

ресивер работает медленнее:

go func() {
    for {
        channLen :=len(messages)
        fmt.Println("len is ",channLen)
        fmt.Println("received",<-messages)
        time.Sleep(10*time.Second)
    }
}()

ИЛИ, мы можем удалить старое сообщение со стороны получателя (прочтите сообщение, например, удалите его)

Другие вопросы по теме