Асинхронный веб-сокет Rust

Я пытаюсь создать задачу tokio внутри моего метода start_websocket, например:

    pub async fn start_websocket(
        &mut self,
        auth: &Auth,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let (ws_original, _) = connect_async(WEBSOCKET_ENDPOINT).await?;
        debug!("Connected to websocket");
        self.websocket = Some(Arc::new(Mutex::new(ws_original)));

        let callback_clone = self.callback.clone().unwrap();
        let websocket_clone = self.websocket.clone().unwrap();
        let ws_thread = tokio::spawn(async move {
            loop {
                debug!("Waiting for message");
                match websocket_clone.try_lock() {
                    Ok(mut ws_locked) => {
                        match ws_locked.try_next().await {
                            Ok(Some(message)) => {
                                debug!("Message received: {:?}", message);
                                handle_message(message, &callback_clone, ws_locked).await;
                            }
                            Ok(None) => {
                                debug!("No more messages");
                            }
                            Err(_) => {
                                debug!("Error while receiving message");
                            }
                        }

                        debug!("Message handled");
                    }
                    Err(_) => {
                         debug!("Websocket cannot be locked!");
                    }
                }
            }
        });
        self.ws_thread = Some(ws_thread);

        Ok(())
    }

Хотя веб-сокет запускается правильно, я вижу, что handle_message обрабатывает сообщения, однако мой основной поток не может заблокироваться на self.websocket.

Мне нужно использовать это в основном потоке, чтобы подписаться/отказаться от подписки на элементы, по которым принимаются разные сообщения. Функция подписки/отписки внутренне вызывает send_data:

    async fn send_data(&mut self, data: serde_json::Value) -> bool {
        let mut delay = Duration::from_millis(10);
        let ws_clone = self.websocket.borrow_mut();
        match ws_clone {
            Some(ws) => {
                loop {
                    match ws.try_lock() {
                        Ok(mut ws) => {
                            let send_result = ws
                                .send(tokio_tungstenite::tungstenite::Message::Text(
                                    data.to_string(),
                                ))
                                .await;
                            match send_result {
                                Ok(_) => {
                                    return {
                                        info!("Data sent successfully");
                                        true
                                    }
                                }
                                Err(_) => {
                                    info!("Failed to send data");
                                }
                            }
                            return true;
                        }
                        Err(_) => {
                            info!("Lock not acquired, while sending data {:?}", delay);
                            sleep(delay).await;
                            delay *= 2; // Double the delay for the next iteration
                        }
                    }
                }
            }
            None => {
                info!("Websocket not connected, retrying in {:?}", delay);
                sleep(delay).await;
                delay *= 2; // Double the delay for the next iteration
                false
            }
        }
    }

Выше я всегда вижу сообщение Lock not acquired, while sending data.

Мне трудно понять, в чем может быть проблема? Я подозреваю, что ws_locked.try_next().await внутри start_websocket, может быть в этом причина?


Соответствующие типы:

pub struct WebSocketApp {
    websocket: Option<Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
    ws_thread: Option<tokio::task::JoinHandle<()>>,
    callback: Option<Arc<Mutex<dyn WebSocketCallback + Send>>>,
}

Я новичок в языке программирования Rust, поэтому прошу прощения за плохую структуру дизайна/кода.

Я открыт для всех отзывов и комментариев и рад поделиться более подробной информацией.

Ваш вызов ws_locked.try_next().await удерживает мьютекс заблокированным на await, поэтому любой код, которому необходимо выполнить запись в веб-сокет, не сможет получить блокировку. Вот почему ваш try_lock попадает в состояние Err, и почти всегда будет, за исключением некоторых крайних условий гонки, которые (вероятно) редко встречаются на практике. Взгляните на черту StreamExt в futures и посмотрите, сможете ли вы splitWebSocketStream поделиться ею в разных темах.

Cosmic Ossifrage 04.04.2024 17:22

@CosmicOssifrage, окей, спасибо за отзыв, посмотрю. Есть ли альтернатива try_next().await?

P0W 04.04.2024 19:43
Почему Python в конце концов умрет
Почему Python в конце концов умрет
Последние 20 лет были действительно хорошими для Python. Он прошел путь от "просто языка сценариев" до основного языка, используемого для написания...
0
2
171
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Как упоминалось в комментариях, проблема здесь в том, что ваш вызов ws_locked.try_next().await удерживает мьютекс заблокированным через await, поэтому любой код, который необходимо записать в веб-сокет, не может получить блокировку. Вот почему ваш try_lock попадает в состояние Err, и так будет почти всегда, за исключением очень редких крайних условий гонки, с которыми вы вряд ли столкнетесь на практике.

