Skip to content

Commit 99e366e

Browse files
committed
--wip-- [skip ci]
1 parent 4db9895 commit 99e366e

33 files changed

+918
-305
lines changed

Cargo.toml

+4-3
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,12 @@ prometheus = { version = "0.13", default-features = false }
3838
prost = "0.12"
3939
rand = "0.8"
4040
regex = "1"
41+
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
4142
semver = "1.0"
4243
serde = "1.0"
4344
serde_derive = "1.0"
45+
serde_json = "1"
46+
take_mut = "0.2.2"
4447
thiserror = "1"
4548
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }
4649
tonic = { version = "0.10", features = ["tls"] }
@@ -51,9 +54,7 @@ env_logger = "0.10"
5154
fail = { version = "0.4", features = ["failpoints"] }
5255
proptest = "1"
5356
proptest-derive = "0.3"
54-
reqwest = { version = "0.11", default-features = false, features = [
55-
"native-tls-vendored",
56-
] }
57+
rstest = "0.18.2"
5758
serde_json = "1"
5859
serial_test = "0.5.0"
5960
simple_logger = "1"

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ doc:
3333
cargo doc --workspace --exclude tikv-client-proto --document-private-items --no-deps
3434

3535
tiup:
36-
tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config $(shell pwd)/config/tikv.toml --pd.config $(shell pwd)/config/pd.toml &
36+
tiup playground nightly --mode tikv-slim --kv 1 --without-monitor --kv.config ./config/tikv.toml --pd.config ./config/pd.toml --kv.binpath ../tikv/target/debug/tikv-server &
3737

3838
all: generate check doc test
3939

config/tikv.toml

+4
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,7 @@ max-open-files = 10000
1515

1616
[raftdb]
1717
max-open-files = 10000
18+
19+
[storage]
20+
api-version = 2
21+
enable-ttl = true

examples/pessimistic.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ async fn main() {
2424
Config::default().with_security(ca, cert, key)
2525
} else {
2626
Config::default()
27-
};
27+
}
28+
.with_default_keyspace();
2829

2930
// init
3031
let client = Client::new_with_config(args.pd, config)

examples/raw.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ async fn main() -> Result<()> {
3131
Config::default().with_security(ca, cert, key)
3232
} else {
3333
Config::default()
34-
};
34+
}
35+
.with_default_keyspace();
3536

3637
// When we first create a client we receive a `Connect` structure which must be resolved before
3738
// the client is actually connected and usable.
@@ -136,6 +137,7 @@ async fn main() -> Result<()> {
136137
);
137138
println!("Scanning batch scan from {batch_scan_keys:?} gives: {vals:?}");
138139

139-
// Cleanly exit.
140+
client.delete_range("".to_owned().."".to_owned()).await?;
141+
140142
Ok(())
141143
}

examples/transaction.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ async fn main() {
8787
Config::default().with_security(ca, cert, key)
8888
} else {
8989
Config::default()
90-
};
90+
}
91+
.with_default_keyspace();
9192

9293
let txn = Client::new_with_config(args.pd, config)
9394
.await

src/common/errors.rs

