diff --git a/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java b/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java new file mode 100644 index 00000000..af4415bf --- /dev/null +++ b/src/main/java/io/kurrent/dbclient/DynamicValueMapper.java @@ -0,0 +1,120 @@ +package io.kurrent.dbclient; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import com.google.protobuf.Duration; +import io.kurrentdb.protocol.DynamicValue; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utility class for converting Java objects to DynamicValue protobuf messages. + */ +public class DynamicValueMapper { + private static final JsonMapper objectMapper = new JsonMapper(); + + /** + * Converts JSON byte array metadata to a Map of DynamicValue objects. + * + * @param jsonMetadata the source metadata as JSON bytes + * @return a map with DynamicValue objects + */ + public static Map mapJsonToDynamicValueMap(byte[] jsonMetadata) { + if (jsonMetadata == null || jsonMetadata.length == 0) + return Collections.emptyMap(); + + try { + Map metadata = objectMapper.readValue(jsonMetadata, new TypeReference>() { + }); + return mapToDynamicValueMap(metadata); + } catch (Exception e) { + return Collections.emptyMap(); + } + } + + /** + * Converts a Map of metadata to a Map of DynamicValue objects. + * + * @param metadata the source metadata map + * @return a map with DynamicValue objects + */ + public static Map mapToDynamicValueMap(Map metadata) { + if (metadata == null) { + return Collections.emptyMap(); + } + + return metadata.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> mapToDynamicValue(entry.getValue()) + )); + } + + /** + * Converts a Java object to a DynamicValue protobuf message. + * + * @param source the source object + * @return the corresponding DynamicValue + */ + public static DynamicValue mapToDynamicValue(Object source) { + if (source == null) { + return DynamicValue.newBuilder() + .setNullValue(com.google.protobuf.NullValue.NULL_VALUE) + .build(); + } + + DynamicValue.Builder builder = DynamicValue.newBuilder(); + + if (source instanceof String) { + return builder.setStringValue((String) source).build(); + } else if (source instanceof Boolean) { + return builder.setBooleanValue((Boolean) source).build(); + } else if (source instanceof Integer) { + return builder.setInt32Value((Integer) source).build(); + } else if (source instanceof Long) { + return builder.setInt64Value((Long) source).build(); + } else if (source instanceof Float) { + return builder.setFloatValue((Float) source).build(); + } else if (source instanceof Double) { + return builder.setDoubleValue((Double) source).build(); + } else if (source instanceof Instant) { + Instant instant = (Instant) source; + return builder.setTimestampValue(Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build()).build(); + } else if (source instanceof LocalDateTime) { + LocalDateTime localDateTime = (LocalDateTime) source; + Instant instant = localDateTime.atZone(java.time.ZoneOffset.UTC).toInstant(); + return builder.setTimestampValue(Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build()).build(); + } else if (source instanceof ZonedDateTime) { + ZonedDateTime zonedDateTime = (ZonedDateTime) source; + Instant instant = zonedDateTime.toInstant(); + return builder.setTimestampValue(Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build()).build(); + } else if (source instanceof java.time.Duration) { + java.time.Duration duration = (java.time.Duration) source; + return builder.setDurationValue(Duration.newBuilder() + .setSeconds(duration.getSeconds()) + .setNanos(duration.getNano()) + .build()).build(); + } else if (source instanceof byte[]) { + return builder.setBytesValue(ByteString.copyFrom((byte[]) source)).build(); + } else { + // For any other type, convert to string + return builder.setStringValue(source.toString()).build(); + } + } +} diff --git a/src/main/java/io/kurrent/dbclient/EventData.java b/src/main/java/io/kurrent/dbclient/EventData.java index ee94fe96..864ce854 100644 --- a/src/main/java/io/kurrent/dbclient/EventData.java +++ b/src/main/java/io/kurrent/dbclient/EventData.java @@ -98,3 +98,4 @@ public static EventDataBuilder builderAsBinary(UUID eventId, String eventType, b } } + diff --git a/src/main/java/io/kurrent/dbclient/EventDataBuilder.java b/src/main/java/io/kurrent/dbclient/EventDataBuilder.java index fde65402..76bd05ed 100644 --- a/src/main/java/io/kurrent/dbclient/EventDataBuilder.java +++ b/src/main/java/io/kurrent/dbclient/EventDataBuilder.java @@ -1,8 +1,5 @@ package io.kurrent.dbclient; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.json.JsonMapper; - import java.util.UUID; /** diff --git a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java index 3cbea889..2dc0c679 100644 --- a/src/main/java/io/kurrent/dbclient/KurrentDBClient.java +++ b/src/main/java/io/kurrent/dbclient/KurrentDBClient.java @@ -75,7 +75,7 @@ public CompletableFuture appendToStream(String streamName, AppendTo return new AppendToStream(this.getGrpcClient(), streamName, events, options).execute(); } - public CompletableFuture multiAppend(AppendToStreamOptions options, Iterator requests) { + public CompletableFuture multiStreamAppend(Iterator requests) { return new MultiStreamAppend(this.getGrpcClient(), requests).execute(); } diff --git a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java index 9c434a7b..21195be3 100644 --- a/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java +++ b/src/main/java/io/kurrent/dbclient/MultiStreamAppend.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; class MultiStreamAppend { @@ -42,22 +43,29 @@ private CompletableFuture append(WorkItemArgs args) { while (this.requests.hasNext()) { AppendStreamRequest request = this.requests.next(); io.kurrentdb.protocol.streams.v2.AppendStreamRequest.Builder builder = io.kurrentdb.protocol.streams.v2.AppendStreamRequest.newBuilder() + .setExpectedRevision(request.getExpectedState().toRawLong()) .setStream(request.getStreamName()); while (request.getEvents().hasNext()) { EventData event = request.getEvents().next(); - builder.addRecords(AppendRecord.newBuilder() + AppendRecord.Builder recordBuilder = AppendRecord.newBuilder() .setData(ByteString.copyFrom(event.getEventData())) - .setRecordId(event.getEventId().toString()) - .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue - .newBuilder() - .setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType())) - .build()) - .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue - .newBuilder() - .setStringValue(event.getEventType()) - .build()) + .setRecordId(event.getEventId().toString()) + .putProperties(SystemMetadataKeys.DATA_FORMAT, DynamicValue + .newBuilder() + .setStringValue(ContentTypeMapper.toSchemaDataFormat(event.getContentType())) + .build()) + .putProperties(SystemMetadataKeys.SCHEMA_NAME, DynamicValue + .newBuilder() + .setStringValue(event.getEventType()) .build()); + + if (event.getUserMetadata() != null) { + Map userMetadataProperties = DynamicValueMapper.mapJsonToDynamicValueMap(event.getUserMetadata()); + recordBuilder.putAllProperties(userMetadataProperties); + } + + builder.addRecords(recordBuilder.build()); } requestStream.onNext(builder.build()); diff --git a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java index 122ce7ca..8244d34b 100644 --- a/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java +++ b/src/test/java/io/kurrent/dbclient/MultiStreamAppendTests.java @@ -4,11 +4,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; +import java.io.IOException; +import java.time.Instant; +import java.util.*; import java.util.concurrent.ExecutionException; +@SuppressWarnings("rawtypes") public class MultiStreamAppendTests implements ConnectionAware { static private Database database; static private Logger logger; @@ -35,7 +36,7 @@ public static void cleanup() { } @Test - public void testMultiStreamAppend() throws ExecutionException, InterruptedException { + public void testMultiStreamAppend() throws ExecutionException, InterruptedException, IOException { KurrentDBClient client = getDefaultClient(); Optional version = client.getServerVersion().get(); @@ -45,18 +46,64 @@ public void testMultiStreamAppend() throws ExecutionException, InterruptedExcept "Multi-stream append is not supported server versions below 25.0.0" ); - List requests = new ArrayList<>(); + // Arrange + String streamName1 = generateName(); + String streamName2 = generateName(); - List events = new ArrayList<>(); - for (int i = 0; i < 10; i++) - events.add(EventData.builderAsBinary("created", new byte[0]).build()); + Map metadata = new HashMap<>(); + metadata.put("stringProperty", "hello world"); + metadata.put("intProperty", 42); + metadata.put("longProperty", 9876543210L); + metadata.put("booleanProperty", true); + metadata.put("doubleProperty", 3.14159); + metadata.put("nullProperty", null); + metadata.put("timestampProperty", Instant.now().toString()); - requests.add(new AppendStreamRequest("foobar", events.iterator(), StreamState.any())); - requests.add(new AppendStreamRequest("baz", events.iterator(), StreamState.any())); + byte[] metadataBytes = mapper.writeValueAsBytes(metadata); + + EventData event1 = EventData.builderAsJson("event-a", "{\"data\":\"test1\"}".getBytes()) + .metadataAsBytes(metadataBytes) + .build(); - MultiAppendWriteResult result = client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get(); + EventData event2 = EventData.builderAsBinary("event-b", new byte[0]).build(); + List events1 = Collections.singletonList(event1); + List events2 = Collections.singletonList(event2); + + List requests = Arrays.asList( + new AppendStreamRequest(streamName1, events1.iterator(), StreamState.noStream()), + new AppendStreamRequest(streamName2, events2.iterator(), StreamState.noStream()) + ); + + // Act + MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); + + // Assert Assertions.assertTrue(result.getSuccesses().isPresent()); + Assertions.assertFalse(result.getSuccesses().get().isEmpty()); + + List readEvents1 = client.readStream(streamName1, ReadStreamOptions.get()).get().getEvents(); + Assertions.assertEquals(1, readEvents1.size()); + + ResolvedEvent readEvent1 = readEvents1.get(0); + Assertions.assertEquals(event1.getEventType(), readEvent1.getEvent().getEventType()); + + byte[] readMetadata = readEvent1.getEvent().getUserMetadata(); + Assertions.assertNotNull(readMetadata); + Assertions.assertTrue(readMetadata.length > 0); + + Map deserializedMetadata = mapper.readValue(readMetadata, Map.class); + Assertions.assertEquals(metadata.get("stringProperty"), deserializedMetadata.get("stringProperty")); + Assertions.assertEquals(metadata.get("intProperty"), deserializedMetadata.get("intProperty")); + Assertions.assertEquals(metadata.get("longProperty"), ((Number) deserializedMetadata.get("longProperty")).longValue()); + Assertions.assertEquals(metadata.get("booleanProperty"), deserializedMetadata.get("booleanProperty")); + Assertions.assertEquals((Double) metadata.get("doubleProperty"), ((Number) deserializedMetadata.get("doubleProperty")).doubleValue(), 0.00001); + Assertions.assertEquals(metadata.get("timestampProperty"), deserializedMetadata.get("timestampProperty")); + Assertions.assertNull(deserializedMetadata.get("nullProperty")); + + List readEvents2 = client.readStream(streamName2, ReadStreamOptions.get()).get().getEvents(); + Assertions.assertEquals(1, readEvents2.size()); + Assertions.assertEquals(event2.getEventType(), readEvents2.get(0).getEvent().getEventType()); } @Test @@ -80,9 +127,136 @@ public void testMultiStreamAppendWhenUnsupported() throws ExecutionException, In ExecutionException e = Assertions.assertThrows( ExecutionException.class, - () -> client.multiAppend(AppendToStreamOptions.get(), requests.iterator()).get()); + () -> client.multiStreamAppend(requests.iterator()).get()); Assertions.assertInstanceOf(UnsupportedOperationException.class, e.getCause()); } -} + @Test + public void testMultiStreamAppendStreamRevisionConflict() throws ExecutionException, InterruptedException { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + + // Arrange + String streamName = generateName(); + + EventData event1 = EventData.builderAsJson("event-1", "{}".getBytes()).build(); + EventData event2 = EventData.builderAsJson("event-2", "{}".getBytes()).build(); + EventData event3 = EventData.builderAsJson("event-3", "{}".getBytes()).build(); + + client.appendToStream( + streamName, + AppendToStreamOptions.get().streamState(StreamState.noStream()), + event1, event2, event3 + ).get(); + + ResolvedEvent lastEvent = client.readStream(streamName, ReadStreamOptions.get().maxCount(1).fromEnd().backwards()).get().getEvents().get(0); + + List requests = Collections.singletonList( + new AppendStreamRequest( + streamName, + Collections.singletonList(EventData.builderAsBinary("event-4", "{}".getBytes()).build()).iterator(), + StreamState.noStream() + ) + ); + + // Act + MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); + + // Assert + Assertions.assertTrue(result.getFailures().isPresent()); + Assertions.assertFalse(result.getFailures().get().isEmpty()); + + AppendStreamFailure failure = result.getFailures().get().get(0); + Assertions.assertEquals(streamName, failure.getStreamName()); + + MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor(); + failure.visit(visitor); + + Assertions.assertTrue(visitor.wasWrongExpectedRevisionVisited()); + Assertions.assertEquals(lastEvent.getOriginalEvent().getRevision(), visitor.getActualRevision()); + } + + @Test + public void testMultiStreamAppendStreamDeleted() throws ExecutionException, InterruptedException { + KurrentDBClient client = getDefaultClient(); + + Optional version = client.getServerVersion().get(); + + Assumptions.assumeTrue( + version.isPresent() && version.get().isGreaterOrEqualThan(25, 0), + "Multi-stream append is not supported server versions below 25.0.0" + ); + + // Arrange + String streamName = generateName(); + + EventData event1 = EventData.builderAsJson("event-1", "{}".getBytes()).build(); + + client.appendToStream( + streamName, + AppendToStreamOptions.get().streamState(StreamState.noStream()), + event1 + ).get(); + + client.tombstoneStream(streamName).get(); + + List requests = Collections.singletonList( + new AppendStreamRequest( + streamName, + Collections.singletonList(EventData.builderAsBinary("event-2", "{}".getBytes()).build()).iterator(), + StreamState.noStream() + ) + ); + + // Act + MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); + + // Assert + Assertions.assertTrue(result.getFailures().isPresent()); + Assertions.assertFalse(result.getFailures().get().isEmpty()); + + AppendStreamFailure failure = result.getFailures().get().get(0); + Assertions.assertEquals(streamName, failure.getStreamName()); + + MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor(); + failure.visit(visitor); + + Assertions.assertTrue(visitor.wasStreamDeletedVisited()); + } + + private static class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor { + private boolean wrongExpectedRevisionVisited = false; + private boolean streamDeletedVisited = false; + private long actualRevision = -1; + + @Override + public void onWrongExpectedRevision(long streamRevision) { + this.wrongExpectedRevisionVisited = true; + this.actualRevision = streamRevision; + } + + @Override + public void onStreamDeleted() { + this.streamDeletedVisited = true; + } + + public boolean wasWrongExpectedRevisionVisited() { + return wrongExpectedRevisionVisited; + } + + public boolean wasStreamDeletedVisited() { + return streamDeletedVisited; + } + + public long getActualRevision() { + return actualRevision; + } + } +}