First API Code:
from fastapi import FastAPI app = FastAPI() @app.get("/") def read_root(): return {"Hello": "World"}
Consolidated Code:
from fastapi import FastAPI from fastapi.responses import JSONResponse from pydantic import BaseModel, Field app = FastAPI() # 1. Endpoint with a dynamic parameter in the URL @app.get("/hello/{name}") def read_root(name: str): return {"Hello": f"{name}"} # 2. Endpoint to get a name dynamically @app.get("/get-name") def get_name(name: str): return {"name": f"{name}"} # 3. Endpoint returning structured JSON data @app.get("/data") def get_data(): return { "name": "John Doe", "age": 30, "cities": ["New York", "Paris", "Tokyo"], "is_active": True } # 4. Endpoint returning a custom status code and message @app.get("/test-code") def return_code(): return JSONResponse( status_code=500, content={"message": "Oops! Something Wrong Happened, try again later."} ) # 5. Endpoint for user creation with validation using Pydantic class User(BaseModel): username: str = Field(..., min_length=3, max_length=50) email: str age: int = Field(..., ge=18, lt=100) @app.post("/users") def create_user(user: User): return user
CORS Code:
from fastapi.middleware.cors import CORSMiddleware app = FastAPI() # Define allowed origins (the URLs that can access your API) origins = [ "http://localhost", # Allow access from localhost "http://localhost:8080", # Allow access from a specific port on localhost "https://kukucourses.com" # Allow access from the Kuku Courses domain ] # Add CORS middleware to the FastAPI app app.add_middleware( CORSMiddleware, allow_origins=origins, # Specify allowed origins allow_credentials=True, # Allow sending cookies or authentication headers allow_methods=["*"], # Allow all HTTP methods (GET, POST, PUT, DELETE, etc.) allow_headers=["*"], # Allow all headers (custom headers, etc.) )
Synchronous execution
from fastapi import FastAPI import time app = FastAPI() # Simulating a slow operation def make_coffee(): time.sleep(2) # Pretend this takes 2 seconds return "☕ Coffee ready!" def make_toast(): time.sleep(3) # Pretend this takes 3 seconds return "🍞 Toast ready!" @app.get("/prepare-slow") def prepare_breakfast_slow(): start_time = time.time() # Record the start time # Sequentially prepare coffee and toast coffee = make_coffee() toast = make_toast() # Calculate the total time taken total_time = time.time() - start_time return { "meals": [coffee, toast], "time_taken": f"{total_time:.1f} seconds" }
Asynchronous execution
from fastapi import FastAPI import asyncio import time app = FastAPI() # Asynchronous way to simulate a slow operation async def async_make_coffee(): await asyncio.sleep(2) # Simulate waiting for 2 seconds return "☕ Coffee ready!" async def async_make_toast(): await asyncio.sleep(3) # Simulate waiting for 3 seconds return "🍞 Toast ready!" # Asynchronous endpoint @app.get("/breakfast-fast") async def prepare_breakfast_fast(): start_time = time.time() # Record the start time # Start both tasks at the same time coffee_task = async_make_coffee() toast_task = async_make_toast() # Wait for both tasks to complete coffee, toast = await asyncio.gather(coffee_task, toast_task) total_time = time.time() - start_time # Calculate the total time return { "meals": [coffee, toast], "time_taken": f"{total_time:.1f} seconds" }
Cache Optimization
from fastapi import FastAPI import time app = FastAPI() # Simulate a slow database query def get_movie_from_database(movie_name: str): time.sleep(2) # Pretend it takes 2 seconds to fetch data return f"Found movie: {movie_name}" # Version 1: Without caching (slow every time) @app.get("/movie/slow/{movie_name}") async def get_movie_slow(movie_name: str): """Get movie without using cache""" # Record the start time start_time = time.time() # Call the slow function result = get_movie_from_database(movie_name) # Calculate total time taken total_time = time.time() - start_time return { "result": result, "time_taken": f"{total_time:.1f} seconds", "from_cache": False } # Simple cache to store movie names movie_cache = [] # Version 2: With simple caching (fast after the first time) @app.get("/movie/fast/{movie_name}") async def get_movie_fast(movie_name: str): """Get movie using simple cache""" # Record the start time start_time = time.time() # Check if the movie is already in the cache if movie_name in movie_cache: total_time = time.time() - start_time return { "result": f"Found movie: {movie_name}", "time_taken": f"{total_time:.1f} seconds", "from_cache": True } # If not in cache, fetch from the database result = get_movie_from_database(movie_name) # Add the movie to the cache for next time movie_cache.append(movie_name) # Calculate total time taken total_time = time.time() - start_time return { "result": result, "time_taken": f"{total_time:.1f} seconds", "from_cache": False }
Optimization Without Background Tasks
from fastapi import FastAPI import time app = FastAPI() # Simulate slow operations def send_welcome_email(email: str): time.sleep(3) # Simulate sending email print(f"Welcome email sent to {email}") def update_analytics(email: str): time.sleep(2) # Simulate analytics update print(f"Analytics updated for {email}") def notify_team(email: str): time.sleep(1) # Simulate team notification print(f"Team notified about new user: {email}") # Endpoint without background tasks @app.post("/signup-slow") async def signup_without_background_tasks(email: str): start_time = time.time() # Perform all tasks sequentially send_welcome_email(email) update_analytics(email) notify_team(email) total_time = time.time() - start_time return { "message": "Signup complete! (But it was slow 🐢)", "email": email, "time_taken": f"{total_time:.1f} seconds", }
Optimization With Background Tasks
from fastapi import FastAPI, BackgroundTasks import time app = FastAPI() # Simulate slow operations def send_welcome_email(email: str): time.sleep(3) # Simulate sending email print(f"Welcome email sent to {email}") def update_analytics(email: str): time.sleep(2) # Simulate analytics update print(f"Analytics updated for {email}") def notify_team(email: str): time.sleep(1) # Simulate team notification print(f"Team notified about new user: {email}") # Endpoint with background tasks @app.post("/signup-fast") async def signup_with_background_tasks(email: str, background_tasks: BackgroundTasks): start_time = time.time() # Add tasks to the background background_tasks.add_task(send_welcome_email, email) background_tasks.add_task(update_analytics, email) background_tasks.add_task(notify_team, email) total_time = time.time() - start_time return { "message": "Signup complete! That was instant! 🚀", "email": email, "time_taken": f"{total_time:.1f} seconds", }
Swagger Documentation & Tags
from fastapi import FastAPI # Create FastAPI instance with metadata app = FastAPI( title="Shopping Mall API", description="A super simple API with products, users and orders", docs_url="/kunal" # Custom Swagger docs URL ) # Dummy data using simple dictionaries products = [ {"id": 1, "name": "Laptop", "price": 999.99}, {"id": 2, "name": "Phone", "price": 499.99}, {"id": 3, "name": "Headphones", "price": 99.99} ] users = [ {"id": 1, "username": "john_doe", "email": "[email protected]"}, {"id": 2, "username": "jane_smith", "email": "[email protected]"} ] orders = [ {"id": 1, "user_id": 1, "items": ["Laptop", "Headphones"], "total": 1099.98}, {"id": 2, "user_id": 2, "items": ["Phone"], "total": 499.99} ] # Product Routes or Endpoints @app.get( "/products/", tags=["Products"], description="Get all products in the store" ) def get_products(): return products @app.get( "/products/{product_id}", tags=["Products"], description="Get a single product by its ID" ) def get_product(product_id: int): for product in products: if product["id"] == product_id: return product return {"message": "Product not found"} # User Routes @app.get( "/users/", tags=["Users"], description="Get all users" ) def get_users(): return users @app.get( "/users/{user_id}", tags=["Users"], description="Get a single user by their ID" ) def get_user(user_id: int): for user in users: if user["id"] == user_id: return user return {"message": "User not found"} # Order Routes @app.get( "/orders/", tags=["Orders"], description="Get all orders" ) def get_orders(): return orders @app.get( "/orders/{order_id}", tags=["Orders"], description="Get a single order by its ID" ) def get_order(order_id: int): for order in orders: if order["id"] == order_id: return order return {"message": "Order not found"}
Swagger Password Protected
from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import HTTPBasic, HTTPBasicCredentials from fastapi.openapi.docs import get_swagger_ui_html from typing import List import secrets # Create FastAPI app app = FastAPI( title="Shopping Mall API", description="A super simple API with password-protected documentation", # Disable default docs docs_url=None, redoc_url=None ) # Setup basic auth security = HTTPBasic() # Define username and password for docs USERNAME = "admin" PASSWORD = "secret123" # Security function to check credentials def get_current_user(credentials: HTTPBasicCredentials = Depends(security)): correct_username = secrets.compare_digest(credentials.username, USERNAME) correct_password = secrets.compare_digest(credentials.password, PASSWORD) if not (correct_username and correct_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Basic"}, ) return credentials.username # Custom docs endpoint with authentication @app.get("/swagger", include_in_schema=False) async def get_documentation(username: str = Depends(get_current_user)): return get_swagger_ui_html( openapi_url="/openapi.json", title="API Documentation", swagger_js_url="https://cdnjs.cloudflare.com/ajax/libs/swagger-ui/4.15.5/swagger-ui-bundle.min.js", swagger_css_url="https://cdnjs.cloudflare.com/ajax/libs/swagger-ui/4.15.5/swagger-ui.min.css" ) # Protect the OpenAPI JSON schema too @app.get("/openapi.json", include_in_schema=False) async def get_openapi_json(username: str = Depends(get_current_user)): return app.openapi() # --- Dummy Data --- products = [ {"id": 1, "name": "Laptop", "price": 999.99}, {"id": 2, "name": "Phone", "price": 499.99}, {"id": 3, "name": "Headphones", "price": 99.99}, ] users = [ {"id": 1, "username": "john_doe", "email": "[email protected]"}, {"id": 2, "username": "jane_smith", "email": "[email protected]"}, ] orders = [ {"id": 1, "user_id": 1, "items": ["Laptop", "Headphones"], "total": 1099.98}, {"id": 2, "user_id": 2, "items": ["Phone"], "total": 499.99}, ] # --- Product Routes --- @app.get("/products/", tags=["Products"], description="Get all products in the store") def get_products(): return products @app.get("/products/{product_id}", tags=["Products"], description="Get a single product by its ID") def get_product(product_id: int): for product in products: if product["id"] == product_id: return product return {"message": "Product not found"} # --- User Routes --- @app.get("/users/", tags=["Users"], description="Get all users") def get_users(): return users @app.get("/users/{user_id}", tags=["Users"], description="Get a single user by their ID") def get_user(user_id: int): for user in users: if user["id"] == user_id: return user return {"message": "User not found"} # --- Order Routes --- @app.get("/orders/", tags=["Orders"], description="Get all orders") def get_orders(): return orders @app.get("/orders/{order_id}", tags=["Orders"], description="Get a single order by its ID") def get_order(order_id: int): for order in orders: if order["id"] == order_id: return order return {"message": "Order not found"}