🚧 Prototype Notice

This project (sufast) is currently a dummy prototype.
Only static routes are working at the moment.
Dynamic routing and full features are under development.
Thank you for understanding! 🙏

Sufast/Documentation
Authentication System

JWT Authentication Example

Complete authentication system with JWT tokens, password hashing, and role-based access control.

Complete Authentication System
Full-featured auth with JWT, password hashing, and RBAC
auth_system.pypython
1from sufast import App, Request, Response
2from typing import Optional, Dict, Any, List
3from datetime import datetime, timedelta
4import jwt
5import bcrypt
6import uuid
7import re
8import os
9
10app = App()
11
12# Configuration
13SECRET_KEY = os.getenv("SECRET_KEY", "your-super-secret-key-change-in-production")
14ALGORITHM = "HS256"
15ACCESS_TOKEN_EXPIRE_MINUTES = 30
16REFRESH_TOKEN_EXPIRE_DAYS = 7
17
18# In-memory databases (use real database in production)
19users_db: Dict[str, Dict[str, Any]] = {}
20refresh_tokens_db: Dict[str, Dict[str, Any]] = {}
21
22# User roles
23class UserRole:
24 ADMIN = "admin"
25 USER = "user"
26 MODERATOR = "moderator"
27
28# Helper functions
29def generate_id() -> str:
30 return str(uuid.uuid4())
31
32def get_current_timestamp() -> str:
33 return datetime.now().isoformat()
34
35def validate_email(email: str) -> bool:
36 pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+.[a-zA-Z]{2,}$'
37 return re.match(pattern, email) is not None
38
39def validate_password(password: str) -> tuple[bool, str]:
40 """Validate password strength."""
41 if len(password) < 8:
42 return False, "Password must be at least 8 characters long"
43 if not re.search(r'[A-Z]', password):
44 return False, "Password must contain at least one uppercase letter"
45 if not re.search(r'[a-z]', password):
46 return False, "Password must contain at least one lowercase letter"
47 if not re.search(r'\d', password):
48 return False, "Password must contain at least one digit"
49 if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
50 return False, "Password must contain at least one special character"
51 return True, "Password is valid"
52
53def hash_password(password: str) -> str:
54 """Hash password using bcrypt."""
55 salt = bcrypt.gensalt()
56 hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
57 return hashed.decode('utf-8')
58
59def verify_password(password: str, hashed: str) -> bool:
60 """Verify password against hash."""
61 return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
62
63def create_access_token(data: Dict[str, Any]) -> str:
64 """Create JWT access token."""
65 to_encode = data.copy()
66 expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
67 to_encode.update({"exp": expire, "type": "access"})
68 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
69
70def create_refresh_token(user_id: str) -> str:
71 """Create JWT refresh token."""
72 token_id = generate_id()
73 expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
74
75 # Store refresh token in database
76 refresh_tokens_db[token_id] = {
77 "user_id": user_id,
78 "expires_at": expire.isoformat(),
79 "created_at": get_current_timestamp()
80 }
81
82 to_encode = {
83 "token_id": token_id,
84 "user_id": user_id,
85 "exp": expire,
86 "type": "refresh"
87 }
88 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
89
90def verify_token(token: str) -> Optional[Dict[str, Any]]:
91 """Verify and decode JWT token."""
92 try:
93 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
94 return payload
95 except jwt.ExpiredSignatureError:
96 return None
97 except jwt.JWTError:
98 return None
99
100def get_current_user(request: Request) -> Optional[Dict[str, Any]]:
101 """Extract current user from request."""
102 auth_header = request.headers.get("authorization")
103 if not auth_header or not auth_header.startswith("Bearer "):
104 return None
105
106 token = auth_header.split(" ")[1]
107 payload = verify_token(token)
108
109 if not payload or payload.get("type") != "access":
110 return None
111
112 user_id = payload.get("sub")
113 if user_id and user_id in users_db:
114 return users_db[user_id]
115
116 return None
117
118def require_auth(request: Request) -> Optional[Response]:
119 """Middleware helper to require authentication."""
120 user = get_current_user(request)
121 if not user:
122 return Response(
123 content={"error": "Authentication required"},
124 status_code=401,
125 headers={"WWW-Authenticate": "Bearer"}
126 )
127 request.state.current_user = user
128 return None
129
130def require_role(required_role: str):
131 """Decorator to require specific role."""
132 def decorator(func):
133 async def wrapper(request: Request, *args, **kwargs):
134 auth_response = require_auth(request)
135 if auth_response:
136 return auth_response
137
138 user = request.state.current_user
139 if user["role"] != required_role and user["role"] != UserRole.ADMIN:
140 return Response(
141 content={"error": f"Role '{required_role}' required"},
142 status_code=403
143 )
144
145 return await func(request, *args, **kwargs)
146 return wrapper
147 return decorator
148
149# ============================================================================
150# AUTHENTICATION ENDPOINTS
151# ============================================================================
152
153@app.post("/auth/register")
154async def register(request: Request):
155 """Register a new user."""
156 try:
157 data = await request.json()
158 except:
159 return {"error": "Invalid JSON format"}, 400
160
161 # Validate required fields
162 required_fields = ["email", "password", "name"]
163 missing_fields = [field for field in required_fields if field not in data]
164 if missing_fields:
165 return {"error": f"Missing required fields: {', '.join(missing_fields)}"}, 400
166
167 email = data["email"].lower().strip()
168 password = data["password"]
169 name = data["name"].strip()
170
171 # Validate email format
172 if not validate_email(email):
173 return {"error": "Invalid email format"}, 400
174
175 # Check if email already exists
176 for user in users_db.values():
177 if user["email"] == email:
178 return {"error": "Email already registered"}, 409
179
180 # Validate password strength
181 is_valid, message = validate_password(password)
182 if not is_valid:
183 return {"error": message}, 400
184
185 # Create user
186 user_id = generate_id()
187 hashed_password = hash_password(password)
188
189 user = {
190 "id": user_id,
191 "email": email,
192 "name": name,
193 "password_hash": hashed_password,
194 "role": data.get("role", UserRole.USER),
195 "is_active": True,
196 "email_verified": False,
197 "created_at": get_current_timestamp(),
198 "updated_at": get_current_timestamp(),
199 "last_login": None
200 }
201
202 # Only admins can create admin users
203 if user["role"] == UserRole.ADMIN:
204 # Check if there are any existing users (first user can be admin)
205 if users_db:
206 return {"error": "Cannot create admin user"}, 403
207
208 users_db[user_id] = user
209
210 # Create tokens
211 access_token = create_access_token({"sub": user_id, "email": email, "role": user["role"]})
212 refresh_token = create_refresh_token(user_id)
213
214 # Remove password hash from response
215 user_response = {k: v for k, v in user.items() if k != "password_hash"}
216
217 return {
218 "user": user_response,
219 "access_token": access_token,
220 "refresh_token": refresh_token,
221 "token_type": "bearer"
222 }, 201
223
224@app.post("/auth/login")
225async def login(request: Request):
226 """Login user with email and password."""
227 try:
228 data = await request.json()
229 except:
230 return {"error": "Invalid JSON format"}, 400
231
232 email = data.get("email", "").lower().strip()
233 password = data.get("password", "")
234
235 if not email or not password:
236 return {"error": "Email and password are required"}, 400
237
238 # Find user by email
239 user = None
240 for u in users_db.values():
241 if u["email"] == email:
242 user = u
243 break
244
245 if not user:
246 return {"error": "Invalid email or password"}, 401
247
248 # Verify password
249 if not verify_password(password, user["password_hash"]):
250 return {"error": "Invalid email or password"}, 401
251
252 # Check if user is active
253 if not user["is_active"]:
254 return {"error": "Account is deactivated"}, 401
255
256 # Update last login
257 user["last_login"] = get_current_timestamp()
258 users_db[user["id"]] = user
259
260 # Create tokens
261 access_token = create_access_token({"sub": user["id"], "email": email, "role": user["role"]})
262 refresh_token = create_refresh_token(user["id"])
263
264 # Remove password hash from response
265 user_response = {k: v for k, v in user.items() if k != "password_hash"}
266
267 return {
268 "user": user_response,
269 "access_token": access_token,
270 "refresh_token": refresh_token,
271 "token_type": "bearer"
272 }
273
274@app.post("/auth/refresh")
275async def refresh_token(request: Request):
276 """Refresh access token using refresh token."""
277 try:
278 data = await request.json()
279 except:
280 return {"error": "Invalid JSON format"}, 400
281
282 refresh_token = data.get("refresh_token")
283 if not refresh_token:
284 return {"error": "Refresh token is required"}, 400
285
286 # Verify refresh token
287 payload = verify_token(refresh_token)
288 if not payload or payload.get("type") != "refresh":
289 return {"error": "Invalid refresh token"}, 401
290
291 token_id = payload.get("token_id")
292 user_id = payload.get("user_id")
293
294 # Check if refresh token exists in database
295 if token_id not in refresh_tokens_db:
296 return {"error": "Refresh token not found"}, 401
297
298 stored_token = refresh_tokens_db[token_id]
299 if stored_token["user_id"] != user_id:
300 return {"error": "Invalid refresh token"}, 401
301
302 # Check if user exists and is active
303 if user_id not in users_db:
304 return {"error": "User not found"}, 401
305
306 user = users_db[user_id]
307 if not user["is_active"]:
308 return {"error": "Account is deactivated"}, 401
309
310 # Create new access token
311 access_token = create_access_token({"sub": user_id, "email": user["email"], "role": user["role"]})
312
313 return {
314 "access_token": access_token,
315 "token_type": "bearer"
316 }
317
318@app.post("/auth/logout")
319async def logout(request: Request):
320 """Logout user and invalidate refresh token."""
321 auth_response = require_auth(request)
322 if auth_response:
323 return auth_response
324
325 try:
326 data = await request.json()
327 except:
328 return {"error": "Invalid JSON format"}, 400
329
330 refresh_token = data.get("refresh_token")
331 if refresh_token:
332 # Verify and remove refresh token
333 payload = verify_token(refresh_token)
334 if payload and payload.get("type") == "refresh":
335 token_id = payload.get("token_id")
336 if token_id in refresh_tokens_db:
337 del refresh_tokens_db[token_id]
338
339 return {"message": "Logged out successfully"}
340
341@app.get("/auth/me")
342async def get_current_user_info(request: Request):
343 """Get current user information."""
344 auth_response = require_auth(request)
345 if auth_response:
346 return auth_response
347
348 user = request.state.current_user
349 user_response = {k: v for k, v in user.items() if k != "password_hash"}
350
351 return {"user": user_response}
352
353@app.put("/auth/me")
354async def update_current_user(request: Request):
355 """Update current user information."""
356 auth_response = require_auth(request)
357 if auth_response:
358 return auth_response
359
360 try:
361 data = await request.json()
362 except:
363 return {"error": "Invalid JSON format"}, 400
364
365 user = request.state.current_user
366 user_id = user["id"]
367
368 # Update allowed fields
369 if "name" in data:
370 user["name"] = data["name"].strip()
371
372 if "email" in data:
373 new_email = data["email"].lower().strip()
374 if not validate_email(new_email):
375 return {"error": "Invalid email format"}, 400
376
377 # Check if email already exists (excluding current user)
378 for u in users_db.values():
379 if u["id"] != user_id and u["email"] == new_email:
380 return {"error": "Email already exists"}, 409
381
382 user["email"] = new_email
383
384 user["updated_at"] = get_current_timestamp()
385 users_db[user_id] = user
386
387 user_response = {k: v for k, v in user.items() if k != "password_hash"}
388 return {"user": user_response}
389
390@app.post("/auth/change-password")
391async def change_password(request: Request):
392 """Change user password."""
393 auth_response = require_auth(request)
394 if auth_response:
395 return auth_response
396
397 try:
398 data = await request.json()
399 except:
400 return {"error": "Invalid JSON format"}, 400
401
402 current_password = data.get("current_password")
403 new_password = data.get("new_password")
404
405 if not current_password or not new_password:
406 return {"error": "Current password and new password are required"}, 400
407
408 user = request.state.current_user
409
410 # Verify current password
411 if not verify_password(current_password, user["password_hash"]):
412 return {"error": "Current password is incorrect"}, 401
413
414 # Validate new password
415 is_valid, message = validate_password(new_password)
416 if not is_valid:
417 return {"error": message}, 400
418
419 # Update password
420 user["password_hash"] = hash_password(new_password)
421 user["updated_at"] = get_current_timestamp()
422 users_db[user["id"]] = user
423
424 return {"message": "Password changed successfully"}
425
426# ============================================================================
427# PROTECTED ENDPOINTS (Examples)
428# ============================================================================
429
430@app.get("/protected")
431async def protected_endpoint(request: Request):
432 """Example protected endpoint."""
433 auth_response = require_auth(request)
434 if auth_response:
435 return auth_response
436
437 user = request.state.current_user
438 return {
439 "message": f"Hello {user['name']}, this is a protected endpoint!",
440 "user_id": user["id"],
441 "role": user["role"]
442 }
443
444@app.get("/admin-only")
445@require_role(UserRole.ADMIN)
446async def admin_only_endpoint(request: Request):
447 """Example admin-only endpoint."""
448 user = request.state.current_user
449 return {
450 "message": f"Hello Admin {user['name']}!",
451 "all_users": len(users_db),
452 "active_tokens": len(refresh_tokens_db)
453 }
454
455@app.get("/moderator-only")
456@require_role(UserRole.MODERATOR)
457async def moderator_only_endpoint(request: Request):
458 """Example moderator-only endpoint."""
459 user = request.state.current_user
460 return {
461 "message": f"Hello Moderator {user['name']}!",
462 "user_count": len(users_db)
463 }
464
465# ============================================================================
466# USER MANAGEMENT (Admin only)
467# ============================================================================
468
469@app.get("/admin/users")
470@require_role(UserRole.ADMIN)
471async def list_all_users(request: Request):
472 """List all users (admin only)."""
473 users = []
474 for user in users_db.values():
475 user_data = {k: v for k, v in user.items() if k != "password_hash"}
476 users.append(user_data)
477
478 return {"users": users, "total": len(users)}
479
480@app.patch("/admin/users/:user_id")
481@require_role(UserRole.ADMIN)
482async def update_user_admin(user_id: str, request: Request):
483 """Update user (admin only)."""
484 if user_id not in users_db:
485 return {"error": "User not found"}, 404
486
487 try:
488 data = await request.json()
489 except:
490 return {"error": "Invalid JSON format"}, 400
491
492 user = users_db[user_id]
493
494 # Update allowed fields
495 if "is_active" in data:
496 user["is_active"] = bool(data["is_active"])
497
498 if "role" in data:
499 if data["role"] in [UserRole.ADMIN, UserRole.MODERATOR, UserRole.USER]:
500 user["role"] = data["role"]
501 else:
502 return {"error": "Invalid role"}, 400
503
504 if "email_verified" in data:
505 user["email_verified"] = bool(data["email_verified"])
506
507 user["updated_at"] = get_current_timestamp()
508 users_db[user_id] = user
509
510 user_response = {k: v for k, v in user.items() if k != "password_hash"}
511 return {"user": user_response}
512
513# ============================================================================
514# SEED DATA
515# ============================================================================
516
517def create_demo_users():
518 """Create demo users for testing."""
519 # Admin user
520 admin_id = generate_id()
521 admin_user = {
522 "id": admin_id,
523 "email": "admin@example.com",
524 "name": "Admin User",
525 "password_hash": hash_password("Admin123!"),
526 "role": UserRole.ADMIN,
527 "is_active": True,
528 "email_verified": True,
529 "created_at": get_current_timestamp(),
530 "updated_at": get_current_timestamp(),
531 "last_login": None
532 }
533 users_db[admin_id] = admin_user
534
535 # Regular user
536 user_id = generate_id()
537 regular_user = {
538 "id": user_id,
539 "email": "user@example.com",
540 "name": "Regular User",
541 "password_hash": hash_password("User123!"),
542 "role": UserRole.USER,
543 "is_active": True,
544 "email_verified": True,
545 "created_at": get_current_timestamp(),
546 "updated_at": get_current_timestamp(),
547 "last_login": None
548 }
549 users_db[user_id] = regular_user
550
551 # Moderator user
552 mod_id = generate_id()
553 moderator_user = {
554 "id": mod_id,
555 "email": "moderator@example.com",
556 "name": "Moderator User",
557 "password_hash": hash_password("Mod123!"),
558 "role": UserRole.MODERATOR,
559 "is_active": True,
560 "email_verified": True,
561 "created_at": get_current_timestamp(),
562 "updated_at": get_current_timestamp(),
563 "last_login": None
564 }
565 users_db[mod_id] = moderator_user
566
567# Initialize demo data
568create_demo_users()
569
570if __name__ == "__main__":
571 print("Starting Sufast Authentication API...")
572 print("Demo users created:")
573 print(" Admin: admin@example.com / Admin123!")
574 print(" User: user@example.com / User123!")
575 print(" Moderator: moderator@example.com / Mod123!")
576 print()
577 print("Available endpoints:")
578 print(" POST /auth/register")
579 print(" POST /auth/login")
580 print(" POST /auth/refresh")
581 print(" POST /auth/logout")
582 print(" GET /auth/me")
583 print(" PUT /auth/me")
584 print(" POST /auth/change-password")
585 print(" GET /protected")
586 print(" GET /admin-only")
587 print(" GET /moderator-only")
588 print(" GET /admin/users")
589 print(" PATCH /admin/users/:user_id")
590
591 app.run(host="0.0.0.0", port=8080)