Я пытаюсь создать задачу tokio внутри моего метода start_websocket
, например:
pub async fn start_websocket(
&mut self,
auth: &Auth,
) -> Result<(), Box<dyn std::error::Error>> {
let (ws_original, _) = connect_async(WEBSOCKET_ENDPOINT).await?;
debug!("Connected to websocket");
self.websocket = Some(Arc::new(Mutex::new(ws_original)));
let callback_clone = self.callback.clone().unwrap();
let websocket_clone = self.websocket.clone().unwrap();
let ws_thread = tokio::spawn(async move {
loop {
debug!("Waiting for message");
match websocket_clone.try_lock() {
Ok(mut ws_locked) => {
match ws_locked.try_next().await {
Ok(Some(message)) => {
debug!("Message received: {:?}", message);
handle_message(message, &callback_clone, ws_locked).await;
}
Ok(None) => {
debug!("No more messages");
}
Err(_) => {
debug!("Error while receiving message");
}
}
debug!("Message handled");
}
Err(_) => {
debug!("Websocket cannot be locked!");
}
}
}
});
self.ws_thread = Some(ws_thread);
Ok(())
}
Хотя веб-сокет запускается правильно, я вижу, что handle_message
обрабатывает сообщения, однако мой основной поток не может заблокироваться на self.websocket
.
Мне нужно использовать это в основном потоке, чтобы подписаться/отказаться от подписки на элементы, по которым принимаются разные сообщения. Функция подписки/отписки внутренне вызывает send_data
:
async fn send_data(&mut self, data: serde_json::Value) -> bool {
let mut delay = Duration::from_millis(10);
let ws_clone = self.websocket.borrow_mut();
match ws_clone {
Some(ws) => {
loop {
match ws.try_lock() {
Ok(mut ws) => {
let send_result = ws
.send(tokio_tungstenite::tungstenite::Message::Text(
data.to_string(),
))
.await;
match send_result {
Ok(_) => {
return {
info!("Data sent successfully");
true
}
}
Err(_) => {
info!("Failed to send data");
}
}
return true;
}
Err(_) => {
info!("Lock not acquired, while sending data {:?}", delay);
sleep(delay).await;
delay *= 2; // Double the delay for the next iteration
}
}
}
}
None => {
info!("Websocket not connected, retrying in {:?}", delay);
sleep(delay).await;
delay *= 2; // Double the delay for the next iteration
false
}
}
}
Выше я всегда вижу сообщение Lock not acquired, while sending data
.
Мне трудно понять, в чем может быть проблема?
Я подозреваю, что ws_locked.try_next().await
внутри start_websocket
, может быть в этом причина?
Соответствующие типы:
pub struct WebSocketApp {
websocket: Option<Arc<Mutex<WebSocketStream<MaybeTlsStream<TcpStream>>>>>,
ws_thread: Option<tokio::task::JoinHandle<()>>,
callback: Option<Arc<Mutex<dyn WebSocketCallback + Send>>>,
}
Я новичок в языке программирования Rust, поэтому прошу прощения за плохую структуру дизайна/кода.
Я открыт для всех отзывов и комментариев и рад поделиться более подробной информацией.
@CosmicOssifrage, окей, спасибо за отзыв, посмотрю. Есть ли альтернатива try_next().await
?
Как упоминалось в комментариях, проблема здесь в том, что ваш вызов ws_locked.try_next().await
удерживает мьютекс заблокированным через await
, поэтому любой код, который необходимо записать в веб-сокет, не может получить блокировку. Вот почему ваш try_lock
попадает в состояние Err
, и так будет почти всегда, за исключением очень редких крайних условий гонки, с которыми вы вряд ли столкнетесь на практике.
В любом случае, даже если вы сможете найти чередование вашего текущего подхода, позволяющее справедливо обслуживать обе задачи, он подвержен риску голодания и вводит искусственные ограничения; базовый веб-сокет является полнодуплексным, что означает, что вы можете передавать и получать одновременно.
Лучший способ решить эту проблему — «разделить» владение потоками передачи и приема веб-сокета между различными потоками/задачами, чтобы вы могли передать право собственности на дескрипторы передачи и получения различным частям вашего кода. К счастью, существует особенность расширения Futures::stream::StreamExt , которая позволяет вам делать именно это, в частности, его метод Split(), который возвращает приемник (используемый для передачи) и поток (используемый для получения ).
Вы можете легко адаптировать свой код, импортировав признак расширения и используя его для разделения веб-сокета на две части, при этом каждая часть передается соответствующей задаче, которая должна передавать или получать соответственно:
use futures::stream::StreamExt;
// later
let (ws_original, _) = connect_async(WEBSOCKET_ENDPOINT).await?;
let (mut tx, mut rx) = ws_original.split();
// You can now move tx into the task that needs to transmit on the socket
// and rx into the task that needs to receive
Вам также, похоже, не нужно использовать try_next
при ожидании на веб-сокете, поскольку вы находитесь в асинхронном контексте, поэтому вы можете просто заблокировать ожидание получения сообщений с помощью next()
.
Здесь я привожу пример, который подключается к простому эхо-серверу websocket, управляемому websocket.org, отправляет сообщение каждую секунду в течение пяти секунд и печатает результат. Он также имеет соответствующую обработку для закрытия веб-сокета через определенный период времени и ожидания возврата задач.
use log::{debug, error};
use futures::{stream::StreamExt, SinkExt};
use tokio::{sync::broadcast, task::JoinHandle};
use tokio_tungstenite::connect_async;
const WEBSOCKET_ENDPOINT: &'static str = "wss://echo.websocket.org";
struct Handler {
ws_thread: Option<JoinHandle<()>>,
sender_thread: Option<JoinHandle<()>>,
shutdown: Option<broadcast::Sender<()>>,
}
impl Handler {
pub async fn start_websocket(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let (ws_original, _) = connect_async(WEBSOCKET_ENDPOINT).await?;
debug!("Connected to websocket");
let (shutdown_tx, _) = broadcast::channel::<()>(1);
self.shutdown = Some(shutdown_tx);
let (mut tx, mut rx) = ws_original.split();
let mut sender_shutdown_receiver = self.shutdown.as_ref().unwrap().subscribe();
let ws_thread = tokio::spawn(async move {
loop {
debug!("Waiting for message");
match rx.next().await {
Some(Ok(msg)) => match msg {
tokio_tungstenite::tungstenite::Message::Close(_) => {
debug!("Received close message");
break;
}
_ => {
debug!("Received message: {:?}", msg);
}
}
Some(Err(e)) => {
error!("Failed to receive message: {}", e);
break;
}
None => {
debug!("No more messages");
break;
}
}
}
});
let sender_thread = tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {
debug!("Sending message");
let message = tokio_tungstenite::tungstenite::Message::Text("Hello".into());
if let Err(e) = tx.send(message).await {
error!("Failed to send message: {}", e);
break;
}
},
_ = sender_shutdown_receiver.recv() => {
debug!("Shutdown received");
let _ = tx.send(tokio_tungstenite::tungstenite::Message::Close(Some(
tokio_tungstenite::tungstenite::protocol::CloseFrame {
code: tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode::Normal,
reason: "Shutting down".into(),
}
))).await;
break;
}
}
}
debug!("Sender thread finished");
});
self.ws_thread = Some(ws_thread);
self.sender_thread = Some(sender_thread);
Ok(())
}
pub fn stop(&mut self) {
if let Some(shutdown) = self.shutdown.take() {
let _ = shutdown.send(());
}
}
}
#[tokio::main]
async fn main() {
env_logger::init();
let mut h = Handler {
ws_thread: None,
sender_thread: None,
shutdown: None,
};
if let Err(e) = h.start_websocket().await {
error!("Failed to start websocket: {}", e);
return;
}
// Simulate the task running for a while
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
h.stop();
if let Some(thread) = h.ws_thread.take() {
let _ = thread.await;
}
if let Some(thread) = h.sender_thread.take() {
let _ = thread.await;
}
}
Ваш вызов
ws_locked.try_next().await
удерживает мьютекс заблокированным наawait
, поэтому любой код, которому необходимо выполнить запись в веб-сокет, не сможет получить блокировку. Вот почему вашtry_lock
попадает в состояниеErr
, и почти всегда будет, за исключением некоторых крайних условий гонки, которые (вероятно) редко встречаются на практике. Взгляните на чертуStreamExt
вfutures
и посмотрите, сможете ли выsplit
WebSocketStream
поделиться ею в разных темах.