diff --git a/docs/api/appending-events.md b/docs/api/appending-events.md index f471a71d..6b00046a 100644 --- a/docs/api/appending-events.md +++ b/docs/api/appending-events.md @@ -245,4 +245,148 @@ AppendToStreamOptions options = AppendToStreamOptions.get() client.appendToStream("some-stream", options, eventData) .get(); -``` \ No newline at end of file +``` + +## Append to multiple streams + +::: note +This feature is only available in KurrentDB 25.1 and later. +::: + +You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails. + +The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendWriteResult`. Each `AppendStreamRequest` contains: + +- **streamName** - The name of the stream +- **expectedState** - The expected state of the stream for optimistic concurrency control +- **events** - A collection of `EventData` objects to append + +The operation returns a `MultiAppendWriteResult` that contains either: +- A list of `AppendStreamSuccess` objects if all streams were successfully updated +- A list of `AppendStreamFailure` objects if any streams failed to update + +::: warning +Event metadata in `EventData` must be valid JSON objects. This requirement will +be removed in a future major release. +::: + +Here's a basic example of appending events to multiple streams: + +```java +JsonMapper mapper = new JsonMapper(); + +Map metadata = new HashMap<>(); +metadata.put("timestamp", Instant.now().toString()); + metadata.put("source", "OrderProcessingSystem"); +metadata.put("version", 1.0); + +byte[] metadataBytes = mapper.writeValueAsBytes(metadata); + +EventData orderEvent = EventData + .builderAsJson("OrderCreated", mapper.writeValueAsBytes(new OrderCreated("12345", 99.99))) + .metadataAsBytes(metadataBytes) + .build(); + +EventData inventoryEvent = EventData + .builderAsJson("ProductPurchased", mapper.writeValueAsBytes(new ProductPurchased("ABC123", 2, 19.99))) + .metadataAsBytes(metadataBytes) + .build(); + +List requests = Arrays.asList( + new AppendStreamRequest( + "order-stream-1", + Collections.singletonList(orderEvent).iterator(), + StreamState.any() + ), + new AppendStreamRequest( + "product-stream-1", + Collections.singletonList(inventoryEvent).iterator(), + StreamState.any() + ) +); + +MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get(); + +if (result.getSuccesses().isPresent()) + result.getSuccesses().get().forEach(success -> { + System.out.println(success.getStreamName() + " updated at " + success.getPosition()); + }); +``` + +If the operation doesn't succeed, you can handle the failures as follows: + +```java +if (result.getFailures().isPresent()) { + MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor(); + result.getFailures().get().forEach(failure -> { + failure.visit(visitor); + + if (visitor.wasWrongExpectedRevisionVisited()) { + System.out.println("Wrong revision for stream: " + failure.getStreamName()); + } else if (visitor.wasStreamDeletedVisited()) { + System.out.println("Stream deleted: " + failure.getStreamName()); + } else if (visitor.wasAccessDenied()) { + System.out.println("Access denied: " + failure.getStreamName()); + } else if (visitor.wasTransactionMaxSizeExceeded()) { + System.out.println("Transaction too large: " + failure.getStreamName()); + } else { + System.out.println("Unknown error: " + failure.getStreamName()); + } + }); +} +``` + +::: details Click here to see the implementaton of `MultiAppendErrorVisitor` + +```java +class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor { + private boolean wrongExpectedRevisionVisited = false; + private boolean streamDeletedVisited = false; + private boolean transactionMaxSizeExceeded = false; + private boolean accessDenied = false; + private long actualRevision = -1; + + @Override + public void onAccessDenied(ErrorDetails.AccessDenied detail) { + this.accessDenied = true; + } + + @Override + public void onWrongExpectedRevision(long streamRevision) { + this.wrongExpectedRevisionVisited = true; + this.actualRevision = streamRevision; + } + + @Override + public void onStreamDeleted() { + this.streamDeletedVisited = true; + } + + @Override + public void onTransactionMaxSizeExceeded(int maxSize) { + this.transactionMaxSizeExceeded = true; + } + + public boolean wasWrongExpectedRevisionVisited() { + return wrongExpectedRevisionVisited; + } + + public boolean wasStreamDeletedVisited() { + return streamDeletedVisited; + } + + public boolean wasAccessDenied() { + return accessDenied; + } + + public boolean wasTransactionMaxSizeExceeded() { + return transactionMaxSizeExceeded; + } + + public long getActualRevision() { + return actualRevision; + } +} +``` + +:::