Future не может безопасно передаваться между потоками в Rust

Я использую Rocket + Capnproto, где получаю запрос от веб-API (Rocket) и должен отправить его на другой сервер через RPC (Capnproto).

вот моя основная установка Rocket

#[rocket::main]
async fn main() {   
    rocket::build()
    .mount("/API",routes![invoke])    
    .launch()
    .await.ok();
}

Вот мой метод маршрута

#[post("/API", data = "<request>")]
async fn invoke(request: api_request::APIRequest<'_>)Result<Json<api_response::ApiResponse>, Json<api_response::ApiResponseError>>{

let result = capnp_rpc::client::run_client(String::from("Hello World")).await;
}

А вот мой код Capnproto, это почти тот же пример hello world, который у них есть, он отлично работает отдельно, но как только я добавляю его в свои проекты Rocket, он терпит неудачу. Как видите, все упаковано в библиотеку Tokio.

pub async fn run_client( message: String ) ->Result<String, Box<dyn std::error::Error>> {
    let server_addr : String = "127.0.0.1:4000".to_string();
    let addr = server_addr
        .to_socket_addrs().unwrap()
        .next()
        .expect("could not parse address");       
        
        rocket::tokio::task::LocalSet::new()
        .run_until( async move {
            let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
            stream.set_nodelay(true).unwrap();
            let (reader, writer) =
                tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
            let rpc_network = Box::new(twoparty::VatNetwork::new(
                futures::io::BufReader::new(reader),
                futures::io::BufWriter::new(writer),
                rpc_twoparty_capnp::Side::Client,
                Default::default(),
            ));
            let mut rpc_system = RpcSystem::new(rpc_network, None);
            let hello_world: hello_world::Client =
                rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);

            rocket::tokio::task::spawn_local(rpc_system);           
           
            let mut request = hello_world.say_hello_request();
            request.get().init_request().set_name(&message[..]);

            let reply = request.send().promise.await?;           
            let reply_message  = reply.get()?.get_reply()?.get_message()?.to_str()?;
            println!("received: {}", reply_message);
            Ok(reply_message.to_string())
        }).await

}

вот полная ошибка, которую я получаю

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `Rc<tokio::task::local::Context>`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> src\infrastructure\capnp_rpc\client.rs:284:16
    |
257 |          rocket::tokio::task::LocalSet::new()
    |          ------------------------------------ has type `LocalSet` which is not `Send`
...
284 |             }).await.unwrap()
    |                ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `{async block@src/main.rs:92:1: 92:36}`, the trait `std::marker::Send` is not implemented for `*const ()`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as this value is used across an await
   --> src\infrastructure\capnp_rpc\client.rs:284:16
    |
257 |          rocket::tokio::task::LocalSet::new()
    |          ------------------------------------ has type `LocalSet` which is not `Send`
...
284 |             }).await.unwrap()
    |                ^^^^^ await occurs here, with `rocket::tokio::task::LocalSet::new()` maybe used later
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

error: future cannot be sent between threads safely
   --> src/main.rs:92:1
    |
