Как использовать future::join_all с мультиплексным redis в асинхронной среде выполнения tokio

Я пытаюсь использовать клиент redis Rust в асинхронном мультиплексном режиме, с tokio в качестве асинхронной среды выполнения и динамическим количеством фьючерсов для присоединения.

Я успешно использовал future::join3 для постоянного числа фьючерсов, но я хочу мультиплексировать гораздо больше команд (конкретный размер не должен быть известен во время компиляции, но даже это было бы улучшением).

Это рабочий пример при использовании future::join3; Пример правильно печатает Ok(Some("PONG")) Ok(Some("PONG")) Ok(Some("PONG"))

Cargo.toml

[package]
name = "redis_sample"
version = "0.1.0"
authors = ["---"]
edition = "2018"


[dependencies]
redis = { version = "0.17.0", features = ["aio", "tokio-comp", "tokio-rt-core"] }
tokio = { version = "0.2.23", features = ["full"] }
futures = "0.3.8"

src/main.rs

use futures::future;
use redis::RedisResult;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let results: (RedisResult<Option<String>>, RedisResult<Option<String>>, RedisResult<Option<String>>) = future::join3(
        redis::cmd("PING").query_async(&mut redis_connection.clone()),
        redis::cmd("PING").query_async(&mut redis_connection.clone()),
        redis::cmd("PING").query_async(&mut redis_connection),
    ).await;

    println!("{:?} {:?} {:?}", results.0, results.1, results.2);

    Ok(())
}

Теперь я хочу сделать то же самое, но с n командами (скажем, 10, но в идеале я хотел бы настроить это на производительность в продакшене). Это то, что я получил, но я не могу преодолеть правила заимствования; Я пытался хранить некоторых посредников (либо Redis Cmd, либо само будущее) в Vec, чтобы продлить их жизнь, но у этого были другие проблемы (с несколькими ссылками mut).

Cargo.toml то же самое; вот main.rs

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

Я получаю два предупреждения компилятора (creates a temporary which is freed while still in use) и не знаю, что делать дальше с этим кодом. Я не на 100% уверен в использовании PIN-кода, но без него я не смог бы даже хранить фьючерсы.

Полный вывод компилятора:

   Compiling redis_sample v0.1.0 (/Users/gyfis/Documents/programming/rust/redis_sample)
error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:32
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                ^^^^^^^^^^^^^^^^^^                                              - temporary value is freed at the end of this statement
   |                                |
   |                                creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error[E0716]: temporary value dropped while borrowed
  --> redis_sample/src/main.rs:14:69
   |
14 |         commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
   |                                                                     ^^^^^^^^^^^^^^^^^^^^^^^^   - temporary value is freed at the end of this statement
   |                                                                     |
   |                                                                     creates a temporary which is freed while still in use
...
21 | }
   | - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
   |
   = note: consider using a `let` binding to create a longer lived value

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0716`.
error: could not compile `redis_sample`.

Любая помощь приветствуется!

Создание Twitter-подобного приложения Trending Topics App с Redis (на примере PHP)
Создание Twitter-подобного приложения Trending Topics App с Redis (на примере PHP)
Redis - это популярная база данных типа "ключ-значение" в памяти с поддержкой различных типов и структур данных, которая в основном используется для...
3
0
1 094
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Это должно сработать, я только что продлил срок службы redis_connection.

use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(Box::pin(async move {
            redis::cmd("PING").query_async(&mut redis_connection).await
        }));
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

Поскольку вы находитесь внутри тела функции, вам даже не нужно упаковывать фьючерсы, вывод типов может сделать всю работу:

use futures::future;
use redis::RedisResult;

const BATCH_SIZE: usize = 10;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;

    let mut commands = vec![];
    for _ in 0..BATCH_SIZE {
        let mut redis_connection = redis_connection.clone();
        commands.push(async move {
            redis::cmd("PING").query_async::<_, Option<String>>(&mut redis_connection).await
        });
    }
    let results = future::join_all(commands).await;

    println!("{:?}", results);

    Ok(())
}

Спасибо! Первый пример работает хорошо, это замечательно. Не могли бы вы описать немного больше, почему вам нужно async move { и почему вы сохраняете результат .await, или, может быть, ссылку на книгу/объяснение где-нибудь? Я, наверное, неправильно смотрю на фьючерсы, но я подумал, что .await подождет, пока будущее не закончится. Второй пример у меня, к сожалению, не компилируется, с двумя ошибками компилятора в строке redis::cmd("PING").query_async - обе такие: error[E0698]: type inside `async` block must be known in this context Спасибо!

Gyfis 20.12.2020 21:36

Мне нужно было асинхронно переместить соединение Redis в будущее. В противном случае будущее будет заимствовать соединение Redis, но это будет ошибкой, поскольку будущее должно пережить соединение Redis — оно должно жить дольше, чем одна итерация цикла. Да, вы правы в том, что await ждет, пока будущее не будет выполнено, но асинхронный блок создает новое будущее и поэтому не запускается немедленно. Я исправлю ответ, чтобы он скомпилировался.

Sabrina Jewson 21.12.2020 17:48

Другие вопросы по теме