Sufast/Documentation
Authentication System
JWT Authentication Example
Complete authentication system with JWT tokens, password hashing, and role-based access control.
Security Note: This example uses secure practices including password hashing, JWT tokens, and proper session management. Never store passwords in plain text!
Complete Authentication System
Full-featured auth with JWT, password hashing, and RBAC
auth_system.pypython
1from sufast import App, Request, Response2from typing import Optional, Dict, Any, List3from datetime import datetime, timedelta4import jwt5import bcrypt6import uuid7import re8import os910app = App()1112# Configuration13SECRET_KEY = os.getenv("SECRET_KEY", "your-super-secret-key-change-in-production")14ALGORITHM = "HS256"15ACCESS_TOKEN_EXPIRE_MINUTES = 3016REFRESH_TOKEN_EXPIRE_DAYS = 71718# In-memory databases (use real database in production)19users_db: Dict[str, Dict[str, Any]] = {}20refresh_tokens_db: Dict[str, Dict[str, Any]] = {}2122# User roles23class UserRole:24 ADMIN = "admin"25 USER = "user"26 MODERATOR = "moderator"2728# Helper functions29def generate_id() -> str:30 return str(uuid.uuid4())3132def get_current_timestamp() -> str:33 return datetime.now().isoformat()3435def validate_email(email: str) -> bool:36 pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'37 return re.match(pattern, email) is not None3839def validate_password(password: str) -> tuple[bool, str]:40 """Validate password strength."""41 if len(password) < 8:42 return False, "Password must be at least 8 characters long"43 if not re.search(r'[A-Z]', password):44 return False, "Password must contain at least one uppercase letter"45 if not re.search(r'[a-z]', password):46 return False, "Password must contain at least one lowercase letter"47 if not re.search(r'\d', password):48 return False, "Password must contain at least one digit"49 if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):50 return False, "Password must contain at least one special character"51 return True, "Password is valid"5253def hash_password(password: str) -> str:54 """Hash password using bcrypt."""55 salt = bcrypt.gensalt()56 hashed = bcrypt.hashpw(password.encode('utf-8'), salt)57 return hashed.decode('utf-8')5859def verify_password(password: str, hashed: str) -> bool:60 """Verify password against hash."""61 return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))6263def create_access_token(data: Dict[str, Any]) -> str:64 """Create JWT access token."""65 to_encode = data.copy()66 expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)67 to_encode.update({"exp": expire, "type": "access"})68 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)6970def create_refresh_token(user_id: str) -> str:71 """Create JWT refresh token."""72 token_id = generate_id()73 expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)74 75 # Store refresh token in database76 refresh_tokens_db[token_id] = {77 "user_id": user_id,78 "expires_at": expire.isoformat(),79 "created_at": get_current_timestamp()80 }81 82 to_encode = {83 "token_id": token_id,84 "user_id": user_id,85 "exp": expire,86 "type": "refresh"87 }88 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)8990def verify_token(token: str) -> Optional[Dict[str, Any]]:91 """Verify and decode JWT token."""92 try:93 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])94 return payload95 except jwt.ExpiredSignatureError:96 return None97 except jwt.JWTError:98 return None99100def get_current_user(request: Request) -> Optional[Dict[str, Any]]:101 """Extract current user from request."""102 auth_header = request.headers.get("authorization")103 if not auth_header or not auth_header.startswith("Bearer "):104 return None105 106 token = auth_header.split(" ")[1]107 payload = verify_token(token)108 109 if not payload or payload.get("type") != "access":110 return None111 112 user_id = payload.get("sub")113 if user_id and user_id in users_db:114 return users_db[user_id]115 116 return None117118def require_auth(request: Request) -> Optional[Response]:119 """Middleware helper to require authentication."""120 user = get_current_user(request)121 if not user:122 return Response(123 content={"error": "Authentication required"},124 status_code=401,125 headers={"WWW-Authenticate": "Bearer"}126 )127 request.state.current_user = user128 return None129130def require_role(required_role: str):131 """Decorator to require specific role."""132 def decorator(func):133 async def wrapper(request: Request, *args, **kwargs):134 auth_response = require_auth(request)135 if auth_response:136 return auth_response137 138 user = request.state.current_user139 if user["role"] != required_role and user["role"] != UserRole.ADMIN:140 return Response(141 content={"error": f"Role '{required_role}' required"},142 status_code=403143 )144 145 return await func(request, *args, **kwargs)146 return wrapper147 return decorator148149# ============================================================================150# AUTHENTICATION ENDPOINTS151# ============================================================================152153@app.post("/auth/register")154async def register(request: Request):155 """Register a new user."""156 try:157 data = await request.json()158 except:159 return {"error": "Invalid JSON format"}, 400160 161 # Validate required fields162 required_fields = ["email", "password", "name"]163 missing_fields = [field for field in required_fields if field not in data]164 if missing_fields:165 return {"error": f"Missing required fields: {', '.join(missing_fields)}"}, 400166 167 email = data["email"].lower().strip()168 password = data["password"]169 name = data["name"].strip()170 171 # Validate email format172 if not validate_email(email):173 return {"error": "Invalid email format"}, 400174 175 # Check if email already exists176 for user in users_db.values():177 if user["email"] == email:178 return {"error": "Email already registered"}, 409179 180 # Validate password strength181 is_valid, message = validate_password(password)182 if not is_valid:183 return {"error": message}, 400184 185 # Create user186 user_id = generate_id()187 hashed_password = hash_password(password)188 189 user = {190 "id": user_id,191 "email": email,192 "name": name,193 "password_hash": hashed_password,194 "role": data.get("role", UserRole.USER),195 "is_active": True,196 "email_verified": False,197 "created_at": get_current_timestamp(),198 "updated_at": get_current_timestamp(),199 "last_login": None200 }201 202 # Only admins can create admin users203 if user["role"] == UserRole.ADMIN:204 # Check if there are any existing users (first user can be admin)205 if users_db:206 return {"error": "Cannot create admin user"}, 403207 208 users_db[user_id] = user209 210 # Create tokens211 access_token = create_access_token({"sub": user_id, "email": email, "role": user["role"]})212 refresh_token = create_refresh_token(user_id)213 214 # Remove password hash from response215 user_response = {k: v for k, v in user.items() if k != "password_hash"}216 217 return {218 "user": user_response,219 "access_token": access_token,220 "refresh_token": refresh_token,221 "token_type": "bearer"222 }, 201223224@app.post("/auth/login")225async def login(request: Request):226 """Login user with email and password."""227 try:228 data = await request.json()229 except:230 return {"error": "Invalid JSON format"}, 400231 232 email = data.get("email", "").lower().strip()233 password = data.get("password", "")234 235 if not email or not password:236 return {"error": "Email and password are required"}, 400237 238 # Find user by email239 user = None240 for u in users_db.values():241 if u["email"] == email:242 user = u243 break244 245 if not user:246 return {"error": "Invalid email or password"}, 401247 248 # Verify password249 if not verify_password(password, user["password_hash"]):250 return {"error": "Invalid email or password"}, 401251 252 # Check if user is active253 if not user["is_active"]:254 return {"error": "Account is deactivated"}, 401255 256 # Update last login257 user["last_login"] = get_current_timestamp()258 users_db[user["id"]] = user259 260 # Create tokens261 access_token = create_access_token({"sub": user["id"], "email": email, "role": user["role"]})262 refresh_token = create_refresh_token(user["id"])263 264 # Remove password hash from response265 user_response = {k: v for k, v in user.items() if k != "password_hash"}266 267 return {268 "user": user_response,269 "access_token": access_token,270 "refresh_token": refresh_token,271 "token_type": "bearer"272 }273274@app.post("/auth/refresh")275async def refresh_token(request: Request):276 """Refresh access token using refresh token."""277 try:278 data = await request.json()279 except:280 return {"error": "Invalid JSON format"}, 400281 282 refresh_token = data.get("refresh_token")283 if not refresh_token:284 return {"error": "Refresh token is required"}, 400285 286 # Verify refresh token287 payload = verify_token(refresh_token)288 if not payload or payload.get("type") != "refresh":289 return {"error": "Invalid refresh token"}, 401290 291 token_id = payload.get("token_id")292 user_id = payload.get("user_id")293 294 # Check if refresh token exists in database295 if token_id not in refresh_tokens_db:296 return {"error": "Refresh token not found"}, 401297 298 stored_token = refresh_tokens_db[token_id]299 if stored_token["user_id"] != user_id:300 return {"error": "Invalid refresh token"}, 401301 302 # Check if user exists and is active303 if user_id not in users_db:304 return {"error": "User not found"}, 401305 306 user = users_db[user_id]307 if not user["is_active"]:308 return {"error": "Account is deactivated"}, 401309 310 # Create new access token311 access_token = create_access_token({"sub": user_id, "email": user["email"], "role": user["role"]})312 313 return {314 "access_token": access_token,315 "token_type": "bearer"316 }317318@app.post("/auth/logout")319async def logout(request: Request):320 """Logout user and invalidate refresh token."""321 auth_response = require_auth(request)322 if auth_response:323 return auth_response324 325 try:326 data = await request.json()327 except:328 return {"error": "Invalid JSON format"}, 400329 330 refresh_token = data.get("refresh_token")331 if refresh_token:332 # Verify and remove refresh token333 payload = verify_token(refresh_token)334 if payload and payload.get("type") == "refresh":335 token_id = payload.get("token_id")336 if token_id in refresh_tokens_db:337 del refresh_tokens_db[token_id]338 339 return {"message": "Logged out successfully"}340341@app.get("/auth/me")342async def get_current_user_info(request: Request):343 """Get current user information."""344 auth_response = require_auth(request)345 if auth_response:346 return auth_response347 348 user = request.state.current_user349 user_response = {k: v for k, v in user.items() if k != "password_hash"}350 351 return {"user": user_response}352353@app.put("/auth/me")354async def update_current_user(request: Request):355 """Update current user information."""356 auth_response = require_auth(request)357 if auth_response:358 return auth_response359 360 try:361 data = await request.json()362 except:363 return {"error": "Invalid JSON format"}, 400364 365 user = request.state.current_user366 user_id = user["id"]367 368 # Update allowed fields369 if "name" in data:370 user["name"] = data["name"].strip()371 372 if "email" in data:373 new_email = data["email"].lower().strip()374 if not validate_email(new_email):375 return {"error": "Invalid email format"}, 400376 377 # Check if email already exists (excluding current user)378 for u in users_db.values():379 if u["id"] != user_id and u["email"] == new_email:380 return {"error": "Email already exists"}, 409381 382 user["email"] = new_email383 384 user["updated_at"] = get_current_timestamp()385 users_db[user_id] = user386 387 user_response = {k: v for k, v in user.items() if k != "password_hash"}388 return {"user": user_response}389390@app.post("/auth/change-password")391async def change_password(request: Request):392 """Change user password."""393 auth_response = require_auth(request)394 if auth_response:395 return auth_response396 397 try:398 data = await request.json()399 except:400 return {"error": "Invalid JSON format"}, 400401 402 current_password = data.get("current_password")403 new_password = data.get("new_password")404 405 if not current_password or not new_password:406 return {"error": "Current password and new password are required"}, 400407 408 user = request.state.current_user409 410 # Verify current password411 if not verify_password(current_password, user["password_hash"]):412 return {"error": "Current password is incorrect"}, 401413 414 # Validate new password415 is_valid, message = validate_password(new_password)416 if not is_valid:417 return {"error": message}, 400418 419 # Update password420 user["password_hash"] = hash_password(new_password)421 user["updated_at"] = get_current_timestamp()422 users_db[user["id"]] = user423 424 return {"message": "Password changed successfully"}425426# ============================================================================427# PROTECTED ENDPOINTS (Examples)428# ============================================================================429430@app.get("/protected")431async def protected_endpoint(request: Request):432 """Example protected endpoint."""433 auth_response = require_auth(request)434 if auth_response:435 return auth_response436 437 user = request.state.current_user438 return {439 "message": f"Hello {user['name']}, this is a protected endpoint!",440 "user_id": user["id"],441 "role": user["role"]442 }443444@app.get("/admin-only")445@require_role(UserRole.ADMIN)446async def admin_only_endpoint(request: Request):447 """Example admin-only endpoint."""448 user = request.state.current_user449 return {450 "message": f"Hello Admin {user['name']}!",451 "all_users": len(users_db),452 "active_tokens": len(refresh_tokens_db)453 }454455@app.get("/moderator-only")456@require_role(UserRole.MODERATOR)457async def moderator_only_endpoint(request: Request):458 """Example moderator-only endpoint."""459 user = request.state.current_user460 return {461 "message": f"Hello Moderator {user['name']}!",462 "user_count": len(users_db)463 }464465# ============================================================================466# USER MANAGEMENT (Admin only)467# ============================================================================468469@app.get("/admin/users")470@require_role(UserRole.ADMIN)471async def list_all_users(request: Request):472 """List all users (admin only)."""473 users = []474 for user in users_db.values():475 user_data = {k: v for k, v in user.items() if k != "password_hash"}476 users.append(user_data)477 478 return {"users": users, "total": len(users)}479480@app.patch("/admin/users/:user_id")481@require_role(UserRole.ADMIN)482async def update_user_admin(user_id: str, request: Request):483 """Update user (admin only)."""484 if user_id not in users_db:485 return {"error": "User not found"}, 404486 487 try:488 data = await request.json()489 except:490 return {"error": "Invalid JSON format"}, 400491 492 user = users_db[user_id]493 494 # Update allowed fields495 if "is_active" in data:496 user["is_active"] = bool(data["is_active"])497 498 if "role" in data:499 if data["role"] in [UserRole.ADMIN, UserRole.MODERATOR, UserRole.USER]:500 user["role"] = data["role"]501 else:502 return {"error": "Invalid role"}, 400503 504 if "email_verified" in data:505 user["email_verified"] = bool(data["email_verified"])506 507 user["updated_at"] = get_current_timestamp()508 users_db[user_id] = user509 510 user_response = {k: v for k, v in user.items() if k != "password_hash"}511 return {"user": user_response}512513# ============================================================================514# SEED DATA515# ============================================================================516517def create_demo_users():518 """Create demo users for testing."""519 # Admin user520 admin_id = generate_id()521 admin_user = {522 "id": admin_id,523 "email": "admin@example.com",524 "name": "Admin User",525 "password_hash": hash_password("Admin123!"),526 "role": UserRole.ADMIN,527 "is_active": True,528 "email_verified": True,529 "created_at": get_current_timestamp(),530 "updated_at": get_current_timestamp(),531 "last_login": None532 }533 users_db[admin_id] = admin_user534 535 # Regular user536 user_id = generate_id()537 regular_user = {538 "id": user_id,539 "email": "user@example.com",540 "name": "Regular User",541 "password_hash": hash_password("User123!"),542 "role": UserRole.USER,543 "is_active": True,544 "email_verified": True,545 "created_at": get_current_timestamp(),546 "updated_at": get_current_timestamp(),547 "last_login": None548 }549 users_db[user_id] = regular_user550 551 # Moderator user552 mod_id = generate_id()553 moderator_user = {554 "id": mod_id,555 "email": "moderator@example.com",556 "name": "Moderator User",557 "password_hash": hash_password("Mod123!"),558 "role": UserRole.MODERATOR,559 "is_active": True,560 "email_verified": True,561 "created_at": get_current_timestamp(),562 "updated_at": get_current_timestamp(),563 "last_login": None564 }565 users_db[mod_id] = moderator_user566567# Initialize demo data568create_demo_users()569570if __name__ == "__main__":571 print("Starting Sufast Authentication API...")572 print("Demo users created:")573 print(" Admin: admin@example.com / Admin123!")574 print(" User: user@example.com / User123!")575 print(" Moderator: moderator@example.com / Mod123!")576 print()577 print("Available endpoints:")578 print(" POST /auth/register")579 print(" POST /auth/login")580 print(" POST /auth/refresh")581 print(" POST /auth/logout")582 print(" GET /auth/me")583 print(" PUT /auth/me")584 print(" POST /auth/change-password")585 print(" GET /protected")586 print(" GET /admin-only")587 print(" GET /moderator-only")588 print(" GET /admin/users")589 print(" PATCH /admin/users/:user_id")590 591 app.run(host="0.0.0.0", port=8080)