Skip to content

Commit bf78da4

Browse files
authored
refactor: add more log to semaphore, add benchmark CLI (#17875)
- Upgrade sub crate `watcher` and `display-more`. - Add semaphore benchmark: ``` databend-metabench \ --rpc 'semaphore:{"semaphores":10,"capacity":5,"ttl_ms":3000,"hold_ms":1000}' \ --client 200 \ --number 100 \ --prefix "foo" \ --grpc-api-address 127.0.0.1:$grpc_port ``` - Log the proposing time of a raft-log in human-readable format. - Collect time profiling data of a semaphore and log it when a semaphore Permit is acquired. It includes the time spent on get/insert permit-seq CAS loop, and the time spent on waiting for `Acquired` event to be received. ``` "AcquirerStat{ latencies: { seq(get/insert): 3/2: [100.1ms/300.1ms, 200.1ms/400.1ms, 250.1ms/-], events(my_seq:4): 3 [(A-3, **A-4**)=(0.0ns-7.9ms), (D-5)=(200.0ms-200.0ms)], total-acquired: 700.1ms } }" ``` - Log more detailed add/remove watcher events in meta-service.
1 parent 380d288 commit bf78da4

File tree

25 files changed

+648
-38
lines changed

25 files changed

+648
-38
lines changed

Cargo.lock

+6-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-3
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ url = "2.5.4"
523523
uuid = { version = "1.10.0", features = ["std", "serde", "v4", "v7"] }
524524
volo-thrift = "0.10"
525525
walkdir = "2.3.2"
526-
watcher = { version = "0.1.1" }
526+
watcher = { version = "0.2.0" }
527527
wiremock = "0.6"
528528
wkt = "0.11.1"
529529
xorf = { version = "0.11.0", default-features = false, features = ["binary-fuse"] }
@@ -636,7 +636,7 @@ async-recursion = { git = "https://github.com/datafuse-extras/async-recursion.gi
636636
backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "72265be" }
637637
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
638638
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "c149502" }
639-
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.1.2" }
639+
display-more = { git = "https://github.com/databendlabs/display-more", tag = "v0.1.3" }
640640
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "dcaf261" }
641641
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.2.3" }
642642
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
@@ -647,5 +647,5 @@ sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafus
647647
tantivy = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370" }
648648
tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "7502370", package = "tantivy-common" }
649649
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "0e300e9" }
650-
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.1.1" }
650+
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.2.0" }
651651
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }

src/common/base/src/containers/pool.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ where
133133
if let Ok(itm) = check_res {
134134
return Ok(itm);
135135
} else {
136-
warn!("RaftNetwork check reused item failed: {:?}", key);
136+
warn!("Pool check reused item failed: {:?}", key);
137137
// mark broken conn as deleted
138138
*guard = None;
139139
}

src/meta/binaries/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ databend-common-meta-client = { workspace = true }
3131
databend-common-meta-control = { workspace = true }
3232
databend-common-meta-kvapi = { workspace = true }
3333
databend-common-meta-raft-store = { workspace = true }
34+
databend-common-meta-semaphore = { workspace = true }
3435
databend-common-meta-sled-store = { workspace = true }
3536
databend-common-meta-store = { workspace = true }
3637
databend-common-meta-types = { workspace = true }

src/meta/binaries/metabench/main.rs

+90
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::collections::BTreeMap;
1818
use std::fmt::Debug;
1919
use std::fmt::Display;
2020
use std::sync::Arc;
21+
use std::time::Duration;
2122
use std::time::Instant;
2223

2324
use chrono::Utc;
@@ -42,6 +43,7 @@ use databend_common_meta_client::required;
4243
use databend_common_meta_client::ClientHandle;
4344
use databend_common_meta_client::MetaGrpcClient;
4445
use databend_common_meta_kvapi::kvapi::KVApi;
46+
use databend_common_meta_semaphore::Semaphore;
4547
use databend_common_meta_types::MatchSeq;
4648
use databend_common_meta_types::Operation;
4749
use databend_common_meta_types::TxnRequest;
@@ -52,6 +54,7 @@ use databend_common_tracing::StderrConfig;
5254
use databend_meta::version::METASRV_COMMIT_VERSION;
5355
use serde::Deserialize;
5456
use serde::Serialize;
57+
use tokio::time::sleep;
5558

