FastAPI For Beginners 2024

First API Code:

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"Hello": "World"}

Consolidated Code:

from fastapi import FastAPI
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field

app = FastAPI()

# 1. Endpoint with a dynamic parameter in the URL
@app.get("/hello/{name}")
def read_root(name: str):
    return {"Hello": f"{name}"}

# 2. Endpoint to get a name dynamically
@app.get("/get-name")
def get_name(name: str):
    return {"name": f"{name}"}

# 3. Endpoint returning structured JSON data
@app.get("/data")
def get_data():
    return {
        "name": "John Doe",
        "age": 30,
        "cities": ["New York", "Paris", "Tokyo"],
        "is_active": True
    }

# 4. Endpoint returning a custom status code and message
@app.get("/test-code")
def return_code():
    return JSONResponse(
        status_code=500,
        content={"message": "Oops! Something Wrong Happened, try again later."}
    )

# 5. Endpoint for user creation with validation using Pydantic
class User(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str
    age: int = Field(..., ge=18, lt=100)

@app.post("/users")
def create_user(user: User):
    return user

CORS Code:

from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# Define allowed origins (the URLs that can access your API)
origins = [
    "http://localhost",          # Allow access from localhost
    "http://localhost:8080",    # Allow access from a specific port on localhost
    "https://kukucourses.com" # Allow access from the Kuku Courses domain
]

# Add CORS middleware to the FastAPI app
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,       # Specify allowed origins
    allow_credentials=True,      # Allow sending cookies or authentication headers
    allow_methods=["*"],         # Allow all HTTP methods (GET, POST, PUT, DELETE, etc.)
    allow_headers=["*"],         # Allow all headers (custom headers, etc.)
)

Synchronous execution

from fastapi import FastAPI
import time

app = FastAPI()

# Simulating a slow operation
def make_coffee():
    time.sleep(2)  # Pretend this takes 2 seconds
    return "☕ Coffee ready!"

def make_toast():
    time.sleep(3)  # Pretend this takes 3 seconds
    return "🍞 Toast ready!"

@app.get("/prepare-slow")
def prepare_breakfast_slow():
    start_time = time.time()  # Record the start time
    
    # Sequentially prepare coffee and toast
    coffee = make_coffee()
    toast = make_toast()
    
    # Calculate the total time taken
    total_time = time.time() - start_time
    
    return {
        "meals": [coffee, toast],
        "time_taken": f"{total_time:.1f} seconds"
    }

Asynchronous execution

from fastapi import FastAPI
import asyncio
import time

app = FastAPI()

# Asynchronous way to simulate a slow operation
async def async_make_coffee():
    await asyncio.sleep(2)  # Simulate waiting for 2 seconds
    return "☕ Coffee ready!"

async def async_make_toast():
    await asyncio.sleep(3)  # Simulate waiting for 3 seconds
    return "🍞 Toast ready!"

# Asynchronous endpoint
@app.get("/breakfast-fast")
async def prepare_breakfast_fast():
    start_time = time.time()  # Record the start time

    # Start both tasks at the same time
    coffee_task = async_make_coffee()
    toast_task = async_make_toast()

    # Wait for both tasks to complete
    coffee, toast = await asyncio.gather(coffee_task, toast_task)

    total_time = time.time() - start_time  # Calculate the total time
    return {
        "meals": [coffee, toast],
        "time_taken": f"{total_time:.1f} seconds"
    }

Cache Optimization

from fastapi import FastAPI
import time

app = FastAPI()

# Simulate a slow database query
def get_movie_from_database(movie_name: str):
    time.sleep(2)  # Pretend it takes 2 seconds to fetch data
    return f"Found movie: {movie_name}"

# Version 1: Without caching (slow every time)
@app.get("/movie/slow/{movie_name}")
async def get_movie_slow(movie_name: str):
    """Get movie without using cache"""
    # Record the start time
    start_time = time.time()

    # Call the slow function
    result = get_movie_from_database(movie_name)

    # Calculate total time taken
    total_time = time.time() - start_time

    return {
        "result": result,
        "time_taken": f"{total_time:.1f} seconds",
        "from_cache": False
    }

