GitHub Repository You can find the project source code on GitHub.
This guide provides detailed, step-by-step instructions on how to use Upstash Workflow with FastAPI. You can also explore the source code for a detailed, end-to-end example and best practices.
Prerequisites
An Upstash QStash API key.
Python and pip installed.
If you haven’t obtained your QStash API key yet, you can do so by signing up for an Upstash account and navigating to your QStash dashboard.
Step 1: Installation
First, create a new directory and set up a virtual environment:
python -m venv venv
source venv/bin/activate
Then, install the Workflow SDK and FastAPI:
pip install fastapi uvicorn upstash-workflow
Create a .env
file in your project root and add your QStash token. This token is used to authenticate your application with the QStash service.
Upstash Workflow is powered by QStash , which requires access to your endpoint to execute workflows. When your app is deployed, QStash will use the app’s URL. However, for local development, you have two main options: use a local QStash server or set up a local tunnel .
Option 1: Local QStash Server
To start the local QStash server, run:
npx @upstash/qstash-cli dev
Once the command runs successfully, you’ll see QSTASH_URL
and QSTASH_TOKEN
values in the console. Add these values to your .env
file:
export QSTASH_URL = "http://127.0.0.1:8080"
export QSTASH_TOKEN =< QSTASH_TOKEN >
This approach allows you to test workflows locally without affecting your billing. However, runs won’t be logged in the Upstash Console.
Option 2: Local Tunnel
Alternatively, you can set up a local tunnel. For this option:
Copy the QSTASH_TOKEN
from the Upstash Console.
Update your .env
file with the following:
export QSTASH_TOKEN = "***"
export UPSTASH_WORKFLOW_URL =< UPSTASH_WORKFLOW_URL >
Replace ***
with your actual QStash token.
Set UPSTASH_WORKFLOW_URL
to the public URL provided by your local tunnel.
Here’s where you can find your QStash token:
Using a local tunnel connects your endpoint to the production QStash, enabling you to view workflow logs in the Upstash Console.
Step 3: Create a Workflow Endpoint
A workflow endpoint allows you to define a set of steps that, together, make up a workflow. Each step contains a piece of business logic that is automatically retried on failure, with easy monitoring via our visual workflow dashboard.
To define a workflow endpoint in a FastAPI project, create a main.py
file that contains your workflow:
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
app = FastAPI()
serve = Serve(app)
@serve.post ( "/api/workflow" )
async def workflow ( context ) -> None :
async def _step1 () -> None :
print ( "initial step ran" )
await context.run( "initial-step" , _step1)
async def _step2 () -> None :
print ( "second step ran" )
await context.run( "second-step" , _step2)
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
app = FastAPI()
serve = Serve(app)
@serve.post ( "/api/workflow" )
async def workflow ( context ) -> None :
async def _step1 () -> None :
print ( "initial step ran" )
await context.run( "initial-step" , _step1)
async def _step2 () -> None :
print ( "second step ran" )
await context.run( "second-step" , _step2)
from fastapi import FastAPI
import time
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext
app = FastAPI()
serve = Serve(app)
def some_work ( input : str ) -> str :
return f "processed ' { input } '"
@serve.post ( "/sleep" )
async def sleep ( context : AsyncWorkflowContext[ str ]) -> None :
input = context.request_payload
async def _step1 () -> str :
output = some_work( input )
print ( "step 1 input" , input , "output" , output)
return output
result1: str = await context.run( "step1" , _step1)
await context.sleep_until( "sleep1" , time.time() + 3 )
async def _step2 () -> str :
output = some_work(result1)
print ( "step 2 input" , result1, "output" , output)
return output
result2: str = await context.run( "step2" , _step2)
await context.sleep( "sleep2" , 2 )
async def _step3 () -> None :
output = some_work(result2)
print ( "step 3 input" , result2, "output" , output)
await context.run( "step3" , _step3)
from fastapi import FastAPI
from typing import Dict
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext, CallResponse
app = FastAPI()
serve = Serve(app)
def some_work ( input : str ) -> str :
return f "processed ' { input } '"
@app.post ( "/get-data" )
async def get_data () -> Dict[ str , str ]:
return { "message" : "get data response" }
@serve.post ( "/call" )
async def call ( context : AsyncWorkflowContext[ str ]) -> None :
input = context.request_payload
async def _step1 () -> str :
output = some_work( input )
print ( "step 1 input" , input , "output" , output)
return output
result1: str = await context.run( "step1" , _step1)
response: CallResponse[Dict[ str , str ]] = await context.call(
"get-data" ,
url = f " { context.env.get( 'UPSTASH_WORKFLOW_URL' , 'http://localhost:8000' ) } /get-data" ,
method = "POST" ,
body = { "message" : result1},
)
async def _step2 () -> str :
output = some_work(response.body[ "message" ])
print ( "step 2 input" , response, "output" , output)
return output
await context.run( "step2" , _step2)
from fastapi import FastAPI
from upstash_workflow.fastapi import Serve
from upstash_workflow import AsyncWorkflowContext
app = FastAPI()
serve = Serve(app)
def some_work ( input : str ) -> str :
return f "processed ' { input } '"
@serve.post ( "/auth" )
async def auth ( context : AsyncWorkflowContext[ str ]) -> None :
if context.headers.get( "authentication" ) != "Bearer secret_password" :
print ( "Authentication failed." )
return
async def _step1 () -> str :
return "output 1"
await context.run( "step1" , _step1)
async def _step2 () -> str :
return "output 2"
await context.run( "step2" , _step2)
Step 4: Run the Workflow Endpoint
Don’t forget to source your environment file to set your environment variables:
After setting your live URL as the environment variable or base_url
option, trigger your workflow by first starting your FastAPI app:
uvicorn main:app --reload
and then making a POST request to your workflow endpoint. For each workflow run, a unique workflow run ID is returned:
curl -X POST https://localhost:8000/api/workflow
# result: {"workflowRunId":"wfr_xxxxxx"}
See the documentation on starting a workflow for other ways you can start your workflow.
If you are using a local tunnel, you can use this ID to track the workflow run and see its status in your QStash workflow dashboard. All steps are listed with their statuses, headers, and body for a detailed overview of your workflow from start to finish. Click on a step to see its detailed logs.
Next Steps
Learn how to protect your workflow endpoint from unauthorized access by securing your workflow endpoint .
Explore the source code for a detailed, end-to-end example and best practices.
For setting up and testing your workflows in a local environment, check out our local development guide .