Skip to content

Commit f518d0e

Browse files
committed
use keyspacepb
1 parent bab9704 commit f518d0e

File tree

10 files changed

+98
-74
lines changed

10 files changed

+98
-74
lines changed

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ prometheus = { version = "0.13", default-features = false }
3838
prost = "0.12"
3939
rand = "0.8"
4040
regex = "1"
41-
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
4241
semver = "1.0"
4342
serde = "1.0"
4443
serde_derive = "1.0"
@@ -54,6 +53,7 @@ env_logger = "0.10"
5453
fail = { version = "0.4", features = ["failpoints"] }
5554
proptest = "1"
5655
proptest-derive = "0.3"
56+
reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
5757
rstest = "0.18.2"
5858
serde_json = "1"
5959
serial_test = "0.5.0"

proto-build/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,4 @@ edition = "2021"
1212

1313
[dependencies]
1414
glob = "0.3"
15-
tonic-build = { version = "0.10", features = ["cleanup-markdown"] }
15+
tonic-build = { version = "0.10", features = ["cleanup-markdown"] }

src/common/errors.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,9 @@ pub enum Error {
5151
#[error("gRPC error: {0}")]
5252
Grpc(#[from] tonic::transport::Error),
5353
/// Wraps a `reqwest::Error`.
54-
#[error("http error: {0}")]
55-
Http(#[from] reqwest::Error),
5654
/// Wraps a `grpcio::Error`.
5755
#[error("gRPC api error: {0}")]
5856
GrpcAPI(#[from] tonic::Status),
59-
#[error("Http request failed: unknown respond {0}")]
60-
UnknownHttpRespond(String),
6157
/// Wraps a `grpcio::Error`.
6258
#[error("url error: {0}")]
6359
Url(#[from] tonic::codegen::http::uri::InvalidUri),
@@ -113,6 +109,8 @@ pub enum Error {
113109
inner: Box<Error>,
114110
success_keys: Vec<Vec<u8>>,
115111
},
112+
#[error("Keyspace not found: {0}")]
113+
KeyspaceNotFound(String),
116114
}
117115

118116
impl From<crate::proto::errorpb::Error> for Error {

src/mock.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use derive_new::new;
1414
use crate::pd::PdClient;
1515
use crate::pd::PdRpcClient;
1616
use crate::pd::RetryClient;
17+
use crate::proto::keyspacepb;
1718
use crate::proto::metapb::RegionEpoch;
1819
use crate::proto::metapb::{self};
1920
use crate::region::RegionId;
@@ -215,7 +216,7 @@ impl PdClient for MockPdClient {
215216

216217
async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
217218

218-
async fn get_keyspace_id(&self, _keyspace: &str) -> Result<u32> {
219+
async fn load_keyspace(&self, _keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
219220
unimplemented!()
220221
}
221222
}

src/pd/client.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::kv::codec;
1414
use crate::pd::retry::RetryClientTrait;
1515
use crate::pd::Cluster;
1616
use crate::pd::RetryClient;
17+
use crate::proto::keyspacepb;
1718
use crate::proto::kvrpcpb;
1819
use crate::proto::metapb;
1920
use crate::region::RegionId;
@@ -65,7 +66,7 @@ pub trait PdClient: Send + Sync + 'static {
6566

6667
async fn update_safepoint(self: Arc<Self>, safepoint: u64) -> Result<bool>;
6768

68-
async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32>;
69+
async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta>;
6970

7071
/// In transactional API, `key` is in raw format
7172
async fn store_for_key(self: Arc<Self>, key: &Key) -> Result<RegionStore> {
@@ -270,8 +271,8 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
270271
self.region_cache.invalidate_region_cache(ver_id).await
271272
}
272273

273-
async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
274-
self.pd.get_keyspace_id(keyspace).await
274+
async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
275+
self.pd.load_keyspace(keyspace).await
275276
}
276277
}
277278

src/pd/cluster.rs

+72-54
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use tonic::Request;
1515

1616
use super::timestamp::TimestampOracle;
1717
use crate::internal_err;
18+
use crate::proto::keyspacepb;
1819
use crate::proto::pdpb;
1920
use crate::Error;
2021
use crate::Result;
@@ -25,7 +26,7 @@ use crate::Timestamp;
2526
pub struct Cluster {
2627
id: u64,
2728
client: pdpb::pd_client::PdClient<Channel>,
28-
endpoint: String,
29+
keyspace_client: keyspacepb::keyspace_client::KeyspaceClient<Channel>,
2930
members: pdpb::GetMembersResponse,
3031
tso: TimestampOracle,
3132
}
@@ -94,16 +95,18 @@ impl Cluster {
9495
req.send(&mut self.client, timeout).await
9596
}
9697

97-
pub async fn get_keyspace_id(&self, keyspace: &str) -> Result<u32> {
98-
let resp =
99-
reqwest::get(format!("{}/pd/api/v2/keyspaces/{keyspace}", self.endpoint)).await?;
100-
let body = resp.json::<serde_json::Value>().await?;
101-
let keyspace_id = body
102-
.get("id")
103-
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?
104-
.as_u64()
105-
.ok_or_else(|| Error::UnknownHttpRespond(body.to_string()))?;
106-
Ok(keyspace_id as u32)
98+
pub async fn load_keyspace(
99+
&mut self,
100+
keyspace: &str,
101+
timeout: Duration,
102+
) -> Result<keyspacepb::KeyspaceMeta> {
103+
let mut req = pd_request!(self.id, keyspacepb::LoadKeyspaceRequest);
104+
req.name = keyspace.to_owned();
105+
let resp = req.send(&mut self.keyspace_client, timeout).await?;
106+
let keyspace = resp
107+
.keyspace
108+
.ok_or_else(|| Error::KeyspaceNotFound(keyspace.to_owned()))?;
109+
Ok(keyspace)
107110
}
108111
}
109112

@@ -123,13 +126,13 @@ impl Connection {
123126
timeout: Duration,
124127
) -> Result<Cluster> {
125128
let members = self.validate_endpoints(endpoints, timeout).await?;
126-
let (client, endpoint, members) = self.try_connect_leader(&members, timeout).await?;
129+
let (client, keyspace_client, members) = self.try_connect_leader(&members, timeout).await?;
127130
let id = members.header.as_ref().unwrap().cluster_id;
128131
let tso = TimestampOracle::new(id, &client)?;
129132
let cluster = Cluster {
130133
id,
131134
client,
132-
endpoint,
135+
keyspace_client,
133136
members,
134137
tso,
135138
};
@@ -140,13 +143,13 @@ impl Connection {
140143
pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> {
141144
warn!("updating pd client");
142145
let start = Instant::now();
143-
let (client, endpoint, members) =
146+
let (client, keyspace_client, members) =
144147
self.try_connect_leader(&cluster.members, timeout).await?;
145148
let tso = TimestampOracle::new(cluster.id, &client)?;
146149
*cluster = Cluster {
147150
id: cluster.id,
148151
client,
149-
endpoint,
152+
keyspace_client,
150153
members,
151154
tso,
152155
};
@@ -169,7 +172,7 @@ impl Connection {
169172
return Err(internal_err!("duplicated PD endpoint {}", ep));
170173
}
171174

172-
let (_, resp) = match self.connect(ep, timeout).await {
175+
let (_, _, resp) = match self.connect(ep, timeout).await {
173176
Ok(resp) => resp,
174177
// Ignore failed PD node.
175178
Err(e) => {
@@ -211,27 +214,42 @@ impl Connection {
211214
&self,
212215
addr: &str,
213216
_timeout: Duration,
214-
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
217+
) -> Result<(
218+
pdpb::pd_client::PdClient<Channel>,
219+
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
220+
pdpb::GetMembersResponse,
221+
)> {
215222
let mut client = self
216223
.security_mgr
217224
.connect(addr, pdpb::pd_client::PdClient::<Channel>::new)
218225
.await?;
226+
let keyspace_client = self
227+
.security_mgr
228+
.connect(
229+
addr,
230+
keyspacepb::keyspace_client::KeyspaceClient::<Channel>::new,
231+
)
232+
.await?;
219233
let resp: pdpb::GetMembersResponse = client
220234
.get_members(pdpb::GetMembersRequest::default())
221235
.await?
222236
.into_inner();
223-
Ok((client, resp))
237+
Ok((client, keyspace_client, resp))
224238
}
225239

226240
async fn try_connect(
227241
&self,
228242
addr: &str,
229243
cluster_id: u64,
230244
timeout: Duration,
231-
) -> Result<(pdpb::pd_client::PdClient<Channel>, pdpb::GetMembersResponse)> {
232-
let (client, r) = self.connect(addr, timeout).await?;
245+
) -> Result<(
246+
pdpb::pd_client::PdClient<Channel>,
247+
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
248+
pdpb::GetMembersResponse,
249+
)> {
250+
let (client, keyspace_client, r) = self.connect(addr, timeout).await?;
233251
Connection::validate_cluster_id(addr, &r, cluster_id)?;
234-
Ok((client, r))
252+
Ok((client, keyspace_client, r))
235253
}
236254

237255
fn validate_cluster_id(
@@ -258,7 +276,7 @@ impl Connection {
258276
timeout: Duration,
259277
) -> Result<(
260278
pdpb::pd_client::PdClient<Channel>,
261-
String,
279+
keyspacepb::keyspace_client::KeyspaceClient<Channel>,
262280
pdpb::GetMembersResponse,
263281
)> {
264282
let previous_leader = previous.leader.as_ref().unwrap();
@@ -274,7 +292,7 @@ impl Connection {
274292
{
275293
for ep in &m.client_urls {
276294
match self.try_connect(ep.as_str(), cluster_id, timeout).await {
277-
Ok((_, r)) => {
295+
Ok((_, _, r)) => {
278296
resp = Some(r);
279297
break 'outer;
280298
}
@@ -290,10 +308,10 @@ impl Connection {
290308
if let Some(resp) = resp {
291309
let leader = resp.leader.as_ref().unwrap();
292310
for ep in &leader.client_urls {
293-
if let Ok((client, members)) =
311+
if let Ok((client, keyspace_client, members)) =
294312
self.try_connect(ep.as_str(), cluster_id, timeout).await
295313
{
296-
return Ok((client, ep.to_string(), members));
314+
return Ok((client, keyspace_client, members));
297315
}
298316
}
299317
}
@@ -306,18 +324,12 @@ type GrpcResult<T> = std::result::Result<T, tonic::Status>;
306324

307325
#[async_trait]
308326
trait PdMessage: Sized {
327+
type Client: Send;
309328
type Response: PdResponse;
310329

311-
async fn rpc(
312-
req: Request<Self>,
313-
client: &mut pdpb::pd_client::PdClient<Channel>,
314-
) -> GrpcResult<Self::Response>;
330+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response>;
315331

316-
async fn send(
317-
self,
318-
client: &mut pdpb::pd_client::PdClient<Channel>,
319-
timeout: Duration,
320-
) -> Result<Self::Response> {
332+
async fn send(self, client: &mut Self::Client, timeout: Duration) -> Result<Self::Response> {
321333
let mut req = self.into_request();
322334
req.set_timeout(timeout);
323335
let response = Self::rpc(req, client).await?;
@@ -332,64 +344,64 @@ trait PdMessage: Sized {
332344

333345
#[async_trait]
334346
impl PdMessage for pdpb::GetRegionRequest {
347+
type Client = pdpb::pd_client::PdClient<Channel>;
335348
type Response = pdpb::GetRegionResponse;
336349

337-
async fn rpc(
338-
req: Request<Self>,
339-
client: &mut pdpb::pd_client::PdClient<Channel>,
340-
) -> GrpcResult<Self::Response> {
350+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
341351
Ok(client.get_region(req).await?.into_inner())
342352
}
343353
}
344354

345355
#[async_trait]
346356
impl PdMessage for pdpb::GetRegionByIdRequest {
357+
type Client = pdpb::pd_client::PdClient<Channel>;
347358
type Response = pdpb::GetRegionResponse;
348359

349-
async fn rpc(
350-
req: Request<Self>,
351-
client: &mut pdpb::pd_client::PdClient<Channel>,
352-
) -> GrpcResult<Self::Response> {
360+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
353361
Ok(client.get_region_by_id(req).await?.into_inner())
354362
}
355363
}
356364

357365
#[async_trait]
358366
impl PdMessage for pdpb::GetStoreRequest {
367+
type Client = pdpb::pd_client::PdClient<Channel>;
359368
type Response = pdpb::GetStoreResponse;
360369

361-
async fn rpc(
362-
req: Request<Self>,
363-
client: &mut pdpb::pd_client::PdClient<Channel>,
364-
) -> GrpcResult<Self::Response> {
370+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
365371
Ok(client.get_store(req).await?.into_inner())
366372
}
367373
}
368374

369375
#[async_trait]
370376
impl PdMessage for pdpb::GetAllStoresRequest {
377+
type Client = pdpb::pd_client::PdClient<Channel>;
371378
type Response = pdpb::GetAllStoresResponse;
372379

373-
async fn rpc(
374-
req: Request<Self>,
375-
client: &mut pdpb::pd_client::PdClient<Channel>,
376-
) -> GrpcResult<Self::Response> {
380+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
377381
Ok(client.get_all_stores(req).await?.into_inner())
378382
}
379383
}
380384

381385
#[async_trait]
382386
impl PdMessage for pdpb::UpdateGcSafePointRequest {
387+
type Client = pdpb::pd_client::PdClient<Channel>;
383388
type Response = pdpb::UpdateGcSafePointResponse;
384389

385-
async fn rpc(
386-
req: Request<Self>,
387-
client: &mut pdpb::pd_client::PdClient<Channel>,
388-
) -> GrpcResult<Self::Response> {
390+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
389391
Ok(client.update_gc_safe_point(req).await?.into_inner())
390392
}
391393
}
392394

395+
#[async_trait]
396+
impl PdMessage for keyspacepb::LoadKeyspaceRequest {
397+
type Client = keyspacepb::keyspace_client::KeyspaceClient<Channel>;
398+
type Response = keyspacepb::LoadKeyspaceResponse;
399+
400+
async fn rpc(req: Request<Self>, client: &mut Self::Client) -> GrpcResult<Self::Response> {
401+
Ok(client.load_keyspace(req).await?.into_inner())
402+
}
403+
}
404+
393405
trait PdResponse {
394406
fn header(&self) -> &pdpb::ResponseHeader;
395407
}
@@ -417,3 +429,9 @@ impl PdResponse for pdpb::UpdateGcSafePointResponse {
417429
self.header.as_ref().unwrap()
418430
}
419431
}
432+
433+
impl PdResponse for keyspacepb::LoadKeyspaceResponse {
434+
fn header(&self) -> &pdpb::ResponseHeader {
435+
self.header.as_ref().unwrap()
436+
}
437+
}

0 commit comments

Comments
 (0)