Использование фрагментов Vec в задаче tokio приводит к запутанным ошибкам

У меня есть фрейм данных Polars, который я обработал в Vec столбчатых данных типа Option<&str>. Я хочу отправить это в базу данных, но там десятки тысяч строк, и я получаю предупреждения SQL о «длинной транзакции». Чтобы решить эту проблему, я хочу распределить работу по нескольким соединениям с БД. Итак, я создал tokio::JoinSet и создал кучу задач для выполнения SQL-запросов. Чтобы отслеживать индексы в каждом фрагменте, я использую очень удобный контейнер paginate.

Я последовал этому ответу, чтобы попытаться заставить это работать, в котором рекомендуется заключить мои Vec в Arc, поскольку мне не нужно изменять данные. К сожалению, мне пока это не удалось.

Обновление Я создал CodeSandbox, показывающий проблему, с которой столкнулся.

Обновление 2. Я полностью выровнял код в этом вопросе, сообщения об ошибках и CodeSandbox. Все еще ищу ответ на ошибку №1.


use anyhow::Result;
use paginate::{Page, Pages};
use polars::prelude::*;
use rand::Rng;
use std::sync::Arc;
use tokio::task::JoinSet;
const CONNECTION_COUNT: usize = 14;

#[tokio::main]
async fn main() -> Result<()> {
    let mut rng = rand::thread_rng();
    let values: Vec<String> = (0..76_000)
        .map(|_| rng.gen_range(0..1000).to_string())
        .collect();
    let rand_df = df!("A" => values).unwrap();

    load_resources("NUMS", &rand_df);

    Ok(())
}

async fn load_resources(source_str: &str, in_df: &DataFrame) -> Result<()> {
    let merged_df = in_df.clone();

    // Error #1 is here ↴ -- see below
    let ids: Arc<Vec<_>> = Arc::new(merged_df.column("A")?.str()?.iter().collect());

    let count = in_df.shape().0;
    let pages = Pages::new(count, 5_000);
    let mut join_set = JoinSet::new();
    let mut results = Vec::new();

    for page in pages.into_iter() {
        join_set.spawn(async move {
            insert_resources(
                source_str,
                page,
                ids.clone(), // Error #2, see below
            )
            .await
        });

        if join_set.len() >= CONNECTION_COUNT {
            if let Some(result) = join_set.join_next().await {
                results.push(result.unwrap());
            }
        }
    }

    // Await the completion of all tasks
    while let Some(result) = join_set.join_next().await {
        results.push(result.unwrap());
    }

    Ok(())
}

async fn insert_resources(
    source_str: &str, // Error #3, see below
    page: Page,
    ids: Arc<Vec<Option<&str>>>,
) -> Result<()> {
    println!("source: {} page: {:?} names: {:?}", source_str, page, ids);

    Ok(())
}

Ошибки по-прежнему говорят о том, что мое заимствованное значение не живет достаточно долго, несмотря на то, что я клонирую DataFrame (после того, как завернул его в Arc).

1. merged_df живет недостаточно долго

error[E0597]: `merged_df` does not live long enough
  --> src/main.rs:26:37
   |
23 |     let merged_df = in_df.clone();
   |         --------- binding `merged_df` declared here
...
26 |     let ids: Arc<Vec<_>> = Arc::new(merged_df.column("A")?.str()?.iter().co...
   |                                     ^^^^^^^^^---------------------------
   |                                     |
   |                                     borrowed value does not live long enough
   |                                     argument requires that `merged_df` is borrowed for `'static`
...
56 | }
   | - `merged_df` dropped here while still borrowed

2. использование перемещенного значения: names. И что за странное предложение поставить .clone() после блока async move?? Я попробовал, и он не компилируется.

error[E0382]: use of moved value: `ids`
  --> src/main.rs:34:24
   |
26 |       let ids: Arc<Vec<_>> = Arc::new(merged_df.column("A")?.str()?.iter...
   |           --- move occurs because `ids` has type `Arc<Vec<Option<&str>>>`, which does not implement the `Copy` trait
...
33 |       for page in pages.into_iter() {
   |       ----------------------------- inside of this loop
34 |           join_set.spawn(async move {
   |  ________________________^
35 | |             insert_resources(
36 | |                 source_str,
37 | |                 page,
38 | |                 ids.clone(), // Error #2, see below
   | |                 --- use occurs due to use in coroutine
39 | |             )
40 | |             .await
41 | |         });
   | |_________^ value moved here, in previous iteration of loop
   |
help: clone the value to increment its reference count
   |
41 |         }.clone());
   |          ++++++++

3. заимствованные данные выходят за пределы функции

error[E0521]: borrowed data escapes outside of function
  --> src/main.rs:34:9
   |