5659
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Parser)]
5760
#[clap(about, version = & * * METASRV_COMMIT_VERSION, author)]
@@ -152,6 +155,8 @@ async fn main() {
152155
benchmark_get_table(&client, prefix, client_num, i).await;
153156
} else if cmd == "table_copy_file" {
154157
benchmark_table_copy_file(&client, prefix, client_num, i, &param).await;
158+
} else if cmd == "semaphore" {
159+
benchmark_semaphore(&client, prefix, client_num, i, &param).await;
155160
} else {
156161
unreachable!("Invalid config.rpc: {}", rpc);
157162
}
@@ -339,6 +344,91 @@ async fn benchmark_table_copy_file(
339344
res.unwrap();
340345
}
341346

347+
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
348+
struct SemaphoreConfig {
349+
semaphores: u64,
350+
351+
/// The capacity of resource in the semaphore.
352+
capacity: u64,
353+
354+
/// The ttl if lease is not extended.
355+
ttl_ms: Option<u64>,
356+
357+
/// The time a permit is held by the application for simulation
358+
hold_ms: Option<u64>,
359+
}
360+
361+
impl Default for SemaphoreConfig {
362+
fn default() -> Self {
363+
Self {
364+
semaphores: 1,
365+
capacity: 100,
366+
ttl_ms: None,
367+
hold_ms: None,
368+
}
369+
}
370+
}
371+
372+
impl SemaphoreConfig {
373+
pub fn ttl(&self) -> Duration {
374+
Duration::from_millis(self.ttl_ms.unwrap_or(3_000))
375+
}
376+
377+
pub fn hold(&self) -> Duration {
378+
Duration::from_millis(self.hold_ms.unwrap_or(100))
379+
}
380+
}
381+
382+
/// Benchmark semaphore acquire.
383+
///
384+
/// - `key_prefix` is used to distribut the load to separate key spaces.
385+
/// - `client_num` is number of concurrent clients.
386+
/// - `i` is the index of the current client.
387+
/// - `param` is a json string of bench specific config.
388+
async fn benchmark_semaphore(
389+
client: &Arc<ClientHandle>,
390+
key_prefix: u64,
391+
client_num: u64,
392+
i: u64,
393+
param: &str,
394+
) {
395+
let param = if param.is_empty() {
396+
SemaphoreConfig::default()
397+
} else {
398+
serde_json::from_str(param).unwrap()
399+
};
400+
401+
let sem_key = format!("sem-{}-{}", key_prefix, client_num % param.semaphores);
402+
let id = format!("cli-{client_num}-{i}th");
403+
404+
let permit_str = format!("({sem_key}, id={id})");
405+
406+
let permit_res =
407+
Semaphore::new_acquired(client.clone(), &sem_key, param.capacity, &id, param.ttl()).await;
408+
409+
print_sem_res(i, format!("sem-acquired: {permit_str}",), &permit_res);
410+
411+
let permit = match permit_res {
412+
Ok(permit) => permit,
413+
Err(e) => {
414+
println!("ERROR: Failed to acquire semaphore: {permit_str}: {}", e);
415+
return;
416+
}
417+
};
418+
419+
sleep(param.hold()).await;
420+
421+
print_sem_res(
422+
i,
423+
format!("sem-released: {permit_str}, {}", permit.stat()),
424+
&permit,
425+
);
426+
427+
fn print_sem_res<D: Debug>(i: u64, typ: impl Display, res: &D) {
428+
println!("{:>10}-th {} result: {:?}", i, typ, res);
429+
}
430+
}
431+
342432
fn print_res<D: Debug>(i: u64, typ: impl Display, res: &D) {
343433
if i % 100 == 0 {
344434
println!("{:>10}-th {} result: {:?}", i, typ, res);

src/meta/raft-store/src/applier.rs

+10-3
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,11 @@ where SM: StateMachineApi + 'static
9696
let log_id = &entry.log_id;
9797
let log_time_ms = Self::get_log_time(entry);
9898

99-
debug!("apply: entry: {}, log_time_ms: {}", entry, log_time_ms);
99+
debug!(
100+
"apply: entry: {}, log_time_ms: {}",
101+
entry,
102+
Duration::from_millis(log_time_ms).display_unix_timestamp_short()
103+
);
100104

101105
self.cmd_ctx = CmdContext::from_millis(log_time_ms);
102106

@@ -544,7 +548,10 @@ where SM: StateMachineApi + 'static
544548
return Ok(());
545549
}
546550

547-
debug!("to clean expired kvs, log_time_ts: {}", log_time_ms);
551+
debug!(
552+
"to clean expired kvs, log_time_ts: {}",
553+
Duration::from_millis(log_time_ms).display_unix_timestamp_short()
554+
);
548555

549556
let mut to_clean = vec![];
550557
let mut strm = self.sm.list_expire_index(log_time_ms).await?;
@@ -608,7 +615,7 @@ where SM: StateMachineApi + 'static
608615
Some(ms) => {
609616
debug!(
610617
"apply: raft-log time: {}",
611-
Duration::from_millis(ms).display_unix_timestamp()
618+
Duration::from_millis(ms).display_unix_timestamp_short()
612619
);
613620
ms
614621
}

src/meta/raft-store/src/state_machine_api_ext.rs

+9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::future;
1616
use std::io;
1717
use std::ops::RangeBounds;
18+
use std::time::Duration;
1819

1920
use databend_common_meta_types::CmdContext;
2021
use databend_common_meta_types::Expirable;
@@ -23,6 +24,7 @@ use databend_common_meta_types::Operation;
2324
use databend_common_meta_types::SeqV;
2425
use databend_common_meta_types::SeqValue;
2526
use databend_common_meta_types::UpsertKV;
27+
use display_more::DisplayUnixTimeStampExt;
2628
use futures_util::StreamExt;
2729
use futures_util::TryStreamExt;
2830
use log::debug;
@@ -71,6 +73,13 @@ pub trait StateMachineApiExt: StateMachineApi {
7173

7274
let expire_ms = kv_meta.expires_at_ms();
7375
if expire_ms < self.get_expire_cursor().time_ms {
76+
warn!(
77+
"upsert_kv_primary_index: expired key inserted: {} < expire-cursor: {}; key: {}",
78+
Duration::from_millis(expire_ms).display_unix_timestamp_short(),
79+
Duration::from_millis(self.get_expire_cursor().time_ms)
80+
.display_unix_timestamp_short(),
81+
upsert_kv.key
82+
);
7483
// The record has expired, delete it at once.
7584
//
7685
// Note that it must update first then delete,

src/meta/semaphore/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ databend-common-meta-kvapi = { workspace = true }
2121
databend-common-meta-types = { workspace = true }
2222
display-more = { workspace = true }
2323
futures = { workspace = true }
24+
itertools = { workspace = true }
2425
log = { workspace = true }
2526
thiserror = { workspace = true }
2627
tokio = { workspace = true }

src/meta/semaphore/src/acquirer/acquirer.rs

+26-6
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@ use databend_common_meta_types::SeqValue;
2727
use databend_common_meta_types::UpsertKV;
2828
use databend_common_meta_types::With;
2929
use futures::FutureExt;
30-
use log::debug;
3130
use log::info;
3231
use log::warn;
3332
use tokio::sync::mpsc;
3433
use tokio::sync::oneshot;
3534

3635
use crate::acquirer::Permit;
36+
use crate::acquirer::SharedAcquirerStat;
3737
use crate::errors::AcquireError;
3838
use crate::errors::ConnectionClosed;
3939
use crate::errors::EarlyRemoved;
@@ -78,6 +78,9 @@ pub(crate) struct Acquirer {
7878
/// This task subscribes to the watch stream and forwards relevant state changes through this channel.
7979
pub(crate) permit_event_rx: mpsc::Receiver<PermitEvent>,
8080

81+
/// The stat about the process of acquiring the semaphore.
82+
pub(crate) stat: SharedAcquirerStat,
83+
8184
/// The context information of this acquirer instance, used for logging.
8285
pub(crate) ctx: String,
8386
}
@@ -88,6 +91,8 @@ impl Acquirer {
8891
let mut sleep_time = Duration::from_millis(10);
8992
let max_sleep_time = Duration::from_secs(1);
9093

94+
self.stat.start();
95+
9196
let permit_entry = PermitEntry {
9297
id: self.acquirer_id.clone(),
9398
permits: 1,
@@ -101,6 +106,8 @@ impl Acquirer {
101106
// Step 1: Get a new globally unique sequence number.
102107
let sem_seq = self.next_global_unique_seq().await?;
103108

109+
self.stat.on_finish_get_seq();
110+
104111
// Step 2: Create a new semaphore entry with the key format `{prefix}/queue/{seq:020}`.
105112
// We use a transaction to ensure the entry is only inserted if the sequence number
106113
// hasn't changed since we obtained it. This guarantees that semaphore entries are
@@ -129,12 +136,18 @@ impl Acquirer {
129136
.context(&self.ctx)
130137
})?;
131138

139+
self.stat.on_finish_try_insert_seq();
140+
132141
if txn_reply.success {
133-
info!("acquire semaphore: {} -> {}", self.acquirer_id, sem_seq);
142+
info!(
143+
"acquire semaphore: enqueue done: acquirer_id: {}, sem_seq: {}",
144+
self.acquirer_id, sem_seq
145+
);
146+
self.stat.on_insert_seq(sem_key.seq);
134147
break sem_key;
135148
} else {
136149
info!(
137-
"acquire semaphore failed: {} -> {}; sleep {:?} and retry",
150+
"acquire semaphore: enqueue conflict: acquirer: {}, sem_seq: {}; sleep {:?} and retry",
138151
self.acquirer_id, sem_seq, sleep_time
139152
);
140153

@@ -155,10 +168,14 @@ impl Acquirer {
155168
self.ctx, sem_event
156169
);
157170

171+
self.stat.on_receive_event(&sem_event);
172+
158173
match sem_event {
159174
PermitEvent::Acquired((seq, _)) => {
160175
if seq == permit_key.seq {
161-
debug!(
176+
self.stat.on_acquire();
177+
178+
info!(
162179
"{} acquired: {}->{}",
163180
self.ctx, permit_key, self.acquirer_id
164181
);
@@ -167,6 +184,8 @@ impl Acquirer {
167184
}
168185
PermitEvent::Removed((seq, _)) => {
169186
if seq == permit_key.seq {
187+
self.stat.on_remove();
188+
170189
warn!(
171190
"semaphore removed before acquired: {}->{}",
172191
permit_key, self.acquirer_id
@@ -180,15 +199,16 @@ impl Acquirer {
180199
}
181200
}
182201

183-
let guard = Permit::new(
202+
let permit = Permit::new(
184203
self.permit_event_rx,
185204
permit_key,
186205
permit_entry,
206+
self.stat,
187207
self.subscriber_cancel_tx,
188208
leaser_cancel_tx,
189209
);
190210

191-
Ok(guard)
211+
Ok(permit)
192212
}
193213

194214
/// Gets a new globally unique sequence number by updating a key in the meta-service.

src/meta/semaphore/src/acquirer/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#[allow(clippy::module_inception)]
1616
mod acquirer;
1717
mod permit;
18+
mod stat;
1819

1920
pub(crate) use acquirer::Acquirer;
2021
pub use permit::Permit;
22+
pub use stat::SharedAcquirerStat;

0 commit comments

Comments
 (0)