Skip to content

Updated all Dependencies #678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import java.net.URI

plugins {
id("java-library")
id("com.github.johnrengelman.shadow")
id("biz.aQute.bnd.builder")
id("com.github.johnrengelman.shadow") version "8.1.1"
id("biz.aQute.bnd.builder") version "6.4.0"
id("maven-publish")
id("io.github.gradle-nexus.publish-plugin")
id("signing")
id("com.github.hierynomus.license")
id("com.github.hierynomus.license") version "0.16.1"
id("pmd")
id("com.github.sgtsilvio.gradle.utf8")
id("com.github.sgtsilvio.gradle.metadata")
Expand Down Expand Up @@ -54,8 +56,8 @@ allprojects {
allprojects {
plugins.withId("java") {
java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}

plugins.apply("com.github.sgtsilvio.gradle.utf8")
Expand All @@ -66,7 +68,7 @@ allprojects {
/* ******************** dependencies ******************** */

dependencies {
api("io.reactivex.rxjava2:rxjava:${property("rxjava.version")}")
api("io.reactivex.rxjava3:rxjava:${property("rxjava.version")}")
api("org.reactivestreams:reactive-streams:${property("reactive-streams.version")}")

implementation("io.netty:netty-buffer:${property("netty.version")}")
Expand All @@ -77,6 +79,7 @@ dependencies {
implementation("org.jctools:jctools-core:${property("jctools.version")}")
implementation("org.jetbrains:annotations:${property("annotations.version")}")
implementation("com.google.dagger:dagger:${property("dagger.version")}")
implementation("io.projectreactor:reactor-core:${property("reactor-core.version")}")//3.5.7

compileOnly("org.slf4j:slf4j-api:${property("slf4j.version")}")

Expand Down Expand Up @@ -122,8 +125,8 @@ dependencies {
testImplementation("nl.jqno.equalsverifier:equalsverifier:${property("equalsverifier.version")}")
testImplementation("org.mockito:mockito-core:${property("mockito.version")}")
testImplementation("com.google.guava:guava:${property("guava.version")}")
testImplementation("org.bouncycastle:bcprov-jdk15on:${property("bouncycastle.version")}")
testImplementation("org.bouncycastle:bcpkix-jdk15on:${property("bouncycastle.version")}")
testImplementation("org.bouncycastle:bcprov-jdk18on:${property("bouncycastle.version")}")
testImplementation("org.bouncycastle:bcpkix-jdk18on:${property("bouncycastle.version")}")
testImplementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:${property("paho.version")}")
testRuntimeOnly("org.slf4j:slf4j-simple:${property("slf4j.version")}")
}
Expand All @@ -143,7 +146,9 @@ val integrationTestRuntimeOnly: Configuration by configurations.getting {
}

dependencies {
integrationTestImplementation("com.hivemq:hivemq-testcontainer-junit5:${property("hivemq-testcontainer.version")}")
integrationTestImplementation("org.testcontainers:testcontainers:${property("hivemq-testcontainer.version")}")
integrationTestImplementation("org.testcontainers:hivemq:${property("hivemq-testcontainer.version")}")

integrationTestImplementation("com.hivemq:hivemq-extension-sdk:${property("hivemq-extension-sdk.version")}")
integrationTestImplementation("org.awaitility:awaitility:${property("awaitility.version")}")
}
Expand All @@ -166,7 +171,9 @@ val integrationTest by tasks.registering(Test::class) {
})
}

tasks.check { dependsOn(integrationTest) }
tasks.check {

dependsOn(integrationTest) }

/* ******************** jars ******************** */

Expand Down Expand Up @@ -324,7 +331,8 @@ allprojects {
license {
header = rootDir.resolve("HEADER")
mapping("java", "SLASHSTAR_STYLE")
}
headerURI = URI("https://raw.githubusercontent.com/hivemq/hivemq-mqtt-client/refs/heads/master/HEADER")
}
}

allprojects {
Expand All @@ -339,8 +347,8 @@ allprojects {
}
}

apply("$rootDir/gradle/japicc.gradle.kts")

apply("$rootDir/gradle/japicc.gradle.kts")

/* ******************** build cache ******************** */

Expand Down
19 changes: 10 additions & 9 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,36 @@ prevVersion=1.3.4
#
# main dependencies
#
rxjava.version=2.2.21
rxjava.version=3.1.10
reactive-streams.version=1.0.4
netty.version=4.1.119.Final
jctools.version=2.1.2
jctools.version=4.0.5
annotations.version=26.0.2
dagger.version=2.27
dagger.version=2.56.1
slf4j.version=1.7.36
reactor.version=3.3.4.RELEASE
reactor-adapter.version=3.3.3.RELEASE
reactor-core.version=3.5.7
#
# test dependencies
#
junit-jupiter.version=5.5.2
junit-jupiter.version=5.8.1
equalsverifier.version=3.17.5
mockito.version=2.28.2
guava.version=24.1.1-jre
bouncycastle.version=1.59
guava.version=33.3.1-jre
bouncycastle.version=1.80
paho.version=1.2.5
#
# integration test dependencies
#
hivemq-testcontainer.version=2.0.0
hivemq-extension-sdk.version=4.7.2
hivemq-testcontainer.version=1.20.6
hivemq-extension-sdk.version=4.38.0
awaitility.version=4.2.2
#
# plugins
#
plugin.shadow.version=6.1.0
plugin.bnd.version=5.3.0
plugin.bnd.version=6.4.0
plugin.nexus-publish.version=1.3.0
plugin.license.version=0.15.0
plugin.utf8.version=0.1.0
Expand Down
3 changes: 2 additions & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#Fri Apr 11 18:00:08 CEST 2025
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.8.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
import com.hivemq.client.mqtt.mqtt3.reactor.Mqtt3ReactorClient;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import io.reactivex.Flowable;
import io.reactivex.rxjava3.core.Flowable;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.adapter.rxjava.RxJava2Adapter;

import reactor.adapter.rxjava.RxJava3Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -59,7 +60,7 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {

@Override
public @NotNull Mono<Mqtt3ConnAck> connect(final @NotNull Mqtt3Connect connect) {
return RxJava2Adapter.singleToMono(delegate.connect(connect));
return RxJava3Adapter.singleToMono(delegate.connect(connect));
}

@Override
Expand All @@ -69,7 +70,7 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {

@Override
public @NotNull Mono<Mqtt3SubAck> subscribe(final @NotNull Mqtt3Subscribe subscribe) {
return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe));
return RxJava3Adapter.singleToMono(delegate.subscribe(subscribe));
}

@Override
Expand Down Expand Up @@ -105,12 +106,12 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {
public @NotNull Flux<Mqtt3Publish> publishes(
final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) {

return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
return RxJava3Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
}

@Override
public @NotNull Mono<Void> unsubscribe(final @NotNull Mqtt3Unsubscribe unsubscribe) {
return RxJava2Adapter.completableToMono(delegate.unsubscribe(unsubscribe));
return RxJava3Adapter.completableToMono(delegate.unsubscribe(unsubscribe));
}

@Override
Expand All @@ -120,12 +121,12 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {

@Override
public @NotNull Flux<Mqtt3PublishResult> publish(final @NotNull Publisher<Mqtt3Publish> publisher) {
return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
return RxJava3Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
}

@Override
public @NotNull Mono<Void> disconnect() {
return RxJava2Adapter.completableToMono(delegate.disconnect());
return RxJava3Adapter.completableToMono(delegate.disconnect());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
import com.hivemq.client.rx.reactor.FluxWithSingle;
import io.reactivex.Flowable;

import io.reactivex.rxjava3.core.Flowable;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Publisher;
import reactor.adapter.rxjava.RxJava2Adapter;

import reactor.adapter.rxjava.RxJava3Adapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -63,7 +65,7 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {

@Override
public @NotNull Mono<Mqtt5ConnAck> connect(final @NotNull Mqtt5Connect connect) {
return RxJava2Adapter.singleToMono(delegate.connect(connect));
return RxJava3Adapter.singleToMono(delegate.connect(connect));
}

@Override
Expand All @@ -73,7 +75,7 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {

@Override
public @NotNull Mono<Mqtt5SubAck> subscribe(final @NotNull Mqtt5Subscribe subscribe) {
return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe));
return RxJava3Adapter.singleToMono(delegate.subscribe(subscribe));
}

@Override
Expand Down Expand Up @@ -109,12 +111,12 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {
public @NotNull Flux<Mqtt5Publish> publishes(
final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) {

return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
return RxJava3Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
}

@Override
public @NotNull Mono<Mqtt5UnsubAck> unsubscribe(final @NotNull Mqtt5Unsubscribe unsubscribe) {
return RxJava2Adapter.singleToMono(delegate.unsubscribe(unsubscribe));
return RxJava3Adapter.singleToMono(delegate.unsubscribe(unsubscribe));
}

@Override
Expand All @@ -124,12 +126,12 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {

@Override
public @NotNull Flux<Mqtt5PublishResult> publish(final @NotNull Publisher<Mqtt5Publish> publisher) {
return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
return RxJava3Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
}

@Override
public @NotNull Mono<Void> reauth() {
return RxJava2Adapter.completableToMono(delegate.reauth());
return RxJava3Adapter.completableToMono(delegate.reauth());
}

@Override
Expand All @@ -139,7 +141,7 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {

@Override
public @NotNull Mono<Void> disconnect(final @NotNull Mqtt5Disconnect disconnect) {
return RxJava2Adapter.completableToMono(delegate.disconnect(disconnect));
return RxJava3Adapter.completableToMono(delegate.disconnect(disconnect));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,29 @@
import com.hivemq.extension.sdk.api.parameter.ExtensionStopOutput;
import com.hivemq.extension.sdk.api.services.Services;
import com.hivemq.extension.sdk.api.services.intializer.ClientInitializer;
import com.hivemq.testcontainer.core.HiveMQExtension;
import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension;

import org.jetbrains.annotations.NotNull;
import org.junit.Rule;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.hivemq.HiveMQContainer;
import org.testcontainers.hivemq.HiveMQExtension;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
* @author Yannick Weber
*/
public class Mqtt3SendMaximumIT {

class Mqtt3SendMaximumIT {

public static final int RECEIVE_MAXIMUM = 10;
public static final @NotNull HiveMQExtension NO_PUBACK_EXTENSION = HiveMQExtension.builder()
Expand All @@ -61,14 +66,53 @@ public class Mqtt3SendMaximumIT {
.mainClass(NoPubackExtension.class)
.build();

@RegisterExtension
public final @NotNull HiveMQTestContainerExtension hivemq =
new HiveMQTestContainerExtension().withExtension(NO_PUBACK_EXTENSION)
.withHiveMQConfig(MountableFile.forClasspathResource("/config.xml"));
@Rule
public HiveMQContainer hivemq = new HiveMQContainer(DockerImageName.parse("hivemq/hivemq-ce")
.withTag("2021.3"))
.withExposedPorts(1883)
.withExtension(NO_PUBACK_EXTENSION)
.withHiveMQConfig(MountableFile.forClasspathResource("/config.xml"));


public void StartContainer(){
hivemq.start();
hivemq.setExposedPorts(Collections.singletonList(hivemq.getMqttPort()));
}

@Test
void mqtt3_sendMaximum_applied() throws InterruptedException {
public void test_mqtt() throws Exception {
StartContainer();
var publisher = Mqtt5Client.builder()
.serverPort(hivemq.getMqttPort()) // 3
.serverHost(hivemq.getHost())
.identifier("publisher")
.buildBlocking();

publisher.connect();

var subscriber = Mqtt5Client.builder()
.serverPort(hivemq.getMqttPort()) // 3
.serverHost(hivemq.getHost())
.identifier("subscriber")
.buildBlocking();

var publishes = subscriber.publishes(MqttGlobalPublishFilter.ALL);
subscriber.connect();
subscriber.subscribeWith().topicFilter("topic/test").send();

publisher.publishWith()
.topic("topic/test")
.payload("Hello World!".getBytes()).send();

var receive = publishes.receive();

assertNotNull(receive); // 4
assertEquals("Hello World!", new String(receive.getPayloadAsBytes())); // 4
}

@Test
void mqtt3_sendMaximum_applied() throws InterruptedException {
StartContainer();
final Mqtt3Client publisher = Mqtt3Client.builder().serverPort(hivemq.getMqttPort()).build();
publisher.toBlocking().connectWith().restrictions().sendMaximum(RECEIVE_MAXIMUM).applyRestrictions().send();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
import io.reactivex.FlowableSubscriber;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.reactivestreams.Subscription;
Expand Down
Loading