Skip to content

Commit d511d23

Browse files
committed
SPI Queue/QE
1 parent d5f2d2e commit d511d23

File tree

5 files changed

+79
-30
lines changed

5 files changed

+79
-30
lines changed

src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java

+11-30
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,17 @@
3636
import net.openhft.chronicle.queue.impl.table.ReadonlyTableStore;
3737
import net.openhft.chronicle.queue.impl.table.SingleTableBuilder;
3838
import net.openhft.chronicle.queue.internal.domestic.QueueOffsetSpec;
39-
import net.openhft.chronicle.threads.*;
39+
import net.openhft.chronicle.queue.providers.EnterpriseQueueFactories;
40+
import net.openhft.chronicle.threads.MediumEventLoop;
41+
import net.openhft.chronicle.threads.Pauser;
42+
import net.openhft.chronicle.threads.TimingPauser;
43+
import net.openhft.chronicle.threads.YieldingPauser;
4044
import net.openhft.chronicle.wire.*;
4145
import org.jetbrains.annotations.NotNull;
4246
import org.jetbrains.annotations.Nullable;
4347

4448
import javax.crypto.spec.SecretKeySpec;
4549
import java.io.File;
46-
import java.lang.reflect.Constructor;
4750
import java.nio.file.Path;
4851
import java.time.LocalTime;
4952
import java.time.ZoneId;
@@ -64,7 +67,6 @@ public class SingleChronicleQueueBuilder extends SelfDescribingMarshallable impl
6467
public static final long SMALL_BLOCK_SIZE = OS.isWindows() ? OS.SAFE_PAGE_SIZE : OS.pageSize(); // the smallest safe block size on Windows 8+
6568

6669
public static final long DEFAULT_SPARSE_CAPACITY = 512L << 30;
67-
private static final Constructor<?> ENTERPRISE_QUEUE_CONSTRUCTOR;
6870
private static final WireStoreFactory storeFactory = SingleChronicleQueueBuilder::createStore;
6971
private static final Supplier<TimingPauser> TIMING_PAUSER_SUPPLIER = DefaultPauserSupplier.INSTANCE;
7072

@@ -74,15 +76,6 @@ public class SingleChronicleQueueBuilder extends SelfDescribingMarshallable impl
7476
CLASS_ALIASES.addAlias(SCQRoll.class, "SCQSRoll");
7577
CLASS_ALIASES.addAlias(SCQIndexing.class, "SCQSIndexing");
7678
CLASS_ALIASES.addAlias(SingleChronicleQueueStore.class, "SCQStore");
77-
78-
Constructor<?> co;
79-
try {
80-
co = ((Class<?>) Class.forName("software.chronicle.enterprise.queue.EnterpriseSingleChronicleQueue")).getDeclaredConstructors()[0];
81-
Jvm.setAccessible(co);
82-
} catch (Exception e) {
83-
co = null;
84-
}
85-
ENTERPRISE_QUEUE_CONSTRUCTOR = co;
8679
}
8780

8881
private BufferMode writeBufferMode = BufferMode.None;
@@ -224,7 +217,7 @@ static SingleChronicleQueueStore createStore(@NotNull RollingChronicleQueue queu
224217
}
225218

226219
public static boolean areEnterpriseFeaturesAvailable() {
227-
return ENTERPRISE_QUEUE_CONSTRUCTOR != null;
220+
return EnterpriseQueueFactories.get().areEnterpriseFeaturesAvailable();
228221
}
229222