22 |   async fn load_resources(source_str: &str, in_df: &DataFrame) -> Result...
   |                           ----------  - let's call the lifetime of this reference `'1`
   |                           |
   |                           `source_str` is a reference that is only valid in the function body
...
34 | /         join_set.spawn(async move {
35 | |             insert_resources(
36 | |                 source_str,
37 | |                 page,
...  |
40 | |             .await
41 | |         });
   | |          ^
   | |          |
   | |__________`source_str` escapes the function body here
   |            argument requires that `'1` must outlive `'static`

Как правильно это сделать?

В вашем коде написано let merged_df = in_df.clone();, в вашей ошибке написано let merged_df = db_eds_df. Чему верить?

Alexey S. Larionov 22.05.2024 15:08

Я значительно упростил код, чтобы попытаться сфокусировать вопрос, но оставил ошибки как есть, чтобы не нарушать формат ASCII. merged_df действительно создается внутри функции путем объединения других переданных df-файлов, но в моем реальном коде ни один из них не вызывается in_df. Чтобы разобраться в этом, правильно использовать in_df. Надеюсь это имеет смысл?

Nick K9 22.05.2024 15:12

@Nickk9, если ваш пример является минимально воспроизводимым примером, то вы можете просто вывести из него ошибки, если нет, то вам следует сделать его MRE.

cafce25 22.05.2024 15:17

Хорошо, я еще больше упростил это и создал виртуальную машину CodeSandbox, которая показывает большинство ошибок.

Nick K9 22.05.2024 16:02

Ваше сообщение по-прежнему не связно, ошибки здесь не соответствуют коду, который вы добавили сюда. Обратите внимание: по ряду причин код, который находится только на внешних ресурсах, недостаточен в качестве минимального воспроизводимого примера для переполнения стекавопрос.

cafce25 22.05.2024 16:06

Когда вы это делаете async move { ... x.clone() ... }, именно ценность x перемещается в будущее. Если x является ссылкой, это означает, что замыкание должно жить дольше, чем ссылка, что в данном случае неверно. Это верно независимо от того, используете ли вы move ссылку в замыкании, потому что ссылки в любом случае Copy. Предложение компилятора использовать клонирование верно, однако вам нужно клонировать, например, ПЕРЕД перемещением замыкания let c = x.clone(); async move { ... c ... }

user2407038 22.05.2024 16:12

Более того, я думаю, что async { func(...).await }, вероятно, является антипаттерном, возможно, есть исключительные случаи, когда это необходимо, но вы можете написать это как просто func(...). Встроенные блоки async предназначены для случаев, когда вы хотите выполнить нетривиальную композицию множества других фьючерсов и функций внутри асинхронного блока.

user2407038 22.05.2024 16:16

Супер полезно, спасибо! Благодаря вашим предложениям я решил 2 из 3 проблем. (Мне также пришлось использовать Arc вместо source_str.) Однако все равно застрял на первой ошибке. «merged_df не живет достаточно долго». CodeSandbox настроен, чтобы показать это… мне нужно как-то поставить этот DF за Arc? Очень рад принять ответ, если вы хотите его написать.

Nick K9 22.05.2024 16:34
Почему Python в конце концов умрет
Почему Python в конце концов умрет
Последние 20 лет были действительно хорошими для Python. Он прошел путь от "просто языка сценариев" до основного языка, используемого для написания...
0
8
84
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Большое спасибо @user2407038 за помощь в разъяснении вопросов.

Чтобы ответить на вопросы о конкретных ошибках:

  1. merged_df живет недостаточно долго. Решением здесь было преобразовать содержимое Vec в принадлежащие String вместо просто &str. Это достигается с помощью: .map(|opt_str| opt_str.map(|s| s.to_string()))
  2. использование перемещенного значения: ids. Я убрал обертку async move {}.await, так что clone() работает хорошо.
  3. заимствованные данные выходят за пределы функции. Как и в случае с 1, я использовал Arc, чтобы обернуть String при передаче его в функцию.

Полный компилируемый код здесь:

use anyhow::Result;
use paginate::{Page, Pages};
use polars::prelude::*;
use rand::Rng;
use std::sync::Arc;
use tokio::task::JoinSet;
const CONNECTION_COUNT: usize = 14;

#[tokio::main]
async fn main() -> Result<()> {
    let mut rng = rand::thread_rng();
    let values: Vec<String> = (0..76_000)
        .map(|_| rng.gen_range(0..1000).to_string())
        .collect();
    let rand_df = df!("A" => values).unwrap();

    let _ = load_resources("NUMS", &rand_df);

    Ok(())
}

async fn load_resources(source_str: &str, in_df: &DataFrame) -> Result<()> {
    let merged_df = in_df.clone();

    let ids: Arc<Vec<_>> = Arc::new(
        merged_df
            .column("A")?
            .str()?
            .iter()
            .map(|opt_str| opt_str.map(|s| s.to_string()))
            .collect(),
    );
    let source = Arc::new(source_str.to_string());

    let count = in_df.shape().0;
    let pages = Pages::new(count, 5_000);
    let mut join_set = JoinSet::new();
    let mut results = Vec::new();

    for page in pages.into_iter() {
        join_set.spawn(insert_resources(source.clone(), page, ids.clone()));

        if join_set.len() >= CONNECTION_COUNT {
            if let Some(result) = join_set.join_next().await {
                results.push(result.unwrap());
            }
        }
    }

    // Await the completion of all tasks
    while let Some(result) = join_set.join_next().await {
        results.push(result.unwrap());
    }

    Ok(())
}

async fn insert_resources(
    source_str: Arc<String>,
    page: Page,
    ids: Arc<Vec<Option<String>>>,
) -> Result<()> {
    println!("source: {} page: {:?} names: {:?}", source_str, page, ids);

    Ok(())
}

Другие вопросы по теме