Skip to content

[DEV-842] Document Multi Stream Append #340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 145 additions & 1 deletion docs/api/appending-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,148 @@ AppendToStreamOptions options = AppendToStreamOptions.get()

client.appendToStream("some-stream", options, eventData)
.get();
```
```

## Append to multiple streams

::: note
This feature is only available in KurrentDB 25.1 and later.
:::

You can append events to multiple streams in a single atomic operation. Either all streams are updated, or the entire operation fails.

The `multiStreamAppend` method accepts a collection of `AppendStreamRequest` objects and returns a `MultiAppendWriteResult`. Each `AppendStreamRequest` contains:

- **streamName** - The name of the stream
- **expectedState** - The expected state of the stream for optimistic concurrency control
- **events** - A collection of `EventData` objects to append

The operation returns a `MultiAppendWriteResult` that contains either:
- A list of `AppendStreamSuccess` objects if all streams were successfully updated
- A list of `AppendStreamFailure` objects if any streams failed to update

::: warning
Event metadata in `EventData` must be valid JSON objects. This requirement will
be removed in a future major release.
:::

Here's a basic example of appending events to multiple streams:

```java
JsonMapper mapper = new JsonMapper();

Map<String, Object> metadata = new HashMap<>();
metadata.put("timestamp", Instant.now().toString());
metadata.put("source", "OrderProcessingSystem");
metadata.put("version", 1.0);

byte[] metadataBytes = mapper.writeValueAsBytes(metadata);

EventData orderEvent = EventData
.builderAsJson("OrderCreated", mapper.writeValueAsBytes(new OrderCreated("12345", 99.99)))
.metadataAsBytes(metadataBytes)
.build();

EventData inventoryEvent = EventData
.builderAsJson("ProductPurchased", mapper.writeValueAsBytes(new ProductPurchased("ABC123", 2, 19.99)))
.metadataAsBytes(metadataBytes)
.build();

List<AppendStreamRequest> requests = Arrays.asList(
new AppendStreamRequest(
"order-stream-1",
Collections.singletonList(orderEvent).iterator(),
StreamState.any()
),
new AppendStreamRequest(
"product-stream-1",
Collections.singletonList(inventoryEvent).iterator(),
StreamState.any()
)
);

MultiAppendWriteResult result = client.multiStreamAppend(requests.iterator()).get();

if (result.getSuccesses().isPresent())
result.getSuccesses().get().forEach(success -> {
System.out.println(success.getStreamName() + " updated at " + success.getPosition());
});
```

If the operation doesn't succeed, you can handle the failures as follows:

```java
if (result.getFailures().isPresent()) {
MultiAppendErrorVisitor visitor = new MultiAppendErrorVisitor();
result.getFailures().get().forEach(failure -> {
failure.visit(visitor);

if (visitor.wasWrongExpectedRevisionVisited()) {
System.out.println("Wrong revision for stream: " + failure.getStreamName());
} else if (visitor.wasStreamDeletedVisited()) {
System.out.println("Stream deleted: " + failure.getStreamName());
} else if (visitor.wasAccessDenied()) {
System.out.println("Access denied: " + failure.getStreamName());
} else if (visitor.wasTransactionMaxSizeExceeded()) {
System.out.println("Transaction too large: " + failure.getStreamName());
} else {
System.out.println("Unknown error: " + failure.getStreamName());
}
});
}
```

::: details Click here to see the implementaton of `MultiAppendErrorVisitor`

```java
class MultiAppendErrorVisitor implements MultiAppendStreamErrorVisitor {
private boolean wrongExpectedRevisionVisited = false;
private boolean streamDeletedVisited = false;
private boolean transactionMaxSizeExceeded = false;
private boolean accessDenied = false;
private long actualRevision = -1;

@Override
public void onAccessDenied(ErrorDetails.AccessDenied detail) {
this.accessDenied = true;
}

@Override
public void onWrongExpectedRevision(long streamRevision) {
this.wrongExpectedRevisionVisited = true;
this.actualRevision = streamRevision;
}

@Override
public void onStreamDeleted() {
this.streamDeletedVisited = true;
}

@Override
public void onTransactionMaxSizeExceeded(int maxSize) {
this.transactionMaxSizeExceeded = true;
}

public boolean wasWrongExpectedRevisionVisited() {
return wrongExpectedRevisionVisited;
}

public boolean wasStreamDeletedVisited() {
return streamDeletedVisited;
}

public boolean wasAccessDenied() {
return accessDenied;
}

public boolean wasTransactionMaxSizeExceeded() {
return transactionMaxSizeExceeded;
}

public long getActualRevision() {
return actualRevision;
}
}
```

:::