diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 34c665b..4b2d911 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -15,10 +15,10 @@ use alloy::{ sol_types::{SolCall, SolError}, transports::TransportError, }; -use eyre::{bail, eyre}; +use eyre::{Context, bail, eyre}; use init4_bin_base::deps::{ metrics::{counter, histogram}, - tracing::{self, debug, error, info, instrument, trace, warn}, + tracing::{self, Instrument, debug, debug_span, error, info, instrument, trace, warn}, }; use oauth2::TokenResponse; use signet_sim::BuiltBlock; @@ -178,7 +178,10 @@ impl SubmitTask { "error in transaction submission" ); - if e.as_revert_data() == Some(IncorrectHostBlock::SELECTOR.into()) { + if e.as_revert_data() + .map(|data| data.starts_with(&IncorrectHostBlock::SELECTOR)) + .unwrap_or_default() + { return Ok(ControlFlow::Retry); } @@ -234,15 +237,33 @@ impl SubmitTask { Ok(ControlFlow::Done) } - #[instrument(skip_all, err)] + /// Sign with a local signer if available, otherwise ask quincey + /// for a signature (politely). + #[instrument(skip_all, fields(is_local = self.sequencer_signer.is_some()))] + async fn get_signature(&self, req: SignRequest) -> eyre::Result { + let sig = if let Some(signer) = &self.sequencer_signer { + signer.sign_hash(&req.signing_hash()).await? + } else { + self.sup_quincey(&req) + .await + .wrap_err("failed to get signature from quincey") + .inspect(|_| { + counter!("builder.quincey_signature_acquired").increment(1); + })? + .sig + }; + + debug!(sig = hex::encode(sig.as_bytes()), "acquired signature"); + Ok(SignResponse { req, sig }) + } + + #[instrument(skip_all)] async fn handle_inbound(&self, block: &BuiltBlock) -> eyre::Result { info!(txns = block.tx_count(), "handling inbound block"); - let sig_request = match self.construct_sig_request(block).await { - Ok(sig_request) => sig_request, - Err(e) => { - error!(error = %e, "error constructing signature request"); - return Ok(ControlFlow::Skip); - } + let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| { + error!(error = %e, "error constructing signature request"); + }) else { + return Ok(ControlFlow::Skip); }; debug!( @@ -251,76 +272,77 @@ impl SubmitTask { "constructed signature request for host block" ); - // If configured with a local signer, we use it. Otherwise, we ask - // quincey (politely) - let signed = if let Some(signer) = &self.sequencer_signer { - let sig = signer.sign_hash(&sig_request.signing_hash()).await?; - debug!(sig = hex::encode(sig.as_bytes()), "acquired signature from local signer"); - SignResponse { req: sig_request, sig } - } else { - let resp: SignResponse = match self.sup_quincey(&sig_request).await { - Ok(resp) => resp, - Err(e) => { - error!(error = %e, "error acquiring signature from quincey"); - return Ok(ControlFlow::Retry); - } - }; - debug!(sig = hex::encode(resp.sig.as_bytes()), "acquired signature from quincey"); - counter!("builder.quincey_signature_acquired").increment(1); - resp - }; + let signed = self.get_signature(sig_request).await?; self.submit_transaction(&signed, block).await } - /// Spawns the in progress block building task - pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { - let (sender, mut inbound) = mpsc::unbounded_channel(); - let handle = tokio::spawn(async move { - loop { - if let Some(in_progress) = inbound.recv().await { - let building_start_time = Instant::now(); - let mut retries = 0; - loop { - match self.handle_inbound(&in_progress).await { - Ok(ControlFlow::Retry) => { - retries += 1; - if retries > 3 { - counter!("builder.building_too_many_retries").increment(1); - histogram!("builder.block_build_time") - .record(building_start_time.elapsed().as_millis() as f64); - error!("error handling inbound block: too many retries"); - break; - } - error!("error handling inbound block: retrying"); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - } - Ok(ControlFlow::Skip) => { - histogram!("builder.block_build_time") - .record(building_start_time.elapsed().as_millis() as f64); - counter!("builder.skipped_blocks").increment(1); - info!("skipping block"); - break; - } - Ok(ControlFlow::Done) => { - histogram!("builder.block_build_time") - .record(building_start_time.elapsed().as_millis() as f64); - counter!("builder.submitted_successful_blocks").increment(1); - info!("block landed successfully"); - break; - } - Err(e) => { - error!(error = %e, "error handling inbound block"); - break; - } - } + async fn retrying_handle_inbound( + &self, + block: &BuiltBlock, + retry_limit: usize, + ) -> eyre::Result { + let mut retries = 0; + let building_start_time = Instant::now(); + + let result = loop { + let span = debug_span!("SubmitTask::retrying_handle_inbound", retries); + + let result = + self.handle_inbound(block).instrument(span.clone()).await.inspect_err(|e| { + error!(error = %e, "error handling inbound block"); + })?; + + let guard = span.entered(); + + match result { + ControlFlow::Retry => { + retries += 1; + if retries > retry_limit { + counter!("builder.building_too_many_retries").increment(1); + return Ok(ControlFlow::Skip); } - } else { - debug!("upstream task gone"); - break; + error!("error handling inbound block: retrying"); + drop(guard); + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + + continue; + } + ControlFlow::Skip => { + counter!("builder.skipped_blocks").increment(1); + break result; + } + ControlFlow::Done => { + counter!("builder.submitted_successful_blocks").increment(1); + break result; } } - }); + }; + + // This is reached when `Done` or `Skip` is returned + histogram!("builder.block_build_time") + .record(building_start_time.elapsed().as_millis() as f64); + info!(?result, "finished block building"); + Ok(result) + } + + async fn task_future(self, mut inbound: mpsc::UnboundedReceiver) { + loop { + let Some(block) = inbound.recv().await else { + debug!("upstream task gone"); + break; + }; + + if self.retrying_handle_inbound(&block, 3).await.is_err() { + break; + } + } + } + + /// Spawns the in progress block building task + pub fn spawn(self) -> (mpsc::UnboundedSender, JoinHandle<()>) { + let (sender, inbound) = mpsc::unbounded_channel(); + let handle = tokio::spawn(self.task_future(inbound)); (sender, handle) }