Skip to content

Commit e94ed50

Browse files
martin-neotechPeter Wilhelmsson
and
Peter Wilhelmsson
authored
Split session tx result prototype (#390)
* Non-working at all... * Babysteps on result before weekend.. * Result prototype * Multiple results in tx * Pair programming * Removed commented code * integration test is runable * Bookmarks prototype * Small fixes * Wrong func name * Syntax fixes * Syntax error * correct integration tests for multiple results * explicit transaction case1 is runable * message fetching logic needs to be fixed * Better conn handling * No fetch in tx _begin * nested results are working now for explicit transactions. * Fixed some tests * transaction.commit will properly consume all the records * tests/integration/test_autocommit.py is green * Fixed bookmarks api * fixed a case of fetching records where there are no records to fetch. * integration test_result is green * Bookmark tests green * data is only used on record objects * Removed 2s sleep for stub tests * fixed an integration test test_errors_on_run_transaction * Rollback on failed connection fix * added is_reset state to bolt4 * stabilized an integration test * added check for Query object for transaction.run * Handle broken connections * fixed case with Bolt 3 tx.run tx.run case * cleanup of code * logging cleanup * comment removed cleanup * removal of old comment * fixed raise hell error * fixed case with session.run session.run * fixed session.run session.run * fixed Transaction class api * fixed the api for Result object * fixed api for Session Object * removed _detach replaced calls with consume and _buffer_all for correct behaviour Co-authored-by: Peter Wilhelmsson <peter.wilhelmsson@neo4j.com>
1 parent 72cb50f commit e94ed50

26 files changed

+895
-768
lines changed

neo4j/__init__.py

+9-3
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,19 @@
9393
Record,
9494
)
9595
from neo4j.work.simple import (
96-
Transaction,
97-
Result,
98-
ResultSummary,
9996
Query,
10097
Session,
10198
unit_of_work,
10299
)
100+
from neo4j.work.transaction import (
101+
Transaction,
102+
)
103+
from neo4j.work.result import (
104+
Result,
105+
)
106+
from neo4j.work.summary import (
107+
ResultSummary,
108+
)
103109

104110

105111
log = getLogger("neo4j")

neo4j/io/_bolt3.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8787
self.responses = deque()
8888
self._max_connection_lifetime = max_connection_lifetime
8989
self._creation_timestamp = perf_counter()
90-
self.state = None
90+
self.supports_multiple_results = False
91+
self._is_reset = True
9192

9293
# Determine the user agent
9394
if user_agent:
@@ -172,6 +173,7 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
172173
self._append(b"\x10", fields, CommitResponse(self, **handlers))
173174
else:
174175
self._append(b"\x10", fields, Response(self, **handlers))
176+
self._is_reset = False
175177

176178
def discard(self, n=-1, qid=-1, **handlers):
177179
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
@@ -182,6 +184,7 @@ def pull(self, n=-1, qid=-1, **handlers):
182184
# Just ignore n and qid, it is not supported in the Bolt 3 Protocol.
183185
log.debug("[#%04X] C: PULL_ALL", self.local_port)
184186
self._append(b"\x3F", (), Response(self, **handlers))
187+
self._is_reset = False
185188

186189
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None, **handlers):
187190
if db is not None:
@@ -206,6 +209,7 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None, db=None,
206209
raise TypeError("Timeout must be specified as a number of seconds")
207210
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
208211
self._append(b"\x11", (extra,), Response(self, **handlers))
212+
self._is_reset = False
209213

210214
def commit(self, **handlers):
211215
log.debug("[#%04X] C: COMMIT", self.local_port)
@@ -239,6 +243,7 @@ def fail(metadata):
239243
self._append(b"\x0F", response=Response(self, on_failure=fail))
240244
self.send_all()
241245
self.fetch_all()
246+
self._is_reset = True
242247

243248
def _send_all(self):
244249
data = self.outbox.view()
@@ -300,7 +305,7 @@ def fetch_message(self):
300305
raise
301306

