🚧 Prototype Notice

This project (sufast) is currently a dummy prototype.
Only static routes are working at the moment.
Dynamic routing and full features are under development.
Thank you for understanding! 🙏

Sufast/Documentation
Database Integration

Database Integration Examples

Learn how to integrate Sufast with popular databases including PostgreSQL, MongoDB, and SQLite.

PostgreSQL Integration
Async PostgreSQL with connection pooling and transactions

Installation & Setup

Installationbash
# Install required packages
pip install asyncpg psycopg2-binary

# Environment variables
export DATABASE_URL="postgresql://user:password@localhost:5432/myapp"

Basic PostgreSQL Example

postgresql_example.pypython
1from sufast import App, Request
2import asyncpg
3import os
4from typing import Optional
5
6app = App()
7
8# Database configuration
9DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/myapp")
10
11class DatabaseManager:
12 def __init__(self):
13 self.pool = None
14
15 async def connect(self):
16 """Create database connection pool."""
17 self.pool = await asyncpg.create_pool(DATABASE_URL)
18 print("✅ Database pool created")
19
20 async def disconnect(self):
21 """Close database connection pool."""
22 if self.pool:
23 await self.pool.close()
24 print("✅ Database pool closed")
25
26 async def fetch(self, query: str, *args):
27 """Fetch multiple rows."""
28 async with self.pool.acquire() as conn:
29 return await conn.fetch(query, *args)
30
31 async def fetchrow(self, query: str, *args):
32 """Fetch a single row."""
33 async with self.pool.acquire() as conn:
34 return await conn.fetchrow(query, *args)
35
36# Initialize database manager
37db = DatabaseManager()
38
39@app.on_event("startup")
40async def startup():
41 """Initialize database connection on startup."""
42 await db.connect()
43
44@app.on_event("shutdown")
45async def shutdown():
46 """Close database connection on shutdown."""
47 await db.disconnect()
48
49# Example endpoints
50@app.get("/users")
51async def list_users(page: int = 1, limit: int = 10):
52 """List users with pagination."""
53 offset = (page - 1) * limit
54
55 # Count total users
56 total = await db.fetchval("SELECT COUNT(*) FROM users")
57
58 # Get users with pagination
59 rows = await db.fetch(
60 "SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
61 limit, offset
62 )
63 users = [dict(row) for row in rows]
64
65 return {
66 "users": users,
67 "pagination": {
68 "page": page,
69 "limit": limit,
70 "total": total,
71 "pages": (total + limit - 1) // limit
72 }
73 }
74
75@app.get("/users/{user_id}")
76async def get_user(user_id: str):
77 """Get user by ID."""
78 user_row = await db.fetchrow(
79 "SELECT id, name, email, created_at FROM users WHERE id = $1",
80 user_id
81 )
82
83 if not user_row:
84 return {"error": "User not found"}, 404
85
86 return {"user": dict(user_row)}
87
88@app.post("/users")
89async def create_user(request: Request):
90 """Create a new user."""
91 data = await request.json()
92
93 # Validate required fields
94 if not data.get("name") or not data.get("email"):
95 return {"error": "Name and email are required"}, 400
96
97 try:
98 user_row = await db.fetchrow(
99 """
100 INSERT INTO users (name, email)
101 VALUES ($1, $2)
102 RETURNING id, name, email, created_at
103 """,
104 data["name"], data["email"]
105 )
106
107 return {"user": dict(user_row)}, 201
108
109 except asyncpg.UniqueViolationError:
110 return {"error": "Email already exists"}, 409
111
112if __name__ == "__main__":
113 app.run(host="0.0.0.0", port=8080)
Best Practices

Connection Management

  • • Use connection pooling for production
  • • Handle startup/shutdown events properly
  • • Implement proper error handling
  • • Use environment variables for configuration

Performance Tips

  • • Use async database drivers
  • • Implement pagination for large datasets
  • • Add database indexes for frequently queried fields
  • • Use transactions for data consistency