Sufast/Documentation
CRUD Operations
Complete CRUD API Example
Build a full-featured REST API with Create, Read, Update, and Delete operations using Sufast.
This example demonstrates a complete REST API for managing users and posts. It includes validation, error handling, and proper HTTP status codes.
Complete CRUD Application
Full-featured API with users, posts, and relationships
from sufast import App, Request
from typing import Optional, List, Dict, Any
from datetime import datetime
import uuid
app = App()
# In-memory databases (use real database in production)
users_db: Dict[str, Dict[str, Any]] = {}
posts_db: Dict[str, Dict[str, Any]] = {}
# Helper functions
def generate_id() -> str:
return str(uuid.uuid4())
def get_current_timestamp() -> str:
return datetime.now().isoformat()
def validate_email(email: str) -> bool:
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
def user_exists(user_id: str) -> bool:
return user_id in users_db
def email_exists(email: str, exclude_user_id: Optional[str] = None) -> bool:
for uid, user in users_db.items():
if uid != exclude_user_id and user["email"] == email:
return True
return False
# ============================================================================
# USERS CRUD OPERATIONS
# ============================================================================
@app.get("/users")
def list_users(
page: int = 1,
limit: int = 10,
search: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
):
"""List all users with pagination and search"""
# Validate parameters
if page < 1:
return {"error": "Page must be >= 1"}, 400
if limit < 1 or limit > 100:
return {"error": "Limit must be between 1 and 100"}, 400
if sort_order not in ["asc", "desc"]:
return {"error": "Sort order must be 'asc' or 'desc'"}, 400
# Get all users
users = list(users_db.values())
# Apply search filter
if search:
search_lower = search.lower()
users = [
user for user in users
if search_lower in user["name"].lower() or search_lower in user["email"].lower()
]
# Sort users
reverse = sort_order == "desc"
if sort_by == "name":
users.sort(key=lambda x: x["name"], reverse=reverse)
elif sort_by == "email":
users.sort(key=lambda x: x["email"], reverse=reverse)
else: # created_at
users.sort(key=lambda x: x["created_at"], reverse=reverse)
# Apply pagination
total = len(users)
start = (page - 1) * limit
end = start + limit
paginated_users = users[start:end]
return {
"users": paginated_users,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": end < total,
"has_prev": page > 1
}
}
@app.get("/users/:user_id")
def get_user(user_id: str, include_posts: bool = False):
"""Get a specific user by ID"""
if not user_exists(user_id):
return {"error": "User not found"}, 404
user = users_db[user_id].copy()
# Include user's posts if requested
if include_posts:
user_posts = [post for post in posts_db.values() if post["author_id"] == user_id]
user["posts"] = user_posts
user["posts_count"] = len(user_posts)
return {"user": user}
@app.post("/users")
async def create_user(request: Request):
"""Create a new user"""
try:
data = await request.json()
except:
return {"error": "Invalid JSON format"}, 400
# Validate required fields
required_fields = ["name", "email"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {"error": f"Missing required fields: {', '.join(missing_fields)}"}, 400
# Validate email format
if not validate_email(data["email"]):
return {"error": "Invalid email format"}, 400
# Check if email already exists
if email_exists(data["email"]):
return {"error": "Email already exists"}, 409
# Validate optional fields
age = data.get("age")
if age is not None:
if not isinstance(age, int) or age < 0 or age > 150:
return {"error": "Age must be between 0 and 150"}, 400
# Create user
user_id = generate_id()
user = {
"id": user_id,
"name": data["name"].strip(),
"email": data["email"].lower().strip(),
"age": age,
"bio": data.get("bio", "").strip(),
"created_at": get_current_timestamp(),
"updated_at": get_current_timestamp()
}
users_db[user_id] = user
return {"user": user}, 201
@app.put("/users/:user_id")
async def update_user(user_id: str, request: Request):
"""Update an entire user (replace)"""
if not user_exists(user_id):
return {"error": "User not found"}, 404
try:
data = await request.json()
except:
return {"error": "Invalid JSON format"}, 400
# Validate required fields
required_fields = ["name", "email"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {"error": f"Missing required fields: {', '.join(missing_fields)}"}, 400
# Validate email format
if not validate_email(data["email"]):
return {"error": "Invalid email format"}, 400
# Check if email already exists (excluding current user)
if email_exists(data["email"], user_id):
return {"error": "Email already exists"}, 409
# Validate optional fields
age = data.get("age")
if age is not None:
if not isinstance(age, int) or age < 0 or age > 150:
return {"error": "Age must be between 0 and 150"}, 400
# Update user (keep original created_at)
original_user = users_db[user_id]
updated_user = {
"id": user_id,
"name": data["name"].strip(),
"email": data["email"].lower().strip(),
"age": age,
"bio": data.get("bio", "").strip(),
"created_at": original_user["created_at"],
"updated_at": get_current_timestamp()
}
users_db[user_id] = updated_user
return {"user": updated_user}
@app.patch("/users/:user_id")
async def patch_user(user_id: str, request: Request):
"""Partially update a user"""
if not user_exists(user_id):
return {"error": "User not found"}, 404
try:
data = await request.json()
except:
return {"error": "Invalid JSON format"}, 400
if not data:
return {"error": "No data provided"}, 400
user = users_db[user_id].copy()
# Update provided fields
if "name" in data:
user["name"] = data["name"].strip()
if "email" in data:
if not validate_email(data["email"]):
return {"error": "Invalid email format"}, 400
if email_exists(data["email"], user_id):
return {"error": "Email already exists"}, 409
user["email"] = data["email"].lower().strip()
if "age" in data:
age = data["age"]
if age is not None:
if not isinstance(age, int) or age < 0 or age > 150:
return {"error": "Age must be between 0 and 150"}, 400
user["age"] = age
if "bio" in data:
user["bio"] = data["bio"].strip()
user["updated_at"] = get_current_timestamp()
users_db[user_id] = user
return {"user": user}
@app.delete("/users/:user_id")
def delete_user(user_id: str):
"""Delete a user and all their posts"""
if not user_exists(user_id):
return {"error": "User not found"}, 404
# Delete user's posts
user_posts = [post_id for post_id, post in posts_db.items() if post["author_id"] == user_id]
for post_id in user_posts:
del posts_db[post_id]
# Delete user
del users_db[user_id]
return {"message": "User and associated posts deleted successfully"}, 204
# ============================================================================
# POSTS CRUD OPERATIONS
# ============================================================================
@app.get("/posts")
def list_posts(
page: int = 1,
limit: int = 10,
author_id: Optional[str] = None,
search: Optional[str] = None,
sort_by: str = "created_at",
sort_order: str = "desc"
):
"""List all posts with pagination and filtering"""
# Validate parameters
if page < 1:
return {"error": "Page must be >= 1"}, 400
if limit < 1 or limit > 100:
return {"error": "Limit must be between 1 and 100"}, 400
if sort_order not in ["asc", "desc"]:
return {"error": "Sort order must be 'asc' or 'desc'"}, 400
# Get all posts
posts = list(posts_db.values())
# Filter by author
if author_id:
if not user_exists(author_id):
return {"error": "Author not found"}, 404
posts = [post for post in posts if post["author_id"] == author_id]
# Apply search filter
if search:
search_lower = search.lower()
posts = [
post for post in posts
if search_lower in post["title"].lower() or search_lower in post["content"].lower()
]
# Sort posts
reverse = sort_order == "desc"
if sort_by == "title":
posts.sort(key=lambda x: x["title"], reverse=reverse)
else: # created_at
posts.sort(key=lambda x: x["created_at"], reverse=reverse)
# Apply pagination
total = len(posts)
start = (page - 1) * limit
end = start + limit
paginated_posts = posts[start:end]
# Add author information to each post
for post in paginated_posts:
if post["author_id"] in users_db:
author = users_db[post["author_id"]]
post["author"] = {
"id": author["id"],
"name": author["name"],
"email": author["email"]
}
return {
"posts": paginated_posts,
"pagination": {
"page": page,
"limit": limit,
"total": total,
"pages": (total + limit - 1) // limit,
"has_next": end < total,
"has_prev": page > 1
}
}
@app.get("/posts/:post_id")
def get_post(post_id: str):
"""Get a specific post by ID"""
if post_id not in posts_db:
return {"error": "Post not found"}, 404
post = posts_db[post_id].copy()
# Add author information
if post["author_id"] in users_db:
author = users_db[post["author_id"]]
post["author"] = {
"id": author["id"],
"name": author["name"],
"email": author["email"]
}
return {"post": post}
@app.post("/posts")
async def create_post(request: Request):
"""Create a new post"""
try:
data = await request.json()
except:
return {"error": "Invalid JSON format"}, 400
# Validate required fields
required_fields = ["title", "content", "author_id"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {"error": f"Missing required fields: {', '.join(missing_fields)}"}, 400
# Validate author exists
if not user_exists(data["author_id"]):
return {"error": "Author not found"}, 404
# Validate content length
if len(data["title"].strip()) < 1:
return {"error": "Title cannot be empty"}, 400
if len(data["content"].strip()) < 1:
return {"error": "Content cannot be empty"}, 400
# Create post
post_id = generate_id()
post = {
"id": post_id,
"title": data["title"].strip(),
"content": data["content"].strip(),
"author_id": data["author_id"],
"tags": data.get("tags", []),
"published": data.get("published", True),
"created_at": get_current_timestamp(),
"updated_at": get_current_timestamp()
}
posts_db[post_id] = post
# Add author information to response
if post["author_id"] in users_db:
author = users_db[post["author_id"]]
post["author"] = {
"id": author["id"],
"name": author["name"],
"email": author["email"]
}
return {"post": post}, 201
@app.put("/posts/:post_id")
async def update_post(post_id: str, request: Request):
"""Update an entire post (replace)"""
if post_id not in posts_db:
return {"error": "Post not found"}, 404
try:
data = await request.json()
except:
return {"error": "Invalid JSON format"}, 400
# Validate required fields
required_fields = ["title", "content"]
missing_fields = [field for field in required_fields if field not in data]
if missing_fields:
return {"error": f"Missing required fields: {', '.join(missing_fields)}"}, 400
# Validate content length
if len(data["title"].strip()) < 1:
return {"error": "Title cannot be empty"}, 400
if len(data["content"].strip()) < 1:
return {"error": "Content cannot be empty"}, 400
# Update post (keep original author and created_at)
original_post = posts_db[post_id]
updated_post = {
"id": post_id,
"title": data["title"].strip(),
"content": data["content"].strip(),
"author_id": original_post["author_id"], # Cannot change author
"tags": data.get("tags", []),
"published": data.get("published", True),
"created_at": original_post["created_at"],
"updated_at": get_current_timestamp()
}
posts_db[post_id] = updated_post
return {"post": updated_post}
@app.patch("/posts/:post_id")
async def patch_post(post_id: str, request: Request):
"""Partially update a post"""
if post_id not in posts_db:
return {"error": "Post not found"}, 404
try:
data = await request.json()
except:
return {"error": "Invalid JSON format"}, 400
if not data:
return {"error": "No data provided"}, 400
post = posts_db[post_id].copy()
# Update provided fields
if "title" in data:
if len(data["title"].strip()) < 1:
return {"error": "Title cannot be empty"}, 400
post["title"] = data["title"].strip()
if "content" in data:
if len(data["content"].strip()) < 1:
return {"error": "Content cannot be empty"}, 400
post["content"] = data["content"].strip()
if "tags" in data:
post["tags"] = data["tags"]
if "published" in data:
post["published"] = data["published"]
post["updated_at"] = get_current_timestamp()
posts_db[post_id] = post
return {"post": post}
@app.delete("/posts/:post_id")
def delete_post(post_id: str):
"""Delete a post"""
if post_id not in posts_db:
return {"error": "Post not found"}, 404
del posts_db[post_id]
return {"message": "Post deleted successfully"}, 204
# ============================================================================
# UTILITY ENDPOINTS
# ============================================================================
@app.get("/")
def root():
"""API information"""
return {
"name": "Sufast CRUD API",
"version": "1.0.0",
"description": "Complete CRUD API example with users and posts",
"endpoints": {
"users": "/users",
"posts": "/posts",
"docs": "/docs"
},
"stats": {
"total_users": len(users_db),
"total_posts": len(posts_db)
}
}
@app.get("/health")
def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": get_current_timestamp(),
"database": {
"users": len(users_db),
"posts": len(posts_db)
}
}
# ============================================================================
# SEED DATA (for testing)
# ============================================================================
def seed_data():
"""Add some sample data for testing"""
# Create sample users
user1_id = generate_id()
user2_id = generate_id()
users_db[user1_id] = {
"id": user1_id,
"name": "Alice Johnson",
"email": "alice@example.com",
"age": 28,
"bio": "Software developer and tech enthusiast",
"created_at": "2024-01-01T10:00:00",
"updated_at": "2024-01-01T10:00:00"
}
users_db[user2_id] = {
"id": user2_id,
"name": "Bob Smith",
"email": "bob@example.com",
"age": 35,
"bio": "Product manager with 10+ years experience",
"created_at": "2024-01-02T11:00:00",
"updated_at": "2024-01-02T11:00:00"
}
# Create sample posts
post1_id = generate_id()
post2_id = generate_id()
post3_id = generate_id()
posts_db[post1_id] = {
"id": post1_id,
"title": "Getting Started with Sufast",
"content": "Sufast is an amazing Python web framework that combines the simplicity of Python with the performance of Rust...",
"author_id": user1_id,
"tags": ["python", "web", "framework"],
"published": True,
"created_at": "2024-01-03T12:00:00",
"updated_at": "2024-01-03T12:00:00"
}
posts_db[post2_id] = {
"id": post2_id,
"title": "Building REST APIs",
"content": "REST APIs are the backbone of modern web applications. In this post, we'll explore best practices...",
"author_id": user1_id,
"tags": ["api", "rest", "backend"],
"published": True,
"created_at": "2024-01-04T13:00:00",
"updated_at": "2024-01-04T13:00:00"
}
posts_db[post3_id] = {
"id": post3_id,
"title": "Product Management in Tech",
"content": "Product management is a crucial role in technology companies. Here's what I've learned...",
"author_id": user2_id,
"tags": ["product", "management", "tech"],
"published": True,
"created_at": "2024-01-05T14:00:00",
"updated_at": "2024-01-05T14:00:00"
}
# Initialize with sample data
seed_data()
if __name__ == "__main__":
print("Starting Sufast CRUD API...")
print("Available endpoints:")
print(" GET /users")
print(" POST /users")
print(" ...etc")