Skip to content

feat: Write protobufs asynchronously [MR-355] #4792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 63 additions & 48 deletions rs/state_layout/src/state_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,18 +720,18 @@ impl StateLayout {
/// the scratchpad is properly marked as unverified before transitioning it into a checkpoint.
pub fn promote_scratchpad_to_unverified_checkpoint<T>(
&self,
scratchpad_layout: CheckpointLayout<RwPolicy<'_, T>>,
scratchpad_layout: CheckpointLayout<RwPolicy<T>>,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
) -> Result<CheckpointLayout<RwPolicy<T>>, LayoutError> {
scratchpad_layout.create_unverified_checkpoint_marker()?;
self.scratchpad_to_checkpoint(scratchpad_layout, height)
}

fn scratchpad_to_checkpoint<T>(
&self,
layout: CheckpointLayout<RwPolicy<'_, T>>,
layout: CheckpointLayout<RwPolicy<T>>,
height: Height,
) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
) -> Result<CheckpointLayout<RwPolicy<T>>, LayoutError> {
// The scratchpad must have an unverified marker before it is promoted to a checkpoint.
debug_assert!(!layout.is_checkpoint_verified());
debug_assert_eq!(height, layout.height());
Expand All @@ -755,7 +755,7 @@ impl StateLayout {
message: "Could not sync checkpoints".to_string(),
io_err: err,
})?;
self.checkpoint_in_verification(height)
self.checkpoint(height)
}

