Skip to content

Commit 6ad44fa

Browse files
authored
feat: SSE (#162)
* SSE initial * Debugging SocketTimeoutException * Cleanup logging; update local example * use setupscheduler * make runnable final to prevent modification * reduce sleep for example * Use shutdown instead of close * use new mainclass spec * check if ssemanager exists before closing * cleanup
1 parent a36561a commit 6ad44fa

File tree

9 files changed

+245
-26
lines changed

9 files changed

+245
-26
lines changed

build.gradle

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ protobuf {
132132
}
133133

134134
ext {
135-
retrofit_version = "2.9.0"
135+
retrofit_version = "2.11.0"
136136
jackson_version = "2.15.3"
137137
swagger_annotations_version = "2.2.18"
138138
lombok_version = "1.18.30"
@@ -142,6 +142,7 @@ ext {
142142
mockito_core_version = "5.6.0"
143143
protobuf_version = "3.24.4"
144144
openfeature_version = "1.7.0"
145+
eventsource_version = "4.1.1"
145146
}
146147

147148
dependencies {
@@ -163,6 +164,7 @@ dependencies {
163164
implementation("com.google.protobuf:protobuf-java:$protobuf_version")
164165

165166
implementation("dev.openfeature:sdk:$openfeature_version")
167+
implementation("com.launchdarkly:okhttp-eventsource:$eventsource_version")
166168

167169
compileOnly("org.projectlombok:lombok:$lombok_version")
168170

@@ -191,17 +193,17 @@ configurations {
191193
task runLocalExample(type: JavaExec) {
192194
description = "Run the local bucketing example"
193195
classpath = sourceSets.examples.runtimeClasspath
194-
main = 'com.devcycle.examples.LocalExample'
196+
mainClass = 'com.devcycle.examples.LocalExample'
195197
}
196198

197199
task runCloudExample(type: JavaExec) {
198200
description = "Run the cloud bucketing example"
199201
classpath = sourceSets.examples.runtimeClasspath
200-
main = 'com.devcycle.examples.CloudExample'
202+
mainClass = 'com.devcycle.examples.CloudExample'
201203
}
202204

203205
task runOpenFeatureExample(type: JavaExec) {
204206
description = "Run the OpenFeature example"
205207
classpath = sourceSets.examples.runtimeClasspath
206-
main = 'com.devcycle.examples.OpenFeatureExample'
208+
mainClass = 'com.devcycle.examples.OpenFeatureExample'
207209
}

src/examples/java/com/devcycle/examples/LocalExample.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.devcycle.examples;
22

3+
import com.devcycle.sdk.server.common.logging.SimpleDevCycleLogger;
34
import com.devcycle.sdk.server.common.model.DevCycleUser;
45
import com.devcycle.sdk.server.local.api.DevCycleLocalClient;
56
import com.devcycle.sdk.server.local.model.DevCycleLocalOptions;
@@ -22,8 +23,11 @@ public static void main(String[] args) throws InterruptedException {
2223
// The default value can be of type string, boolean, number, or JSON
2324
Boolean defaultValue = false;
2425

25-
DevCycleLocalOptions options = DevCycleLocalOptions.builder().configPollingIntervalMs(60000)
26-
.disableAutomaticEventLogging(false).disableCustomEventLogging(false).build();
26+
DevCycleLocalOptions options = DevCycleLocalOptions.builder()
27+
.configPollingIntervalMS(60000)
28+
.customLogger(new SimpleDevCycleLogger(SimpleDevCycleLogger.Level.DEBUG))
29+
.enableBetaRealtimeUpdates(true)
30+
.build();
2731

2832
// Initialize DevCycle Client
2933
DevCycleLocalClient client = new DevCycleLocalClient(server_sdk_key, options);
@@ -46,5 +50,6 @@ public static void main(String[] args) throws InterruptedException {
4650
} else {
4751
System.out.println("feature is NOT enabled");
4852
}
53+
Thread.sleep(10000);
4954
}
5055
}

src/main/java/com/devcycle/sdk/server/common/model/ProjectConfig.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
@NoArgsConstructor
1414
@JsonIgnoreProperties(ignoreUnknown = true)
1515
public class ProjectConfig {
16+
1617
@Schema(description = "Project Settings")
1718
private Object project;
1819

@@ -30,4 +31,8 @@ public class ProjectConfig {
3031

3132
@Schema(description = "Variable Hashes for all Variables in this Project")
3233
private Object variableHashes;
33-
}
34+
35+
@Schema(description = "SSE Configuration")
36+
private SSE sse;
37+
}
38+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.devcycle.sdk.server.common.model;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Builder;
6+
import lombok.Data;
7+
import lombok.NoArgsConstructor;
8+
9+
@Data
10+
@Builder
11+
@AllArgsConstructor
12+
@NoArgsConstructor
13+
@JsonIgnoreProperties(ignoreUnknown = true)
14+
public class SSE {
15+
private String hostname;
16+
private String path;
17+
}
18+
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.devcycle.sdk.server.common.model;
2+
3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
4+
import lombok.AllArgsConstructor;
5+
import lombok.Builder;
6+
import lombok.Data;
7+
import lombok.NoArgsConstructor;
8+
9+
@Data
10+
@Builder
11+
@AllArgsConstructor
12+
@NoArgsConstructor
13+
@JsonIgnoreProperties(ignoreUnknown = true)
14+
public class SSEMessage {
15+
private String etag;
16+
private double lastModified;
17+
private String type;
18+
}

src/main/java/com/devcycle/sdk/server/local/managers/EnvironmentConfigManager.java

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,22 @@
33
import com.devcycle.sdk.server.common.api.IDevCycleApi;
44
import com.devcycle.sdk.server.common.exception.DevCycleException;
55
import com.devcycle.sdk.server.common.logging.DevCycleLogger;
6-
import com.devcycle.sdk.server.common.model.ErrorResponse;
7-
import com.devcycle.sdk.server.common.model.HttpResponseCode;
8-
import com.devcycle.sdk.server.common.model.ProjectConfig;
6+
import com.devcycle.sdk.server.common.model.*;
97
import com.devcycle.sdk.server.local.api.DevCycleLocalApiClient;
108
import com.devcycle.sdk.server.local.bucketing.LocalBucketing;
119
import com.devcycle.sdk.server.local.model.DevCycleLocalOptions;
1210
import com.fasterxml.jackson.core.JsonParseException;
1311
import com.fasterxml.jackson.core.JsonProcessingException;
1412
import com.fasterxml.jackson.databind.ObjectMapper;
13+
import com.launchdarkly.eventsource.FaultEvent;
14+
import com.launchdarkly.eventsource.MessageEvent;
15+
import com.launchdarkly.eventsource.StartedEvent;
1516
import retrofit2.Call;
1617
import retrofit2.Response;
1718

1819
import java.io.IOException;
20+
import java.net.URI;
21+
import java.net.URISyntaxException;
1922
import java.time.ZonedDateTime;
2023
import java.time.format.DateTimeFormatter;
2124
import java.util.concurrent.Executors;
@@ -26,46 +29,52 @@ public final class EnvironmentConfigManager {
2629
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
2730
private static final int DEFAULT_POLL_INTERVAL_MS = 30000;
2831
private static final int MIN_INTERVALS_MS = 1000;
29-
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
32+
private ScheduledExecutorService scheduler;
3033
private final IDevCycleApi configApiClient;
3134
private final LocalBucketing localBucketing;
35+
private SSEManager sseManager;
36+
private boolean isSSEConnected = false;
37+
private final DevCycleLocalOptions options;
3238

3339
private ProjectConfig config;
3440
private String configETag = "";
3541
private String configLastModified = "";
3642

3743
private final String sdkKey;
3844
private final int pollingIntervalMS;
45+
private static final int pollingIntervalSSEMS = 15 * 60 * 60 * 1000;
3946
private boolean pollingEnabled = true;
4047

4148
public EnvironmentConfigManager(String sdkKey, LocalBucketing localBucketing, DevCycleLocalOptions options) {
4249
this.sdkKey = sdkKey;
4350
this.localBucketing = localBucketing;
51+
this.options = options;
4452

4553
configApiClient = new DevCycleLocalApiClient(sdkKey, options).initialize();
4654

4755
int configPollingIntervalMS = options.getConfigPollingIntervalMS();
4856
pollingIntervalMS = configPollingIntervalMS >= MIN_INTERVALS_MS ? configPollingIntervalMS
4957
: DEFAULT_POLL_INTERVAL_MS;
5058

51-
setupScheduler();
59+
scheduler = setupScheduler();
60+
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS);
5261
}
5362

54-
private void setupScheduler() {
55-
Runnable getConfigRunnable = new Runnable() {
56-
public void run() {
57-
try {
58-
if (pollingEnabled) {
59-
getConfig();
60-
}
61-
} catch (DevCycleException e) {
62-
DevCycleLogger.error("Failed to load config: " + e.getMessage());
63+
private ScheduledExecutorService setupScheduler() {
64+
return Executors.newScheduledThreadPool(1, new DaemonThreadFactory());
65+
}
66+
67+
private final Runnable getConfigRunnable = new Runnable() {
68+
public void run() {
69+
try {
70+
if (pollingEnabled) {
71+
getConfig();
6372
}
73+
} catch (DevCycleException e) {
74+
DevCycleLogger.error("Failed to load config: " + e.getMessage());
6475
}
65-
};
66-
67-
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, this.pollingIntervalMS, TimeUnit.MILLISECONDS);
68-
}
76+
}
77+
};
6978

7079
public boolean isConfigInitialized() {
7180
return config != null;
@@ -74,9 +83,57 @@ public boolean isConfigInitialized() {
7483
private ProjectConfig getConfig() throws DevCycleException {
7584
Call<ProjectConfig> config = this.configApiClient.getConfig(this.sdkKey, this.configETag, this.configLastModified);
7685
this.config = getResponseWithRetries(config, 1);
86+
if (this.options.isEnableBetaRealtimeUpdates()) {
87+
try {
88+
URI uri = new URI(this.config.getSse().getHostname() + this.config.getSse().getPath());
89+
if (sseManager == null) {
90+
sseManager = new SSEManager(uri);
91+
}
92+
sseManager.restart(uri, this::handleSSEMessage, this::handleSSEError, this::handleSSEStarted);
93+
} catch (URISyntaxException e) {
94+
DevCycleLogger.warning("Failed to create SSEManager: " + e.getMessage());
95+
}
96+
}
7797
return this.config;
7898
}
7999

100+
private Void handleSSEMessage(MessageEvent messageEvent) {
101+
DevCycleLogger.debug("Received message: " + messageEvent.getData());
102+
if (!isSSEConnected)
103+
{
104+
handleSSEStarted(null);
105+
}
106+
107+
String data = messageEvent.getData();
108+
if (data == null || data.isEmpty() || data.equals("keepalive")) {
109+
return null;
110+
}
111+
try {
112+
SSEMessage message = OBJECT_MAPPER.readValue(data, SSEMessage.class);
113+
if (message.getType() == null || message.getType().equals("refetchConfig") || message.getType().isEmpty()) {
114+
DevCycleLogger.debug("Received refetchConfig message, fetching new config");
115+
getConfigRunnable.run();
116+
}
117+
} catch (JsonProcessingException e) {
118+
DevCycleLogger.warning("Failed to parse SSE message: " + e.getMessage());
119+
}
120+
return null;
121+
}
122+
123+
private Void handleSSEError(FaultEvent faultEvent) {
124+
DevCycleLogger.warning("Received error: " + faultEvent.getCause());
125+
return null;
126+
}
127+
128+
private Void handleSSEStarted(StartedEvent startedEvent) {
129+
isSSEConnected = true;
130+
DevCycleLogger.debug("SSE Connected - setting polling interval to " + pollingIntervalSSEMS);
131+
scheduler.shutdown();
132+
scheduler = setupScheduler();
133+
scheduler.scheduleAtFixedRate(getConfigRunnable, 0, pollingIntervalSSEMS, TimeUnit.MILLISECONDS);
134+
return null;
135+
}
136+
80137
private ProjectConfig getResponseWithRetries(Call<ProjectConfig> call, int maxRetries) throws DevCycleException {
81138
// attempt 0 is the initial request, attempt > 0 are all retries
82139
int attempt = 0;
@@ -206,6 +263,9 @@ private void stopPolling() {
206263
}
207264

208265
public void cleanup() {
266+
if (sseManager != null) {
267+
sseManager.close();
268+
}
209269
stopPolling();
210270
}
211271
}

0 commit comments

Comments
 (0)