API Examples
This guide provides comprehensive examples of building APIs with Nexios, covering common use cases and best practices.
API Design Best Practices
Before starting, consider these important aspects:
- Use consistent URL patterns
- Implement proper error handling
- Follow REST principles
- Consider API versioning
- Plan for scalability
- Document all endpoints
RESTful API
Basic CRUD Operations
Model Design
When designing models:
- Use appropriate field types
- Add indexes for frequently queried fields
- Implement proper validation
- Consider relationships carefully
- Add timestamps for auditing
python
from nexios.db import Model, Column, types
from datetime import datetime
class User(Model):
id = Column(types.Integer, primary_key=True)
username = Column(types.String, unique=True)
email = Column(types.String, unique=True)
password = Column(types.String)
is_active = Column(types.Boolean, default=True)
created_at = Column(types.DateTime, default=datetime.utcnow)
updated_at = Column(types.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Meta:
indexes = [
('username',),
('email',)
]
class Post(Model):
id = Column(types.Integer, primary_key=True)
title = Column(types.String)
content = Column(types.Text)
user_id = Column(types.Integer, foreign_key="users.id")
published = Column(types.Boolean, default=False)
created_at = Column(types.DateTime, default=datetime.utcnow)
updated_at = Column(types.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Meta:
indexes = [
('user_id',),
('published',)
]
python
from pydantic import BaseModel, EmailStr, validator
from datetime import datetime
from typing import Optional
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
@validator('password')
def password_strength(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
if not any(c.isupper() for c in v):
raise ValueError('Password must contain at least one uppercase letter')
if not any(c.islower() for c in v):
raise ValueError('Password must contain at least one lowercase letter')
if not any(c.isdigit() for c in v):
raise ValueError('Password must contain at least one number')
return v
class UserUpdate(BaseModel):
username: Optional[str] = None
email: Optional[EmailStr] = None
is_active: Optional[bool] = None
class UserResponse(BaseModel):
id: int
username: str
email: str
is_active: bool
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
class PostCreate(BaseModel):
title: str
content: str
published: bool = False
@validator('title')
def title_length(cls, v):
if len(v) < 3:
raise ValueError('Title must be at least 3 characters')
return v
class PostResponse(BaseModel):
id: int
title: str
content: str
user_id: int
published: bool
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
python
from nexios import Router
from nexios.responses import JSONResponse
from nexios.exceptions import HTTPException
from nexios.middleware import RateLimitMiddleware
from typing import List
router = Router(prefix="/api/v1")
# Add rate limiting middleware
router.add_middleware(
RateLimitMiddleware,
rate_limit=100, # requests
time_window=60 # seconds
)
@router.get("/users")
async def list_users(request, response):
"""List all users with pagination.
Query Parameters:
page (int): Page number (default: 1)
limit (int): Items per page (default: 10)
search (str): Search term for username/email
sort (str): Sort field (default: created_at)
order (str): Sort order (asc/desc, default: desc)
Returns:
dict: Paginated list of users with metadata
"""
page = int(request.query_params.get("page", 1))
limit = int(request.query_params.get("limit", 10))
search = request.query_params.get("search", "")
sort = request.query_params.get("sort", "created_at")
order = request.query_params.get("order", "desc")
# Validate pagination parameters
if page < 1 or limit < 1 or limit > 100:
raise HTTPException(400, "Invalid pagination parameters")
# Build query
query = User.query
if search:
query = query.filter(
(User.username.ilike(f"%{search}%")) |
(User.email.ilike(f"%{search}%"))
)
# Apply sorting
if hasattr(User, sort):
sort_field = getattr(User, sort)
if order == "desc":
sort_field = sort_field.desc()
query = query.order_by(sort_field)
# Execute query with pagination
users = await query.paginate(page, limit)
total = await query.count()
return response.json({
"items": [UserResponse.from_orm(u) for u in users],
"total": total,
"page": page,
"pages": (total + limit - 1) // limit,
"has_next": page * limit < total,
"has_prev": page > 1
})
@router.post("/users")
async def create_user(request, response):
"""Create a new user.
Request Body:
username (str): Unique username
email (str): Valid email address
password (str): Secure password
Returns:
UserResponse: Created user data
Raises:
HTTPException: If username/email exists or validation fails
"""
try:
data = UserCreate(**await request.json())
except ValueError as e:
raise HTTPException(400, str(e))
# Check if user exists
if await User.query.filter_by(
username=data.username
).exists():
raise HTTPException(400, "Username taken")
if await User.query.filter_by(
email=data.email
).exists():
raise HTTPException(400, "Email already registered")
# Hash password
data.password = hash_password(data.password)
# Create user
user = await User.create(**data.dict())
return response.json(
UserResponse.from_orm(user),
status_code=201
)
@router.get("/users/{user_id:int}")
async def get_user(request, response):
"""Get user by ID.
Path Parameters:
user_id (int): User ID
Returns:
UserResponse: User data
Raises:
HTTPException: If user not found
"""
user = await User.get_or_404(
request.path_params.user_id
)
return response.json(UserResponse.from_orm(user))
@router.put("/users/{user_id:int}")
async def update_user(request, response):
"""Update user by ID.
Path Parameters:
user_id (int): User ID
Request Body:
username (str, optional): New username
email (str, optional): New email
is_active (bool, optional): Active status
Returns:
UserResponse: Updated user data
Raises:
HTTPException: If user not found or validation fails
"""
user = await User.get_or_404(
request.path_params.user_id
)
try:
data = UserUpdate(**await request.json())
except ValueError as e:
raise HTTPException(400, str(e))
# Check unique constraints
if data.username and data.username != user.username:
if await User.query.filter_by(
username=data.username
).exists():
raise HTTPException(400, "Username taken")
if data.email and data.email != user.email:
if await User.query.filter_by(
email=data.email
).exists():
raise HTTPException(400, "Email already registered")
# Update user
await user.update(**data.dict(exclude_unset=True))
return response.json(UserResponse.from_orm(user))
@router.delete("/users/{user_id:int}")
async def delete_user(request, response):
"""Delete user by ID.
Path Parameters:
user_id (int): User ID
Returns:
None
Raises:
HTTPException: If user not found
"""
user = await User.get_or_404(
request.path_params.user_id
)
await user.delete()
return response.json(None, status_code=204)
Database Operations
When working with databases:
- Always use transactions for multiple operations
- Implement proper error handling
- Use appropriate indexes
- Consider query performance
- Handle concurrent access
Authentication
JWT Authentication
Security Best Practices
For JWT authentication:
- Use strong secret keys
- Implement token rotation
- Set appropriate expiration times
- Validate token claims
- Use secure cookie settings
python
from nexios.auth import JWTAuth
from nexios.exceptions import HTTPException
from datetime import datetime, timedelta
from typing import Optional
class AuthConfig:
SECRET_KEY: str = "your-secret-key" # Use environment variable in production
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE: timedelta = timedelta(minutes=30)
REFRESH_TOKEN_EXPIRE: timedelta = timedelta(days=7)
TOKEN_ROTATION_THRESHOLD: timedelta = timedelta(minutes=5)
auth = JWTAuth(
secret_key=AuthConfig.SECRET_KEY,
algorithm=AuthConfig.ALGORITHM,
access_token_expire=AuthConfig.ACCESS_TOKEN_EXPIRE,
refresh_token_expire=AuthConfig.REFRESH_TOKEN_EXPIRE
)
@router.post("/auth/login")
async def login(request, response):
"""Authenticate user and return tokens.
Request Body:
username (str): Username
password (str): Password
Returns:
dict: Access and refresh tokens
Raises:
HTTPException: If credentials are invalid
"""
try:
data = await request.json()
username = data.get("username")
password = data.get("password")
if not username or not password:
raise HTTPException(400, "Username and password required")
# Validate credentials
user = await User.query.filter_by(
username=username
).first()
if not user or not verify_password(
password, user.password
):
raise HTTPException(
401, "Invalid credentials"
)
# Generate tokens
access_token = auth.create_access_token({
"sub": str(user.id),
"username": user.username,
"exp": datetime.utcnow() + AuthConfig.ACCESS_TOKEN_EXPIRE
})
refresh_token = auth.create_refresh_token({
"sub": str(user.id),
"exp": datetime.utcnow() + AuthConfig.REFRESH_TOKEN_EXPIRE
})
# Set secure cookie for refresh token
response.set_cookie(
"refresh_token",
refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=AuthConfig.REFRESH_TOKEN_EXPIRE.seconds
)
return response.json({
"access_token": access_token,
"token_type": "bearer"
})
except Exception as e:
raise HTTPException(500, f"Authentication failed: {str(e)}")
@router.post("/auth/refresh")
async def refresh_token(request, response):
"""Refresh access token using refresh token.
Headers:
Authorization: Bearer <refresh_token>
Returns:
dict: New access token
Raises:
HTTPException: If token is invalid or expired
"""
refresh_token = request.headers.get("Authorization")
if not refresh_token:
raise HTTPException(401, "Missing token")
try:
# Extract token from Bearer
token = refresh_token.split(" ")[1]
# Decode and validate token
payload = auth.decode_refresh_token(token)
# Check if token is about to expire
exp = datetime.fromtimestamp(payload["exp"])
if exp - datetime.utcnow() < AuthConfig.TOKEN_ROTATION_THRESHOLD:
# Generate new refresh token
new_refresh_token = auth.create_refresh_token({
"sub": payload["sub"],
"exp": datetime.utcnow() + AuthConfig.REFRESH_TOKEN_EXPIRE
})
response.set_cookie(
"refresh_token",
new_refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=AuthConfig.REFRESH_TOKEN_EXPIRE.seconds
)
# Generate new access token
access_token = auth.create_access_token({
"sub": payload["sub"],
"exp": datetime.utcnow() + AuthConfig.ACCESS_TOKEN_EXPIRE
})
return response.json({
"access_token": access_token,
"token_type": "bearer"
})
except Exception as e:
raise HTTPException(401, f"Invalid token: {str(e)}")
@router.post("/auth/logout")
async def logout(request, response):
"""Logout user by invalidating refresh token.
Returns:
dict: Success message
"""
response.delete_cookie("refresh_token")
return response.json({"message": "Successfully logged out"})
python
from nexios import Depend
from typing import Optional
async def get_current_user(
request,
token: str = Depend(auth.get_token)
) -> User:
"""Get current user from token.
Args:
request: Request object
token: JWT token from Authorization header
Returns:
User: Current user object
Raises:
HTTPException: If token is invalid or user not found
"""
try:
payload = auth.decode_access_token(token)
user = await User.get(int(payload["sub"]))
if not user:
raise HTTPException(401, "User not found")
if not user.is_active:
raise HTTPException(401, "User is inactive")
return user
except Exception as e:
raise HTTPException(401, f"Invalid token: {str(e)}")
@router.get("/users/me")
async def get_current_user_info(
request,
response,
user: User = Depend(get_current_user)
):
"""Get current user information.
Returns:
UserResponse: Current user data
"""
return response.json(UserResponse.from_orm(user))
@router.post("/users/me/change-password")
async def change_password(
request,
response,
user: User = Depend(get_current_user)
):
"""Change user password.
Request Body:
old_password (str): Current password
new_password (str): New password
Returns:
dict: Success message
Raises:
HTTPException: If old password is incorrect or new password is invalid
"""
try:
data = await request.json()
old_password = data.get("old_password")
new_password = data.get("new_password")
if not old_password or not new_password:
raise HTTPException(400, "Both old and new passwords are required")
# Verify old password
if not verify_password(old_password, user.password):
raise HTTPException(400, "Incorrect password")
# Validate new password
if len(new_password) < 8:
raise HTTPException(400, "New password must be at least 8 characters")
# Update password
user.password = hash_password(new_password)
await user.save()
# Invalidate all sessions
await user.invalidate_sessions()
return response.json({"message": "Password changed successfully"})
except Exception as e:
raise HTTPException(400, str(e))
Security Considerations
When implementing authentication:
- Never store plain-text passwords
- Use secure password hashing (bcrypt, Argon2)
- Implement rate limiting for auth endpoints
- Use secure cookie settings
- Implement proper session management
- Consider 2FA for sensitive operations
Error Handling
Implement proper error handling for:
- Invalid credentials
- Expired tokens
- Rate limiting
- Account lockout
- Session management
- Password policies
File Handling
File Upload and Download
python
from nexios.responses import FileResponse
from pathlib import Path
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
@router.post("/files/upload")
async def upload_file(request, response):
"""Upload a file."""
files = await request.files()
if not files:
raise HTTPException(400, "No file uploaded")
file = files["file"]
if not file.filename:
raise HTTPException(400, "No filename")
# Save file
file_path = UPLOAD_DIR / file.filename
await file.save(file_path)
return response.json({
"filename": file.filename,
"size": file_path.stat().st_size
})
@router.get("/files/{filename}")
async def download_file(request, response):
"""Download a file."""
filename = request.path_params.filename
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(404, "File not found")
return FileResponse(
file_path,
filename=filename,
media_type="application/octet-stream"
)
WebSocket
Real-time Chat
python
from nexios.websockets import WebSocket
from dataclasses import dataclass
from typing import Dict, Set
import json
@dataclass
class ChatRoom:
name: str
clients: Set[WebSocket]
class ChatServer:
def __init__(self):
self.rooms: Dict[str, ChatRoom] = {}
def get_room(self, name: str) -> ChatRoom:
if name not in self.rooms:
self.rooms[name] = ChatRoom(name, set())
return self.rooms[name]
async def broadcast(
self,
room: ChatRoom,
message: dict,
exclude: WebSocket = None
):
for client in room.clients:
if client != exclude:
await client.send_json(message)
chat_server = ChatServer()
@router.websocket("/ws/chat/{room_name}")
async def chat_endpoint(websocket: WebSocket):
await websocket.accept()
try:
# Get room
room_name = websocket.path_params.room_name
room = chat_server.get_room(room_name)
room.clients.add(websocket)
# Announce join
await chat_server.broadcast(
room,
{
"type": "system",
"message": "User joined"
},
websocket
)
# Handle messages
async for message in websocket.iter_json():
await chat_server.broadcast(
room,
{
"type": "message",
"message": message["text"]
},
websocket
)
finally:
room.clients.remove(websocket)
await chat_server.broadcast(
room,
{
"type": "system",
"message": "User left"
}
)
Background Tasks
Task Queue Integration
python
from nexios.background import BackgroundTask
from nexios.email import EmailSender
email_sender = EmailSender()
@router.post("/users")
async def create_user(request, response):
data = UserCreate(**await request.json())
user = await User.create(**data.dict())
# Schedule welcome email
background_task = BackgroundTask(
email_sender.send_email,
to=user.email,
subject="Welcome!",
template="welcome.html",
context={"username": user.username}
)
return response.json(
UserResponse.from_orm(user),
status_code=201,
background=background_task
)
Database Integration
SQLAlchemy Integration
python
from nexios.db import Database, Model
from sqlalchemy import select
from sqlalchemy.orm import joinedload
db = Database("postgresql+asyncpg://user:pass@localhost/db")
@router.get("/posts")
async def list_posts(request, response):
async with db.session() as session:
# Complex query with joins
query = select(Post).options(
joinedload(Post.user)
).order_by(
Post.created_at.desc()
)
# Add filters
if category := request.query_params.get("category"):
query = query.filter(Post.category == category)
# Pagination
page = int(request.query_params.get("page", 1))
limit = int(request.query_params.get("limit", 10))
offset = (page - 1) * limit
# Execute query
result = await session.execute(
query.limit(limit).offset(offset)
)
posts = result.scalars().unique().all()
# Get total
total = await session.scalar(
select(func.count()).select_from(Post)
)
return response.json({
"items": [
PostResponse.from_orm(post)
for post in posts
],
"total": total,
"page": page,
"pages": (total + limit - 1) // limit
})
Error Handling
Custom Error Handlers
python
from nexios.exceptions import HTTPException
from sqlalchemy.exc import IntegrityError
from pydantic import ValidationError
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
return response.json(
{
"error": exc.detail,
"code": exc.status_code
},
status_code=exc.status_code
)
@app.exception_handler(ValidationError)
async def validation_exception_handler(request, exc):
return response.json(
{
"error": "Validation error",
"details": exc.errors()
},
status_code=422
)
@app.exception_handler(IntegrityError)
async def integrity_exception_handler(request, exc):
return response.json(
{
"error": "Database integrity error",
"detail": str(exc.orig)
},
status_code=409
)
@app.exception_handler(Exception)
async def generic_exception_handler(request, exc):
logger.exception("Unhandled error")
return response.json(
{
"error": "Internal server error",
"detail": str(exc) if app.debug else None
},
status_code=500
)
Testing
API Testing
python
from nexios.testing import TestClient
import pytest
@pytest.fixture
async def client():
app = create_test_app()
async with TestClient(app) as client:
yield client
async def test_create_user(client):
response = await client.post(
"/api/v1/users",
json={
"username": "testuser",
"email": "test@example.com",
"password": "password123"
}
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "testuser"
assert "password" not in data
async def test_login(client):
# Create user
await client.post(
"/api/v1/users",
json={
"username": "testuser",
"email": "test@example.com",
"password": "password123"
}
)
# Login
response = await client.post(
"/api/v1/auth/login",
json={
"username": "testuser",
"password": "password123"
}
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
async def test_protected_route(client):
# Try without token
response = await client.get("/api/v1/users/me")
assert response.status_code == 401
# Login and get token
response = await client.post(
"/api/v1/auth/login",
json={
"username": "testuser",
"password": "password123"
}
)
token = response.json()["access_token"]
# Try with token
response = await client.get(
"/api/v1/users/me",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
OpenAPI Documentation
API Documentation
python
from nexios import NexiosApp
from nexios.openapi import OpenAPIConfig
app = NexiosApp(
title="My API",
version="1.0.0",
description="A sample API",
openapi_config=OpenAPIConfig(
tags=[
{
"name": "users",
"description": "User operations"
},
{
"name": "auth",
"description": "Authentication"
}
],
servers=[
{
"url": "https://api.example.com",
"description": "Production server"
},
{
"url": "http://localhost:8000",
"description": "Development server"
}
],
security_schemes={
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
)
)
@router.get(
"/users/{user_id}",
tags=["users"],
summary="Get user by ID",
description="Retrieve a user by their ID",
responses={
200: {
"description": "User found",
"content": {
"application/json": {
"schema": UserResponse.schema()
}
}
},
404: {
"description": "User not found"
}
}
)
async def get_user(request, response):
"""Get user by ID."""
user = await User.get_or_404(
request.path_params.user_id
)
return response.json(UserResponse.from_orm(user))
Best Practices
API Design
- Use consistent naming conventions
- Implement proper error handling
- Add comprehensive documentation
- Include input validation
- Use appropriate HTTP methods
- Implement rate limiting
- Add authentication where needed
- Use proper status codes
- Include pagination for lists
- Keep endpoints focused
Security Checklist
- [ ] Implement authentication
- [ ] Use HTTPS
- [ ] Validate input
- [ ] Rate limit requests
- [ ] Hash passwords
- [ ] Use secure headers
- [ ] Implement CORS
- [ ] Log security events
- [ ] Regular security updates
- [ ] API key management
Performance Tips
- Use async operations
- Implement caching
- Optimize database queries
- Use connection pooling
- Compress responses
- Batch operations
- Profile endpoints
- Monitor performance
- Use appropriate indexes
- Optimize payload size