Skip to content

Commit 84bfb1c

Browse files
committed
cyclic barrier , synchronized queues
1 parent d541d54 commit 84bfb1c

File tree

118 files changed

+357
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

118 files changed

+357
-0
lines changed

.idea/.gitignore

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/Concurrency-Multithreading-and-Parallel-Computing-in-Java.iml

+9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/misc.xml

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/modules.xml

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/vcs.xml

+6
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.project

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<projectDescription>
3+
<name>Concurrency-Multithreading-and-Parallel-Computing-in-Java</name>
4+
<comment></comment>
5+
<projects>
6+
</projects>
7+
<buildSpec>
8+
</buildSpec>
9+
<natures>
10+
</natures>
11+
</projectDescription>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.ashfaq.dev.concepts.Concurrent_Collections;
2+
import java.util.concurrent.ArrayBlockingQueue;
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.ExecutorService;
5+
import java.util.concurrent.Executors;
6+
7+
8+
//A BlockingQueue is a type of queue that supports operations that wait for the queue to become non-empty when retrieving
9+
//an element and wait for space
10+
//to become available in the queue when storing an element. This is particularly useful in a producer-consumer scenario.
11+
class Producer implements Runnable {
12+
private final BlockingQueue<Integer> queue;
13+
14+
public Producer(BlockingQueue<Integer> queue) {
15+
this.queue = queue;
16+
}
17+
18+
@Override
19+
public void run() {
20+
try {
21+
int value = 0;
22+
while (true) {
23+
System.out.println("Producing " + value);
24+
queue.put(value);
25+
value++;
26+
Thread.sleep(1000); // Simulate time taken to produce
27+
}
28+
} catch (InterruptedException e) {
29+
Thread.currentThread().interrupt();
30+
System.out.println("Producer interrupted.");
31+
}
32+
}
33+
}
34+
35+
class Consumer implements Runnable {
36+
private final BlockingQueue<Integer> queue;
37+
38+
public Consumer(BlockingQueue<Integer> queue) {
39+
this.queue = queue;
40+
}
41+
42+
@Override
43+
public void run() {
44+
try {
45+
while (true) {
46+
Integer value = queue.take();
47+
System.out.println("Consuming " + value);
48+
Thread.sleep(1500); // Simulate time taken to consume
49+
}
50+
} catch (InterruptedException e) {
51+
Thread.currentThread().interrupt();
52+
System.out.println("Consumer interrupted.");
53+
}
54+
}
55+
}
56+
57+
public class BlockingQueueExample {
58+
public static void main(String[] args) {
59+
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
60+
ExecutorService executor = Executors.newFixedThreadPool(2);
61+
62+
executor.submit(new Producer(queue));
63+
executor.submit(new Consumer(queue));
64+
65+
// Shutdown the executor after some time for demonstration
66+
try {
67+
Thread.sleep(10000);
68+
} catch (InterruptedException e) {
69+
Thread.currentThread().interrupt();
70+
}
71+
72+
executor.shutdownNow();
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package org.ashfaq.dev.concepts.Concurrent_Collections;
2+
3+
import java.util.concurrent.BrokenBarrierException;
4+
import java.util.concurrent.CyclicBarrier;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
8+
class Task implements Runnable {
9+
private final CyclicBarrier barrier;
10+
private final int id;
11+
12+
public Task(CyclicBarrier barrier, int id) {
13+
this.barrier = barrier;
14+
this.id = id;
15+
}
16+
17+
@Override
18+
public void run() {
19+
try {
20+
System.out.println("Task " + id + " is performing part 1.");
21+
Thread.sleep((int) (Math.random() * 1000));
22+
System.out.println("Task " + id + " reached the barrier.");
23+
barrier.await(); // Wait for other threads
24+
System.out.println("Task " + id + " is performing part 2.");
25+
} catch (InterruptedException | BrokenBarrierException e) {
26+
e.printStackTrace();
27+
}
28+
}
29+
}
30+
31+
public class CyclicBarrierExample {
32+
public static void main(String[] args) {
33+
// Create a CyclicBarrier for 3 threads with a barrier action
34+
CyclicBarrier barrier = new CyclicBarrier(3, () -> System.out.println("All tasks have reached the barrier. Proceeding..."));
35+
36+
37+
// Create and start three tasks
38+
// Thread t1 = new Thread(new Task(barrier, 1));
39+
// Thread t2 = new Thread(new Task(barrier, 2));
40+
// Thread t3 = new Thread(new Task(barrier, 3));
41+
//
42+
// t1.start();
43+
// t2.start();
44+
// t3.start();
45+
// OP
46+
//Task 2 is performing part 1.
47+
// Task 1 is performing part 1.
48+
// Task 3 is performing part 1.
49+
// Task 2 reached the barrier.
50+
// Task 1 reached the barrier.
51+
// Task 3 reached the barrier.
52+
// All tasks have reached the barrier. Proceeding...
53+
// Task 2 is performing part 2.
54+
// Task 1 is performing part 2.
55+
// Task 3 is performing part 2.
56+
57+
58+
// Example Using Executors and CyclicBarrier
59+
60+
61+
int numberOfThreads = 3;
62+
// Create an ExecutorService with a fixed thread pool
63+
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
64+
65+
// Submit tasks to the executor service
66+
for (int i = 1; i <= numberOfThreads; i++) {
67+
executorService.submit(new Task(barrier, i));
68+
}
69+
70+
// Shutdown the executor service
71+
executorService.shutdown();
72+
73+
// OP
74+
// Task 2 is performing part 1.
75+
// Task 3 is performing part 1.
76+
// Task 1 is performing part 1.
77+
// Task 1 reached the barrier.
78+
// Task 3 reached the barrier.
79+
// Task 2 reached the barrier.
80+
// All tasks have reached the barrier. Proceeding...
81+
// Task 3 is performing part 2.
82+
// Task 2 is performing part 2.
83+
// Task 1 is performing part 2.
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package org.ashfaq.dev.concepts.Concurrent_Collections;
2+
3+
import java.util.concurrent.DelayQueue;
4+
import java.util.concurrent.Delayed;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.TimeUnit;
8+
9+
class DelayedTask implements Delayed {
10+
private final String name;
11+
private final long delayTime;
12+
private final long expireTime;
13+
14+
public DelayedTask(String name, long delayTime) {
15+
this.name = name;
16+
this.delayTime = delayTime;
17+
this.expireTime = System.currentTimeMillis() + delayTime;
18+
}
19+
20+
@Override
21+
public long getDelay(TimeUnit unit) {
22+
long diff = expireTime - System.currentTimeMillis();
23+
return unit.convert(diff, TimeUnit.MILLISECONDS);
24+
}
25+
26+
@Override
27+
public int compareTo(Delayed o) {
28+
if (this.expireTime < ((DelayedTask) o).expireTime) {
29+
return -1;
30+
}
31+
if (this.expireTime > ((DelayedTask) o).expireTime) {
32+
return 1;
33+
}
34+
return 0;
35+
}
36+
37+
public void execute() {
38+
System.out.println("Executing task: " + name + " at " + System.currentTimeMillis());
39+
}
40+
}
41+
42+
public class DelayQueueExample {
43+
public static void main(String[] args) {
44+
DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
45+
ExecutorService executor = Executors.newFixedThreadPool(1);
46+
47+
// Add tasks to the delay queue
48+
delayQueue.put(new DelayedTask("Task1", 5000)); // 5 seconds delay
49+
delayQueue.put(new DelayedTask("Task2", 3000)); // 3 seconds delay
50+
delayQueue.put(new DelayedTask("Task3", 10000)); // 10 seconds delay
51+
52+
// Consumer thread to execute tasks after their delay
53+
executor.submit(() -> {
54+
while (!Thread.currentThread().isInterrupted()) {
55+
try {
56+
DelayedTask task = delayQueue.take(); // Blocks until a task's delay has expired
57+
task.execute();
58+
} catch (InterruptedException e) {
59+
Thread.currentThread().interrupt();
60+
}
61+
}
62+
});
63+
64+
// For demonstration, we'll shut down the executor after some time
65+
try {
66+
Thread.sleep(15000); // Let it run for 15 seconds
67+
} catch (InterruptedException e) {
68+
Thread.currentThread().interrupt();
69+
}
70+
71+
executor.shutdownNow();
72+
}
73+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.ashfaq.dev.concepts.Concurrent_Collections;
2+
3+
import java.util.concurrent.BlockingQueue;
4+
import java.util.concurrent.PriorityBlockingQueue;
5+
6+
class FirstWorker implements Runnable {
7+
8+
private BlockingQueue<String> queue;
9+
10+
public FirstWorker(BlockingQueue<String> queue) {
11+
this.queue = queue;
12+
}
13+
14+
@Override
15+
public void run() {
16+
try {
17+
18+
queue.put("A");
19+
queue.put("B");
20+
queue.put("C");
21+
Thread.sleep(2000);
22+
queue.put("D");
23+
Thread.sleep(2000);
24+
queue.put("E");
25+
queue.put("F");
26+
27+
} catch (InterruptedException e) {
28+
}
29+
}
30+
}
31+
32+
class SecondWorker implements Runnable {
33+
34+
private BlockingQueue<String> queue;
35+
36+
public SecondWorker(BlockingQueue<String> queue) {
37+
this.queue = queue;
38+
}
39+
40+
@Override
41+
public void run() {
42+
try {
43+
44+
Thread.sleep(5000);
45+
System.out.println(queue.take());
46+
Thread.sleep(2000);
47+
System.out.println(queue.take());
48+
Thread.sleep(2000);
49+
System.out.println(queue.take());
50+
System.out.println(queue.take());
51+
System.out.println(queue.take());
52+
System.out.println(queue.take());
53+
54+
} catch (InterruptedException e) {
55+
}
56+
}
57+
}
58+
59+
60+
public class PriorityQueue {
61+
62+
public static void main(String[] args) {
63+
64+
BlockingQueue<String> queue = new PriorityBlockingQueue<>();
65+
FirstWorker firstWorker = new FirstWorker(queue);
66+
SecondWorker secondWorker = new SecondWorker(queue);
67+
68+
Thread t1 = new Thread(firstWorker);
69+
Thread t2 = new Thread(secondWorker);
70+
71+
t1.start();
72+
t2.start();
73+
74+
75+
76+
77+
78+
79+
80+
}
81+
82+
}

0 commit comments

Comments
 (0)