Я начинаю с Go и сейчас пишу простую программу, которая считывает данные с датчика и помещает их в канал для выполнения с ними некоторых вычислений. Теперь у меня он работает следующим образом:
package main
import (
"fmt"
"time"
"strconv"
)
func get_sensor_data(c chan float64) {
time.Sleep(1 * time.Second) // wait a second before sensor data starts pooring in
c <- 2.1 // Sensor data starts being generated
c <- 2.2
c <- 2.3
c <- 2.4
c <- 2.5
}
func main() {
s := 1.1
c := make(chan float64)
go get_sensor_data(c)
for {
select {
case s = <-c:
fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
default:
// no new values in the channel
}
fmt.Println(s)
time.Sleep(500 * time.Millisecond) // Do heavy "work"
}
}
Это нормально работает, но датчик генерирует много данных, и меня всегда интересуют только последние данные. Однако при такой настройке он считывает только следующий элемент в каждом цикле, что означает, что если канал в какой-то момент содержит 20 значений, самое новое значение считывается только через 10 секунд.
Есть ли способ, чтобы канал всегда содержал только одно значение за раз, чтобы я всегда получал только те данные, которые мне интересны, и канал не использовал ненужную память (хотя память - меньшее из моих беспокойств )?

Нет. Каналы представляют собой буферы FIFO, точка. Так работают каналы и их единственное предназначение. Если вам нужно только последнее значение, рассмотрите возможность использования одной переменной, защищенной мьютексом; записывайте в него всякий раз, когда поступают новые данные, и всякий раз, когда вы их читаете, вы всегда будете читать последнее значение.
Каналы лучше всего рассматривать как очереди (FIFO). Поэтому вы не можете пропустить мимо. Однако существуют библиотеки, которые делают такие вещи: https://github.com/cloudfoundry/go-diodes - это атомарный кольцевой буфер, который перезаписывает старые данные. Вы можете установить меньший размер, если хотите.
Все это, как говорится, не похоже, что вам нужна очередь (или кольцевой буфер). Вам просто нужен мьютекс:
type SensorData struct{
mu sync.RWMutex
last float64
}
func (d *SensorData) Store(data float64) {
mu.Lock()
defer mu.Unlock()
d.last = data
}
func (d *SensorData) Get() float64 {
mu.RLock()
defer mu.RUnlock()
return d.last
}
Здесь используется RWMutex, что означает, что многие вещи могут читать с него одновременно, в то время как только одна вещь может писать. Он будет хранить одну запись, как вы и сказали.
Они выстроятся в очередь. «Первый» захватит блокировку, установит значение и снимет блокировку. Затем «второй» захватит блокировку, изменит значение и снимет блокировку. В данном случае это не имеет большого значения, поскольку они просто устанавливают значение. Блокировка удерживается ограниченное время. Это может стать опасным, если вы заблокируете и сделаете что-то, что может занять некоторое время (запись в файл или что-то еще).
Спасибо за объяснение. У меня есть еще один вопрос: я обнаружил, что могу прочитать значение last, используя d.last или используя d.Get(). Есть ли преимущество использования d.Get() перед d.last?
Если вы читаете d.last напрямую, вы не используете мьютекс и не участвуете в гонке
а с have a race вы имеете в виду, что это может вызвать проблемы, если он записывается и читается одновременно? Или может существовать состояние гонки, когда из него одновременно читаются две части?
Под гонкой я подразумеваю гонку за данные. Гонка данных происходит, когда два потока одновременно пытаются получить доступ к памяти. Гонка за данными делает вашу программу недействительной и может привести к непредсказуемым результатам.
Существующий код использует небуферизованный канал. Не будет ли это уже блокировать дальнейшую запись, пока из нее не будет считано значение? то есть ch <- 2.1 необходимо будет прочитать, прежде чем на него можно будет записать ch <- 2.2. Таким образом, в некотором смысле из канала поступают самые свежие данные, которые могут не быть самыми последними с датчика.
Возможно, я медленно, но я не вижу кода, о котором вы говорите (который использует каналы). При этом приведенный выше код (который не использует каналы) на самом деле перезаписывает старые данные и сохраняет только самые последние.
Каналы служат определенной цели. Вы можете использовать код, который находится внутри блокировки, и обновлять переменную всякий раз, когда нужно установить новое значение.
Таким образом, получатель всегда будет получать последнее значение.
Вы не можете получить это напрямую из одного канала, но вы можете использовать один канал для каждого значения и получать уведомления, когда появляются новые значения:
package main
import (
"fmt"
"strconv"
"sync"
"time"
)
type LatestChannel struct {
n float64
next chan struct{}
mu sync.Mutex
}
func New() *LatestChannel {
return &LatestChannel{next: make(chan struct{})}
}
func (c *LatestChannel) Push(n float64) {
c.mu.Lock()
c.n = n
old := c.next
c.next = make(chan struct{})
c.mu.Unlock()
close(old)
}
func (c *LatestChannel) Get() (float64, <-chan struct{}) {
c.mu.Lock()
n := c.n
next := c.next
c.mu.Unlock()
return n, next
}
func getSensorData(c *LatestChannel) {
time.Sleep(1 * time.Second)
c.Push(2.1)
time.Sleep(100 * time.Millisecond)
c.Push(2.2)
time.Sleep(100 * time.Millisecond)
c.Push(2.3)
time.Sleep(100 * time.Millisecond)
c.Push(2.4)
time.Sleep(100 * time.Millisecond)
c.Push(2.5)
}
func main() {
s := 1.1
c := New()
_, hasNext := c.Get()
go getSensorData(c)
for {
select {
case <-hasNext:
s, hasNext = c.Get()
fmt.Println("the next value of s from the channel: " + strconv.FormatFloat(s, 'f', 1, 64))
default:
// no new values in the channel
}
fmt.Println(s)
time.Sleep(250 * time.Millisecond) // Do heavy "work"
}
}
Если вам не нужно уведомление о новом значении, вы можете попробовать прочитать Каналы внутри паттерна каналов на Голанге.
Попробуйте этот пакет https://github.com/subbuv26/chanup
Это позволяет производителю обновлять канал, используя последнее значение, которое заменяет последнее значение. И производит не блокируется. (при этом устаревшие значения переопределяются). Таким образом, со стороны потребителя всегда читается только последний элемент.
import "github.com/subbuv26/chanup"
ch := chanup.GetChan()
_ := ch.Put(testType{
a: 10,
s: "Sample",
})
_ := ch.Update(testType{
a: 20,
s: "Sample2",
})
// Continue updating with latest values
...
...
// On consumer end
val := ch.Get()
// val contains latest value
val будет типом interface {}, поэтому type преобразует его в желаемый тип.
Есть элегантное решение, позволяющее использовать только канал. Если вы согласны с добавлением еще одного канала и горутины - вы можете ввести канал без буфера и горутину, которая пытается отправить ему последнее значение из вашего канала:
package main
import (
"fmt"
"time"
)
func wrapLatest(ch <-chan int) <-chan int {
result := make(chan int) // important that this one i unbuffered
go func() {
defer close(result)
value, ok := <-ch
if !ok {
return
}
for {
select {
case value, ok = <-ch:
if !ok {
return
}
case result<-value:
if value, ok = <-ch; !ok {
return
}
}
}
}()
return result
}
func main() {
sendChan := make(chan int, 10) // may be buffered or not
go func() {
for i := 0; i < 10; i++ {
sendChan <- i
time.Sleep(time.Second)
}
close(sendChan)
}()
recvChan := wrapLatest(sendChan)
for i := range recvChan {
fmt.Println(i)
time.Sleep(time.Second*2)
}
}
Есть еще один способ решить эту проблему (трюк)
отправитель работает быстрее: отправитель удаляет канал, если channel_length> 1
go func() {
for {
msg:=strconv.Itoa(int(time.Now().Unix()))
fmt.Println("make: ",msg," at:",time.Now())
messages <- msg
if len(messages)>1{
//remove old message
<-messages
}
time.Sleep(2*time.Second)
}
}()
ресивер работает медленнее:
go func() {
for {
channLen :=len(messages)
fmt.Println("len is ",channLen)
fmt.Println("received",<-messages)
time.Sleep(10*time.Second)
}
}()
ИЛИ, мы можем удалить старое сообщение со стороны получателя (прочтите сообщение, например, удалите его)
Спасибо, это именно то, что мне нужно. Вот вставка из того, что у меня есть сейчас (для будущих читателей, включая меня): pastebin.com/21KyfYYV. Еще один вопрос: что произойдет, если два процесса захотят записать на
RWMutexодновременно? Он ждет, пропускает или терпит неудачу?