Как добавить трассировки/промежутки в асинхронную функцию?

У меня возникли проблемы с добавлением трассировки к определенной функции в моем проекте Rust. Вот соответствующие файлы

// chain/ethereum/src/ingestor.rs

    #[tracing::instrument(skip(self), name = "PollingBlockIngestor::do_poll")]
    async fn do_poll(&self) -> Result<(), IngestorError> {

        // Get chain head ptr from store
        let head_block_ptr_opt = self.chain_store.cheap_clone().chain_head_ptr().await?;

        // To check if there is a new block or not, fetch only the block header since that's cheaper
        // than the full block. This is worthwhile because most of the time there won't be a new
        // block, as we expect the poll interval to be much shorter than the block time.
        info_span!("latest_block");
        let latest_block = self.latest_block().await?;

        if let Some(head_block) = head_block_ptr_opt.as_ref() {
            // If latest block matches head block in store, nothing needs to be done
            if &latest_block == head_block {
                return Ok(());
            }

            if latest_block.number < head_block.number {
                // An ingestor might wait or move forward, but it never
                // wavers and goes back. More seriously, this keeps us from
                // later trying to ingest a block with the same number again
                warn!(self.logger,
                    "Provider went backwards - ignoring this latest block";
                    "current_block_head" => head_block.number,
                    "latest_block_head" => latest_block.number);
                return Ok(());
            }
        }

        // Compare latest block with head ptr, alert user if far behind
        match head_block_ptr_opt {
            None => {
                info!(
                    self.logger,
                    "Downloading latest blocks from Ethereum, this may take a few minutes..."
                );
            }
            Some(head_block_ptr) => {
                let latest_number = latest_block.number;
                let head_number = head_block_ptr.number;
                let distance = latest_number - head_number;
                let blocks_needed = (distance).min(self.ancestor_count);
                let code = if distance >= 15 {
                    LogCode::BlockIngestionLagging
                } else {
                    LogCode::BlockIngestionStatus
                };
                if distance > 0 {
                    info!(
                        self.logger,
                        "Syncing {} blocks from Ethereum",
                        blocks_needed;
                        "current_block_head" => head_number,
                        "latest_block_head" => latest_number,
                        "blocks_behind" => distance,
                        "blocks_needed" => blocks_needed,
                        "code" => code,
                    );
                }
            }
        }

        let mut missing_block_hash = self.ingest_block(&latest_block.hash).await?;

        
        while let Some(hash) = missing_block_hash {
            missing_block_hash = self.ingest_block(&hash).await?;
        }
        Ok(())
    }

    async fn latest_block(&self) -> Result<BlockPtr, IngestorError> {
        info_span!("latest_block_header");
        self.eth_adapter
            .latest_block_header(&self.logger)
            .compat()
            .await
            .map(|block| block.into())
    }

и

// chain/ethereum/src/ethereum_adapter.rs

impl EthereumAdapterTrait for EthereumAdapter {

    #[tracing::instrument(skip_all, name = "testest")]
    fn latest_block_header(
        &self,
        logger: &Logger,
    ) -> Box<dyn Future<Item = web3::types::Block<H256>, Error = IngestorError> + Send> {
        let s = info_span!("latest_block_header in eth adapter");
        let web3 = self.web3.clone();
        Box::new(
            retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
                .no_limit()
                .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
                .run(move || {
                    let web3 = web3.cheap_clone();
                    async move {
                        let block_opt = web3
                            .eth()
                            .block(Web3BlockNumber::Latest.into())
                            .await
                            .map_err(|e| {
                                anyhow!("could not get latest block from Ethereum: {}", e)
                            })?;

                        block_opt
                            .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into())
                    }
                })
                .map_err(move |e| {
                    e.into_inner().unwrap_or_else(move || {
                        anyhow!("Ethereum node took too long to return latest block").into()
                    })
                })
                .boxed()
                .compat(),
        )
    }
// lots of other functions
}

Когда я запускаю это и отправляю трассировки Jaeger, я не вижу ни одного из промежутков на latest_block_header. Я вижу PollingBlockIngestor::do_poll и latest_block и latest_block_header, но не вижу testest и latest_block_header in eth adapter.

Как правильно создать интервалы для функции latest_block_header?

Обновлено: я также пытался изменить асинхронный ход, например, когда я вызываю instrument в будущем, но это тоже не работает.

                    let web3 = web3.cheap_clone();
                    let s = info_span!("latest_block_header in eth adapter");
                    async move {
                        let block_opt = web3
                            .eth()
                            .block(Web3BlockNumber::Latest.into())
                            .instrument(s)
                            .await
                            .map_err(|e| {
                                anyhow!("could not get latest block from Ethereum: {}", e)
                            })?;

                        block_opt
                            .ok_or_else(|| anyhow!("no latest block returned from Ethereum").into())
                    }
