Skip to content

refactor: misc cleaning on the submit task #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 9, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 95 additions & 73 deletions src/tasks/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use alloy::{
sol_types::{SolCall, SolError},
transports::TransportError,
};
use eyre::{bail, eyre};
use eyre::{Context, bail, eyre};
use init4_bin_base::deps::{
metrics::{counter, histogram},
tracing::{self, debug, error, info, instrument, trace, warn},
tracing::{self, Instrument, debug, debug_span, error, info, instrument, trace, warn},
};
use oauth2::TokenResponse;
use signet_sim::BuiltBlock;
Expand Down Expand Up @@ -178,7 +178,10 @@ impl SubmitTask {
"error in transaction submission"
);

if e.as_revert_data() == Some(IncorrectHostBlock::SELECTOR.into()) {
if e.as_revert_data()
.map(|data| data.starts_with(&IncorrectHostBlock::SELECTOR))
.unwrap_or_default()
{
return Ok(ControlFlow::Retry);
}

Expand Down Expand Up @@ -234,15 +237,33 @@ impl SubmitTask {
Ok(ControlFlow::Done)
}

#[instrument(skip_all, err)]
/// Sign with a local signer if available, otherwise ask quincey
/// for a signature (politely).
#[instrument(skip_all, fields(is_local = self.sequencer_signer.is_some()))]
async fn get_signature(&self, req: SignRequest) -> eyre::Result<SignResponse> {
let sig = if let Some(signer) = &self.sequencer_signer {
signer.sign_hash(&req.signing_hash()).await?
} else {
self.sup_quincey(&req)
.await
.wrap_err("failed to get signature from quincey")
.inspect(|_| {
counter!("builder.quincey_signature_acquired").increment(1);
})?
.sig
};

debug!(sig = hex::encode(sig.as_bytes()), "acquired signature");
Ok(SignResponse { req, sig })
}

#[instrument(skip_all)]
async fn handle_inbound(&self, block: &BuiltBlock) -> eyre::Result<ControlFlow> {
info!(txns = block.tx_count(), "handling inbound block");
let sig_request = match self.construct_sig_request(block).await {
Ok(sig_request) => sig_request,
Err(e) => {
error!(error = %e, "error constructing signature request");
return Ok(ControlFlow::Skip);
}
let Ok(sig_request) = self.construct_sig_request(block).await.inspect_err(|e| {
error!(error = %e, "error constructing signature request");
}) else {
return Ok(ControlFlow::Skip);
};

debug!(
Expand All @@ -251,76 +272,77 @@ impl SubmitTask {
"constructed signature request for host block"
);

// If configured with a local signer, we use it. Otherwise, we ask
// quincey (politely)
let signed = if let Some(signer) = &self.sequencer_signer {
let sig = signer.sign_hash(&sig_request.signing_hash()).await?;
debug!(sig = hex::encode(sig.as_bytes()), "acquired signature from local signer");
SignResponse { req: sig_request, sig }
} else {
let resp: SignResponse = match self.sup_quincey(&sig_request).await {
Ok(resp) => resp,
Err(e) => {
error!(error = %e, "error acquiring signature from quincey");
return Ok(ControlFlow::Retry);
}
};
debug!(sig = hex::encode(resp.sig.as_bytes()), "acquired signature from quincey");
counter!("builder.quincey_signature_acquired").increment(1);
resp
};
let signed = self.get_signature(sig_request).await?;

self.submit_transaction(&signed, block).await
}

/// Spawns the in progress block building task
pub fn spawn(self) -> (mpsc::UnboundedSender<BuiltBlock>, JoinHandle<()>) {
let (sender, mut inbound) = mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
loop {
if let Some(in_progress) = inbound.recv().await {
let building_start_time = Instant::now();
let mut retries = 0;
loop {
match self.handle_inbound(&in_progress).await {
Ok(ControlFlow::Retry) => {
retries += 1;
if retries > 3 {
counter!("builder.building_too_many_retries").increment(1);
histogram!("builder.block_build_time")
.record(building_start_time.elapsed().as_millis() as f64);
error!("error handling inbound block: too many retries");
break;
}
error!("error handling inbound block: retrying");
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
}
Ok(ControlFlow::Skip) => {
histogram!("builder.block_build_time")
.record(building_start_time.elapsed().as_millis() as f64);
counter!("builder.skipped_blocks").increment(1);
info!("skipping block");
break;
}
Ok(ControlFlow::Done) => {
histogram!("builder.block_build_time")
.record(building_start_time.elapsed().as_millis() as f64);
counter!("builder.submitted_successful_blocks").increment(1);
info!("block landed successfully");
break;
}
Err(e) => {
error!(error = %e, "error handling inbound block");
break;
}
}
async fn retrying_handle_inbound(
&self,
block: &BuiltBlock,
retry_limit: usize,
) -> eyre::Result<ControlFlow> {
let mut retries = 0;
let building_start_time = Instant::now();

let result = loop {
let span = debug_span!("SubmitTask::retrying_handle_inbound", retries);

let result =
self.handle_inbound(block).instrument(span.clone()).await.inspect_err(|e| {
error!(error = %e, "error handling inbound block");
})?;

let guard = span.entered();

match result {
ControlFlow::Retry => {
retries += 1;
if retries > retry_limit {
counter!("builder.building_too_many_retries").increment(1);
return Ok(ControlFlow::Skip);
}
} else {
debug!("upstream task gone");
break;
error!("error handling inbound block: retrying");
drop(guard);
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we maybe wait a sec less? 2 is quite a bit from a slot time perspective

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im not making a behavior change, just cleaning


continue;
}
ControlFlow::Skip => {
counter!("builder.skipped_blocks").increment(1);
break result;
}
ControlFlow::Done => {
counter!("builder.submitted_successful_blocks").increment(1);
break result;
}
}
});
};

// This is reached when `Done` or `Skip` is returned
histogram!("builder.block_build_time")
.record(building_start_time.elapsed().as_millis() as f64);
info!(?result, "finished block building");
Ok(result)
}

async fn task_future(self, mut inbound: mpsc::UnboundedReceiver<BuiltBlock>) {
loop {
let Some(block) = inbound.recv().await else {
debug!("upstream task gone");
break;
};

if self.retrying_handle_inbound(&block, 3).await.is_err() {
break;
}
}
}

/// Spawns the in progress block building task
pub fn spawn(self) -> (mpsc::UnboundedSender<BuiltBlock>, JoinHandle<()>) {
let (sender, inbound) = mpsc::unbounded_channel();
let handle = tokio::spawn(self.task_future(inbound));

(sender, handle)
}
Expand Down
Loading