В любом случае, даже если вы сможете найти чередование вашего текущего подхода, позволяющее справедливо обслуживать обе задачи, он подвержен риску голодания и вводит искусственные ограничения; базовый веб-сокет является полнодуплексным, что означает, что вы можете передавать и получать одновременно.


Лучший способ решить эту проблему — «разделить» владение потоками передачи и приема веб-сокета между различными потоками/задачами, чтобы вы могли передать право собственности на дескрипторы передачи и получения различным частям вашего кода. К счастью, существует особенность расширения Futures::stream::StreamExt , которая позволяет вам делать именно это, в частности, его метод Split(), который возвращает приемник (используемый для передачи) и поток (используемый для получения ).

Вы можете легко адаптировать свой код, импортировав признак расширения и используя его для разделения веб-сокета на две части, при этом каждая часть передается соответствующей задаче, которая должна передавать или получать соответственно:

use futures::stream::StreamExt;

// later
let (ws_original, _) = connect_async(WEBSOCKET_ENDPOINT).await?;
let (mut tx, mut rx) = ws_original.split();

// You can now move tx into the task that needs to transmit on the socket
// and rx into the task that needs to receive

Вам также, похоже, не нужно использовать try_next при ожидании на веб-сокете, поскольку вы находитесь в асинхронном контексте, поэтому вы можете просто заблокировать ожидание получения сообщений с помощью next().


Здесь я привожу пример, который подключается к простому эхо-серверу websocket, управляемому websocket.org, отправляет сообщение каждую секунду в течение пяти секунд и печатает результат. Он также имеет соответствующую обработку для закрытия веб-сокета через определенный период времени и ожидания возврата задач.

use log::{debug, error};
use futures::{stream::StreamExt, SinkExt};
use tokio::{sync::broadcast, task::JoinHandle};
use tokio_tungstenite::connect_async;

const WEBSOCKET_ENDPOINT: &'static str = "wss://echo.websocket.org";

struct Handler {
    ws_thread: Option<JoinHandle<()>>,
    sender_thread: Option<JoinHandle<()>>,
    shutdown: Option<broadcast::Sender<()>>,
}

impl Handler {
    pub async fn start_websocket(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        let (ws_original, _) = connect_async(WEBSOCKET_ENDPOINT).await?;
        debug!("Connected to websocket");

        let (shutdown_tx, _) = broadcast::channel::<()>(1);
        self.shutdown = Some(shutdown_tx);

        let (mut tx, mut rx) = ws_original.split();

        let mut sender_shutdown_receiver = self.shutdown.as_ref().unwrap().subscribe();

        let ws_thread = tokio::spawn(async move {
            loop {
                debug!("Waiting for message");

                match rx.next().await {
                    Some(Ok(msg)) => match msg {
                        tokio_tungstenite::tungstenite::Message::Close(_) => {
                            debug!("Received close message");
                            break;
                        }
                        _ => {
                            debug!("Received message: {:?}", msg);
                        }
                    }
                    Some(Err(e)) => {
                        error!("Failed to receive message: {}", e);
                        break;
                    }
                    None => {
                        debug!("No more messages");
                        break;
                    }
                }
            }
        });

        let sender_thread = tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
                        debug!("Sending message");

                        let message = tokio_tungstenite::tungstenite::Message::Text("Hello".into());

                        if let Err(e) = tx.send(message).await {
                            error!("Failed to send message: {}", e);
                            break;
                        }
                    },
                    _ = sender_shutdown_receiver.recv() => {
                        debug!("Shutdown received");
                        let _ = tx.send(tokio_tungstenite::tungstenite::Message::Close(Some(
                            tokio_tungstenite::tungstenite::protocol::CloseFrame {
                                code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Normal,
                                reason: "Shutting down".into(),
                            }
                        ))).await;
                        break;
                    }
                }
            }

            debug!("Sender thread finished");
        });

        self.ws_thread = Some(ws_thread);
        self.sender_thread = Some(sender_thread);

        Ok(())
    }

    pub fn stop(&mut self) {
        if let Some(shutdown) = self.shutdown.take() {
            let _ = shutdown.send(());
        }
    }
}

#[tokio::main]
async fn main() {
    env_logger::init();

    let mut h = Handler {
        ws_thread: None,
        sender_thread: None,
        shutdown: None,
    };

    if let Err(e) = h.start_websocket().await {
        error!("Failed to start websocket: {}", e);
        return;
    }

    // Simulate the task running for a while
    tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    h.stop();

    if let Some(thread) = h.ws_thread.take() {
        let _ = thread.await;
    }
    if let Some(thread) = h.sender_thread.take() {
        let _ = thread.await;
    }
}

Другие вопросы по теме