Я использую Rocket + Capnproto, где получаю запрос от веб-API (Rocket) и должен отправить его на другой сервер через RPC (Capnproto).
вот моя основная установка Rocket
#[rocket::main]
async fn main() {
rocket::build()
.mount("/API",routes![invoke])
.launch()
.await.ok();
}
Вот мой метод маршрута
#[post("/API", data = "<request>")]
async fn invoke(request: api_request::APIRequest<'_>)Result<Json<api_response::ApiResponse>, Json<api_response::ApiResponseError>>{
let result = capnp_rpc::client::run_client(String::from("Hello World")).await;
}
А вот мой код Capnproto, это почти тот же пример hello world, который у них есть, он отлично работает отдельно, но как только я добавляю его в свои проекты Rocket, он терпит неудачу. Как видите, все упаковано в библиотеку Tokio.
pub async fn run_client( message: String ) ->Result<String, Box<dyn std::error::Error>> {
let server_addr : String = "127.0.0.1:4000".to_string();
let addr = server_addr
.to_socket_addrs().unwrap()
.next()
.expect("could not parse address");
rocket::tokio::task::LocalSet::new()
.run_until( async move {
let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true).unwrap();
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let hello_world: hello_world::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
rocket::tokio::task::spawn_local(rpc_system);
let mut request = hello_world.say_hello_request();
request.get().init_request().set_name(&message[..]);
let reply = request.send().promise.await?;
let reply_message = reply.get()?.get_reply()?.get_message()?.to_str()?;
println!("received: {}", reply_message);
Ok(reply_message.to_string())
}).await
}
вот полная ошибка, которую я получаю
error: future cannot be sent between threads safely
--> src/main.rs:92:1
|
92 | #[post("/API", data = "<request>")]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
--> src\infrastructure\capnp_rpc\client.rs:284:16
|
257 | rocket::tokio::task::LocalSet::new()
| ------------------------------------ has type `LocalSet` which is not `Send`
...
284 | }).await.unwrap()
| ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
= note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
= note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)
error: future cannot be sent between threads safely
--> src/main.rs:92:1
|
92 | #[post("/API", data = "<request>")]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `*const ()`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
--> src\infrastructure\capnp_rpc\client.rs:284:16
|
257 | rocket::tokio::task::LocalSet::new()
| ------------------------------------ has type `LocalSet` which is not `Send`
...
284 | }).await.unwrap()
| ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
= note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
= note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)
error: future cannot be sent between threads safely
--> src/main.rs:92:1
|
92 | #[post("/API", data = "<request>")]
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn StdError`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as it awaits another future which is not `Send`
--> src\infrastructure\capnp_rpc\client.rs:257:10
|
257 | / rocket::tokio::task::LocalSet::new()
258 | | .spawn_local( async move {
259 | | let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
260 | | stream.set_nodelay(true).unwrap();
... |
283 | | Ok(reply_message.to_string())
284 | | }).await.unwrap()
| |______________^ await occurs here on type `tokio::task::JoinHandle<Result<std::string::String, Box<dyn StdError>>>`, which is not `Send`
= note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
= note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)
Судя по тому, что мне удалось выяснить, у Rocket есть собственный обработчик угроз, и похоже, что этот код пытается запустить другого обработчика угроз. Я попробовал несколько вещей из других вопросов, пытаясь использовать Mutex, но не смог заставить его работать.
Не знаю, как обернуть этот код, чтобы он мог работать под основным обработчиком угроз Rocket.
именно так выглядят все примеры capnproto, если я удалю это, эти строки не будут работать Rocket::tokio::task::spawn_local(rpc_system);
Что, если удалить LocalSet
, но заменить rocket::tokio::task::spawn_local(rpc_system);
на rocket::tokio::task::LocalSet::new().spawn_local(rpc_system);
?
Этот вопрос похож на: Rust «будущее нельзя безопасно пересылать между потоками». Если вы считаете, что это другое, отредактируйте вопрос, поясните, чем он отличается и/или как ответы на этот вопрос не помогают решить вашу проблему.
@Rob Я не понимаю, как ответы в вопросах и ответах помогут ОП здесь
@ChayimFriedman Я получаю ту же ошибку, спасибо
@Rob, к сожалению, мой уровень Rust не так уж и высок, я видел оба вопроса и не уверен, как я мог бы реализовать здесь решение, спасибо
См. docs.rs/tokio/latest/tokio/task/…
Что, если вы сохраните LocalSet
, но замените run_until
на spawn_local
?
@ChayimFriedman, если я это сделаю, я получу еще одну ошибку: несоответствие типов.
Можете ли вы опубликовать эту ошибку (в вопросе)?
@ChayimFriedman привет, я добавил ошибки проверки груза выше после внесения изменений «Сохранить LocalSet
, но заменить run_until
на spawn_local
», спасибо
Запустить локальную задачу (задачу, которая должна выполняться в одном потоке) внутри нелокальной задачи принципиально невозможно, так как задача-владелец может перемещаться между потоками и перемещать вместе с ней локальную задачу.
Поскольку все запросы rocket
являются нелокальными задачами, это означает, что они не могут запускать код capnproto. Единственный оставшийся выбор – это главная задача. Ему потребуется управлять циклом событий, создавая задачи capnproto и одновременно запуская приложение. Другие задачи будут связываться с ним по каналам.
Вот скелет:
use std::future::Future;
use std::pin::Pin;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::LocalSet;
type Job = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()>>>>;
async fn send_job<Fut>(tx: &mpsc::Sender<Job>, job: impl FnOnce() -> Fut + 'static) -> Fut::Output
where
Fut: Future + 'static,
Fut::Output: 'static,
{
let (result_tx, result_rx) = oneshot::channel();
tx.send(Box::new(move || {
let fut = job();
Box::pin(async move {
let result = fut.await;
result_tx.send(result).unwrap_or_else(|_| panic!());
})
}))
.await
.unwrap();
result_rx.await.unwrap()
}
#[tokio::main]
async fn main() {
let local_set = LocalSet::new();
local_set
.run_until(async {
// Choose a reasonable number here. If we are at the limit,
// that means requests are coming too fast and we need to throttle them.
let (tx, mut rx) = mpsc::channel::<Job>(100);
local_set.spawn_local(async move {
loop {
let job = rx.recv().await.expect("no sender?");
tokio::task::spawn_local(job());
}
});
main_work(tx).await;
})
.await;
}
async fn main_work(tx: mpsc::Sender<Job>) {}
Внутри main_work()
вы запускаете rocket
. tx
должен быть доступен для всех запросов, для этого используйте State.
Затем, когда какому-либо запросу потребуется запустить задачу capnproto, он вызовет:
let result = send_job(&tx, move || async move {
// Code here.
})
.await;
Такая конструкция ограничивает масштабируемость за счет использования только одного ядра для задач capnproto, но это напрямую связано с ограничениями ящика capnp-rpc
.
Получив помощь от нескольких коллег, я смог выполнить эту работу, вот код, которым я хочу поделиться решением, мне все еще немного сложно понять, но я думаю, что важной частью является создание новой угрозы из основного угроза, а затем используйте код capnproto.
use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use rocket::futures::AsyncReadExt;
use rocket::*;
use std::net::ToSocketAddrs;
use tokio::runtime::Builder;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use tokio::task::LocalSet;
#[post("/API", data = "<request>")]
async fn invoke(request: &str) {
println!("req {}", request);
let mut client = RpcClient::new(String::from("Hello World"));
let response = client.get_response().await;
}
pub struct RpcClient {
receiver: oneshot::Receiver<String>,
}
impl RpcClient {
fn new(message: String) -> Self {
let (sender, receiver) = oneshot::channel();
let rt = Builder::new_current_thread().enable_all().build().unwrap();
let server_addr: String = "127.0.0.1:4000".to_string();
let addr = server_addr
.to_socket_addrs()
.unwrap()
.next()
.expect("could not parse address");
std::thread::spawn(move || {
let local = LocalSet::new();
local.spawn_local(async move {
let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
stream.set_nodelay(true).unwrap();
let (reader, writer) =
tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
let rpc_network = Box::new(twoparty::VatNetwork::new(
futures::io::BufReader::new(reader),
futures::io::BufWriter::new(writer),
rpc_twoparty_capnp::Side::Client,
Default::default(),
));
let mut rpc_system = RpcSystem::new(rpc_network, None);
let hello_world: hello_world::Client =
rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
rocket::tokio::task::spawn_local(rpc_system);
let mut request = hello_world.say_hello_request();
request.get().init_request().set_name(&message[..]);
let reply = request.send().promise.await?;
let reply_message = reply.get()?.get_reply()?.get_message()?.to_str()?;
println!("received: {}", reply_message);
// send the message to the receiver
sender.send(reply_message.to_string()).unwrap();
Ok::<String, Box<dyn std::error::Error>>(reply_message.to_string())
});
rt.block_on(local);
});
Self { receiver }
}
async fn get_response(mut self) -> Result<String, String> {
match self.receiver.await {
Ok(response) => Ok(response),
Err(error) => Err(error.to_string()),
}
}
}
#[rocket::main]
async fn main() {
rocket::build()
.mount("/API", routes![invoke])
.launch()
.await
.ok();
}
Спасибо
Почему вы используете
LocalSet
? Что произойдет, если вы просто удалите его и поместите код прямо в метод?