Dapr Python SDK integration with FastAPI

How to create Dapr Python virtual actors and pubsub with the FastAPI extension

The Dapr Python SDK provides integration with FastAPI using the dapr-ext-fastapi extension.


You can download and install the Dapr FastAPI extension with:

pip install dapr-ext-fastapi

pip install dapr-ext-fastapi-dev


Subscribing to events of different types

import uvicorn
from fastapi import Body, FastAPI
from dapr.ext.fastapi import DaprApp
from pydantic import BaseModel

class RawEventModel(BaseModel):
    body: str

class User(BaseModel):
    id: int
    name = 'Jane Doe'

class CloudEventModel(BaseModel):
    data: User
    datacontenttype: str
    id: str
    pubsubname: str
    source: str
    specversion: str
    topic: str
    traceid: str
    traceparent: str
    tracestate: str
    type: str    
app = FastAPI()
dapr_app = DaprApp(app)

# Allow handling event with any structure (Easiest, but least robust)
# dapr publish --publish-app-id sample --topic any_topic --pubsub pubsub --data '{"id":"7", "desc": "good", "size":"small"}'
@dapr_app.subscribe(pubsub='pubsub', topic='any_topic')
def any_event_handler(event_data = Body()):

# For robustness choose one of the below based on if publisher is using CloudEvents

# Handle events sent with CloudEvents
# dapr publish --publish-app-id sample --topic cloud_topic --pubsub pubsub --data '{"id":"7", "name":"Bob Jones"}'
@dapr_app.subscribe(pubsub='pubsub', topic='cloud_topic')
def cloud_event_handler(event_data: CloudEventModel):

# Handle raw events sent without CloudEvents
# curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/raw_topic?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"body": "345"}'
@dapr_app.subscribe(pubsub='pubsub', topic='raw_topic')
def raw_event_handler(event_data: RawEventModel):


if __name__ == "__main__":
    uvicorn.run(app, host="", port=30212)

Creating an actor

from fastapi import FastAPI
from dapr.ext.fastapi import DaprActor
from demo_actor import DemoActor

app = FastAPI(title=f'{DemoActor.__name__}Service')

# Add Dapr Actor Extension
actor = DaprActor(app)

async def startup_event():
    # Register DemoActor
    await actor.register_actor(DemoActor)

def get_my_data():
    return "{'message': 'myData'}"