Skip to content

[DEV-865] Add support for PinnedByCorrelation consumer strategy in persistent subscription #341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

abstract class AbstractCreatePersistentSubscription<TPos, TSettings extends PersistentSubscriptionSettings> {
Expand All @@ -31,6 +32,8 @@ protected Persistent.CreateReq.Settings.Builder createSettings(){
@SuppressWarnings({"unchecked", "deprecation"})
public CompletableFuture execute() {
return this.client.runWithArgs(args -> {
Optional<ServerVersion> serverVersion = args.getServerVersion();

CompletableFuture result = new CompletableFuture();
PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client =
GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
Expand All @@ -56,8 +59,15 @@ public CompletableFuture execute() {
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.RoundRobin);
} else if (settings.getNamedConsumerStrategy().isPinned()) {
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.Pinned);
} else if (settings.getNamedConsumerStrategy().isPinnedByCorrelation()) {
if (serverVersion.get().isGreaterThan(21, 10, 0)) {
settingsBuilder.setConsumerStrategy(settings.getNamedConsumerStrategy().toString());
} else {
logger.error("Consumer strategy: '{}' is only available on server 21.10.1 and above", NamedConsumerStrategy.PINNED_BY_CORRELATION);
throw new UnsupportedFeatureException();
}
} else {
logger.error(String.format("Unsupported named consumer strategy: '%s'", settings.getNamedConsumerStrategy().toString()));
logger.error("Unsupported named consumer strategy: '{}'", settings.getNamedConsumerStrategy().toString());
throw new UnsupportedFeatureException();
}

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/kurrent/dbclient/NamedConsumerStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class NamedConsumerStrategy {
*/
public static final NamedConsumerStrategy PINNED = new NamedConsumerStrategy("Pinned");

/**
* This is similar to the Pinned strategy, but instead of using the source stream id to bucket the messages, it distributes the events based on the event's correlationId.
*/
public static final NamedConsumerStrategy PINNED_BY_CORRELATION = new NamedConsumerStrategy("PinnedByCorrelation");

NamedConsumerStrategy(String value) {
this.value = value;
}
Expand All @@ -53,6 +58,14 @@ public boolean isPinned() {
return isNamed("Pinned");
}


/**
* Checks if it's a <i>PinnedByCorrelation</i> strategy.
*/
public boolean isPinnedByCorrelation() {
return isNamed("PinnedByCorrelation");
}

/**
* Checks if the strategy's name matches the string passed as a parameter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import io.kurrent.dbclient.*;
import com.fasterxml.jackson.databind.json.JsonMapper;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.*;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@SuppressWarnings("unchecked")
Expand All @@ -29,7 +33,7 @@ default void testListPersistentSubscriptions() throws Throwable {
List<PersistentSubscriptionInfo> subs = client.listAll().get();

int count = 0;
for (PersistentSubscriptionInfo info: subs) {
for (PersistentSubscriptionInfo info : subs) {
if (info.getEventSource().equals(streamA) || info.getEventSource().equals(streamB)) {
count++;
}
Expand Down Expand Up @@ -122,7 +126,7 @@ default void testGetPersistentSubscriptionInfoToAll() throws Throwable {

@Test
@Order(6)
default void testGetPersistentSubscriptionInfoNotExisting() throws Throwable {
default void testGetPersistentSubscriptionInfoNotExisting() throws Throwable {
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
Optional<PersistentSubscriptionToStreamInfo> result = client.getInfoToStream(generateName(), generateName()).get();

Expand All @@ -147,6 +151,7 @@ default void testReplayParkedMessages() throws Throwable {

client.subscribeToStream(streamName, groupName, new PersistentSubscriptionListener() {
int count = 0;

@Override
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
if (count < 2)
Expand Down Expand Up @@ -206,6 +211,7 @@ default void testReplayParkedMessagesToAll() throws Throwable {

client.subscribeToAll(groupName, new PersistentSubscriptionListener() {
int count = 0;

@Override
public void onEvent(PersistentSubscription subscription, int retryCount, ResolvedEvent event) {
if (count < 2 && event.getOriginalEvent().getStreamId().equals(streamName))
Expand Down Expand Up @@ -268,9 +274,9 @@ default void testEncoding() throws Throwable {
break;
}

Assertions.assertTrue(info.isPresent());
Assertions.assertEquals(info.get().getEventSource(), streamName);
Assertions.assertEquals(info.get().getGroupName(), groupName);
Assertions.assertTrue(info.isPresent());
Assertions.assertEquals(info.get().getEventSource(), streamName);
Assertions.assertEquals(info.get().getGroupName(), groupName);
}

@Test
Expand All @@ -279,4 +285,53 @@ default void testRestartSubsystem() throws Throwable {
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
client.restartSubsystem().get();
}

@ParameterizedTest
@ArgumentsSource(NamedConsumerStrategyProvider.class)
default void testCreatePersistentSubscriptionToAllWithConsumerStrategies(NamedConsumerStrategy strategy) throws Throwable {
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
String groupName = String.format("/foo/%s/group", generateName());

CreatePersistentSubscriptionToAllOptions options = CreatePersistentSubscriptionToAllOptions.get()
.namedConsumerStrategy(strategy);

client.createToAll(groupName, options).get();

Optional<PersistentSubscriptionToAllInfo> result = client.getInfoToAll(groupName).get();
Assertions.assertTrue(result.isPresent(), "Subscription should be created");

Assertions.assertEquals(groupName, result.get().getGroupName());
Assertions.assertEquals(strategy.toString(), result.get().getSettings().getNamedConsumerStrategy().toString());
}

@ParameterizedTest
@ArgumentsSource(NamedConsumerStrategyProvider.class)
default void testCreatePersistentSubscriptionToStreamWithConsumerStrategies(NamedConsumerStrategy strategy) throws Throwable {
KurrentDBPersistentSubscriptionsClient client = getDefaultPersistentSubscriptionClient();
String streamName = String.format("/foo/%s/stream", generateName());
String groupName = String.format("/foo/%s/group", generateName());

CreatePersistentSubscriptionToStreamOptions options = CreatePersistentSubscriptionToStreamOptions.get()
.namedConsumerStrategy(strategy);

client.createToStream(streamName, groupName, options).get();

Optional<PersistentSubscriptionToStreamInfo> result = client.getInfoToStream(streamName, groupName).get();
Assertions.assertTrue(result.isPresent(), "Subscription should be created");

Assertions.assertEquals(groupName, result.get().getGroupName());
Assertions.assertEquals(strategy.toString(), result.get().getSettings().getNamedConsumerStrategy().toString());
}
}

class NamedConsumerStrategyProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.of(NamedConsumerStrategy.DISPATCH_TO_SINGLE),
Arguments.of(NamedConsumerStrategy.ROUND_ROBIN),
Arguments.of(NamedConsumerStrategy.PINNED),
Arguments.of(NamedConsumerStrategy.PINNED_BY_CORRELATION)
);
}
}
Loading