-
Notifications
You must be signed in to change notification settings - Fork 169
Move the SM worker's implementation into SM trait #1355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Change the receiver to `&self` for methods reading the log and adjust callers.
implemented more efficiently with overlapped I/O and similar. This is quick&dirty change only, with no documentation on newly-added structs, but it should be self-evident.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+reviewer:@drmingdrmer
I implemented it pretty straight-forward. This should be basically "good enough" for the SM.
As noted, basis for discussion, not really a finished change. But feel free to take over and do whatever you think is the best :-).
Reviewable status: 0 of 15 files reviewed, all discussions resolved (waiting on @drmingdrmer)
openraft/src/storage/v2/raft_state_machine.rs
line 44 at r1 (raw file):
use crate::StoredMembership; pub struct BuildSnapshotResultSender<C: RaftTypeConfig> {
We have a few public API result senders, which basically wrap the notification channel.
openraft/src/storage/v2/raft_state_machine.rs
line 100 at r1 (raw file):
/// The payload of a state machine command. pub enum RaftStateMachineCommand<C>
Additionally, the state machine command is enhanced by respective result senders in the public API.
openraft/src/storage/v2/raft_state_machine.rs
line 175 at r1 (raw file):
where C: RaftTypeConfig { pub(crate) fn new(cmd: Command<C>, resp_tx: &MpscUnboundedSenderOf<C, Notification<C>>) -> Self {
And mapped here. This is why the notification channel was added to the couple structs above.
openraft/src/storage/v2/raft_state_machine.rs
line 219 at r1 (raw file):
/// Run state machine worker on this state machine. async fn worker<LR: RaftLogReader<C>>(
This is the worker loop, which can be implemented differently. So far, it's 1:1 copy, except that it uses channels pushed over the API to respond to commands.
openraft/src/storage/v2/raft_state_machine.rs
line 279 at r1 (raw file):
self.apply_from_log(first, last, log_reader, &mut client_resp_channels, tx).await?; } RaftStateMachineCommand::Func { func, input_sm_type } => {
This can be likely improved by doing the conversion already in the caller, which knows the SM type.
openraft/src/storage/v2/raft_state_machine.rs
line 343 at r1 (raw file):
#[tracing::instrument(level = "debug", skip_all)] async fn apply_from_log<LR: RaftLogReader<C>>(
And this is the helper which encompasses the original method of ping-pong reading from the log and applying entries (i.e., unoptimized version of apply()
).
Again, it's basically 1:1 copy from the old function, except that the result can be sent asynchronously via the supplied sender.
openraft/src/core/sm/worker.rs
line 104 at r1 (raw file):
#[tracing::instrument(level = "debug", skip_all)] async fn worker_loop(&mut self) -> Result<(), StorageError<C>> { self.state_machine.worker(&mut self.cmd_rx, &self.log_reader).await
The loop is moved with minor adjustments to only use public API to the SM.
openraft/src/storage/v2/raft_log_reader.rs
line 51 at r1 (raw file):
/// that occur after the read operation has commenced. async fn try_get_log_entries<RB: RangeBounds<u64> + Clone + Debug + OptionalSend>( &self,
I've also changed log reader receiver to &self
, since log reading must be actually immutable. And, it's necessary to make it immutable to be able to read multiple ranges in parallel for overlapped processing. This is in a separate commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mhm... Straight-forward tests locally worked, but CI tests much more. Never mind, as the basis for discussion, it's good enough, I think :-).
Reviewable status: 0 of 15 files reviewed, all discussions resolved (waiting on @drmingdrmer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for this draft implementation for the state machine upgrade.
Most parts are clear and comprehensible, but I'm still a bit confused about how to parallelize these three components:
- Reading log entries
- State machine command draining
- Applying log entries
In short, how to pass log-entries to StateMachine::apply()
.
Reviewed 15 of 15 files at r1, all commit messages.
Reviewable status:complete! all files reviewed, all discussions resolved (waiting on @schreter)
openraft/src/storage/v2/raft_log_reader.rs
line 51 at r1 (raw file):
Previously, schreter wrote…
I've also changed log reader receiver to
&self
, since log reading must be actually immutable. And, it's necessary to make it immutable to be able to read multiple ranges in parallel for overlapped processing. This is in a separate commit.
Why does it need to read multiple ranges in parallel? I think the procedure should be:
let entries= log_reader.try_get_log_entrys().await;
sm.apply(entries).await;
The next read should always happen after the previous one finished.
openraft/src/storage/v2/raft_state_machine.rs
line 279 at r1 (raw file):
Previously, schreter wrote…
This can be likely improved by doing the conversion already in the caller, which knows the SM type.
Correct
openraft/src/storage/v2/raft_state_machine.rs
line 343 at r1 (raw file):
Previously, schreter wrote…
And this is the helper which encompasses the original method of ping-pong reading from the log and applying entries (i.e., unoptimized version of
apply()
).Again, it's basically 1:1 copy from the old function, except that the result can be sent asynchronously via the supplied sender.
This function is the key point of the state machine API change, yet it still performs the apply operation in the old manner. What's the intended approach for interacting with self.apply()
?
Is it meant to be used in a fire-and-forget manner? For example, should we spawn a separate task for each call to self.apply()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In short, not at all :-). The custom implementation won't call StateMachine::apply()
but handle it in the worker directly. I.e., parallelize reading one or more blocks of log entries, then as the log entries come in, start applying entries on the state machine in order (not via apply()
, but directly) and as soon as the "next water mark" is reached, send the answer to the RaftCore via the provided channel.
Reviewable status:
complete! all files reviewed, all discussions resolved (waiting on @schreter)
openraft/src/storage/v2/raft_log_reader.rs
line 51 at r1 (raw file):
Previously, drmingdrmer (张炎泼) wrote…
Why does it need to read multiple ranges in parallel? I think the procedure should be:
let entries= log_reader.try_get_log_entrys().await; sm.apply(entries).await;
The next read should always happen after the previous one finished.
Why? If the core already sends the next batch of committed entries, why should reading those entries be delayed until the previous batch is applied? The entries are immutable.
Of course, for all practical purposes, those entries most likely will be still in memory (sent from client or from the leader), except for "restart" case, where the entries are truly read from the log in batches and where we can do overlapped reads (prefetching) while applying to the state machine.
openraft/src/storage/v2/raft_state_machine.rs
line 343 at r1 (raw file):
Previously, drmingdrmer (张炎泼) wrote…
This function is the key point of the state machine API change, yet it still performs the apply operation in the old manner. What's the intended approach for interacting with
self.apply()
?Is it meant to be used in a fire-and-forget manner? For example, should we spawn a separate task for each call to
self.apply()
?
The change is a minimalistic change. I.e., older implementations of state machine still have synchronous apply()
method implemented, but not the two new methods. So the default implementation boils down to the existing ping-pong mechanism where reading the log and applying to state machine are executed in turns.
openraft/src/storage/v2/raft_state_machine.rs
line 409 at r1 (raw file):
} final_resp_tx.send(Ok(last_applied));
Of course, the optimized method won't await synchronously and send the result here, but rather do it in dataflow manner whenever something is applied, immediately send reply to the client channel and send the notification to the RaftCore after all entries in the batch are applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite confused. Can you just show me the code? pseudo codes would be fine. I can understand the concept of a watermark but I have no idea what it should be in source code. And I also need to understand what statement represents a "as soon as" condition. And how does it "handle it in the worker directly" without calling apply()
?
Reviewable status:
complete! all files reviewed, all discussions resolved (waiting on @schreter)
openraft/src/storage/v2/raft_state_machine.rs
line 343 at r1 (raw file):
Previously, schreter wrote…
The change is a minimalistic change. I.e., older implementations of state machine still have synchronous
apply()
method implemented, but not the two new methods. So the default implementation boils down to the existing ping-pong mechanism where reading the log and applying to state machine are executed in turns.
Right. What's the plan to parallel waiting for self.apply()
and draining the state-machine-command channel? This is the key point.
It seems the state-machine-command channel and the self.apply()
are both stream-like instance.
And we have to simultaneously poll these two stream, right?
Or
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! all files reviewed, all discussions resolved (waiting on @schreter)
openraft/src/storage/v2/raft_state_machine.rs
line 343 at r1 (raw file):
It seems the state-machine-command channel and the
self.apply()
are both stream-like instance.
And we have to simultaneously poll these two stream, right?
No, not really (but maybe it could be done in such a way).
The optimized implementation of the SM::worker()
would neither use apply()
nor this method. Instead, upon receiving Apply
command, an overlapped/asynchronous operation would start which loads the log entries in the desired range and as soon as log entries come in, prepares apply on the SM. The actual operation on SM runs also overlapped and completes asynchronously. Whenever the result of the operation is ready, post it to the client reply channel immediately, reducing the latency.
When the next Apply
command comes in, it can immediately start reading the new log range (async operation overlapped with the previous range) and start applying the entries as soon as the previous batch was prepared (to maintain log order of SM operation preparation).
When the asynchronous operation on the SM (e.g., involving I/O) is completed, then we can push SM-internal last_applied
up, provided there are no "holes" and all previous operations already completed. Once the internal last_applied
reaches the range end of an Apply
batch, notify the Raft core about last applied advancement.
When a "special" operation like Snapshot
comes in, then we can simply drain all existing running Apply
commands and then execute the operation.
Basically, what I'm trying to achieve is multiple things:
- overlapped reading of the log
- immediate send of replies to the client instead of delaying them until the entire apply batch completed
The latter can be achieved also with much simpler means - instead of sending only entries to apply()
, send both entries and reply channels to it, so the replies can be sent w/o delay immediately as soon as the operation completes. Additionally, instead of awaiting completion and reporting the notification after the apply()
method finishes, do it similar to log append interface by adding an additional notification channel to notify when the apply is done (of course, there are some issues with snapshot synchronization, but that can be left to the optimized SM implementation).
The former can be also achieved differently by overlapping the I/O of at least two possible batches in the worker or by changing the interface of the log reader to a streaming one, which can internally do prefetching as necessary.
BTW, already the change of the apply()
interface to receive both entries + reply channels + notification channel would already enable us to implement optimized SM - we can simply return empty shells for entries read from the log immediately and actually read them only as part of the (overlapped) apply()
.
Thank you for the detailed description. However, without accompanying (pseudo) code, it's still difficult to fully understand the implementation details. The description presents an abstract overview of the algorithm, which I understand pretty well since the first issue discussing about the new sm-API, but some important specifics are missing. For instance, the frequent use of To make this a concrete implementation, it’s important to clearly define the boundaries of responsibility:
Another critical aspect of the new state machine design is the serialization of non- Additionally, providing pseudo-code examples would help illustrate the workflow and clarify these concepts.
In this context, who does "we" refer to—OpenRaft or the state machine implementation? This distinction is fundamental to the API design. Clarifying these points would greatly help in understanding and implementing the proposed design. |
This allows implementing
SM::apply()
more efficiently with overlapped I/O and similar.This is just quick&dirty change, with no documentation on newly-added structs as a basis for discussion, but it should be self-evident.
This change is