This tutorial shows how to build a load balancing worker using FastAPI and deploy it as a Serverless endpoint on Runpod.
Requirements
Before you begin you’ll need:
- A Runpod account.
- Basic familiarity with Python and REST APIs.
- Docker installed on your local machine.
Step 1: Create a basic FastAPI application
You can download a preconfigured repository containing the completed code for this tutorial on GitHub.
First, let’s create a simple FastAPI application that will serve as our API.
Create a file named app.py:
import os
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
# Create FastAPI app
app = FastAPI()
# Define request models
class GenerationRequest(BaseModel):
prompt: str
max_tokens: int = 100
temperature: float = 0.7
class GenerationResponse(BaseModel):
generated_text: str
# Global variable to track requests
request_count = 0
# Health check endpoint; required for Runpod to monitor worker health
@app.get("/ping")
async def health_check():
return {"status": "healthy"}
# Our custom generation endpoint
@app.post("/generate", response_model=GenerationResponse)
async def generate(request: GenerationRequest):
global request_count
request_count += 1
# A simple mock implementation; we'll replace this with an actual model later
generated_text = f"Response to: {request.prompt} (request #{request_count})"
return {"generated_text": generated_text}
# A simple endpoint to show request stats
@app.get("/stats")
async def stats():
return {"total_requests": request_count}
# Run the app when the script is executed
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 80))
print(f"Starting server on port {port}")
# Start the server
uvicorn.run(app, host="0.0.0.0", port=port)
This simple application defines the following endpoints:
- A health check endpoint at
/ping
- A text generation endpoint at
/generate
- A statistics endpoint at
/stats
Step 2: Create a Dockerfile
Now, let’s create a Dockerfile to package our application:
FROM nvidia/cuda:12.1.0-base-ubuntu22.04
RUN apt-get update -y \
&& apt-get install -y python3-pip
RUN ldconfig /usr/local/cuda-12.1/compat/
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY app.py .
# Start the handler
CMD ["python3", "app.py"]
You’ll also need to create a requirements.txt file:
fastapi==0.95.1
uvicorn==0.22.0
pydantic==1.10.7
Step 3: Build and push the Docker image
Build and push your Docker image to a container registry:
# Build the image
docker build --platform linux/amd64 -t YOUR_DOCKER_USERNAME/loadbalancer-example:v1.0 .
# Push to Docker Hub
docker push YOUR_DOCKER_USERNAME/loadbalancer-example:v1.0
Step 4: Deploy to Runpod
Now, let’s deploy our application to a Serverless endpoint:
- Go to the Serverless page in the Runpod console.
- Click New Endpoint
- Click Import from Docker Registry.
- In the Container Image field, enter your Docker image URL:
YOUR_DOCKER_USERNAME/loadbalancer-example:v1.0
Then click Next.
- Give your endpoint a name.
- Under Endpoint Type, select Load Balancer.
- Under GPU Configuration, select at least one GPU type (16 GB or 24 GB GPUs are fine for this example).
- Leave all other settings at their defaults.
- Click Deploy Endpoint.
Step 5: Access your custom API
Once your endpoint is created, you can access your custom APIs at:
https://ENDPOINT_ID.api.runpod.ai/PATH
For example, the load balancing worker we defined in step 1 exposes these endpoints:
- Health check:
https://ENDPOINT_ID.api.runpod.ai/ping
- Generate text:
https://ENDPOINT_ID.api.runpod.ai/generate
- Get request count:
https://ENDPOINT_ID.api.runpod.ai/stats
Try running one or more of these commands, replacing ENDPOINT_ID and RUNPOD_API_KEY with your actual endpoint ID and API key:
curl -X POST "https://ENDPOINT_ID.api.runpod.ai/generate" \
-H 'Authorization: Bearer RUNPOD_API_KEY' \
-H "Content-Type: application/json" \
-d '{"prompt": "Hello, world!"}'
After sending a request, your workers will take some time to initialize. You can track their progress by checking the logs in the Workers tab of your endpoint page.
If you see: {"error":"no workers available"}% after running the request, this means your workers did not initialize in time to process it. If you try running the request again, this will usually resolve the issue.For production applications, implement a health check with retries before sending requests. See Handling cold start errors for a complete code example.
Congratulations! You’ve successfully deployed and tested a load balancing endpoint. If you want to use a real model, you can follow the vLLM worker tutorial.
(Optional) Advanced endpoint definitions
For a more complex API, you can define multiple endpoints and organize them logically. Here’s an example of how to structure a more complex API:
from fastapi import FastAPI, HTTPException, Depends, Query
from pydantic import BaseModel
import os
app = FastAPI()
# --- Authentication middleware ---
def verify_api_key(api_key: str = Query(None, alias="api_key")):
if api_key != os.getenv("API_KEY", "test_key"):
raise HTTPException(401, "Invalid API key")
return api_key
# --- Models ---
class TextRequest(BaseModel):
text: str
max_length: int = 100
class ImageRequest(BaseModel):
prompt: str
width: int = 512
height: int = 512
# --- Text endpoints ---
@app.post("/v1/text/summarize")
async def summarize(request: TextRequest, api_key: str = Depends(verify_api_key)):
# Implement text summarization
return {"summary": f"Summary of: {request.text[:30]}..."}
@app.post("/v1/text/translate")
async def translate(request: TextRequest, target_lang: str, api_key: str = Depends(verify_api_key)):
# Implement translation
return {"translation": f"Translation to {target_lang}: {request.text[:30]}..."}
# --- Image endpoints ---
@app.post("/v1/image/generate")
async def generate_image(request: ImageRequest, api_key: str = Depends(verify_api_key)):
# Implement image generation
return {"image_url": f"https://example.com/images/{hash(request.prompt)}.jpg"}
# --- Health check ---
@app.get("/ping")
async def health_check():
return {"status": "healthy"}
(Optional) WebSocket support
Load balancing endpoints also support WebSocket connections. This section shows how to add a WebSocket endpoint to your worker and connect to it from a client.
Add a WebSocket endpoint
WebSocket endpoints in FastAPI use the @app.websocket() decorator. Add the following to your app.py:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
app = FastAPI()
# Track active connections
active_ws_connections: list[WebSocket] = []
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Streaming WebSocket endpoint.
Clients send JSON messages like: {"prompt": "Hello", "max_tokens": 50}
Server streams responses back and sends {"done": true} when complete.
"""
await websocket.accept()
active_ws_connections.append(websocket)
try:
while True:
data = await websocket.receive_json()
prompt = data.get("prompt", "")
if not prompt:
await websocket.send_json({"error": "prompt is required"})
continue
# Simulate streaming response (replace with your model)
words = f"Response to: {prompt}".split()
for i, word in enumerate(words):
await websocket.send_json({"token": word, "index": i})
await asyncio.sleep(0.05) # Simulate inference latency
await websocket.send_json({"done": True})
except WebSocketDisconnect:
pass
finally:
active_ws_connections.remove(websocket)
Connect from a client
When connecting to a WebSocket endpoint on a load balancing worker, you must set the open_timeout parameter to allow time for workers to scale up. The default timeout of 5 seconds is usually not enough.
import asyncio
import json
import websockets
async def connect_to_worker():
url = "wss://ENDPOINT_ID.api.runpod.ai/ws"
headers = [("Authorization", "Bearer RUNPOD_API_KEY")]
# Set open_timeout to allow workers time to scale up (default is ~5s)
async with websockets.connect(
url,
additional_headers=headers,
open_timeout=60.0, # Wait up to 60 seconds for connection
) as ws:
# Send a request
await ws.send(json.dumps({"prompt": "Hello, world!", "max_tokens": 50}))
# Receive streaming response
while True:
response = json.loads(await ws.recv())
if response.get("done"):
print("Generation complete")
break
print(response.get("token", ""), end=" ")
asyncio.run(connect_to_worker())
If you don’t set open_timeout, connections will fail with a timeout error when workers need to scale up from zero. A value of 60 seconds works for most use cases.
Update requirements.txt
Add the websockets library to your client’s dependencies:
Troubleshooting
Here are some common issues and methods for troubleshooting:
- No workers available: If your request returns
{"error":"no workers available"}%, this means means your workers did not initialize in time to process the request. Running the request again will usually fix this issue.
- Worker unhealthy: Check your health endpoint implementation and ensure it’s returning proper status codes.
- API not accessible: If your request returns
{"error":"not allowed for QB API"}, verify that your endpoint type is set to “Load Balancer”.
- Port issues: Make sure the environment variable for
PORT matches what your application is using, and that the PORT_HEALTH variable is set to a different port.
- Model errors: Check your model’s requirements and whether it’s compatible with your GPU.
- WebSocket timeout: If WebSocket connections fail with timeout errors, increase the
open_timeout parameter in your client code to allow workers time to scale up. See (Optional) WebSocket support for details.
Next steps
Now that you’ve learned how to build a basic load balancing worker, you can try implementing a real model with vLLM.