diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 92ce26e8..42691558 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -20,7 +20,7 @@ jobs: strategy: fail-fast: false matrix: - test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend] + test: [Streams, PersistentSubscriptions, Telemetry, MultiStreamAppend, Connection] runs-on: ubuntu-latest steps: diff --git a/build.gradle b/build.gradle index 5b51f211..d96972a3 100644 --- a/build.gradle +++ b/build.gradle @@ -132,6 +132,7 @@ tasks.register("singleNodeTests", Test) { include("**/PersistentSubscriptionsTests.class") include("**/TelemetryTests.class") include("**/ConnectionShutdownTests.class") + include("**/ConnectionTests.class") } } @@ -155,6 +156,7 @@ tasks.register("clusterTests", Test) { include("**/StreamsTests.class") include("**/PersistentSubscriptionsTests.class") include("**/ConnectionShutdownTests.class") + include("**/ConnectionTests.class") } } diff --git a/src/test/java/io/kurrent/dbclient/Main.java b/src/test/java/io/kurrent/dbclient/Main.java new file mode 100644 index 00000000..61d7eadc --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/Main.java @@ -0,0 +1,57 @@ +package io.kurrent.dbclient; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; + +public class Main { + public static void main(String[] args) throws InterruptedException, ExecutionException { + System.setProperty("org.slf4j.simpleLogger.log.io.kurrent.dbclient", "trace"); + System.setProperty("org.slf4j.simpleLogger.log.io.netty", "trace"); + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.dateTimeFormat", "HH:mm:ss.SSS"); + + KurrentDBClientSettings settings = KurrentDBConnectionString.parseOrThrow("kurrentdb://localhost:2113?tls=false"); + KurrentDBClient client = KurrentDBClient.create(settings); + + ReadAllOptions options = ReadAllOptions.get() + .forwards() + .fromStart(); + + Publisher publisher = client.readAllReactive(options); + + final CountDownLatch latch = new CountDownLatch(1); + publisher.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + } + + @Override + public void onNext(ReadMessage readMessage) { + RecordedEvent event = readMessage.getEvent().getOriginalEvent(); + + if (!event.getEventType().startsWith("$")) { + System.out.println("Event: " + event.getEventType()); + } + } + + @Override + public void onError(Throwable throwable) { + System.out.println("Error type: " + throwable.getClass().getSimpleName()); + System.out.println("Error message: " + throwable.getMessage()); + latch.countDown(); + } + + @Override + public void onComplete() { + latch.countDown(); + } + }); + + latch.await(); + } +} \ No newline at end of file diff --git a/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java b/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java index 0afe060e..57c98461 100644 --- a/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java +++ b/src/test/java/io/kurrent/dbclient/connection/ConnectionShutdownTests.java @@ -1,17 +1,55 @@ package io.kurrent.dbclient.connection; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.kurrent.dbclient.*; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; +import io.kurrent.dbclient.databases.DockerContainerDatabase; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class ConnectionShutdownTests { + static private DockerContainerDatabase database; + static private Logger logger; + + @BeforeEach + public void setup() { + database = (DockerContainerDatabase) DatabaseFactory.spawn(); + logger = LoggerFactory.getLogger(PersistentSubscriptionsTests.class); + } + + @AfterEach + public void cleanup() { + unpauseDatabase(); + database.dispose(); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) + public void testCallTerminationWhenServerUnreachable() throws Throwable { + KurrentDBClient client = database.defaultClient(); + + ReadResult initialResult = client.readAll(ReadAllOptions.get()).get(5, TimeUnit.SECONDS); + + Assertions.assertFalse(initialResult.getEvents().isEmpty()); + + pauseDatabase(); + + ExecutionException e = Assertions.assertThrows(ExecutionException.class, () -> + client.readAll().get(30, TimeUnit.SECONDS) + ); + + StatusRuntimeException status = (StatusRuntimeException) e.getCause(); + Assertions.assertEquals(Status.Code.UNAVAILABLE, status.getStatus().getCode()); + } + @Test @Timeout(value = 1, unit = TimeUnit.MINUTES) public void testDatabaseCleanupWithActiveSubscription() throws Throwable { @@ -59,4 +97,14 @@ public void onCancelled(Subscription subscription, Throwable throwable) { Throwable ex = reconnectError.get(); Assertions.assertInstanceOf(ConnectionShutdownException.class, ex.getCause()); } + + static void pauseDatabase() { + logger.debug("Pausing database container: {}", database.getContainerId()); + database.getDockerClient().pauseContainerCmd(database.getContainerId()).exec(); + } + + static void unpauseDatabase() { + logger.debug("Unpausing database container: {}", database.getContainerId()); + database.getDockerClient().unpauseContainerCmd(database.getContainerId()).exec(); + } } diff --git a/src/test/java/io/kurrent/dbclient/connection/ConnectionTests.java b/src/test/java/io/kurrent/dbclient/connection/ConnectionTests.java new file mode 100644 index 00000000..511b0ee6 --- /dev/null +++ b/src/test/java/io/kurrent/dbclient/connection/ConnectionTests.java @@ -0,0 +1,58 @@ +package io.kurrent.dbclient.connection; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.kurrent.dbclient.*; +import io.kurrent.dbclient.databases.DockerContainerDatabase; +import org.junit.jupiter.api.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +public class ConnectionTests { + static private DockerContainerDatabase database; + static private Logger logger; + + @BeforeEach + public void setup() { + database = (DockerContainerDatabase) DatabaseFactory.spawn(); + logger = LoggerFactory.getLogger(PersistentSubscriptionsTests.class); + } + + @AfterEach + public void cleanup() { + unpauseDatabase(); + database.dispose(); + } + + @Test + @Timeout(value = 30, unit = TimeUnit.SECONDS) + public void testCallTerminationWhenServerUnreachable() throws Throwable { + KurrentDBClient client = database.defaultClient(); + + ReadResult initialResult = client.readAll(ReadAllOptions.get()).get(5, TimeUnit.SECONDS); + + Assertions.assertFalse(initialResult.getEvents().isEmpty()); + + pauseDatabase(); + + ExecutionException e = Assertions.assertThrows(ExecutionException.class, () -> + client.readAll().get(30, TimeUnit.SECONDS) + ); + + StatusRuntimeException status = (StatusRuntimeException) e.getCause(); + Assertions.assertEquals(Status.Code.UNAVAILABLE, status.getStatus().getCode()); + } + + static void pauseDatabase() { + logger.debug("Pausing database container: {}", database.getContainerId()); + database.getDockerClient().pauseContainerCmd(database.getContainerId()).exec(); + } + + static void unpauseDatabase() { + logger.debug("Unpausing database container: {}", database.getContainerId()); + database.getDockerClient().unpauseContainerCmd(database.getContainerId()).exec(); + } +}