230223
private static RollCycle loadDefaultRollCycle() {
@@ -284,10 +277,10 @@ public SingleChronicleQueue build() {
284277

285278
// It is important to check enterprise features after preBuild()
286279
// Enterprise-only config options can be loaded from the metadata
287-
if (checkEnterpriseFeaturesRequested())
288-
chronicleQueue = buildEnterprise();
289-
else
290-
chronicleQueue = new SingleChronicleQueue(this);
280+
if (checkEnterpriseFeaturesRequested() && !areEnterpriseFeaturesAvailable()) {
281+
throw new IllegalStateException("Enterprise features requested but Chronicle Queue Enterprise is not in the class path!");
282+
}
283+
chronicleQueue = EnterpriseQueueFactories.get().newInstance(this);
291284

292285
postBuild(chronicleQueue);
293286

@@ -324,23 +317,11 @@ private boolean checkEnterpriseFeaturesRequested() {
324317
}
325318

326319
public static boolean onlyAvailableInEnterprise(final String feature) {
327-
if (ENTERPRISE_QUEUE_CONSTRUCTOR == null)
320+
if (!areEnterpriseFeaturesAvailable())
328321
Jvm.warn().on(SingleChronicleQueueBuilder.class, feature + " is only supported in Chronicle Queue Enterprise. If you would like to use this feature, please contact sales@chronicle.software for more information.");
329322
return true;
330323
}
331324

332-
@NotNull
333-
private SingleChronicleQueue buildEnterprise() {
334-
if (ENTERPRISE_QUEUE_CONSTRUCTOR == null)
335-
throw new IllegalStateException("Enterprise features requested but Chronicle Queue Enterprise is not in the class path!");
336-
337-
try {
338-
return (SingleChronicleQueue) ENTERPRISE_QUEUE_CONSTRUCTOR.newInstance(this);
339-
} catch (Exception e) {
340-
throw new IllegalStateException("Couldn't create an instance of Enterprise queue", e);
341-
}
342-
}
343-
344325
public SingleChronicleQueueBuilder aesEncryption(@Nullable byte[] keyBytes) {
345326
if (keyBytes == null) {
346327
codingSuppliers(null, null);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package net.openhft.chronicle.queue.impl.single;
2+
3+
import net.openhft.chronicle.queue.providers.QueueFactory;
4+
5+
public class SingleChronicleQueueFactory implements QueueFactory {
6+
7+
@Override
8+
public boolean areEnterpriseFeaturesAvailable() {
9+
return false;
10+
}
11+
12+
@Override
13+
public SingleChronicleQueue newInstance(SingleChronicleQueueBuilder queueBuilder) {
14+
return new SingleChronicleQueue(queueBuilder);
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package net.openhft.chronicle.queue.providers;
2+
3+
import net.openhft.chronicle.core.Jvm;
4+
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueFactory;
5+
import org.jetbrains.annotations.NotNull;
6+
7+
import java.util.ServiceLoader;
8+
9+
public class EnterpriseQueueFactories {
10+
11+
private static final String PREFERRED_FACTORY_CLASS_NAME = Jvm.getProperty("net.openhft.chronicle.queue.providers.EnterpriseQueueWrapper",
12+
"software.chronicle.enterprise.queue.EnterpriseQueueFactory");
13+
14+
private static QueueFactory queueFactory;
15+
16+
/**
17+
* Get the {@link QueueFactory}
18+
*
19+
* @return the active queue wrapper
20+
*/
21+
@NotNull
22+
public static QueueFactory get() {
23+
if (queueFactory == null) {
24+
final ServiceLoader<QueueFactory> load = ServiceLoader.load(QueueFactory.class);
25+
for (QueueFactory factory : load) {
26+
// last one in wins, unless we encounter the "preferred" one
27+
queueFactory = factory;
28+
if (PREFERRED_FACTORY_CLASS_NAME.equals(factory.getClass().getName())) {
29+
break;
30+
}
31+
}
32+
if (queueFactory == null) {
33+
Jvm.error().on(EnterpriseQueueFactories.class, "There's no queue wrapper factory configured, this shouldn't happen. ");
34+
queueFactory = new SingleChronicleQueueFactory();
35+
}
36+
}
37+
return queueFactory;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package net.openhft.chronicle.queue.providers;
2+
3+
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
4+
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
5+
6+
public interface QueueFactory {
7+
8+
// SPI for SingleChronicleQueueBuilder
9+
boolean areEnterpriseFeaturesAvailable();
10+
11+
SingleChronicleQueue newInstance(SingleChronicleQueueBuilder queueBuilder);
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
net.openhft.chronicle.queue.impl.single.SingleChronicleQueueFactory

0 commit comments

Comments
 (0)