У меня есть фрейм данных Polars, который я обработал в Vec столбчатых данных типа Option<&str>. Я хочу отправить это в базу данных, но там десятки тысяч строк, и я получаю предупреждения SQL о «длинной транзакции». Чтобы решить эту проблему, я хочу распределить работу по нескольким соединениям с БД. Итак, я создал tokio::JoinSet и создал кучу задач для выполнения SQL-запросов. Чтобы отслеживать индексы в каждом фрагменте, я использую очень удобный контейнер paginate.
Я последовал этому ответу, чтобы попытаться заставить это работать, в котором рекомендуется заключить мои Vec в Arc, поскольку мне не нужно изменять данные. К сожалению, мне пока это не удалось.
Обновление Я создал CodeSandbox, показывающий проблему, с которой столкнулся.
Обновление 2. Я полностью выровнял код в этом вопросе, сообщения об ошибках и CodeSandbox. Все еще ищу ответ на ошибку №1.
use anyhow::Result;
use paginate::{Page, Pages};
use polars::prelude::*;
use rand::Rng;
use std::sync::Arc;
use tokio::task::JoinSet;
const CONNECTION_COUNT: usize = 14;
#[tokio::main]
async fn main() -> Result<()> {
let mut rng = rand::thread_rng();
let values: Vec<String> = (0..76_000)
.map(|_| rng.gen_range(0..1000).to_string())
.collect();
let rand_df = df!("A" => values).unwrap();
load_resources("NUMS", &rand_df);
Ok(())
}
async fn load_resources(source_str: &str, in_df: &DataFrame) -> Result<()> {
let merged_df = in_df.clone();
// Error #1 is here ↴ -- see below
let ids: Arc<Vec<_>> = Arc::new(merged_df.column("A")?.str()?.iter().collect());
let count = in_df.shape().0;
let pages = Pages::new(count, 5_000);
let mut join_set = JoinSet::new();
let mut results = Vec::new();
for page in pages.into_iter() {
join_set.spawn(async move {
insert_resources(
source_str,
page,
ids.clone(), // Error #2, see below
)
.await
});
if join_set.len() >= CONNECTION_COUNT {
if let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
}
}
// Await the completion of all tasks
while let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
Ok(())
}
async fn insert_resources(
source_str: &str, // Error #3, see below
page: Page,
ids: Arc<Vec<Option<&str>>>,
) -> Result<()> {
println!("source: {} page: {:?} names: {:?}", source_str, page, ids);
Ok(())
}
Ошибки по-прежнему говорят о том, что мое заимствованное значение не живет достаточно долго, несмотря на то, что я клонирую DataFrame (после того, как завернул его в Arc).
1. merged_df живет недостаточно долго
error[E0597]: `merged_df` does not live long enough
--> src/main.rs:26:37
|
23 | let merged_df = in_df.clone();
| --------- binding `merged_df` declared here
...
26 | let ids: Arc<Vec<_>> = Arc::new(merged_df.column("A")?.str()?.iter().co...
| ^^^^^^^^^---------------------------
| |
| borrowed value does not live long enough
| argument requires that `merged_df` is borrowed for `'static`
...
56 | }
| - `merged_df` dropped here while still borrowed
2. использование перемещенного значения: names. И что за странное предложение поставить .clone() после блока async move?? Я попробовал, и он не компилируется.
error[E0382]: use of moved value: `ids`
--> src/main.rs:34:24
|
26 | let ids: Arc<Vec<_>> = Arc::new(merged_df.column("A")?.str()?.iter...
| --- move occurs because `ids` has type `Arc<Vec<Option<&str>>>`, which does not implement the `Copy` trait
...
33 | for page in pages.into_iter() {
| ----------------------------- inside of this loop
34 | join_set.spawn(async move {
| ________________________^
35 | | insert_resources(
36 | | source_str,
37 | | page,
38 | | ids.clone(), // Error #2, see below
| | --- use occurs due to use in coroutine
39 | | )
40 | | .await
41 | | });
| |_________^ value moved here, in previous iteration of loop
|
help: clone the value to increment its reference count
|
41 | }.clone());
| ++++++++
3. заимствованные данные выходят за пределы функции
error[E0521]: borrowed data escapes outside of function
--> src/main.rs:34:9
|
22 | async fn load_resources(source_str: &str, in_df: &DataFrame) -> Result...
| ---------- - let's call the lifetime of this reference `'1`
| |
| `source_str` is a reference that is only valid in the function body
...
34 | / join_set.spawn(async move {
35 | | insert_resources(
36 | | source_str,
37 | | page,
... |
40 | | .await
41 | | });
| | ^
| | |
| |__________`source_str` escapes the function body here
| argument requires that `'1` must outlive `'static`
Как правильно это сделать?
Я значительно упростил код, чтобы попытаться сфокусировать вопрос, но оставил ошибки как есть, чтобы не нарушать формат ASCII. merged_df действительно создается внутри функции путем объединения других переданных df-файлов, но в моем реальном коде ни один из них не вызывается in_df. Чтобы разобраться в этом, правильно использовать in_df. Надеюсь это имеет смысл?
@Nickk9, если ваш пример является минимально воспроизводимым примером, то вы можете просто вывести из него ошибки, если нет, то вам следует сделать его MRE.
Хорошо, я еще больше упростил это и создал виртуальную машину CodeSandbox, которая показывает большинство ошибок.
Ваше сообщение по-прежнему не связно, ошибки здесь не соответствуют коду, который вы добавили сюда. Обратите внимание: по ряду причин код, который находится только на внешних ресурсах, недостаточен в качестве минимального воспроизводимого примера для переполнения стекавопрос.
Когда вы это делаете async move { ... x.clone() ... }, именно ценность x перемещается в будущее. Если x является ссылкой, это означает, что замыкание должно жить дольше, чем ссылка, что в данном случае неверно. Это верно независимо от того, используете ли вы move ссылку в замыкании, потому что ссылки в любом случае Copy. Предложение компилятора использовать клонирование верно, однако вам нужно клонировать, например, ПЕРЕД перемещением замыкания let c = x.clone(); async move { ... c ... }
Более того, я думаю, что async { func(...).await }, вероятно, является антипаттерном, возможно, есть исключительные случаи, когда это необходимо, но вы можете написать это как просто func(...). Встроенные блоки async предназначены для случаев, когда вы хотите выполнить нетривиальную композицию множества других фьючерсов и функций внутри асинхронного блока.
Супер полезно, спасибо! Благодаря вашим предложениям я решил 2 из 3 проблем. (Мне также пришлось использовать Arc вместо source_str.) Однако все равно застрял на первой ошибке. «merged_df не живет достаточно долго». CodeSandbox настроен, чтобы показать это… мне нужно как-то поставить этот DF за Arc? Очень рад принять ответ, если вы хотите его написать.

Большое спасибо @user2407038 за помощь в разъяснении вопросов.
Чтобы ответить на вопросы о конкретных ошибках:
merged_df живет недостаточно долго. Решением здесь было преобразовать содержимое Vec в принадлежащие String вместо просто &str. Это достигается с помощью: .map(|opt_str| opt_str.map(|s| s.to_string()))ids. Я убрал обертку async move {}.await, так что clone() работает хорошо.Arc, чтобы обернуть String при передаче его в функцию.Полный компилируемый код здесь:
use anyhow::Result;
use paginate::{Page, Pages};
use polars::prelude::*;
use rand::Rng;
use std::sync::Arc;
use tokio::task::JoinSet;
const CONNECTION_COUNT: usize = 14;
#[tokio::main]
async fn main() -> Result<()> {
let mut rng = rand::thread_rng();
let values: Vec<String> = (0..76_000)
.map(|_| rng.gen_range(0..1000).to_string())
.collect();
let rand_df = df!("A" => values).unwrap();
let _ = load_resources("NUMS", &rand_df);
Ok(())
}
async fn load_resources(source_str: &str, in_df: &DataFrame) -> Result<()> {
let merged_df = in_df.clone();
let ids: Arc<Vec<_>> = Arc::new(
merged_df
.column("A")?
.str()?
.iter()
.map(|opt_str| opt_str.map(|s| s.to_string()))
.collect(),
);
let source = Arc::new(source_str.to_string());
let count = in_df.shape().0;
let pages = Pages::new(count, 5_000);
let mut join_set = JoinSet::new();
let mut results = Vec::new();
for page in pages.into_iter() {
join_set.spawn(insert_resources(source.clone(), page, ids.clone()));
if join_set.len() >= CONNECTION_COUNT {
if let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
}
}
// Await the completion of all tasks
while let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
Ok(())
}
async fn insert_resources(
source_str: Arc<String>,
page: Page,
ids: Arc<Vec<Option<String>>>,
) -> Result<()> {
println!("source: {} page: {:?} names: {:?}", source_str, page, ids);
Ok(())
}
В вашем коде написано
let merged_df = in_df.clone();, в вашей ошибке написаноlet merged_df = db_eds_df. Чему верить?