Я пытаюсь использовать клиент redis Rust в асинхронном мультиплексном режиме, с tokio в качестве асинхронной среды выполнения и динамическим количеством фьючерсов для присоединения.
Я успешно использовал future::join3
для постоянного числа фьючерсов, но я хочу мультиплексировать гораздо больше команд (конкретный размер не должен быть известен во время компиляции, но даже это было бы улучшением).
Это рабочий пример при использовании future::join3
; Пример правильно печатает
Ok(Some("PONG")) Ok(Some("PONG")) Ok(Some("PONG"))
Cargo.toml
[package]
name = "redis_sample"
version = "0.1.0"
authors = ["---"]
edition = "2018"
[dependencies]
redis = { version = "0.17.0", features = ["aio", "tokio-comp", "tokio-rt-core"] }
tokio = { version = "0.2.23", features = ["full"] }
futures = "0.3.8"
src/main.rs
use futures::future;
use redis::RedisResult;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let mut redis_connection = redis_client.get_multiplexed_tokio_connection().await?;
let results: (RedisResult<Option<String>>, RedisResult<Option<String>>, RedisResult<Option<String>>) = future::join3(
redis::cmd("PING").query_async(&mut redis_connection.clone()),
redis::cmd("PING").query_async(&mut redis_connection.clone()),
redis::cmd("PING").query_async(&mut redis_connection),
).await;
println!("{:?} {:?} {:?}", results.0, results.1, results.2);
Ok(())
}
Теперь я хочу сделать то же самое, но с n
командами (скажем, 10, но в идеале я хотел бы настроить это на производительность в продакшене). Это то, что я получил, но я не могу преодолеть правила заимствования; Я пытался хранить некоторых посредников (либо Redis Cmd
, либо само будущее) в Vec, чтобы продлить их жизнь, но у этого были другие проблемы (с несколькими ссылками mut
).
Cargo.toml
то же самое; вот main.rs
use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;
const BATCH_SIZE: usize = 10;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;
let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
for _ in 0..BATCH_SIZE {
commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
}
let results = future::join_all(commands).await;
println!("{:?}", results);
Ok(())
}
Я получаю два предупреждения компилятора (creates a temporary which is freed while still in use
) и не знаю, что делать дальше с этим кодом. Я не на 100% уверен в использовании PIN-кода, но без него я не смог бы даже хранить фьючерсы.
Полный вывод компилятора:
Compiling redis_sample v0.1.0 (/Users/gyfis/Documents/programming/rust/redis_sample)
error[E0716]: temporary value dropped while borrowed
--> redis_sample/src/main.rs:14:32
|
14 | commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
| ^^^^^^^^^^^^^^^^^^ - temporary value is freed at the end of this statement
| |
| creates a temporary which is freed while still in use
...
21 | }
| - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
|
= note: consider using a `let` binding to create a longer lived value
error[E0716]: temporary value dropped while borrowed
--> redis_sample/src/main.rs:14:69
|
14 | commands.push(Box::pin(redis::cmd("PING").query_async(& mut redis_connection.clone())));
| ^^^^^^^^^^^^^^^^^^^^^^^^ - temporary value is freed at the end of this statement
| |
| creates a temporary which is freed while still in use
...
21 | }
| - borrow might be used here, when `commands` is dropped and runs the `Drop` code for type `std::vec::Vec`
|
= note: consider using a `let` binding to create a longer lived value
error: aborting due to 2 previous errors
For more information about this error, try `rustc --explain E0716`.
error: could not compile `redis_sample`.
Любая помощь приветствуется!
Это должно сработать, я только что продлил срок службы redis_connection
.
use futures::{future, Future};
use std::pin::Pin;
use redis::RedisResult;
const BATCH_SIZE: usize = 10;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;
let mut commands: Vec<Pin<Box<dyn Future<Output = RedisResult<Option<String>>>>>> = vec![];
for _ in 0..BATCH_SIZE {
let mut redis_connection = redis_connection.clone();
commands.push(Box::pin(async move {
redis::cmd("PING").query_async(&mut redis_connection).await
}));
}
let results = future::join_all(commands).await;
println!("{:?}", results);
Ok(())
}
Поскольку вы находитесь внутри тела функции, вам даже не нужно упаковывать фьючерсы, вывод типов может сделать всю работу:
use futures::future;
use redis::RedisResult;
const BATCH_SIZE: usize = 10;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
let redis_connection = redis_client.get_multiplexed_tokio_connection().await?;
let mut commands = vec![];
for _ in 0..BATCH_SIZE {
let mut redis_connection = redis_connection.clone();
commands.push(async move {
redis::cmd("PING").query_async::<_, Option<String>>(&mut redis_connection).await
});
}
let results = future::join_all(commands).await;
println!("{:?}", results);
Ok(())
}
Мне нужно было асинхронно переместить соединение Redis в будущее. В противном случае будущее будет заимствовать соединение Redis, но это будет ошибкой, поскольку будущее должно пережить соединение Redis — оно должно жить дольше, чем одна итерация цикла. Да, вы правы в том, что await ждет, пока будущее не будет выполнено, но асинхронный блок создает новое будущее и поэтому не запускается немедленно. Я исправлю ответ, чтобы он скомпилировался.
Спасибо! Первый пример работает хорошо, это замечательно. Не могли бы вы описать немного больше, почему вам нужно
async move {
и почему вы сохраняете результат.await
, или, может быть, ссылку на книгу/объяснение где-нибудь? Я, наверное, неправильно смотрю на фьючерсы, но я подумал, что.await
подождет, пока будущее не закончится. Второй пример у меня, к сожалению, не компилируется, с двумя ошибками компилятора в строкеredis::cmd("PING").query_async
- обе такие:error[E0698]: type inside `async` block must be known in this context
Спасибо!