pub fn clone_checkpoint(&self, from: Height, to: Height) -> Result<(), LayoutError> {
Expand All @@ -778,7 +778,10 @@ impl StateLayout {

/// Returns the layout of the checkpoint with the given height.
/// If the checkpoint is not found, an error is returned.
fn checkpoint(&self, height: Height) -> Result<CheckpointLayout<ReadOnly>, LayoutError> {
fn checkpoint<T>(&self, height: Height) -> Result<CheckpointLayout<T>, LayoutError>
where
T: AccessPolicy,
{
let cp_name = Self::checkpoint_name(height);
let path = self.checkpoints().join(cp_name);
if !path.exists() {
Expand Down Expand Up @@ -1576,29 +1579,39 @@ fn parse_and_sort_checkpoint_heights(names: &[String]) -> Result<Vec<Height>, La
Ok(heights)
}

struct CheckpointLayoutImpl<Permissions: AccessPolicy> {
struct CheckpointLayoutImpl {
root: PathBuf,
height: Height,
// The StateLayout is used to make sure we never remove the CheckpointLayout when still in use.
// Is not None for CheckpointLayout pointing to "real" checkpoints, that is checkpoints in
// StateLayout's root/checkpoints/..., that are tracked by StateLayout
state_layout: Option<StateLayout>,
permissions_tag: PhantomData<Permissions>,
}

impl<Permissions: AccessPolicy> Drop for CheckpointLayoutImpl<Permissions> {
impl Drop for CheckpointLayoutImpl {
fn drop(&mut self) {
if let Some(state_layout) = &self.state_layout {
state_layout.remove_checkpoint_ref(self.height)
}
}
}

pub struct CheckpointLayout<Permissions: AccessPolicy>(Arc<CheckpointLayoutImpl<Permissions>>);
pub struct CheckpointLayout<Permissions: AccessPolicy>(
Arc<CheckpointLayoutImpl>,
PhantomData<Permissions>,
);

// TODO(MR-676) prevent cloning when Permissions is intentinally non-cloneable
impl<Permissions: AccessPolicy> Clone for CheckpointLayout<Permissions> {
fn clone(&self) -> Self {
Self(self.0.clone())
CheckpointLayout(self.0.clone(), PhantomData)
}
}

impl<Permissions: ReadPolicy> CheckpointLayout<Permissions> {
/// Clone CheckpointLayout removing all access but ReadOnly.
pub fn as_readonly(&self) -> CheckpointLayout<ReadOnly> {
CheckpointLayout(Arc::clone(&self.0), PhantomData)
}
}

Expand All @@ -1620,22 +1633,26 @@ impl<Permissions: AccessPolicy> CheckpointLayout<Permissions> {
state_layout: StateLayout,
) -> Result<Self, LayoutError> {
Permissions::check_dir(&root)?;
Ok(Self(Arc::new(CheckpointLayoutImpl::<Permissions> {
root,
height,
state_layout: Some(state_layout),
permissions_tag: PhantomData,
})))
Ok(Self(
Arc::new(CheckpointLayoutImpl {
root,
height,
state_layout: Some(state_layout),
}),
PhantomData,
))
}

pub fn new_untracked(root: PathBuf, height: Height) -> Result<Self, LayoutError> {
Permissions::check_dir(&root)?;
Ok(Self(Arc::new(CheckpointLayoutImpl::<Permissions> {
root,
height,
state_layout: None,
permissions_tag: PhantomData,
})))
Ok(Self(
Arc::new(CheckpointLayoutImpl {
root,
height,
state_layout: None,
}),
PhantomData,
))
}

pub fn system_metadata(&self) -> ProtoFileWith<pb_metadata::SystemMetadata, Permissions> {
Expand Down Expand Up @@ -1784,32 +1801,7 @@ impl<Permissions: AccessPolicy> CheckpointLayout<Permissions> {
}
Ok(())
}
}

impl<P> CheckpointLayout<P>
where
P: WritePolicy,
{
/// Creates the unverified checkpoint marker.
/// If the marker already exists, this function does nothing and returns `Ok(())`.
///
/// Only the checkpoint layout with write policy can create the unverified checkpoint marker,
/// e.g. state sync scratchpad and tip.
pub fn create_unverified_checkpoint_marker(&self) -> Result<(), LayoutError> {
let marker = self.unverified_checkpoint_marker();
if marker.exists() {
return Ok(());
}
open_for_write(&marker)?;
sync_path(&self.0.root).map_err(|err| LayoutError::IoError {
path: self.0.root.clone(),
message: "Failed to sync checkpoint directory for the creation of the unverified checkpoint marker".to_string(),
io_err: err,
})
}
}

impl CheckpointLayout<ReadOnly> {
/// Removes the unverified checkpoint marker.
/// If the marker does not exist, this function does nothing and returns `Ok(())`.
///
Expand Down Expand Up @@ -1856,6 +1848,29 @@ impl CheckpointLayout<ReadOnly> {
}
}

impl<P> CheckpointLayout<P>
where
P: WritePolicy,
{
/// Creates the unverified checkpoint marker.
/// If the marker already exists, this function does nothing and returns `Ok(())`.
///
/// Only the checkpoint layout with write policy can create the unverified checkpoint marker,
/// e.g. state sync scratchpad and tip.
pub fn create_unverified_checkpoint_marker(&self) -> Result<(), LayoutError> {
let marker = self.unverified_checkpoint_marker();
if marker.exists() {
return Ok(());
}
open_for_write(&marker)?;
sync_path(&self.0.root).map_err(|err| LayoutError::IoError {
path: self.0.root.clone(),
message: "Failed to sync checkpoint directory for the creation of the unverified checkpoint marker".to_string(),
io_err: err,
})
}
}

pub struct PageMapLayout<Permissions: AccessPolicy> {
root: PathBuf,
name_stem: String,
Expand Down
2 changes: 1 addition & 1 deletion rs/state_layout/src/state_layout/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ fn checkpoints_files_are_removed_after_flushing_removal_channel() {
.promote_scratchpad_to_unverified_checkpoint(scratchpad_layout, h)
.unwrap();
cp.finalize_and_remove_unverified_marker(None).unwrap();
cp
cp.as_readonly()
};

let mut checkpoints = vec![];
Expand Down
33 changes: 10 additions & 23 deletions rs/state_manager/src/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::{
CheckpointError, CheckpointMetrics, HasDowngrade, PageMapToFlush, TipRequest,
CheckpointError, CheckpointMetrics, PageMapToFlush, TipRequest,
CRITICAL_ERROR_CHECKPOINT_SOFT_INVARIANT_BROKEN,
CRITICAL_ERROR_REPLICATED_STATE_ALTERED_AFTER_CHECKPOINT, NUMBER_OF_CHECKPOINT_THREADS,
};
Expand All @@ -52,37 +52,25 @@ impl CheckpointLoadingMetrics for CheckpointMetrics {
/// layout. Returns a layout of the new state that is equivalent to the
/// given one and a result of the operation.
pub(crate) fn make_unvalidated_checkpoint(
state: &mut ReplicatedState,
mut state: ReplicatedState,
height: Height,
tip_channel: &Sender<TipRequest>,
metrics: &CheckpointMetrics,
fd_factory: Arc<dyn PageAllocatorFileDescriptor>,
) -> Result<(CheckpointLayout<ReadOnly>, HasDowngrade), CheckpointError> {
) -> Result<(Arc<ReplicatedState>, CheckpointLayout<ReadOnly>), Box<dyn std::error::Error + Send>> {
{
let _timer = metrics
.make_checkpoint_step_duration
.with_label_values(&["flush_page_map_deltas"])
.start_timer();
flush_canister_snapshots_and_page_maps(state, height, tip_channel);
flush_canister_snapshots_and_page_maps(&mut state, height, tip_channel);
}
{
let _timer = metrics
.make_checkpoint_step_duration
.with_label_values(&["strip_page_map_deltas"])
.start_timer();
strip_page_map_deltas(state, fd_factory);
}
{
let _timer = metrics
.make_checkpoint_step_duration
.with_label_values(&["serialize_to_tip_cloning"])
.start_timer();
tip_channel
.send(TipRequest::SerializeToTip {
height,
replicated_state: Box::new(state.clone()),
})
.unwrap();
strip_page_map_deltas(&mut state, Arc::clone(&fd_factory));
}

tip_channel
Expand All @@ -92,7 +80,7 @@ pub(crate) fn make_unvalidated_checkpoint(
})
.unwrap();

let (cp, has_downgrade) = {
{
let _timer = metrics
.make_checkpoint_step_duration
.with_label_values(&["tip_to_checkpoint"])
Expand All @@ -102,14 +90,13 @@ pub(crate) fn make_unvalidated_checkpoint(
tip_channel
.send(TipRequest::TipToCheckpoint {
height,
state,
fd_factory,
sender: send,
})
.unwrap();
let (cp, has_downgrade) = recv.recv().unwrap()?;
(cp, has_downgrade)
};

Ok((cp, has_downgrade))
Ok(recv.recv().unwrap()?)
}
}

pub(crate) fn validate_and_finalize_checkpoint_and_remove_unverified_marker(
Expand Down
12 changes: 8 additions & 4 deletions rs/state_manager/src/checkpoint/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;
use crate::{spawn_tip_thread, StateManagerMetrics, NUMBER_OF_CHECKPOINT_THREADS};
use crate::{
flush_tip_channel, spawn_tip_thread, StateManagerMetrics, NUMBER_OF_CHECKPOINT_THREADS,
};
use ic_base_types::NumSeconds;
use ic_config::state_manager::lsmt_config_default;
use ic_logger::ReplicaLogger;
Expand Down Expand Up @@ -69,8 +71,8 @@ fn make_checkpoint_and_get_state_impl(
tip_channel: &Sender<TipRequest>,
log: &ReplicaLogger,
) -> ReplicatedState {
let (cp_layout, _has_downgrade) = make_unvalidated_checkpoint(
state,
let (switched_state, cp_layout) = make_unvalidated_checkpoint(
state.clone(),
height,
tip_channel,
&state_manager_metrics(log).checkpoint_metrics,
Expand All @@ -82,6 +84,8 @@ fn make_checkpoint_and_get_state_impl(
err
)
});
*state = (*switched_state).clone();
flush_tip_channel(&tip_channel);
load_checkpoint_and_validate_parallel(
&cp_layout,
state.metadata.own_subnet_type,
Expand Down Expand Up @@ -198,7 +202,7 @@ fn scratchpad_dir_is_deleted_if_checkpointing_failed() {
let expected_scratchpad_dir = root.join("tmp").join("scratchpad_000000000000002a");

let replicated_state = make_unvalidated_checkpoint(
&mut state,
state,
HEIGHT,
&tip_channel,
&state_manager_metrics.checkpoint_metrics,
Expand Down
Loading
Loading