From d556b2fe2e8adcc0c18ebb4ed4a197100c393826 Mon Sep 17 00:00:00 2001 From: William Chong Date: Wed, 20 Aug 2025 11:34:21 +0000 Subject: [PATCH] feat: add support for PinnedByCorrelation consumer strategy in persistent subscriptions --- .../AbstractCreatePersistentSubscription.java | 12 +++- .../dbclient/NamedConsumerStrategy.java | 13 ++++ ...PersistentSubscriptionManagementTests.java | 65 +++++++++++++++++-- 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/kurrent/dbclient/AbstractCreatePersistentSubscription.java b/src/main/java/io/kurrent/dbclient/AbstractCreatePersistentSubscription.java index a7d24ade..4e9b1dd0 100644 --- a/src/main/java/io/kurrent/dbclient/AbstractCreatePersistentSubscription.java +++ b/src/main/java/io/kurrent/dbclient/AbstractCreatePersistentSubscription.java @@ -5,6 +5,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.CompletableFuture; abstract class AbstractCreatePersistentSubscription { @@ -31,6 +32,8 @@ protected Persistent.CreateReq.Settings.Builder createSettings(){ @SuppressWarnings({"unchecked", "deprecation"}) public CompletableFuture execute() { return this.client.runWithArgs(args -> { + Optional serverVersion = args.getServerVersion(); + CompletableFuture result = new CompletableFuture(); PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client = GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options); @@ -56,8 +59,15 @@ public CompletableFuture execute() { settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.RoundRobin); } else if (settings.getNamedConsumerStrategy().isPinned()) { settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.Pinned); + } else if (settings.getNamedConsumerStrategy().isPinnedByCorrelation()) { + if (serverVersion.get().isGreaterThan(21, 10, 0)) { + settingsBuilder.setConsumerStrategy(settings.getNamedConsumerStrategy().toString()); + } else { + logger.error("Consumer strategy: '{}' is only available on server 21.10.1 and above", NamedConsumerStrategy.PINNED_BY_CORRELATION); + throw new UnsupportedFeatureException(); + } } else { - logger.error(String.format("Unsupported named consumer strategy: '%s'", settings.getNamedConsumerStrategy().toString())); + logger.error("Unsupported named consumer strategy: '{}'", settings.getNamedConsumerStrategy().toString()); throw new UnsupportedFeatureException(); } diff --git a/src/main/java/io/kurrent/dbclient/NamedConsumerStrategy.java b/src/main/java/io/kurrent/dbclient/NamedConsumerStrategy.java index 78275d92..38f8aaa7 100644 --- a/src/main/java/io/kurrent/dbclient/NamedConsumerStrategy.java +++ b/src/main/java/io/kurrent/dbclient/NamedConsumerStrategy.java @@ -28,6 +28,11 @@ public class NamedConsumerStrategy { */ public static final NamedConsumerStrategy PINNED = new NamedConsumerStrategy("Pinned"); + /** + * This is similar to the Pinned strategy, but instead of using the source stream id to bucket the messages, it distributes the events based on the event's correlationId. + */ + public static final NamedConsumerStrategy PINNED_BY_CORRELATION = new NamedConsumerStrategy("PinnedByCorrelation"); + NamedConsumerStrategy(String value) { this.value = value; } @@ -53,6 +58,14 @@ public boolean isPinned() { return isNamed("Pinned"); } + + /** + * Checks if it's a PinnedByCorrelation strategy. + */ + public boolean isPinnedByCorrelation() { + return isNamed("PinnedByCorrelation"); + } + /** * Checks if the strategy's name matches the string passed as a parameter. */ diff --git a/src/test/java/io/kurrent/dbclient/persistentsubscriptions/PersistentSubscriptionManagementTests.java b/src/test/java/io/kurrent/dbclient/persistentsubscriptions/PersistentSubscriptionManagementTests.java index 0fe9ce63..c87a2d23 100644 --- a/src/test/java/io/kurrent/dbclient/persistentsubscriptions/PersistentSubscriptionManagementTests.java +++ b/src/test/java/io/kurrent/dbclient/persistentsubscriptions/PersistentSubscriptionManagementTests.java @@ -3,11 +3,15 @@ import io.kurrent.dbclient.*; import com.fasterxml.jackson.databind.json.JsonMapper; import org.junit.jupiter.api.*; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @SuppressWarnings("unchecked") @@ -29,7 +33,7 @@ default void testListPersistentSubscriptions() throws Throwable { List subs = client.listAll().get(); int count = 0; - for (PersistentSubscriptionInfo info: subs) { + for (PersistentSubscriptionInfo info : subs) { if (info.getEventSource().equals(streamA) || info.getEventSource().equals(streamB)) { count++; } @@ -122,7 +126,7 @@ default void testGetPersistentSubscriptionInfoToAll() throws Throwable { @Test @Order(6) - default void testGetPersistentSubscriptionInfoNotExisting() throws Throwable { + default void testGetPersistentSubscriptionInfoNotExisting() throws Throwable { KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient(); Optional result = client.getInfoToStream(generateName(), generateName()).get(); @@ -147,6 +151,7 @@ default void testReplayParkedMessages() throws Throwable { client.subscribeToStream(streamName, groupName, new PersistentSubscriptionListener() { int count = 0; + @Override public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) { if (count < 2) @@ -206,6 +211,7 @@ default void testReplayParkedMessagesToAll() throws Throwable { client.subscribeToAll(groupName, new PersistentSubscriptionListener() { int count = 0; + @Override public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) { if (count < 2 && event.getOriginalEvent().getStreamId().equals(streamName)) @@ -268,9 +274,9 @@ default void testEncoding() throws Throwable { break; } - Assertions.assertTrue(info.isPresent()); - Assertions.assertEquals(info.get().getEventSource(), streamName); - Assertions.assertEquals(info.get().getGroupName(), groupName); + Assertions.assertTrue(info.isPresent()); + Assertions.assertEquals(info.get().getEventSource(), streamName); + Assertions.assertEquals(info.get().getGroupName(), groupName); } @Test @@ -279,4 +285,53 @@ default void testRestartSubsystem() throws Throwable { KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient(); client.restartSubsystem().get(); } + + @ParameterizedTest + @ArgumentsSource(NamedConsumerStrategyProvider.class) + default void testCreatePersistentSubscriptionToAllWithConsumerStrategies(NamedConsumerStrategy strategy) throws Throwable { + KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient(); + String groupName = String.format("/foo/%s/group", generateName()); + + CreatePersistentSubscriptionToAllOptions options = CreatePersistentSubscriptionToAllOptions.get() + .namedConsumerStrategy(strategy); + + client.createToAll(groupName, options).get(); + + Optional result = client.getInfoToAll(groupName).get(); + Assertions.assertTrue(result.isPresent(), "Subscription should be created"); + + Assertions.assertEquals(groupName, result.get().getGroupName()); + Assertions.assertEquals(strategy.toString(), result.get().getSettings().getNamedConsumerStrategy().toString()); + } + + @ParameterizedTest + @ArgumentsSource(NamedConsumerStrategyProvider.class) + default void testCreatePersistentSubscriptionToStreamWithConsumerStrategies(NamedConsumerStrategy strategy) throws Throwable { + KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient(); + String streamName = String.format("/foo/%s/stream", generateName()); + String groupName = String.format("/foo/%s/group", generateName()); + + CreatePersistentSubscriptionToStreamOptions options = CreatePersistentSubscriptionToStreamOptions.get() + .namedConsumerStrategy(strategy); + + client.createToStream(streamName, groupName, options).get(); + + Optional result = client.getInfoToStream(streamName, groupName).get(); + Assertions.assertTrue(result.isPresent(), "Subscription should be created"); + + Assertions.assertEquals(groupName, result.get().getGroupName()); + Assertions.assertEquals(strategy.toString(), result.get().getSettings().getNamedConsumerStrategy().toString()); + } } + +class NamedConsumerStrategyProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext context) { + return Stream.of( + Arguments.of(NamedConsumerStrategy.DISPATCH_TO_SINGLE), + Arguments.of(NamedConsumerStrategy.ROUND_ROBIN), + Arguments.of(NamedConsumerStrategy.PINNED), + Arguments.of(NamedConsumerStrategy.PINNED_BY_CORRELATION) + ); + } +} \ No newline at end of file