Skip to content

Commit 06021b5

Browse files
Update to ojdbc 23.5
1 parent 17adf49 commit 06021b5

8 files changed

+172
-29
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565

6666
<properties>
6767
<java.version>11</java.version>
68-
<ojdbc.version>23.4.0.24.05</ojdbc.version>
68+
<ojdbc.version>23.5.0.24.07</ojdbc.version>
6969
<r2dbc.version>1.0.0.RELEASE</r2dbc.version>
7070
<reactor.version>3.5.11</reactor.version>
7171
<reactive-streams.version>1.0.3</reactive-streams.version>

src/main/java/oracle/r2dbc/impl/OracleBatchImpl.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
*/
5757
final class OracleBatchImpl implements Batch {
5858

59+
/** The OracleConnectionImpl that created this Batch */
60+
private final OracleConnectionImpl r2dbcConnection;
61+
5962
/** Adapts Oracle JDBC Driver APIs into Reactive Streams APIs */
6063
private final ReactiveJdbcAdapter adapter;
6164

@@ -83,12 +86,11 @@ final class OracleBatchImpl implements Batch {
8386
* @param jdbcConnection JDBC connection to an Oracle Database. Not null.
8487
* @param adapter Adapts JDBC calls into reactive streams. Not null.
8588
*/
86-
OracleBatchImpl(
87-
Duration timeout, Connection jdbcConnection, ReactiveJdbcAdapter adapter) {
89+
OracleBatchImpl(Duration timeout, OracleConnectionImpl r2dbcConnection) {
8890
this.timeout = timeout;
89-
this.jdbcConnection =
90-
requireNonNull(jdbcConnection, "jdbcConnection is null");
91-
this.adapter = requireNonNull(adapter, "adapter is null");
91+
this.r2dbcConnection = r2dbcConnection;
92+
this.jdbcConnection = r2dbcConnection.jdbcConnection();
93+
this.adapter = r2dbcConnection.adapter();
9294
}
9395

9496
/**
@@ -103,7 +105,7 @@ public Batch add(String sql) {
103105
requireOpenConnection(jdbcConnection);
104106
requireNonNull(sql, "sql is null");
105107
statements.add(
106-
new OracleStatementImpl(sql, timeout, jdbcConnection, adapter));
108+
new OracleStatementImpl(sql, timeout, r2dbcConnection));
107109
return this;
108110
}
109111

src/main/java/oracle/r2dbc/impl/OracleConnectionImpl.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.sql.SQLException;
3939
import java.sql.Savepoint;
4040
import java.time.Duration;
41+
import java.util.Queue;
42+
import java.util.concurrent.ConcurrentLinkedQueue;
4143

4244
import static io.r2dbc.spi.IsolationLevel.READ_COMMITTED;
4345
import static io.r2dbc.spi.IsolationLevel.SERIALIZABLE;
@@ -126,6 +128,12 @@ final class OracleConnectionImpl implements Connection, Lifecycle {
126128
*/
127129
private TransactionDefinition currentTransaction = null;
128130

131+
/**
132+
* A queue of tasks that must complete before the {@link #jdbcConnection} is
133+
* closed.
134+
*/
135+
private final Queue<Publisher<?>> closeTasks = new ConcurrentLinkedQueue<>();
136+
129137
/**
130138
* Constructs a new connection that uses the specified {@code adapter} to
131139
* perform database operations with the specified {@code jdbcConnection}.
@@ -369,7 +377,46 @@ else if (isReadOnly == null && name == null) {
369377
*/
370378
@Override
371379
public Publisher<Void> close() {
372-
return adapter.publishClose(jdbcConnection);
380+
381+
Publisher<Void> closeTasksPublisher = Mono.defer(() -> {
382+
Publisher<?>[] closeTasksArray = closeTasks.toArray(Publisher<?>[]::new);
383+
closeTasks.clear();
384+
385+
return Flux.concatDelayError(closeTasksArray).then();
386+
});
387+
388+
return Flux.concatDelayError(
389+
closeTasksPublisher,
390+
adapter.publishClose(jdbcConnection));
391+
}
392+
393+
/**
394+
* <p>
395+
* Adds a publisher that must be subscribed to and must terminate before
396+
* closing the JDBC connection. This can be used to ensure that a publisher
397+
* has completed a task before the {@link #jdbcConnection()} has been closed
398+
* and becomes unusable.
399+
* </p><p>
400+
* <i> A call to this method should always be accompanied with a call to
401+
* {@link #removeCloseTask(Publisher)}</i>: If the publisher is subscribed to
402+
* and it terminates before {@link #close()} is called, then any reference
403+
* to the Publisher must be removed so that it can be garbage collected.
404+
* </p>
405+
* @param publisher Publisher that must terminate before closing the JDBC
406+
* connection. Not null.
407+
*/
408+
void addCloseTask(Publisher<?> publisher) {
409+
closeTasks.add(publisher);
410+
}
411+
412+
/**
413+
* Removes a publisher that was previously added with
414+
* {@link #addCloseTask(Publisher)}.
415+
*
416+
* @param publisher Publisher to remove. Not null.
417+
*/
418+
void removeCloseTask(Publisher<?> publisher) {
419+
closeTasks.remove(publisher);
373420
}
374421

375422
/**
@@ -417,7 +464,7 @@ public Publisher<Void> commitTransaction() {
417464
@Override
418465
public Batch createBatch() {
419466
requireOpenConnection(jdbcConnection);
420-
return new OracleBatchImpl(statementTimeout, jdbcConnection, adapter);
467+
return new OracleBatchImpl(statementTimeout, this);
421468
}
422469

423470
/**
@@ -441,8 +488,7 @@ public Batch createBatch() {
441488
public Statement createStatement(String sql) {
442489
requireNonNull(sql, "sql is null");
443490
requireOpenConnection(jdbcConnection);
444-
return new OracleStatementImpl(
445-
sql, statementTimeout, jdbcConnection, adapter);
491+
return new OracleStatementImpl(sql, statementTimeout, this);
446492
}
447493

448494
/**
@@ -826,4 +872,24 @@ public Publisher<Void> preRelease() {
826872
});
827873
}
828874

875+
/**
876+
* Returns the JDBC connection that this R2DBC connection executes database
877+
* calls with.
878+
*
879+
* @return The JDBC connection that backs this R2DBC connection. Not null.
880+
*/
881+
java.sql.Connection jdbcConnection() {
882+
return jdbcConnection;
883+
}
884+
885+
/**
886+
* Returns the adapter that adapts the asynchronous API of the
887+
* {@link #jdbcConnection()} that backs this R2DBC connection.
888+
*
889+
* @return The JDBC connection that backs this R2DBC connection. Not null.
890+
*/
891+
ReactiveJdbcAdapter adapter() {
892+
return adapter;
893+
}
894+
829895
}

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,11 @@ final class OracleStatementImpl implements Statement {
197197
/** Adapts Oracle JDBC Driver APIs into Reactive Streams APIs */
198198
private final ReactiveJdbcAdapter adapter;
199199

200+
/**
201+
* The instance of OracleConnectionImpl that created this statement.
202+
*/
203+
private final OracleConnectionImpl r2dbcConnection;
204+
200205
/**
201206
* SQL Language command that this statement executes. The command is
202207
* provided by user code and may include parameter markers.
@@ -255,15 +260,15 @@ final class OracleStatementImpl implements Statement {
255260
* @param sql SQL Language statement that may include parameter markers.
256261
* @param timeout Timeout applied to the execution of the constructed
257262
* {@code Statement}. Not null. Not negative.
258-
* @param jdbcConnection JDBC connection to an Oracle Database.
259-
* @param adapter Adapts JDBC calls into reactive streams.
263+
* @param r2dbcConnection The OracleConnectionImpl that is creating this
264+
* statement. Not null.
260265
*/
261266
OracleStatementImpl(
262-
String sql, Duration timeout, Connection jdbcConnection,
263-
ReactiveJdbcAdapter adapter) {
267+
String sql, Duration timeout, OracleConnectionImpl r2dbcConnection) {
264268
this.sql = sql;
265-
this.jdbcConnection = jdbcConnection;
266-
this.adapter = adapter;
269+
this.r2dbcConnection = r2dbcConnection;
270+
this.jdbcConnection = r2dbcConnection.jdbcConnection();
271+
this.adapter = r2dbcConnection.adapter();
267272

268273
// The SQL string is parsed to identify parameter markers and allocate the
269274
// bindValues array accordingly
@@ -987,13 +992,29 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
987992
this.preparedStatement = preparedStatement;
988993
this.binds = binds;
989994

995+
// Work around for Oracle JDBC bug #37160069: The JDBC statement must be
996+
// closed before closeAsyncOracle is called. This bug should be fixed
997+
// by the 23.7 release of Oracle JDBC. The fix can be verified by the
998+
// OracleConnectionImplTest.testSetStatementTimeout method (test won't
999+
// fail, but look for an error in stderr). Typically, statement closing
1000+
// is a no-op if the connection is closed. However, if the statement
1001+
// executes SELECT ... FOR UPDATE, then JDBC will implicitly execute a
1002+
// commit() when the Statement (or really the ResultSet) is closed. This
1003+
// commit operation fails if the JDBC connection is already closed.
1004+
Publisher<Void> closePublisher = closeStatement();
1005+
r2dbcConnection.addCloseTask(closePublisher);
1006+
1007+
dependentCounter = new DependentCounter(Publishers.concatTerminal(
1008+
closePublisher,
1009+
Mono.fromRunnable(() ->
1010+
r2dbcConnection.removeCloseTask(closePublisher))));
1011+
9901012
// Add this statement as a "party" (think j.u.c.Phaser) to the dependent
9911013
// results by calling increment(). After the Result publisher returned by
9921014
// execute() terminates, this statement "arrives" by calling decrement().
9931015
// Calling decrement() after the Result publisher terminates ensures that
9941016
// the JDBC statement can not be closed until all results have had a
9951017
// chance to be emitted to user code.
996-
dependentCounter = new DependentCounter(closeStatement());
9971018
dependentCounter.increment();
9981019
}
9991020

@@ -1864,7 +1885,24 @@ private Publisher<java.sql.Blob> convertBlobBind(
18641885
Mono.from(adapter.publishBlobWrite(r2dbcBlob.stream(), jdbcBlob))
18651886
.thenReturn(jdbcBlob),
18661887
jdbcBlob -> {
1867-
addDeallocation(adapter.publishBlobFree(jdbcBlob));
1888+
Publisher<Void> freePublisher = adapter.publishBlobFree(jdbcBlob);
1889+
1890+
// Work around for Oracle JDBC bug #37160069: All LOBs need to be
1891+
// freed before closeAsyncOracle is called. This bug should be fixed
1892+
// by the 23.7 release of Oracle JDBC. The fix can be verified by the
1893+
// clobInsert and blobInsert methods in the TestKit class of the R2DBC
1894+
// SPI test: These tests will subscribe to Connection.close() before
1895+
// this freePublisher is subscribed to.
1896+
r2dbcConnection.addCloseTask(freePublisher);
1897+
1898+
addDeallocation(
1899+
Publishers.concatTerminal(
1900+
freePublisher,
1901+
Mono.fromRunnable(() ->
1902+
r2dbcConnection.removeCloseTask(freePublisher))));
1903+
1904+
// TODO: Why is discard() called here? It should be called by the
1905+
// user who allocated the Blob, not by Oracle R2DBC.
18681906
return r2dbcBlob.discard();
18691907
});
18701908
}
@@ -1891,7 +1929,24 @@ private Publisher<java.sql.Clob> convertClobBind(
18911929
Mono.from(adapter.publishClobWrite(r2dbcClob.stream(), jdbcClob))
18921930
.thenReturn(jdbcClob),
18931931
jdbcClob -> {
1894-
addDeallocation(adapter.publishClobFree(jdbcClob));
1932+
Publisher<Void> freePublisher = adapter.publishClobFree(jdbcClob);
1933+
1934+
// Work around for Oracle JDBC bug #37160069: All LOBs need to be
1935+
// freed before closeAsyncOracle is called. This bug should be fixed
1936+
// by the 23.7 release of Oracle JDBC. The fix can be verified by the
1937+
// clobInsert and blobInsert methods in the TestKit class of the R2DBC
1938+
// SPI test: These tests will subscribe to Connection.close() before
1939+
// this freePublisher is subscribed to.
1940+
r2dbcConnection.addCloseTask(freePublisher);
1941+
1942+
addDeallocation(
1943+
Publishers.concatTerminal(
1944+
freePublisher,
1945+
Mono.fromRunnable(() ->
1946+
r2dbcConnection.removeCloseTask(freePublisher))));
1947+
1948+
// TODO: Why is discard() called here? It should be called by the
1949+
// user who allocated the Clob, not by Oracle R2DBC.
18951950
return r2dbcClob.discard();
18961951
});
18971952
}

src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1356,6 +1356,14 @@ public void testValidate() {
13561356
*/
13571357
@Test
13581358
public void testSetStatementTimeout() {
1359+
// Assume that oracle.jdbc.disablePipeline is only set to false when
1360+
// experimenting with pipelining on Mac OS. In this scenario, statement
1361+
// cancellation is known to not work.
1362+
String disabledProperty = System.getProperty("oracle.jdbc.disablePipeline");
1363+
assumeTrue(
1364+
disabledProperty == null || disabledProperty.equalsIgnoreCase("true"),
1365+
"oracle.jdbc.disablePipeline is set, and the value is not \"true\"");
1366+
13591367
Connection connection =
13601368
Mono.from(sharedConnection()).block(connectTimeout());
13611369
try {

src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
import io.r2dbc.spi.Statement;
3030
import oracle.r2dbc.OracleR2dbcObject;
3131
import oracle.r2dbc.OracleR2dbcTypes;
32+
import oracle.r2dbc.test.DatabaseConfig;
33+
import oracle.r2dbc.test.TestUtils;
3234
import org.junit.jupiter.api.Test;
35+
import org.reactivestreams.Publisher;
3336
import reactor.core.publisher.Flux;
3437
import reactor.core.publisher.Mono;
3538

@@ -38,6 +41,7 @@
3841
import java.util.Arrays;
3942
import java.util.List;
4043
import java.util.Map;
44+
import java.util.concurrent.atomic.AtomicBoolean;
4145
import java.util.concurrent.atomic.AtomicInteger;
4246

4347
import static java.util.Arrays.asList;

src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import static org.junit.jupiter.api.Assertions.assertThrows;
9999
import static org.junit.jupiter.api.Assertions.assertTrue;
100100
import static org.junit.jupiter.api.Assertions.fail;
101+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
101102

102103
/**
103104
* Verifies that
@@ -382,6 +383,14 @@ public void testConnectTimeout()
382383
*/
383384
@Test
384385
public void testStatementTimeout() {
386+
// Assume that oracle.jdbc.disablePipeline is only set to false when
387+
// experimenting with pipelining on Mac OS. In this scenario, statement
388+
// cancellation is known to not work.
389+
String disabledProperty = System.getProperty("oracle.jdbc.disablePipeline");
390+
assumeTrue(
391+
disabledProperty == null || disabledProperty.equalsIgnoreCase("true"),
392+
"oracle.jdbc.disablePipeline is set, and the value is not \"true\"");
393+
385394
Connection connection0 =
386395
Mono.from(ConnectionFactories.get(connectionFactoryOptions()
387396
.mutate()

src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.r2dbc.spi.Result;
3232
import io.r2dbc.spi.Result.Message;
3333
import io.r2dbc.spi.Result.UpdateCount;
34+
import io.r2dbc.spi.Row;
3435
import io.r2dbc.spi.Statement;
3536
import oracle.r2dbc.OracleR2dbcObject;
3637
import oracle.r2dbc.OracleR2dbcOptions;
@@ -76,12 +77,11 @@
7677
import static oracle.r2dbc.test.DatabaseConfig.connectionFactoryOptions;
7778
import static oracle.r2dbc.test.DatabaseConfig.databaseVersion;
7879
import static oracle.r2dbc.test.DatabaseConfig.jdbcMinorVersion;
79-
import static oracle.r2dbc.test.DatabaseConfig.jdbcVersion;
8080
import static oracle.r2dbc.test.DatabaseConfig.newConnection;
8181
import static oracle.r2dbc.test.DatabaseConfig.sharedConnection;
82+
import static oracle.r2dbc.test.DatabaseConfig.sqlTimeout;
8283
import static oracle.r2dbc.test.TestUtils.constructObject;
8384
import static oracle.r2dbc.test.TestUtils.showErrors;
84-
import static oracle.r2dbc.test.DatabaseConfig.sqlTimeout;
8585
import static oracle.r2dbc.util.Awaits.awaitError;
8686
import static oracle.r2dbc.util.Awaits.awaitExecution;
8787
import static oracle.r2dbc.util.Awaits.awaitMany;
@@ -3231,10 +3231,10 @@ public boolean equals(Object other) {
32313231

32323232
// Oracle JDBC 23.4 has a defect which prevents the Subscriber from
32333233
// receiving a terminal signal. The defect has been reported as bug
3234-
// #36607804, and is expected to be fixed in the 23.5 release.
3234+
// #36607804, it will be fixed in the 23.6 release.
32353235
Assumptions.assumeTrue(
3236-
jdbcMinorVersion() >= 5,
3237-
"Oracle JDBC 23.4 does not support generated keys for VECTOR");
3236+
jdbcMinorVersion() >= 6,
3237+
"Oracle JDBC 23.5 does not support generated keys for VECTOR");
32383238

32393239
IdVector expected2 = new IdVector(
32403240
0,
@@ -3257,12 +3257,11 @@ public boolean equals(Object other) {
32573257
return Mono.just(((UpdateCount) segment).value());
32583258
}
32593259
else if (segment instanceof Result.RowSegment) {
3260-
OutParameters outParameters =
3261-
((Result.OutSegment)segment).outParameters();
3260+
Row generatedRow = ((Result.RowSegment)segment).row();
32623261

32633262
return Mono.just(new IdVector(
3264-
outParameters.get("outId", Integer.class),
3265-
outParameters.get("outVector", VECTOR.class)));
3263+
generatedRow.get("id", Integer.class),
3264+
generatedRow.get("value", VECTOR.class)));
32663265
}
32673266
else if (segment instanceof Message) {
32683267
throw ((Message)segment).exception();

0 commit comments

Comments
 (0)