Skip to content

Commit d4c37ae

Browse files
authored
fix(NotificationManager): add thread-safety to NotificationManager (#460)
Fix NotificationManager to be thread-safe (add-handler and send-notifications can happen concurrently)
1 parent 535372c commit d4c37ae

File tree

2 files changed

+48
-10
lines changed

2 files changed

+48
-10
lines changed

core-api/src/main/java/com/optimizely/ab/notification/NotificationManager.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.slf4j.Logger;
2020
import org.slf4j.LoggerFactory;
2121

22+
import java.util.Collections;
2223
import java.util.LinkedHashMap;
2324
import java.util.Map;
2425
import java.util.concurrent.atomic.AtomicInteger;
@@ -33,7 +34,7 @@ public class NotificationManager<T> {
3334

3435
private static final Logger logger = LoggerFactory.getLogger(NotificationManager.class);
3536

36-
private final Map<Integer, NotificationHandler<T>> handlers = new LinkedHashMap<>();
37+
private final Map<Integer, NotificationHandler<T>> handlers = Collections.synchronizedMap(new LinkedHashMap<>());
3738
private final AtomicInteger counter;
3839

3940
public NotificationManager() {
@@ -47,10 +48,12 @@ public NotificationManager(AtomicInteger counter) {
4748
public int addHandler(NotificationHandler<T> newHandler) {
4849

4950
// Prevent registering a duplicate listener.
50-
for (NotificationHandler<T> handler: handlers.values()) {
51-
if (handler.equals(newHandler)) {
52-
logger.warn("Notification listener was already added");
53-
return -1;
51+
synchronized (handlers) {
52+
for (NotificationHandler<T> handler : handlers.values()) {
53+
if (handler.equals(newHandler)) {
54+
logger.warn("Notification listener was already added");
55+
return -1;
56+
}
5457
}
5558
}
5659

@@ -61,11 +64,13 @@ public int addHandler(NotificationHandler<T> newHandler) {
6164
}
6265

6366
public void send(final T message) {
64-
for (Map.Entry<Integer, NotificationHandler<T>> handler: handlers.entrySet()) {
65-
try {
66-
handler.getValue().handle(message);
67-
} catch (Exception e) {
68-
logger.warn("Catching exception sending notification for class: {}, handler: {}", message.getClass(), handler.getKey());
67+
synchronized (handlers) {
68+
for (Map.Entry<Integer, NotificationHandler<T>> handler: handlers.entrySet()) {
69+
try {
70+
handler.getValue().handle(message);
71+
} catch (Exception e) {
72+
logger.warn("Catching exception sending notification for class: {}, handler: {}", message.getClass(), handler.getKey());
73+
}
6974
}
7075
}
7176
}

core-api/src/test/java/com/optimizely/ab/notification/NotificationManagerTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
import org.junit.Test;
2121

2222
import java.util.List;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2328
import java.util.concurrent.atomic.AtomicInteger;
2429

2530
import static org.junit.Assert.*;
@@ -70,4 +75,32 @@ public void testSendWithError() {
7075
assertEquals(1, messages.size());
7176
assertEquals("message1", messages.get(0).getMessage());
7277
}
78+
79+
@Test
80+
public void testThreadSafety() throws InterruptedException {
81+
int numThreads = 10;
82+
int numRepeats = 2;
83+
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
84+
CountDownLatch latch = new CountDownLatch(numThreads);
85+
AtomicBoolean failedAlready = new AtomicBoolean(false);
86+
87+
for(int i = 0; i < numThreads; i++) {
88+
executor.execute(() -> {
89+
try {
90+
for (int j = 0; j < numRepeats; j++) {
91+
if(!failedAlready.get()) {
92+
notificationManager.addHandler(new TestNotificationHandler<>());
93+
notificationManager.send(new TestNotification("message1"));
94+
}
95+
}
96+
} catch (Exception e) {
97+
failedAlready.set(true);
98+
} finally {
99+
latch.countDown();
100+
}
101+
});
102+
}
103+
assertTrue(latch.await(10, TimeUnit.SECONDS));
104+
assertEquals(numThreads * numRepeats, notificationManager.size());
105+
}
73106
}

0 commit comments

Comments
 (0)