Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 6282d1e

Browse files
Query iterator only throws exeption once (#252)
## What is the goal of this PR? Revert previous changes from #247 and #248, which made query queues and iterators throw the same error idempotently. However, this goes counter to standard usage of iterators and queues, which are not meant to behave idempotently (each item is only returned once, and if they have an error they should no longer be used). ## What are the changes implemented in this PR? * remove idempotent error state of collectors and queues, which back query iterators * note that we still store the error on the transaction bidirectional stream, in case the server throws an exception when there are no query iterators active Note: mirrors change from typedb/typedb-driver#372
1 parent fca5194 commit 6282d1e

File tree

3 files changed

+12
-18
lines changed

3 files changed

+12
-18
lines changed

typedb/stream/bidirectional_stream.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
# under the License.
2020
#
2121
from queue import Empty, Queue
22-
from typing import TypeVar, Iterator, Union, Generic, List
22+
from typing import TypeVar, Iterator, Union
2323
from uuid import uuid4, UUID
2424

2525
import typedb_protocol.common.transaction_pb2 as transaction_proto
@@ -133,8 +133,7 @@ def __init__(self, request_id: UUID, stream: "BidirectionalStream"):
133133
self._stream = stream
134134

135135
def get(self) -> T:
136-
value = self._stream.fetch(self._request_id)
137-
return value
136+
return self._stream.fetch(self._request_id)
138137

139138

140139
class RequestIterator(Iterator[Union[transaction_proto.Transaction.Req, StopIteration]]):

typedb/stream/response_collector.py

+9-12
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
from typing import Generic, TypeVar, Dict, Optional
2525
from uuid import UUID
2626

27-
from typedb.common.exception import TypeDBClientException, TRANSACTION_CLOSED, ILLEGAL_STATE, \
28-
TRANSACTION_CLOSED_WITH_ERRORS
27+
from grpc import RpcError
28+
from typedb.common.exception import TypeDBClientException, TRANSACTION_CLOSED, ILLEGAL_STATE
2929

3030
R = TypeVar('R')
3131

@@ -54,26 +54,23 @@ class Queue(Generic[R]):
5454

5555
def __init__(self):
5656
self._response_queue: queue.Queue[Response] = queue.Queue()
57-
self._error: TypeDBClientException = None
5857

5958
def get(self, block: bool) -> R:
6059
response = self._response_queue.get(block=block)
6160
if response.is_value():
6261
return response.value
63-
elif response.is_done():
64-
self._raise_transaction_closed_error()
62+
elif response.is_done() and response.error is None:
63+
raise TypeDBClientException.of(TRANSACTION_CLOSED)
64+
elif response.is_done() and response.error is not None:
65+
raise TypeDBClientException.of_rpc(response.error)
6566
else:
6667
raise TypeDBClientException.of(ILLEGAL_STATE)
6768

68-
def _raise_transaction_closed_error(self):
69-
raise TypeDBClientException.of(TRANSACTION_CLOSED_WITH_ERRORS, self._error) if self._error else TypeDBClientException.of(TRANSACTION_CLOSED)
70-
7169
def put(self, response: R):
7270
self._response_queue.put(ValueResponse(response))
7371

7472
def close(self, error: Optional[TypeDBClientException]):
75-
self._error = error
76-
self._response_queue.put(DoneResponse())
73+
self._response_queue.put(DoneResponse(error))
7774

7875

7976
class Response:
@@ -96,8 +93,8 @@ def is_value(self):
9693

9794
class DoneResponse(Response):
9895

99-
def __init__(self):
100-
pass
96+
def __init__(self, error: Optional[RpcError]):
97+
self.error = error
10198

10299
def is_done(self):
103100
return True

typedb/stream/response_part_iterator.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,7 @@ def _has_next(self) -> bool:
7474
raise TypeDBClientException.of(ILLEGAL_STATE)
7575

7676
def __next__(self) -> transaction_proto.Transaction.ResPart:
77-
if self._bidirectional_stream.get_error() is not None:
78-
raise self._bidirectional_stream.get_error()
79-
elif not self._has_next():
77+
if not self._has_next():
8078
raise StopIteration
8179
else:
8280
self._state = ResponsePartIterator.State.EMPTY

0 commit comments

Comments
 (0)