Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 9f1d0fc

Browse files
author
Ganeshwara Hananda
authored
Implement missed token renewal step in Cluster client (#236)
## What is the goal of this PR? We have fixed the bug where some RPC methods in Cluster client does not perform token renewal step. ## What are the changes implemented in this PR? - Implement token renewal step for database, session, and transaction RPC calls
1 parent bae291c commit 9f1d0fc

File tree

4 files changed

+48
-18
lines changed

4 files changed

+48
-18
lines changed

typedb/common/rpc/stub.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ def channel(self) -> Channel:
6969
def stub(self) -> core_service_proto.TypeDBStub:
7070
pass
7171

72-
def resilient_call(self, function: Callable[[], T]) -> T:
72+
@staticmethod
73+
def resilient_call(function: Callable[[], T]) -> T:
7374
try:
7475
# TODO actually implement forced gRPC to reconnected rapidly, which provides resilience
7576
return function()

typedb/connection/cluster/database_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def all(self) -> List[_ClusterDatabase]:
5959
errors = []
6060
for address in self._database_mgrs:
6161
try:
62-
res = self._client._stub(address).databases_all(cluster_database_manager_all_req())
62+
res = self._client._stub(address).cluster_databases_all(cluster_database_manager_all_req())
6363
return [_ClusterDatabase.of(db, self._client) for db in res.databases]
6464
except TypeDBClientException as e:
6565
errors.append("- %s: %s\n" % (address, e))

typedb/connection/cluster/stub.py

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818
# specific language governing permissions and limitations
1919
# under the License.
2020
#
21+
from typing import Iterator
2122
from typing import TypeVar, Callable
2223

2324
import typedb_protocol.cluster.cluster_database_pb2 as cluster_database_proto
2425
import typedb_protocol.cluster.cluster_server_pb2 as cluster_server_proto
2526
import typedb_protocol.cluster.cluster_service_pb2_grpc as cluster_service_proto
2627
import typedb_protocol.cluster.cluster_user_pb2 as cluster_user_proto
28+
import typedb_protocol.common.session_pb2 as session_proto
29+
import typedb_protocol.common.transaction_pb2 as transaction_proto
30+
import typedb_protocol.core.core_database_pb2 as core_database_proto
2731
import typedb_protocol.core.core_service_pb2_grpc as core_service_proto
2832
from grpc import Channel, RpcError
2933

3034
from typedb.api.connection.credential import TypeDBCredential
31-
from typedb.common.rpc.stub import TypeDBStub
3235
from typedb.common.exception import CLUSTER_TOKEN_CREDENTIAL_INVALID, TypeDBClientException, UNABLE_TO_CONNECT
36+
from typedb.common.rpc.stub import TypeDBStub
3337

3438
T = TypeVar('T')
3539

@@ -51,28 +55,55 @@ def __init__(self, channel: Channel, credential: TypeDBCredential):
5155
raise e2
5256

5357
def servers_all(self, req: cluster_server_proto.ServerManager.All.Req) -> cluster_server_proto.ServerManager.All.Res:
54-
return self.resilient_authenticated_call(lambda: self._cluster_stub.servers_all(req))
55-
56-
def databases_get(self, req: cluster_database_proto.ClusterDatabaseManager.Get.Req) -> cluster_database_proto.ClusterDatabaseManager.Get.Res:
57-
return self.resilient_authenticated_call(lambda: self._cluster_stub.databases_get(req))
58-
59-
def databases_all(self, req: cluster_database_proto.ClusterDatabaseManager.All.Req) -> cluster_database_proto.ClusterDatabaseManager.All.Res:
60-
return self.resilient_authenticated_call(lambda: self._cluster_stub.databases_all(req))
58+
return self.may_renew_token(lambda: self._cluster_stub.servers_all(req))
6159

6260
def users_all(self, req: cluster_user_proto.ClusterUserManager.All.Req) -> cluster_user_proto.ClusterUserManager.All.Res:
63-
return self.resilient_authenticated_call(lambda: self._cluster_stub.users_all(req))
61+
return self.may_renew_token(lambda: self._cluster_stub.users_all(req))
6462

6563
def users_contains(self, req: cluster_user_proto.ClusterUserManager.Contains.Req) -> cluster_user_proto.ClusterUserManager.Contains.Res:
66-
return self.resilient_authenticated_call(lambda: self._cluster_stub.users_contains(req))
64+
return self.may_renew_token(lambda: self._cluster_stub.users_contains(req))
6765

6866
def users_create(self, req: cluster_user_proto.ClusterUserManager.Create.Req) -> cluster_user_proto.ClusterUserManager.Create.Res:
69-
return self.resilient_authenticated_call(lambda: self._cluster_stub.users_create(req))
67+
return self.may_renew_token(lambda: self._cluster_stub.users_create(req))
7068

7169
def user_password(self, req: cluster_user_proto.ClusterUser.Delete.Req) -> cluster_user_proto.ClusterUser.Delete.Res:
72-
return self.resilient_authenticated_call(lambda: self._cluster_stub.user_password(req))
70+
return self.may_renew_token(lambda: self._cluster_stub.user_password(req))
7371

7472
def user_delete(self, req: cluster_user_proto.ClusterUser.Delete.Req) -> cluster_user_proto.ClusterUser.Delete.Res:
75-
return self.resilient_authenticated_call(lambda: self._cluster_stub.user_delete(req))
73+
return self.may_renew_token(lambda: self._cluster_stub.user_delete(req))
74+
75+
def cluster_databases_all(self, req: cluster_database_proto.ClusterDatabaseManager.All.Req) -> cluster_database_proto.ClusterDatabaseManager.All.Res:
76+
return self.may_renew_token(lambda: self._cluster_stub.databases_all(req))
77+
78+
def databases_all(self, req: core_database_proto.CoreDatabaseManager.All.Req) -> core_database_proto.CoreDatabaseManager.All.Res:
79+
return self.may_renew_token(self.resilient_call(lambda: self.stub().databases_all(req)))
80+
81+
def databases_get(self, req: cluster_database_proto.ClusterDatabaseManager.Get.Req) -> cluster_database_proto.ClusterDatabaseManager.Get.Res:
82+
return self.may_renew_token(lambda: self._cluster_stub.databases_get(req))
83+
84+
def databases_contains(self, req: core_database_proto.CoreDatabaseManager.Contains.Req) -> core_database_proto.CoreDatabaseManager.Contains.Res:
85+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).databases_contains(req))
86+
87+
def databases_create(self, req: core_database_proto.CoreDatabaseManager.Create.Req) -> core_database_proto.CoreDatabaseManager.Create.Res:
88+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).databases_create(req))
89+
90+
def database_schema(self, req: core_database_proto.CoreDatabase.Schema.Req) -> core_database_proto.CoreDatabase.Schema.Res:
91+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).database_schema(req))
92+
93+
def database_delete(self, req: core_database_proto.CoreDatabase.Delete.Req) -> core_database_proto.CoreDatabase.Delete.Res:
94+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).database_delete(req))
95+
96+
def session_open(self, req: session_proto.Session.Open.Req) -> session_proto.Session.Open.Res:
97+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).session_open(req))
98+
99+
def session_close(self, req: session_proto.Session.Close.Req) -> session_proto.Session.Close.Res:
100+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).session_close(req))
101+
102+
def session_pulse(self, req: session_proto.Session.Pulse.Req) -> session_proto.Session.Pulse.Res:
103+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).session_pulse(req))
104+
105+
def transaction(self, request_iterator: Iterator[transaction_proto.Transaction.Client]) -> Iterator[transaction_proto.Transaction.Server]:
106+
return self.may_renew_token(lambda: super(_ClusterServerStub, self).transaction(request_iterator))
76107

77108
def channel(self) -> Channel:
78109
return self._channel
@@ -83,7 +114,7 @@ def stub(self) -> TypeDBStub:
83114
def token(self):
84115
return self._token
85116

86-
def resilient_authenticated_call(self, function: Callable[[], T]) -> T:
117+
def may_renew_token(self, function: Callable[[], T]) -> T:
87118
try:
88119
return self.resilient_call(function)
89120
except TypeDBClientException as e:

typedb/connection/database_manager.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
from typing import List
2323

24-
from grpc import Channel
25-
2624
from typedb.api.connection.database import DatabaseManager
2725
from typedb.common.exception import TypeDBClientException, DB_DOES_NOT_EXIST, MISSING_DB_NAME
2826
from typedb.common.rpc.request_builder import core_database_manager_contains_req, core_database_manager_create_req, \

0 commit comments

Comments
 (0)