# Simple cache to store movie names
movie_cache = []

# Version 2: With simple caching (fast after the first time)
@app.get("/movie/fast/{movie_name}")
async def get_movie_fast(movie_name: str):
    """Get movie using simple cache"""
    # Record the start time
    start_time = time.time()

    # Check if the movie is already in the cache
    if movie_name in movie_cache:
        total_time = time.time() - start_time
        return {
            "result": f"Found movie: {movie_name}",
            "time_taken": f"{total_time:.1f} seconds",
            "from_cache": True
        }

    # If not in cache, fetch from the database
    result = get_movie_from_database(movie_name)

    # Add the movie to the cache for next time
    movie_cache.append(movie_name)

    # Calculate total time taken
    total_time = time.time() - start_time

    return {
        "result": result,
        "time_taken": f"{total_time:.1f} seconds",
        "from_cache": False
    }

Optimization Without Background Tasks

from fastapi import FastAPI
import time

app = FastAPI()

# Simulate slow operations
def send_welcome_email(email: str):
    time.sleep(3)  # Simulate sending email
    print(f"Welcome email sent to {email}")

def update_analytics(email: str):
    time.sleep(2)  # Simulate analytics update
    print(f"Analytics updated for {email}")

def notify_team(email: str):
    time.sleep(1)  # Simulate team notification
    print(f"Team notified about new user: {email}")

# Endpoint without background tasks
@app.post("/signup-slow")
async def signup_without_background_tasks(email: str):
    start_time = time.time()

    # Perform all tasks sequentially
    send_welcome_email(email)
    update_analytics(email)
    notify_team(email)

    total_time = time.time() - start_time
    return {
        "message": "Signup complete! (But it was slow 🐢)",
        "email": email,
        "time_taken": f"{total_time:.1f} seconds",
    }

Optimization With Background Tasks

from fastapi import FastAPI, BackgroundTasks
import time

app = FastAPI()

# Simulate slow operations
def send_welcome_email(email: str):
    time.sleep(3)  # Simulate sending email
    print(f"Welcome email sent to {email}")

def update_analytics(email: str):
    time.sleep(2)  # Simulate analytics update
    print(f"Analytics updated for {email}")

def notify_team(email: str):
    time.sleep(1)  # Simulate team notification
    print(f"Team notified about new user: {email}")

# Endpoint with background tasks
@app.post("/signup-fast")
async def signup_with_background_tasks(email: str, background_tasks: BackgroundTasks):
    start_time = time.time()

    # Add tasks to the background
    background_tasks.add_task(send_welcome_email, email)
    background_tasks.add_task(update_analytics, email)
    background_tasks.add_task(notify_team, email)

    total_time = time.time() - start_time
    return {
        "message": "Signup complete! That was instant! 🚀",
        "email": email,
        "time_taken": f"{total_time:.1f} seconds",
    }

Swagger Documentation & Tags

from fastapi import FastAPI

# Create FastAPI instance with metadata
app = FastAPI(
    title="Shopping Mall API",
    description="A super simple API with products, users and orders",
    docs_url="/kunal"  # Custom Swagger docs URL
)

# Dummy data using simple dictionaries
products = [
    {"id": 1, "name": "Laptop", "price": 999.99},
    {"id": 2, "name": "Phone", "price": 499.99},
    {"id": 3, "name": "Headphones", "price": 99.99}
]

users = [
    {"id": 1, "username": "john_doe", "email": "[email protected]"},
    {"id": 2, "username": "jane_smith", "email": "[email protected]"}
]

orders = [
    {"id": 1, "user_id": 1, "items": ["Laptop", "Headphones"], "total": 1099.98},
    {"id": 2, "user_id": 2, "items": ["Phone"], "total": 499.99}
]

# Product Routes or Endpoints
@app.get(
    "/products/",
    tags=["Products"],
    description="Get all products in the store"
)
def get_products():
    return products