+5
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,14 @@ pub enum Error {
5050
/// Wraps a `grpcio::Error`.
5151
#[error("gRPC error: {0}")]
5252
Grpc(#[from] tonic::transport::Error),
53+
/// Wraps a `reqwest::Error`.
54+
#[error("http error: {0}")]
55+
Http(#[from] reqwest::Error),
5356
/// Wraps a `grpcio::Error`.
5457
#[error("gRPC api error: {0}")]
5558
GrpcAPI(#[from] tonic::Status),
59+
#[error("Http request failed: unknown respond {0}")]
60+
UnknownHttpRespond(String),
5661
/// Wraps a `grpcio::Error`.
5762
#[error("url error: {0}")]
5863
Url(#[from] tonic::codegen::http::uri::InvalidUri),

src/config.rs

+19
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub struct Config {
1919
pub cert_path: Option<PathBuf>,
2020
pub key_path: Option<PathBuf>,
2121
pub timeout: Duration,
22+
pub keyspace: Option<String>,
2223
}
2324

2425
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(2);
@@ -30,6 +31,7 @@ impl Default for Config {
3031
cert_path: None,
3132
key_path: None,
3233
timeout: DEFAULT_REQUEST_TIMEOUT,
34+
keyspace: None,
3335
}
3436
}
3537
}
@@ -83,4 +85,21 @@ impl Config {
8385
self.timeout = timeout;
8486
self
8587
}
88+
89+
/// Set to use default keyspace.
90+
///
91+
/// Server should enable `api-version = 2` to use this feature.
92+
#[must_use]
93+
pub fn with_default_keyspace(self) -> Self {
94+
self.with_keyspace("DEFAULT")
95+
}
96+
97+
/// Set the use keyspace for the client.
98+
///
99+
/// Server should enable `api-version = 2` to use this feature.
100+
#[must_use]
101+
pub fn with_keyspace(mut self, keyspace: &str) -> Self {
102+
self.keyspace = Some(keyspace.to_owned());
103+
self
104+
}
86105
}

src/kv/bound_range.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,11 @@ impl BoundRange {
136136
pub fn into_keys(self) -> (Key, Option<Key>) {
137137
let start = match self.from {
138138
Bound::Included(v) => v,
139-
Bound::Excluded(mut v) => {
140-
v.push_zero();
141-
v
142-
}
139+
Bound::Excluded(v) => v.next_key(),
143140
Bound::Unbounded => Key::EMPTY,
144141
};
145142
let end = match self.to {
146-
Bound::Included(mut v) => {
147-
v.push_zero();
148-
Some(v)
149-
}
143+
Bound::Included(v) => Some(v.next_key()),
150144
Bound::Excluded(v) => Some(v),
151145
Bound::Unbounded => None,
152146
};

src/kv/key.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ pub struct Key(
7171
test,
7272
proptest(strategy = "any_with::<Vec<u8>>((size_range(_PROPTEST_KEY_MAX), ()))")
7373
)]
74-
pub(super) Vec<u8>,
74+
pub(crate) Vec<u8>,
7575
);
7676

7777
impl AsRef<Key> for kvrpcpb::Mutation {
@@ -98,10 +98,11 @@ impl Key {
9898

9999
/// Push a zero to the end of the key.
100100
///
101-
/// Extending a zero makes the new key the smallest key that is greater than than the original one, i.e. the succeeder.
101+
/// Extending a zero makes the new key the smallest key that is greater than than the original one.
102102
#[inline]
103-
pub(super) fn push_zero(&mut self) {
104-
self.0.push(0)
103+
pub(crate) fn next_key(mut self) -> Self {
104+
self.0.push(0);
105+
self
105106
}
106107

107108
/// Convert the key to a lower bound. The key is treated as inclusive.

src/mock.rs

+4
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,8 @@ impl PdClient for MockPdClient {
214214
}
215215

216216
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
217+
218+
async fn get_keyspace_id(&self, _keyspace: &str) -> Result<u32> {
219+
unimplemented!()
220+
}
217221
}

src/pd/client.rs

+6
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ pub trait PdClient: Send + Sync + 'static {
6565

6666
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;
6767

68+
async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32>;
69+
6870
/// In transactional API, `key` is in raw format
6971
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<RegionStore> {
7072
let region = self.region_for_key(key).await?;
@@ -267,6 +269,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
267269
async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
268270
self.region_cache.invalidate_region_cache(ver_id).await
269271
}
272+
273+
async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
274+
self.pd.get_keyspace_id(keyspace).await
275+
}
270276
}
271277

272278
impl PdRpcClient<TikvConnect, Cluster> {

src/pd/cluster.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use tonic::Request;
1616
use super::timestamp::TimestampOracle;
1717
use crate::internal_err;
1818
use crate::proto::pdpb;
19+
use crate::Error;
1920
use crate::Result;
2021
use crate::SecurityManager;
2122
use crate::Timestamp;
@@ -24,6 +25,7 @@ use crate::Timestamp;
2425
pub struct Cluster {
2526
id: u64,
2627
client: pdpb::pd_client::PdClient<Channel>,
28+
endpoint: String,
2729
members: pdpb::GetMembersResponse,
2830
tso: TimestampOracle,
2931
}
@@ -91,6 +93,18 @@ impl Cluster {
9193
req.safe_point = safepoint;
9294
req.send(&mut self.client, timeout).await
9395
}
96+
97+
pub async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
98+
let resp =
99+
reqwest::get(format!("{}/pd/api/v2/keyspaces/{keyspace}", self.endpoint)).await?;
100+
let body = resp.json::<serde_json::Value>().await?;
101+
let keyspace_id = body
102+
.get("id")
103+
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?
104+
.as_u64()
105+
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?;
106+
Ok(keyspace_id as u32)
107+
}
94108
}
95109

96110
/// An object for connecting and reconnecting to a PD cluster.
@@ -109,12 +123,13 @@ impl Connection {
109123
timeout: Duration,
110124
) -> Result<Cluster> {
111125
let members = self.validate_endpoints(endpoints, timeout).await?;
112-
let (client, members) = self.try_connect_leader(&members, timeout).await?;
126+
let (client, endpoint, members) = self.try_connect_leader(&members, timeout).await?;
113127
let id = members.header.as_ref().unwrap().cluster_id;
114128
let tso = TimestampOracle::new(id, &client)?;
115129
let cluster = Cluster {
116130
id,
117131
client,
132+
endpoint,
118133
members,
119134
tso,
120135
};
@@ -125,11 +140,13 @@ impl Connection {
125140
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
126141
warn!("updating pd client");
127142
let start = Instant::now();
128-
let (client, members) = self.try_connect_leader(&cluster.members, timeout).await?;
143+
let (client, endpoint, members) =
144+
self.try_connect_leader(&cluster.members, timeout).await?;
129145
let tso = TimestampOracle::new(cluster.id, &client)?;
130146
*cluster = Cluster {
131147
id: cluster.id,
132148
client,
149+
endpoint,
133150
members,
134151
tso,
135152
};
@@ -239,7 +256,11 @@ impl Connection {
239256
&self,
240257
previous: &pdpb::GetMembersResponse,
241258
timeout: Duration,
242-
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
259+
) -> Result<(
260+
pdpb::pd_client::PdClient<Channel>,
261+
String,
262+
pdpb::GetMembersResponse,
263+
)> {
243264
let previous_leader = previous.leader.as_ref().unwrap();
244265
let members = &previous.members;
245266
let cluster_id = previous.header.as_ref().unwrap().cluster_id;
@@ -269,9 +290,10 @@ impl Connection {
269290
if let Some(resp) = resp {
270291
let leader = resp.leader.as_ref().unwrap();
271292
for ep in &leader.client_urls {
272-
let r = self.try_connect(ep.as_str(), cluster_id, timeout).await;
273-
if r.is_ok() {
274-
return r;
293+
if let Ok((client, members)) =
294+
self.try_connect(ep.as_str(), cluster_id, timeout).await
295+
{
296+
return Ok((client, ep.to_string(), members));
275297
}
276298
}
277299
}

src/pd/retry.rs

+8
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub trait RetryClientTrait {
4545
async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp>;
4646

4747
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;
48+
49+
async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32>;
4850
}
4951
/// Client for communication with a PD cluster. Has the facility to reconnect to the cluster.
5052
pub struct RetryClient<Cl = Cluster> {
@@ -197,6 +199,12 @@ impl RetryClientTrait for RetryClient<Cluster> {
197199
.map(|resp| resp.new_safe_point == safepoint)
198200
})
199201
}
202+
203+
async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
204+
retry_mut!(self, "get_keyspace_id", |cluster| async {
205+
cluster.get_keyspace_id(keyspace).await
206+
})
207+
}
200208
}
201209

202210
impl fmt::Debug for RetryClient {

0 commit comments

Comments
 (0)