From bf796ed67b13ea45e2de677dcd717e33f80d8784 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 7 May 2025 11:46:09 -0400 Subject: [PATCH] fix: metrics previously did not wait for confirmation --- src/tasks/metrics.rs | 96 ++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs index f406b30..c940717 100644 --- a/src/tasks/metrics.rs +++ b/src/tasks/metrics.rs @@ -1,10 +1,13 @@ use crate::config::HostProvider; -use alloy::{primitives::TxHash, providers::Provider as _}; +use alloy::{ + primitives::TxHash, + providers::{PendingTransactionBuilder, PendingTransactionError, Provider as _, WatchTxError}, +}; use init4_bin_base::deps::{ metrics::{counter, histogram}, - tracing::{debug, error}, + tracing::{Instrument, debug, error, info_span}, }; -use std::time::Instant; +use std::time::{Duration, Instant}; use tokio::{sync::mpsc, task::JoinHandle}; /// Collects metrics on transactions sent by the Builder @@ -15,42 +18,52 @@ pub struct MetricsTask { } impl MetricsTask { - /// Given a transaction hash, record metrics on the result of the transaction mining - pub async fn log_tx(&self, pending_tx_hash: TxHash) { - // start timer when tx hash is received - let start: Instant = Instant::now(); + /// Given a transaction hash, record metrics on the result of the + /// transaction mining + pub fn log_tx(&self, tx_hash: TxHash) -> impl Future + use<> { + let provider = self.host_provider.clone(); - // wait for the tx to mine, get its receipt - let receipt_result = - self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await; + async move { + // start timer when tx hash is received + let start: Instant = Instant::now(); - match receipt_result { - Ok(maybe_receipt) => { - match maybe_receipt { - Some(receipt) => { - // record how long it took to mine the transaction - // potential improvement: use the block timestamp to calculate the time elapsed - histogram!("metrics.tx_mine_time") - .record(start.elapsed().as_millis() as f64); + let span = info_span!("metrics_submission", %tx_hash); - // log whether the transaction reverted - if receipt.status() { - counter!("metrics.tx_reverted").increment(1); - debug!(tx_hash = %pending_tx_hash, "tx reverted"); - } else { - counter!("metrics.tx_succeeded").increment(1); - debug!(tx_hash = %pending_tx_hash, "tx succeeded"); - } - } - None => { - counter!("metrics.no_receipt").increment(1); - error!("no receipt found for tx hash"); + // wait for the tx to mine, get its receipt + let receipt = PendingTransactionBuilder::new(provider.root().clone(), tx_hash) + .with_required_confirmations(1) + .with_timeout(Some(Duration::from_secs(60))) + .get_receipt() + .instrument(span.clone()) + .await; + + // enter the span to log the result + let _guard = span.entered(); + + match receipt { + Ok(receipt) => { + // record how long it took to mine the transaction + // potential improvement: use the block timestamp to calculate the time elapsed + histogram!("metrics.tx_mine_time").record(start.elapsed().as_millis() as f64); + + // log whether the transaction reverted + if receipt.status() { + counter!("metrics.tx_reverted").increment(1); + debug!("tx reverted"); + } else { + counter!("metrics.tx_succeeded").increment(1); + debug!("tx succeeded"); } } - } - Err(e) => { - counter!("metrics.rpc_error").increment(1); - error!(error = ?e, "rpc error"); + Err(PendingTransactionError::TxWatcher(WatchTxError::Timeout)) => { + // log that the transaction timed out + counter!("metrics.tx_not_mined").increment(1); + debug!("tx not mined"); + } + Err(e) => { + counter!("metrics.rpc_error").increment(1); + error!(error = ?e, "rpc error"); + } } } } @@ -58,21 +71,16 @@ impl MetricsTask { /// Spawns the task which collects metrics on pending transactions pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { let (sender, mut inbound) = mpsc::unbounded_channel(); + let handle = tokio::spawn(async move { debug!("metrics task spawned"); loop { - if let Some(pending_tx_hash) = inbound.recv().await { - let this = self.clone(); - tokio::spawn(async move { - debug!("received tx hash"); - let that = this.clone(); - that.log_tx(pending_tx_hash).await; - debug!("logged tx metrics"); - }); - } else { + let Some(tx_hash) = inbound.recv().await else { debug!("upstream task gone"); break; - } + }; + let fut = self.log_tx(tx_hash); + tokio::spawn(fut); } });