The documentation you are viewing is for Dapr v1.13 which is an older version of Dapr. For up-to-date documentation, see the latest version.

声明式和编程式订阅方法

了解 Dapr 允许您订阅主题的方法。

Pub/sub API订阅方法

Dapr 应用程序可以通过两种方法订阅发布的主题,这两种方法支持相同的功能:声明式和编程式。

订阅方法 说明
声明式 订阅定义在一个外部文件中。 声明式方法会从您的代码中移除 Dapr 依赖,并允许现有的应用程序订阅 topics,而无需更改代码。
编程式 订阅定义在应用程序代码中。 编程式方法在代码中实现订阅。

以下示例演示了checkout应用程序和orderprocessing应用程序通过orders主题进行pub/sub消息传递。 示例演示了相同的 Dapr 发布/订阅组件,首先以声明方式使用,然后以编程方式使用。

声明式订阅

您可以使用外部组件文件来声明式地订阅主题。 这个示例使用一个名为 subscription.yaml 的YAML组件文件:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order
spec:
  topic: orders
  routes:
    default: /checkout
  pubsubname: pubsub
scopes:
- orderprocessing
- checkout

这里的订阅称为 order:

  • 使用Pub/sub(发布/订阅)组件pubsub订阅名为orders的主题。
  • route字段设置为将所有主题消息发送到应用程序中的/checkout端点。
  • scopes 字段设置为仅限具有 orderprocessingcheckout 的应用程序访问此订阅。

在运行 Dapr 时,将 YAML 组件文件路径设置为指向组件的 Dapr。


dapr run --app-id myapp --resources-path ./myComponents -- dotnet run

dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run

dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py

dapr run --app-id myapp --resources-path ./myComponents -- npm start

dapr run --app-id myapp --resources-path ./myComponents -- go run app.go

在 Kubernetes 中,将该组件应用到集群中:

kubectl apply -f subscription.yaml

在您的应用程序代码中,订阅 Dapr pub/sub 组件中指定的主题。


 //Subscribe to a topic 
[HttpPost("checkout")]
public void getCheckout([FromBody] int orderId)
{
    Console.WriteLine("Subscriber received : " + orderId);
}

import io.dapr.client.domain.CloudEvent;

 //Subscribe to a topic
@PostMapping(path = "/checkout")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
    return Mono.fromRunnable(() -> {
        try {
            log.info("Subscriber received: " + cloudEvent.getData());
        } 
    });
}

from cloudevents.sdk.event import v1

#Subscribe to a topic 
@app.route('/checkout', methods=['POST'])
def checkout(event: v1.Event) -> None:
    data = json.loads(event.Data())
    logging.info('Subscriber received: ' + str(data))

const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

// listen to the declarative route
app.post('/checkout', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

//Subscribe to a topic
var sub = &common.Subscription{
	PubsubName: "pubsub",
	Topic:      "orders",
	Route:      "/checkout",
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	log.Printf("Subscriber received: %s", e.Data)
	return false, nil
}

/checkout 端点与订阅中定义的 route 相匹配,这是 Dapr 将所有主题消息发送至的位置。

编程方式订阅

动态编程方法返回 routes 代码中的 JSON 结构,与声明式方法的 route YAML 结构相对应。

注意: 编程订阅只在应用程序启动时读取一次。 你不能_动态_添加新的编程订阅,仅在编译时添加新的编程订阅。

在下面的示例中,您将在应用程序代码中定义在声明性YAML订阅上方找到的值。


[Topic("pubsub", "orders")]
[HttpPost("/checkout")]
public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient)
{
    // Logic
    return order;
}

or

// Dapr subscription in [Topic] routes orders topic to this route
app.MapPost("/checkout", [Topic("pubsub", "orders")] (Order order) => {
    Console.WriteLine("Subscriber received : " + order);
    return Results.Ok(order);
});

上面定义的这两个处理程序还需要映射到配置 dapr/subscribe 端点。 这是在定义端点时在应用程序启动代码中完成的。

app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Topic(name = "checkout", pubsubName = "pubsub")
@PostMapping(path = "/orders")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  return Mono.fromRunnable(() -> {
    try {
      System.out.println("Subscriber received: " + cloudEvent.getData());
      System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  });

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'checkout',
        'routes': {
          'rules': [
            {
              'match': 'event.type == "order"',
              'path': '/orders'
            },
          ],
          'default': '/orders'
        }
      }]
    return jsonify(subscriptions)

@app.route('/orders', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()

const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "checkout",
      routes: {
        rules: [
          {
            match: 'event.type == "order"',
            path: '/orders'
          },
        ],
        default: '/products'
      }
    }
  ]);
})

app.post('/orders', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`))

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

type rule struct {
	Match string `json:"match"`
	Path  string `json:"path"`
}

// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "checkout",
			Routes: routes{
				Rules: []rule{
					{
						Match: `event.type == "order"`,
						Path:  "/orders",
					},
				},
				Default: "/orders",
			},
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}

下一步