The documentation you are viewing is for Dapr v1.13 which is an older version of Dapr. For up-to-date documentation, see the latest version.
Publish and subscribe to bulk messages
alpha
The bulk publish and subscribe APIs are in alpha stage.With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests between the Dapr sidecar, the application, and the underlying pub/sub broker.
Publishing messages in bulk
Restrictions when publishing messages in bulk
The bulk publish API allows you to publish multiple messages to a topic in a single request. It is non-transactional, i.e., from a single bulk request, some messages can succeed and some can fail. If any of the messages fail to publish, the bulk publish operation returns a list of failed messages.
The bulk publish operation also does not guarantee any ordering of messages.
Example
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class BulkPublisher {
private static final String PUBSUB_NAME = "my-pubsub-name";
private static final String TOPIC_NAME = "topic-a";
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
}
// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
}
}
}
import { DaprClient } from "@dapr/dapr";
const pubSubName = "my-pubsub-name";
const topic = "topic-a";
async function start() {
const client = new DaprClient();
// Publish multiple messages to a topic.
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
// Publish multiple messages to a topic with explicit bulk publish messages.
const bulkPublishMessages = [
{
entryID: "entry-1",
contentType: "application/json",
event: { hello: "foo message 1" },
},
{
entryID: "entry-2",
contentType: "application/cloudevents+json",
event: {
specversion: "1.0",
source: "/some/source",
type: "example",
id: "1234",
data: "foo message 2",
datacontenttype: "text/plain"
},
},
{
entryID: "entry-3",
contentType: "text/plain",
event: "foo message 3",
},
];
await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
using System;
using System.Collections.Generic;
using Dapr.Client;
const string PubsubName = "my-pubsub-name";
const string TopicName = "topic-a";
IReadOnlyList<object> BulkPublishData = new List<object>() {
new { Id = "17", Amount = 10m },
new { Id = "18", Amount = 20m },
new { Id = "19", Amount = 30m }
};
using var client = new DaprClientBuilder().Build();
var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
if (res == null) {
throw new Exception("null response from dapr");
}
if (res.FailedEntries.Count > 0)
{
Console.WriteLine("Some events failed to be published!");
foreach (var failedEntry in res.FailedEntries)
{
Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " +
failedEntry.ErrorMessage);
}
}
else
{
Console.WriteLine("Published all events!");
}
import requests
import json
base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
pubsub_name = "my-pubsub-name"
topic_name = "topic-a"
payload = [
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]
response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
print(response.status_code)
package main
import (
"fmt"
"strings"
"net/http"
"io/ioutil"
)
const (
pubsubName = "my-pubsub-name"
topicName = "topic-a"
baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
)
func main() {
url := fmt.Sprintf(baseUrl, pubsubName, topicName)
method := "POST"
payload := strings.NewReader(`[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]`)
client := &http.Client {}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
res, err := client.Do(req)
// ...
}
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
-H 'Content-Type: application/json' \
-d '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
-Body '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
Subscribing messages in bulk
The bulk subscribe API allows you to subscribe multiple messages from a topic in a single request. As we know from How to: Publish & Subscribe to topics, there are two ways to subscribe to topic(s):
- Declaratively - subscriptions are defined in an external file.
- Programmatically - subscriptions are defined in code.
To Bulk Subscribe to topic(s), we just need to use bulkSubscribe
spec attribute, something like following:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order-pub-sub
spec:
topic: orders
routes:
default: /checkout
pubsubname: order-pub-sub
bulkSubscribe:
enabled: true
maxMessagesCount: 100
maxAwaitDurationMs: 40
scopes:
- orderprocessing
- checkout
In the example above, bulkSubscribe
is optional. If you use bulkSubscribe
, then:
enabled
is mandatory and enables or disables bulk subscriptions on this topic- You can optionally configure the max number of messages (
maxMessagesCount
) delivered in a bulk message. Default value ofmaxMessagesCount
for components not supporting bulk subscribe is 100 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages. If a component supports bulk subscribe, then default value for this parameter can be found in that component doc. - You can optionally provide the max duration to wait (
maxAwaitDurationMs
) before a bulk message is sent to the app. Default value ofmaxAwaitDurationMs
for components not supporting bulk subscribe is 1000 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages. If a component supports bulk subscribe, then default value for this parameter can be found in that component doc.
The application receives an EntryId
associated with each entry (individual message) in the bulk message. This EntryId
must be used by the app to communicate the status of that particular entry. If the app fails to notify on an EntryId
status, it’s considered a RETRY
.
A JSON-encoded payload body with the processing status against each entry needs to be sent:
{
"statuses":
[
{
"entryId": "<entryId1>",
"status": "<status>"
},
{
"entryId": "<entryId2>",
"status": "<status>"
}
]
}
Possible status values:
Status | Description |
---|---|
SUCCESS |
Message is processed successfully |
RETRY |
Message to be retried by Dapr |
DROP |
Warning is logged and message is dropped |
Please refer Expected HTTP Response for Bulk Subscribe for further insights on response.
Example
Please refer following code samples for how to use Bulk Subscribe:
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import reactor.core.publisher.Mono;
class BulkSubscriber {
@BulkSubscribe()
// @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
@Topic(name = "topicbulk", pubsubName = "orderPubSub")
@PostMapping(path = "/topicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
import { DaprServer } from "@dapr/dapr";
const pubSubName = "orderPubSub";
const topic = "topicbulk";
const daprHost = process.env.DAPR_HOST || "127.0.0.1";
const daprPort = process.env.DAPR_HTTP_PORT || "3502";
const serverHost = process.env.SERVER_HOST || "127.0.0.1";
const serverPort = process.env.APP_PORT || 5001;
async function start() {
const server = new DaprServer({
serverHost,
serverPort,
clientOptions: {
daprHost,
daprPort,
},
});
// Publish multiple messages to a topic with default config.
await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)));
// Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)), 100, 40);
}
using Microsoft.AspNetCore.Mvc;
using Dapr.AspNetCore;
using Dapr;
namespace DemoApp.Controllers;
[ApiController]
[Route("[controller]")]
public class BulkMessageController : ControllerBase
{
private readonly ILogger<BulkMessageController> logger;
public BulkMessageController(ILogger<BulkMessageController> logger)
{
this.logger = logger;
}
[BulkSubscribe("messages", 10, 10)]
[Topic("pubsub", "messages")]
public ActionResult<BulkSubscribeAppResponse> HandleBulkMessages([FromBody] BulkSubscribeMessage<BulkMessageModel<BulkMessageModel>> bulkMessages)
{
List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>();
logger.LogInformation($"Received {bulkMessages.Entries.Count()} messages");
foreach (var message in bulkMessages.Entries)
{
try
{
logger.LogInformation($"Received a message with data '{message.Event.Data.MessageData}'");
responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(responseEntries);
}
public class BulkMessageModel
{
public string MessageData { get; set; }
}
}
How components handle publishing and subscribing to bulk messages
For event publish/subscribe, two kinds of network transfers are involved.
- From/To App To/From Dapr.
- From/To Dapr To/From Pubsub Broker.
These are the opportunities where optimization is possible. When optimized, Bulk requests are made, which reduce the overall number of calls and thus increases throughput and provides better latency.
On enabling Bulk Publish and/or Bulk Subscribe, the communication between the App and Dapr sidecar (Point 1 above) is optimized for all components.
Optimization from Dapr sidecar to the pub/sub broker depends on a number of factors, for example:
- Broker must inherently support Bulk pub/sub
- The Dapr component must be updated to support the use of bulk APIs provided by the broker
Currently, the following components are updated to support this level of optimization:
Component | Bulk Publish | Bulk Subscribe |
---|---|---|
Kafka | Yes | Yes |
Azure Servicebus | Yes | Yes |
Azure Eventhubs | Yes | Yes |
Demos
Watch the following demos and presentations about bulk pub/sub.
KubeCon Europe 2023 presentation
Dapr Community Call #77 presentation
Related links
- List of supported pub/sub components
- Read the API reference
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.