92  | #[post("/API", data = "<request>")]
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn StdError`, which is required by `{async block@src/main.rs:92:1: 92:36}: std::marker::Send`
note: future is not `Send` as it awaits another future which is not `Send`
   --> src\infrastructure\capnp_rpc\client.rs:257:10
    |
257 | /          rocket::tokio::task::LocalSet::new()
258 | |             .spawn_local( async move {
259 | |                 let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
260 | |                 stream.set_nodelay(true).unwrap();
...   |
283 | |                 Ok(reply_message.to_string())
284 | |             }).await.unwrap()
    | |______________^ await occurs here on type `tokio::task::JoinHandle<Result<std::string::String, Box<dyn StdError>>>`, which is not `Send`
    = note: required for the cast from `Pin<Box<{async block@src/main.rs:92:1: 92:36}>>` to `Pin<Box<dyn futures::Future<Output = Outcome<rocket::Response<'_>, Status, (rocket::Data<'_>, Status)>> + std::marker::Send>>`
    = note: this error originates in the attribute macro `post` (in Nightly builds, run with -Z macro-backtrace for more info)

Судя по тому, что мне удалось выяснить, у Rocket есть собственный обработчик угроз, и похоже, что этот код пытается запустить другого обработчика угроз. Я попробовал несколько вещей из других вопросов, пытаясь использовать Mutex, но не смог заставить его работать.

Не знаю, как обернуть этот код, чтобы он мог работать под основным обработчиком угроз Rocket.

Почему вы используете LocalSet? Что произойдет, если вы просто удалите его и поместите код прямо в метод?

Chayim Friedman 14.07.2024 00:10

именно так выглядят все примеры capnproto, если я удалю это, эти строки не будут работать Rocket::tokio::task::spawn_local(rpc_system);

Gonzalo 14.07.2024 00:25

Что, если удалить LocalSet, но заменить rocket::tokio::task::spawn_local(rpc_system); на rocket::tokio::task::LocalSet::new().spawn_local(rpc_system)‌​;?

Chayim Friedman 14.07.2024 01:26

Этот вопрос похож на: Rust «будущее нельзя безопасно пересылать между потоками». Если вы считаете, что это другое, отредактируйте вопрос, поясните, чем он отличается и/или как ответы на этот вопрос не помогают решить вашу проблему.

Rob 14.07.2024 02:32

@Rob Я не понимаю, как ответы в вопросах и ответах помогут ОП здесь

kmdreko 14.07.2024 02:35

@ChayimFriedman Я получаю ту же ошибку, спасибо

Gonzalo 14.07.2024 03:36

@Rob, к сожалению, мой уровень Rust не так уж и высок, я видел оба вопроса и не уверен, как я мог бы реализовать здесь решение, спасибо

Gonzalo 14.07.2024 03:39

См. docs.rs/tokio/latest/tokio/task/…

true equals false 14.07.2024 12:19

Что, если вы сохраните LocalSet, но замените run_until на spawn_local?

Chayim Friedman 14.07.2024 15:15

@ChayimFriedman, если я это сделаю, я получу еще одну ошибку: несоответствие типов.

Gonzalo 15.07.2024 00:23

Можете ли вы опубликовать эту ошибку (в вопросе)?

Chayim Friedman 15.07.2024 17:52

@ChayimFriedman привет, я добавил ошибки проверки груза выше после внесения изменений «Сохранить LocalSet, но заменить run_until на spawn_local», спасибо

Gonzalo 16.07.2024 03:02
Почему Python в конце концов умрет
Почему Python в конце концов умрет
Последние 20 лет были действительно хорошими для Python. Он прошел путь от "просто языка сценариев" до основного языка, используемого для написания...
1
12
118
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Запустить локальную задачу (задачу, которая должна выполняться в одном потоке) внутри нелокальной задачи принципиально невозможно, так как задача-владелец может перемещаться между потоками и перемещать вместе с ней локальную задачу.

Поскольку все запросы rocket являются нелокальными задачами, это означает, что они не могут запускать код capnproto. Единственный оставшийся выбор – это главная задача. Ему потребуется управлять циклом событий, создавая задачи capnproto и одновременно запуская приложение. Другие задачи будут связываться с ним по каналам.

Вот скелет:

use std::future::Future;
use std::pin::Pin;

use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::LocalSet;

type Job = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()>>>>;

async fn send_job<Fut>(tx: &mpsc::Sender<Job>, job: impl FnOnce() -> Fut + 'static) -> Fut::Output
where
    Fut: Future + 'static,
    Fut::Output: 'static,
{
    let (result_tx, result_rx) = oneshot::channel();
    tx.send(Box::new(move || {
        let fut = job();
        Box::pin(async move {
            let result = fut.await;
            result_tx.send(result).unwrap_or_else(|_| panic!());
        })
    }))
    .await
    .unwrap();
    result_rx.await.unwrap()
}

#[tokio::main]
async fn main() {
    let local_set = LocalSet::new();
    local_set
        .run_until(async {
            // Choose a reasonable number here. If we are at the limit,
            // that means requests are coming too fast and we need to throttle them.
            let (tx, mut rx) = mpsc::channel::<Job>(100);

            local_set.spawn_local(async move {
                loop {
                    let job = rx.recv().await.expect("no sender?");
                    tokio::task::spawn_local(job());
                }
            });

            main_work(tx).await;
        })
        .await;
}

async fn main_work(tx: mpsc::Sender<Job>) {}

Внутри main_work() вы запускаете rocket. tx должен быть доступен для всех запросов, для этого используйте State.

Затем, когда какому-либо запросу потребуется запустить задачу capnproto, он вызовет:

let result = send_job(&tx, move || async move {
    // Code here.
})
.await;

Такая конструкция ограничивает масштабируемость за счет использования только одного ядра для задач capnproto, но это напрямую связано с ограничениями ящика capnp-rpc.

Ответ принят как подходящий

Получив помощь от нескольких коллег, я смог выполнить эту работу, вот код, которым я хочу поделиться решением, мне все еще немного сложно понять, но я думаю, что важной частью является создание новой угрозы из основного угроза, а затем используйте код capnproto.

use capnp_rpc::{rpc_twoparty_capnp, twoparty, RpcSystem};
use rocket::futures::AsyncReadExt;
use rocket::*;
use std::net::ToSocketAddrs;
use tokio::runtime::Builder;
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::RecvError;
use tokio::task::LocalSet;

#[post("/API", data = "<request>")]
async fn invoke(request: &str) {
    println!("req {}", request);
    let mut client = RpcClient::new(String::from("Hello World"));
    let response = client.get_response().await;
}

pub struct RpcClient {
    receiver: oneshot::Receiver<String>,
}

impl RpcClient {
    fn new(message: String) -> Self {
        let (sender, receiver) = oneshot::channel();
        let rt = Builder::new_current_thread().enable_all().build().unwrap();
        let server_addr: String = "127.0.0.1:4000".to_string();
        let addr = server_addr
            .to_socket_addrs()
            .unwrap()
            .next()
            .expect("could not parse address");
        std::thread::spawn(move || {
            let local = LocalSet::new();
            local.spawn_local(async move {
                let stream = rocket::tokio::net::TcpStream::connect(&addr).await?;
                stream.set_nodelay(true).unwrap();
                let (reader, writer) =
                    tokio_util::compat::TokioAsyncReadCompatExt::compat(stream).split();
                let rpc_network = Box::new(twoparty::VatNetwork::new(
                    futures::io::BufReader::new(reader),
                    futures::io::BufWriter::new(writer),
                    rpc_twoparty_capnp::Side::Client,
                    Default::default(),
                ));
                let mut rpc_system = RpcSystem::new(rpc_network, None);
                let hello_world: hello_world::Client =
                    rpc_system.bootstrap(rpc_twoparty_capnp::Side::Server);
                
                rocket::tokio::task::spawn_local(rpc_system);
                
                let mut request = hello_world.say_hello_request();
                request.get().init_request().set_name(&message[..]);
                
                let reply = request.send().promise.await?;
                let reply_message = reply.get()?.get_reply()?.get_message()?.to_str()?;
                println!("received: {}", reply_message);
                // send the message to the receiver
                sender.send(reply_message.to_string()).unwrap();
                Ok::<String, Box<dyn std::error::Error>>(reply_message.to_string())
            });
            rt.block_on(local);
        });
        Self { receiver }
    }

    async fn get_response(mut self) -> Result<String, String> {
        match self.receiver.await {
            Ok(response) => Ok(response),
            Err(error) => Err(error.to_string()),
        }
    }
}
#[rocket::main]
async fn main() {
    rocket::build()
        .mount("/API", routes![invoke])
        .launch()
        .await
        .ok();
}

Спасибо

Другие вопросы по теме