302307
if details:
303-
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details))
308+
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # Do not log any data
304309
self.responses[0].on_records(details)
305310

306311
if summary_signature is None:
@@ -529,4 +534,4 @@ def on_failure(self, metadata):
529534

530535
class CommitResponse(Response):
531536

532-
pass
537+
pass

neo4j/io/_bolt4x0.py

+13-17
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ def __init__(self, unresolved_address, sock, max_connection_lifetime, *, auth=No
8686
self.responses = deque()
8787
self._max_connection_lifetime = max_connection_lifetime # self.pool_config.max_connection_lifetime
8888
self._creation_timestamp = perf_counter()
89-
self.state = None
89+
self.supports_multiple_results = True
90+
self._is_reset = True
9091

9192
# Determine the user agent
9293
if user_agent:
@@ -171,6 +172,7 @@ def run(self, query, parameters=None, mode=None, bookmarks=None, metadata=None,
171172
self._append(b"\x10", fields, CommitResponse(self, **handlers))
172173
else:
173174
self._append(b"\x10", fields, Response(self, **handlers))
175+
self._is_reset = False
174176

175177
def discard(self, n=-1, qid=-1, **handlers):
176178
extra = {"n": n}
@@ -185,6 +187,7 @@ def pull(self, n=-1, qid=-1, **handlers):
185187
extra["qid"] = qid
186188
log.debug("[#%04X] C: PULL %r", self.local_port, extra)
187189
self._append(b"\x3F", (extra,), Response(self, **handlers))
190+
self._is_reset = False
188191

189192
def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
190193
db=None, **handlers):
@@ -210,6 +213,7 @@ def begin(self, mode=None, bookmarks=None, metadata=None, timeout=None,
210213
raise TypeError("Timeout must be specified as a number of seconds")
211214
log.debug("[#%04X] C: BEGIN %r", self.local_port, extra)
212215
self._append(b"\x11", (extra,), Response(self, **handlers))
216+
self._is_reset = False
213217

214218
def commit(self, **handlers):
215219
log.debug("[#%04X] C: COMMIT", self.local_port)
@@ -243,6 +247,7 @@ def fail(metadata):
243247
self._append(b"\x0F", response=Response(self, on_failure=fail))
244248
self.send_all()
245249
self.fetch_all()
250+
self._is_reset = True
246251

247252
def _send_all(self):
248253
data = self.outbox.view()
@@ -304,7 +309,7 @@ def fetch_message(self):
304309
raise
305310

306311
if details:
307-
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details))
312+
log.debug("[#%04X] S: RECORD * %d", self.local_port, len(details)) # Do not log any data
308313
self.responses[0].on_records(details)
309314

310315
if summary_signature is None:
@@ -483,27 +488,18 @@ def __init__(self, connection, **handlers):
483488
def on_records(self, records):
484489
""" Called when one or more RECORD messages have been received.
485490
"""
486-
self.connection.state = "streaming"
487491
handler = self.handlers.get("on_records")
488492
if callable(handler):
489493
handler(records)
490494

491495
def on_success(self, metadata):
492496
""" Called when a SUCCESS message has been received.
493497
"""
494-
if metadata.get("has_more"):
495-
if self.connection.state == "streaming_discard_all":
496-
handler = self.handlers.get("on_success_has_more_streaming_discard_all")
497-
self.connection.state = None
498-
if callable(handler):
499-
handler(self.connection, **self.handlers)
500-
else:
501-
self.connection.state = "streaming_has_more"
502-
else:
503-
self.connection.state = None
504-
handler = self.handlers.get("on_success")
505-
if callable(handler):
506-
handler(metadata)
498+
handler = self.handlers.get("on_success")
499+
if callable(handler):
500+
handler(metadata)
501+
502+
if not metadata.get("has_more"):
507503
handler = self.handlers.get("on_summary")
508504
if callable(handler):
509505
handler()
@@ -544,4 +540,4 @@ def on_failure(self, metadata):
544540

545541
class CommitResponse(Response):
546542

547-
pass
543+
pass

0 commit comments

Comments
 (0)