@app.get(
    "/products/{product_id}",
    tags=["Products"],
    description="Get a single product by its ID"
)
def get_product(product_id: int):
    for product in products:
        if product["id"] == product_id:
            return product
    return {"message": "Product not found"}

# User Routes
@app.get(
    "/users/",
    tags=["Users"],
    description="Get all users"
)
def get_users():
    return users


@app.get(
    "/users/{user_id}",
    tags=["Users"],
    description="Get a single user by their ID"
)
def get_user(user_id: int):
    for user in users:
        if user["id"] == user_id:
            return user
    return {"message": "User not found"}

# Order Routes
@app.get(
    "/orders/",
    tags=["Orders"],
    description="Get all orders"
)
def get_orders():
    return orders


@app.get(
    "/orders/{order_id}",
    tags=["Orders"],
    description="Get a single order by its ID"
)
def get_order(order_id: int):
    for order in orders:
        if order["id"] == order_id:
            return order
    return {"message": "Order not found"}

Swagger Password Protected

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.openapi.docs import get_swagger_ui_html
from typing import List
import secrets

# Create FastAPI app
app = FastAPI(
    title="Shopping Mall API",
    description="A super simple API with password-protected documentation",
    # Disable default docs
    docs_url=None,
    redoc_url=None
)

# Setup basic auth
security = HTTPBasic()

# Define username and password for docs
USERNAME = "admin"
PASSWORD = "secret123"

# Security function to check credentials
def get_current_user(credentials: HTTPBasicCredentials = Depends(security)):
    correct_username = secrets.compare_digest(credentials.username, USERNAME)
    correct_password = secrets.compare_digest(credentials.password, PASSWORD)
    if not (correct_username and correct_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Basic"},
        )
    return credentials.username

# Custom docs endpoint with authentication
@app.get("/swagger", include_in_schema=False)
async def get_documentation(username: str = Depends(get_current_user)):
    return get_swagger_ui_html(
        openapi_url="/openapi.json",
        title="API Documentation",
        swagger_js_url="https://cdnjs.cloudflare.com/ajax/libs/swagger-ui/4.15.5/swagger-ui-bundle.min.js",
        swagger_css_url="https://cdnjs.cloudflare.com/ajax/libs/swagger-ui/4.15.5/swagger-ui.min.css"
    )

# Protect the OpenAPI JSON schema too
@app.get("/openapi.json", include_in_schema=False)
async def get_openapi_json(username: str = Depends(get_current_user)):
    return app.openapi()

# --- Dummy Data ---
products = [
    {"id": 1, "name": "Laptop", "price": 999.99},
    {"id": 2, "name": "Phone", "price": 499.99},
    {"id": 3, "name": "Headphones", "price": 99.99},
]

users = [
    {"id": 1, "username": "john_doe", "email": "[email protected]"},
    {"id": 2, "username": "jane_smith", "email": "[email protected]"},
]

orders = [
    {"id": 1, "user_id": 1, "items": ["Laptop", "Headphones"], "total": 1099.98},
    {"id": 2, "user_id": 2, "items": ["Phone"], "total": 499.99},
]

# --- Product Routes ---
@app.get("/products/", tags=["Products"], description="Get all products in the store")
def get_products():
    return products

@app.get("/products/{product_id}", tags=["Products"], description="Get a single product by its ID")
def get_product(product_id: int):
    for product in products:
        if product["id"] == product_id:
            return product
    return {"message": "Product not found"}

# --- User Routes ---
@app.get("/users/", tags=["Users"], description="Get all users")
def get_users():
    return users

@app.get("/users/{user_id}", tags=["Users"], description="Get a single user by their ID")
def get_user(user_id: int):
    for user in users:
        if user["id"] == user_id:
            return user
    return {"message": "User not found"}

# --- Order Routes ---
@app.get("/orders/", tags=["Orders"], description="Get all orders")
def get_orders():
    return orders

@app.get("/orders/{order_id}", tags=["Orders"], description="Get a single order by its ID")
def get_order(order_id: int):
    for order in orders:
        if order["id"] == order_id:
            return order
    return {"message": "Order not found"}