Почему Python в конце концов умрет
Почему Python в конце концов умрет
Последние 20 лет были действительно хорошими для Python. Он прошел путь от "просто языка сценариев" до основного языка, используемого для написания...
3
0
71
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Простое создание диапазона не «входит» в него. Я не знаю наверняка, будет ли невведенный диапазон означать, что он отсутствует в Jaeger (через открытую телеметрию), но тем не менее...

Чтобы «ввести» диапазон, вам необходимо либо:

  • используйте .enter() или .entered(), который вернет тип защиты RAII, который выйдет из диапазона после удаления. Единственная разница между ними заключается в том, используется ли диапазон или просто на него ссылаются.
  • используйте .in_scope(|| { ... }), чтобы обернуть вложенный код в этот диапазон.
  • используйте метод расширения .instrument(span) в блоке Future/async { ... }. Это будет автоматически входить и выходить по мере опроса задачи.

Вероятно, вам понадобится последний вариант, поскольку он предназначен специально для отслеживания асинхронных задач.

Размещение его во внутреннем блоке async будет означать, что он покрывает только эту часть вашего кода, и при повторном запуске (при условии, что retry(...) может это сделать) в пределах вашего диапазона «latest_block_header» будет несколько диапазонов «latest_block_header in eth адаптер».

use tracing::Instrument;

let s = info_span!("latest_block_header in eth adapter");
async move {
    ...
}.instrument(s)

Если вместо этого вы хотите, чтобы оно охватывало всю операцию retry(...), вы можете добавить его в конце:

let s = info_span!("latest_block_header in eth adapter");

Box::new(
    retry("eth_getBlockByNumber(latest) no txs RPC call", logger)
        .no_limit()
        .timeout_secs(...)
        .run(...)
        .map_err(...)
        .boxed()
        .compat()
        .instrument(s),
)

Хм, я бы предположил, что аннотация должна сработать. Интересно, что использование span в других методах автоматически попадает в него. Добавление instrument в конец retry не работает, поскольку конечный тип не является Future, который требуется для Box. Я также попробовал использовать instrument в редактировании, которое опубликовал в ОП. Это тоже не сработало.

Paymahn Moghadasian 26.07.2024 18:22

Мне интересно, возникает ли эта проблема из-за того, что функция возвращает Box<Future>>?

Paymahn Moghadasian 26.07.2024 18:24

О, это Future вроде из Futures v0.1 или что-то в этом роде? Это действительно усложняет ситуацию, но на самом деле это не должно влиять на внутренние вещи - например, async { }.instrument() все равно должен работать, пока он вообще работает.

kmdreko 26.07.2024 18:29

Вот последняя версия функции: gist.github.com/paymog/0ceb78953c37a2aea0f2cee0f2df0c08 Я добавил два вызова instrument с внутренним и внешним диапазоном. Похоже, ни того, ни другого не отправят в егерь. Я также добавил информационный журнал в начале функции и определенно вижу, что этот журнал создается.

Paymahn Moghadasian 26.07.2024 18:35

Прошло много времени с тех пор, как я использовал Jaeger, в списке «Операции» отображаются вложенные диапазоны или это просто корневые диапазоны? Можете ли вы проверить, просмотрев конкретное дерево диапазона? Если это не так, есть ли у вас какая-либо конфигурация, которая может отфильтровывать диапазоны? Определенно звучит так, будто происходит нечто иное, чем то, что я ответил. Можете ли вы попытаться предоставить минимальный воспроизводимый пример, который я мог бы запустить и который демонстрирует такое поведение?

kmdreko 26.07.2024 19:07

Вложенные промежутки отображаются в Jaeger: imgur.com/a/vkm0Cq2. Также возможен поиск отдельных пролетов (что показано на изображении в ОП). Я могу попытаться сделать минимальный воспроизводимый пример.

Paymahn Moghadasian 26.07.2024 19:27

Я не думаю, что эта черта является причиной проблемы. Вы на 100% уверены, что код, который вы используете, включает ваши изменения? Вызывает ли он реализацию, которую, по вашему мнению, следует использовать? А вы присматриваете новые пролеты в Jaeger? Начинаю чувствовать, что мы лаем не на то дерево...

kmdreko 26.07.2024 19:40

Да, совершенно уверен, что этот код, который я копирую, работает, потому что я вижу журналы из четвертого комментария в этой цепочке. Я также запускаю это в IDE и вижу, что оно компилируется при каждом изменении.

Paymahn Moghadasian 26.07.2024 20:45
Ответ принят как подходящий

Оказывается, я неправильно настроил фильтры. Они были

        let filter_layer = EnvFilter::try_new(
            "graph_chain_ethereum::ingestor,reqwest_tracing::reqwest_otel_span_builder",
        )

Мне нужно было добавить graph_chain_ethereum::ethereum_adapter в список и тогда всё заработало безупречно.

Другие вопросы по теме