REST API with JWT Authentication
Abstract
Create a comprehensive REST API with JWT authentication using Flask that provides secure user management, CRUD operations, and role-based access control. This project demonstrates advanced backend development, authentication systems, API security, and professional API design patterns.
Prerequisites
- Python 3.7 or above
- Text Editor or IDE
- Solid understanding of Python syntax and web development
- Knowledge of Flask framework and REST principles
- Familiarity with JWT tokens and authentication concepts
- Understanding of database operations and SQLAlchemy
- Basic knowledge of API security and testing tools
Getting Started
Create a new project
- Create a new project folder and name it
restAPIAuth
restAPIAuth
. - Create a new file and name it
restapi.py
restapi.py
. - Install required dependencies:
pip install flask flask-jwt-extended flask-sqlalchemy
pip install flask flask-jwt-extended flask-sqlalchemy
- Open the project folder in your favorite text editor or IDE.
- Copy the code below and paste it into your
restapi.py
restapi.py
file.
Write the code
- Add the following code to your
restapi.py
restapi.py
file.
⚙️ REST API with JWT Authentication
# Simple Flask REST API
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# API Keys table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
api_key VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Tasks table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(200) NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT 0,
priority VARCHAR(10) DEFAULT 'medium',
due_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Products table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
stock_quantity INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# API Logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
endpoint VARCHAR(200),
method VARCHAR(10),
ip_address VARCHAR(45),
user_agent TEXT,
api_key VARCHAR(255),
response_code INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
# Insert sample data
self.insert_sample_data()
def insert_sample_data(self):
"""Insert sample data for demonstration"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if sample data already exists
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] > 0:
return
# Sample users
sample_users = [
('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
]
cursor.executemany(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
sample_users
)
# Sample API keys
api_keys = [
(1, 'demo_key_123456789', 'Demo Key'),
(2, 'test_key_987654321', 'Test Key'),
(3, 'admin_key_555666777', 'Admin Key')
]
cursor.executemany(
"INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
api_keys
)
# Sample tasks
sample_tasks = [
('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
]
cursor.executemany(
"INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
sample_tasks
)
# Sample products
sample_products = [
('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
]
cursor.executemany(
"INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
sample_products
)
conn.commit()
# Initialize database
db_manager = DatabaseManager(DATABASE)
def require_api_key(f):
"""Decorator to require API key authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
# Validate API key
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ak.id, ak.user_id, u.username
FROM api_keys ak
JOIN users u ON ak.user_id = u.id
WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
''', (api_key,))
result = cursor.fetchone()
if not result:
log_api_request(request, api_key, 401)
return jsonify({'error': 'Invalid API key'}), 401
# Update last used timestamp
cursor.execute(
"UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
(api_key,)
)
conn.commit()
# Log successful request
log_api_request(request, api_key, 200)
# Add user info to request context
request.current_user_id = result[1]
request.current_username = result[2]
return f(*args, **kwargs)
return decorated_function
def log_api_request(request_obj, api_key, response_code):
"""Log API request for monitoring"""
try:
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
VALUES (?, ?, ?, ?, ?, ?)
''', (
request_obj.endpoint,
request_obj.method,
request_obj.remote_addr,
request_obj.headers.get('User-Agent', ''),
api_key,
response_code
))
conn.commit()
except Exception as e:
logger.error(f"Error logging API request: {e}")
def get_db_connection():
"""Get database connection with row factory"""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
# API Documentation endpoint
@app.route('/')
def api_documentation():
"""API Documentation"""
docs = {
"title": "Simple Flask REST API",
"version": "1.0.0",
"description": "A demonstration REST API with CRUD operations",
"base_url": request.base_url,
"authentication": {
"type": "API Key",
"header": "X-API-Key",
"demo_keys": [
"demo_key_123456789",
"test_key_987654321",
"admin_key_555666777"
]
},
"endpoints": {
"Tasks": {
"GET /api/tasks": "Get all tasks",
"GET /api/tasks/<id>": "Get specific task",
"POST /api/tasks": "Create new task",
"PUT /api/tasks/<id>": "Update task",
"DELETE /api/tasks/<id>": "Delete task"
},
"Products": {
"GET /api/products": "Get all products",
"GET /api/products/<id>": "Get specific product",
"POST /api/products": "Create new product",
"PUT /api/products/<id>": "Update product",
"DELETE /api/products/<id>": "Delete product"
},
"Users": {
"GET /api/users": "Get all users",
"GET /api/users/<id>": "Get specific user",
"POST /api/users": "Create new user"
},
"Statistics": {
"GET /api/stats": "Get API usage statistics"
}
},
"examples": {
"create_task": {
"method": "POST",
"url": "/api/tasks",
"headers": {"X-API-Key": "demo_key_123456789"},
"body": {
"title": "New Task",
"description": "Task description",
"priority": "high",
"due_date": "2023-12-31"
}
}
}
}
return jsonify(docs)
# Health check endpoint
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
"""Get all tasks with optional filtering"""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
completed = request.args.get('completed', type=bool)
priority = request.args.get('priority')
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if completed is not None:
query += " AND completed = ?"
params.append(completed)
if priority:
query += " AND priority = ?"
params.append(priority)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([per_page, (page - 1) * per_page])
with get_db_connection() as conn:
tasks = conn.execute(query, params).fetchall()
# Get total count
count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
count_params = []
if completed is not None:
count_query += " AND completed = ?"
count_params.append(completed)
if priority:
count_query += " AND priority = ?"
count_params.append(priority)
total = conn.execute(count_query, count_params).fetchone()[0]
return jsonify({
'tasks': [dict(task) for task in tasks],
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page
}
})
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
"""Get specific task"""
with get_db_connection() as conn:
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if task is None:
abort(404)
return jsonify(dict(task))
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
"""Create new task"""
data = request.get_json()
if not data or 'title' not in data:
return jsonify({'error': 'Title is required'}), 400
title = data['title']
description = data.get('description', '')
priority = data.get('priority', 'medium')
due_date = data.get('due_date')
user_id = request.current_user_id
# Validate priority
if priority not in ['low', 'medium', 'high', 'critical']:
return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tasks (title, description, priority, due_date, user_id)
VALUES (?, ?, ?, ?, ?)
''', (title, description, priority, due_date, user_id))
task_id = cursor.lastrowid
conn.commit()
# Get the created task
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(task)), 201
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
"""Update task"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Check if task exists
with get_db_connection() as conn:
existing_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if existing_task is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['title', 'description', 'completed', 'priority', 'due_date']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(task_id)
query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated task
updated_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(updated_task))
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
"""Delete task"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
"""Get all products with optional filtering"""
category = request.args.get('category')
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
in_stock = request.args.get('in_stock', type=bool)
query = "SELECT * FROM products WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if min_price is not None:
query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
query += " AND price <= ?"
params.append(max_price)
if in_stock is not None:
if in_stock:
query += " AND stock_quantity > 0"
else:
query += " AND stock_quantity = 0"
query += " ORDER BY name"
with get_db_connection() as conn:
products = conn.execute(query, params).fetchall()
return jsonify({
'products': [dict(product) for product in products]
})
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
"""Get specific product"""
with get_db_connection() as conn:
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if product is None:
abort(404)
return jsonify(dict(product))
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
"""Create new product"""
data = request.get_json()
required_fields = ['name', 'price']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
name = data['name']
description = data.get('description', '')
price = data['price']
category = data.get('category', '')
stock_quantity = data.get('stock_quantity', 0)
# Validate price
try:
price = float(price)
if price < 0:
raise ValueError()
except (ValueError, TypeError):
return jsonify({'error': 'Price must be a valid positive number'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO products (name, description, price, category, stock_quantity)
VALUES (?, ?, ?, ?, ?)
''', (name, description, price, category, stock_quantity))
product_id = cursor.lastrowid
conn.commit()
# Get the created product
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(product)), 201
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
"""Update product"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
with get_db_connection() as conn:
existing_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if existing_product is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(product_id)
query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated product
updated_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(updated_product))
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
"""Delete product"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
"""Get all users (limited info)"""
with get_db_connection() as conn:
users = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
).fetchall()
return jsonify({
'users': [dict(user) for user in users]
})
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
"""Get specific user"""
with get_db_connection() as conn:
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
if user is None:
abort(404)
return jsonify(dict(user))
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
"""Create new user"""
data = request.get_json()
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
username = data['username']
email = data['email']
password = data['password']
# Hash password
password_hash = hashlib.sha256(password.encode()).hexdigest()
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (username, email, password_hash))
user_id = cursor.lastrowid
conn.commit()
# Get the created user (without password)
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
return jsonify(dict(user)), 201
except sqlite3.IntegrityError as e:
if 'username' in str(e):
return jsonify({'error': 'Username already exists'}), 400
elif 'email' in str(e):
return jsonify({'error': 'Email already exists'}), 400
else:
return jsonify({'error': 'Database constraint violation'}), 400
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
"""Get API usage statistics"""
with get_db_connection() as conn:
# Basic counts
task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
# API usage stats
total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
# Recent activity (last 24 hours)
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
recent_requests = conn.execute(
"SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
).fetchone()[0]
# Top endpoints
top_endpoints = conn.execute('''
SELECT endpoint, COUNT(*) as count
FROM api_logs
GROUP BY endpoint
ORDER BY count DESC
LIMIT 5
''').fetchall()
# Response code distribution
response_codes = conn.execute('''
SELECT response_code, COUNT(*) as count
FROM api_logs
GROUP BY response_code
ORDER BY response_code
''').fetchall()
return jsonify({
'database_stats': {
'tasks': task_count,
'products': product_count,
'users': user_count
},
'api_usage': {
'total_requests': total_requests,
'requests_last_24h': recent_requests,
'top_endpoints': [dict(row) for row in top_endpoints],
'response_codes': [dict(row) for row in response_codes]
},
'timestamp': datetime.now().isoformat()
})
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(400)
def bad_request(error):
return jsonify({'error': 'Bad request'}), 400
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
def main():
"""Main function to run the Flask REST API"""
print("Simple Flask REST API")
print("=====================")
print("\nAPI Documentation: http://localhost:5000/")
print("Health Check: http://localhost:5000/health")
print("\nDemo API Keys:")
print("- demo_key_123456789")
print("- test_key_987654321")
print("- admin_key_555666777")
print("\nExample usage:")
print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
print("\nStarting server...")
app.run(debug=True, host='0.0.0.0', port=5000)
if __name__ == "__main__":
main()
# Simple Flask REST API
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# API Keys table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
api_key VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Tasks table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(200) NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT 0,
priority VARCHAR(10) DEFAULT 'medium',
due_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Products table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
stock_quantity INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# API Logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
endpoint VARCHAR(200),
method VARCHAR(10),
ip_address VARCHAR(45),
user_agent TEXT,
api_key VARCHAR(255),
response_code INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
# Insert sample data
self.insert_sample_data()
def insert_sample_data(self):
"""Insert sample data for demonstration"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if sample data already exists
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] > 0:
return
# Sample users
sample_users = [
('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
]
cursor.executemany(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
sample_users
)
# Sample API keys
api_keys = [
(1, 'demo_key_123456789', 'Demo Key'),
(2, 'test_key_987654321', 'Test Key'),
(3, 'admin_key_555666777', 'Admin Key')
]
cursor.executemany(
"INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
api_keys
)
# Sample tasks
sample_tasks = [
('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
]
cursor.executemany(
"INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
sample_tasks
)
# Sample products
sample_products = [
('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
]
cursor.executemany(
"INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
sample_products
)
conn.commit()
# Initialize database
db_manager = DatabaseManager(DATABASE)
def require_api_key(f):
"""Decorator to require API key authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
# Validate API key
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ak.id, ak.user_id, u.username
FROM api_keys ak
JOIN users u ON ak.user_id = u.id
WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
''', (api_key,))
result = cursor.fetchone()
if not result:
log_api_request(request, api_key, 401)
return jsonify({'error': 'Invalid API key'}), 401
# Update last used timestamp
cursor.execute(
"UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
(api_key,)
)
conn.commit()
# Log successful request
log_api_request(request, api_key, 200)
# Add user info to request context
request.current_user_id = result[1]
request.current_username = result[2]
return f(*args, **kwargs)
return decorated_function
def log_api_request(request_obj, api_key, response_code):
"""Log API request for monitoring"""
try:
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
VALUES (?, ?, ?, ?, ?, ?)
''', (
request_obj.endpoint,
request_obj.method,
request_obj.remote_addr,
request_obj.headers.get('User-Agent', ''),
api_key,
response_code
))
conn.commit()
except Exception as e:
logger.error(f"Error logging API request: {e}")
def get_db_connection():
"""Get database connection with row factory"""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
# API Documentation endpoint
@app.route('/')
def api_documentation():
"""API Documentation"""
docs = {
"title": "Simple Flask REST API",
"version": "1.0.0",
"description": "A demonstration REST API with CRUD operations",
"base_url": request.base_url,
"authentication": {
"type": "API Key",
"header": "X-API-Key",
"demo_keys": [
"demo_key_123456789",
"test_key_987654321",
"admin_key_555666777"
]
},
"endpoints": {
"Tasks": {
"GET /api/tasks": "Get all tasks",
"GET /api/tasks/<id>": "Get specific task",
"POST /api/tasks": "Create new task",
"PUT /api/tasks/<id>": "Update task",
"DELETE /api/tasks/<id>": "Delete task"
},
"Products": {
"GET /api/products": "Get all products",
"GET /api/products/<id>": "Get specific product",
"POST /api/products": "Create new product",
"PUT /api/products/<id>": "Update product",
"DELETE /api/products/<id>": "Delete product"
},
"Users": {
"GET /api/users": "Get all users",
"GET /api/users/<id>": "Get specific user",
"POST /api/users": "Create new user"
},
"Statistics": {
"GET /api/stats": "Get API usage statistics"
}
},
"examples": {
"create_task": {
"method": "POST",
"url": "/api/tasks",
"headers": {"X-API-Key": "demo_key_123456789"},
"body": {
"title": "New Task",
"description": "Task description",
"priority": "high",
"due_date": "2023-12-31"
}
}
}
}
return jsonify(docs)
# Health check endpoint
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
"""Get all tasks with optional filtering"""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
completed = request.args.get('completed', type=bool)
priority = request.args.get('priority')
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if completed is not None:
query += " AND completed = ?"
params.append(completed)
if priority:
query += " AND priority = ?"
params.append(priority)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([per_page, (page - 1) * per_page])
with get_db_connection() as conn:
tasks = conn.execute(query, params).fetchall()
# Get total count
count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
count_params = []
if completed is not None:
count_query += " AND completed = ?"
count_params.append(completed)
if priority:
count_query += " AND priority = ?"
count_params.append(priority)
total = conn.execute(count_query, count_params).fetchone()[0]
return jsonify({
'tasks': [dict(task) for task in tasks],
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page
}
})
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
"""Get specific task"""
with get_db_connection() as conn:
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if task is None:
abort(404)
return jsonify(dict(task))
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
"""Create new task"""
data = request.get_json()
if not data or 'title' not in data:
return jsonify({'error': 'Title is required'}), 400
title = data['title']
description = data.get('description', '')
priority = data.get('priority', 'medium')
due_date = data.get('due_date')
user_id = request.current_user_id
# Validate priority
if priority not in ['low', 'medium', 'high', 'critical']:
return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tasks (title, description, priority, due_date, user_id)
VALUES (?, ?, ?, ?, ?)
''', (title, description, priority, due_date, user_id))
task_id = cursor.lastrowid
conn.commit()
# Get the created task
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(task)), 201
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
"""Update task"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Check if task exists
with get_db_connection() as conn:
existing_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if existing_task is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['title', 'description', 'completed', 'priority', 'due_date']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(task_id)
query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated task
updated_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(updated_task))
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
"""Delete task"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
"""Get all products with optional filtering"""
category = request.args.get('category')
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
in_stock = request.args.get('in_stock', type=bool)
query = "SELECT * FROM products WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if min_price is not None:
query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
query += " AND price <= ?"
params.append(max_price)
if in_stock is not None:
if in_stock:
query += " AND stock_quantity > 0"
else:
query += " AND stock_quantity = 0"
query += " ORDER BY name"
with get_db_connection() as conn:
products = conn.execute(query, params).fetchall()
return jsonify({
'products': [dict(product) for product in products]
})
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
"""Get specific product"""
with get_db_connection() as conn:
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if product is None:
abort(404)
return jsonify(dict(product))
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
"""Create new product"""
data = request.get_json()
required_fields = ['name', 'price']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
name = data['name']
description = data.get('description', '')
price = data['price']
category = data.get('category', '')
stock_quantity = data.get('stock_quantity', 0)
# Validate price
try:
price = float(price)
if price < 0:
raise ValueError()
except (ValueError, TypeError):
return jsonify({'error': 'Price must be a valid positive number'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO products (name, description, price, category, stock_quantity)
VALUES (?, ?, ?, ?, ?)
''', (name, description, price, category, stock_quantity))
product_id = cursor.lastrowid
conn.commit()
# Get the created product
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(product)), 201
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
"""Update product"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
with get_db_connection() as conn:
existing_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if existing_product is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(product_id)
query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated product
updated_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(updated_product))
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
"""Delete product"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
"""Get all users (limited info)"""
with get_db_connection() as conn:
users = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
).fetchall()
return jsonify({
'users': [dict(user) for user in users]
})
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
"""Get specific user"""
with get_db_connection() as conn:
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
if user is None:
abort(404)
return jsonify(dict(user))
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
"""Create new user"""
data = request.get_json()
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
username = data['username']
email = data['email']
password = data['password']
# Hash password
password_hash = hashlib.sha256(password.encode()).hexdigest()
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (username, email, password_hash))
user_id = cursor.lastrowid
conn.commit()
# Get the created user (without password)
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
return jsonify(dict(user)), 201
except sqlite3.IntegrityError as e:
if 'username' in str(e):
return jsonify({'error': 'Username already exists'}), 400
elif 'email' in str(e):
return jsonify({'error': 'Email already exists'}), 400
else:
return jsonify({'error': 'Database constraint violation'}), 400
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
"""Get API usage statistics"""
with get_db_connection() as conn:
# Basic counts
task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
# API usage stats
total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
# Recent activity (last 24 hours)
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
recent_requests = conn.execute(
"SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
).fetchone()[0]
# Top endpoints
top_endpoints = conn.execute('''
SELECT endpoint, COUNT(*) as count
FROM api_logs
GROUP BY endpoint
ORDER BY count DESC
LIMIT 5
''').fetchall()
# Response code distribution
response_codes = conn.execute('''
SELECT response_code, COUNT(*) as count
FROM api_logs
GROUP BY response_code
ORDER BY response_code
''').fetchall()
return jsonify({
'database_stats': {
'tasks': task_count,
'products': product_count,
'users': user_count
},
'api_usage': {
'total_requests': total_requests,
'requests_last_24h': recent_requests,
'top_endpoints': [dict(row) for row in top_endpoints],
'response_codes': [dict(row) for row in response_codes]
},
'timestamp': datetime.now().isoformat()
})
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(400)
def bad_request(error):
return jsonify({'error': 'Bad request'}), 400
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
def main():
"""Main function to run the Flask REST API"""
print("Simple Flask REST API")
print("=====================")
print("\nAPI Documentation: http://localhost:5000/")
print("Health Check: http://localhost:5000/health")
print("\nDemo API Keys:")
print("- demo_key_123456789")
print("- test_key_987654321")
print("- admin_key_555666777")
print("\nExample usage:")
print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
print("\nStarting server...")
app.run(debug=True, host='0.0.0.0', port=5000)
if __name__ == "__main__":
main()
- Save the file.
- Run the following command to start the API server.
C:\Users\username\Documents\restAPIAuth> python restapi.py
* Running on http://127.0.0.1:5000
REST API with JWT Authentication
===============================
✓ Database initialized
✓ JWT configured
✓ Routes registered
Available Endpoints:
POST /auth/register - User registration
POST /auth/login - User login
GET /api/protected - Protected endpoint
GET /api/users - Get all users (admin only)
C:\Users\username\Documents\restAPIAuth> python restapi.py
* Running on http://127.0.0.1:5000
REST API with JWT Authentication
===============================
✓ Database initialized
✓ JWT configured
✓ Routes registered
Available Endpoints:
POST /auth/register - User registration
POST /auth/login - User login
GET /api/protected - Protected endpoint
GET /api/users - Get all users (admin only)
Explanation
- The
from flask_jwt_extended import JWTManager
from flask_jwt_extended import JWTManager
imports JWT functionality for secure authentication. - The
flask_sqlalchemy
flask_sqlalchemy
provides ORM capabilities for database operations and user management. - User registration includes password hashing and validation for secure account creation.
- JWT token generation creates secure access tokens with expiration times.
- Protected endpoints require valid JWT tokens for access authorization.
- Role-based access control differentiates between user and admin privileges.
- Password hashing uses secure algorithms to protect user credentials.
- Token validation ensures only authenticated users can access protected resources.
- CRUD operations provide complete data management functionality.
- Error handling returns appropriate HTTP status codes and error messages.
- Input validation prevents malformed requests and security vulnerabilities.
- Rate limiting protects the API from abuse and excessive requests.
Next Steps
Congratulations! You have successfully created a REST API with JWT Authentication in Python. Experiment with the code and see if you can modify the application. Here are a few suggestions:
- Add refresh token functionality for extended sessions
- Implement OAuth2 integration with Google/Facebook
- Create comprehensive API documentation with Swagger
- Add email verification for user registration
- Implement password recovery and reset features
- Create API versioning strategies
- Add comprehensive logging and monitoring
- Implement database migrations and backups
- Create automated testing suites
Conclusion
In this project, you learned how to create a REST API with JWT Authentication in Python using Flask. You also learned about secure authentication, token management, API security, and implementing professional authentication systems. You can find the source code on GitHub
The Complete Code
⚙️ REST API with JWT Authentication
# Simple Flask REST API
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# API Keys table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
api_key VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Tasks table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(200) NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT 0,
priority VARCHAR(10) DEFAULT 'medium',
due_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Products table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
stock_quantity INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# API Logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
endpoint VARCHAR(200),
method VARCHAR(10),
ip_address VARCHAR(45),
user_agent TEXT,
api_key VARCHAR(255),
response_code INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
# Insert sample data
self.insert_sample_data()
def insert_sample_data(self):
"""Insert sample data for demonstration"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if sample data already exists
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] > 0:
return
# Sample users
sample_users = [
('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
]
cursor.executemany(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
sample_users
)
# Sample API keys
api_keys = [
(1, 'demo_key_123456789', 'Demo Key'),
(2, 'test_key_987654321', 'Test Key'),
(3, 'admin_key_555666777', 'Admin Key')
]
cursor.executemany(
"INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
api_keys
)
# Sample tasks
sample_tasks = [
('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
]
cursor.executemany(
"INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
sample_tasks
)
# Sample products
sample_products = [
('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
]
cursor.executemany(
"INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
sample_products
)
conn.commit()
# Initialize database
db_manager = DatabaseManager(DATABASE)
def require_api_key(f):
"""Decorator to require API key authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
# Validate API key
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ak.id, ak.user_id, u.username
FROM api_keys ak
JOIN users u ON ak.user_id = u.id
WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
''', (api_key,))
result = cursor.fetchone()
if not result:
log_api_request(request, api_key, 401)
return jsonify({'error': 'Invalid API key'}), 401
# Update last used timestamp
cursor.execute(
"UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
(api_key,)
)
conn.commit()
# Log successful request
log_api_request(request, api_key, 200)
# Add user info to request context
request.current_user_id = result[1]
request.current_username = result[2]
return f(*args, **kwargs)
return decorated_function
def log_api_request(request_obj, api_key, response_code):
"""Log API request for monitoring"""
try:
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
VALUES (?, ?, ?, ?, ?, ?)
''', (
request_obj.endpoint,
request_obj.method,
request_obj.remote_addr,
request_obj.headers.get('User-Agent', ''),
api_key,
response_code
))
conn.commit()
except Exception as e:
logger.error(f"Error logging API request: {e}")
def get_db_connection():
"""Get database connection with row factory"""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
# API Documentation endpoint
@app.route('/')
def api_documentation():
"""API Documentation"""
docs = {
"title": "Simple Flask REST API",
"version": "1.0.0",
"description": "A demonstration REST API with CRUD operations",
"base_url": request.base_url,
"authentication": {
"type": "API Key",
"header": "X-API-Key",
"demo_keys": [
"demo_key_123456789",
"test_key_987654321",
"admin_key_555666777"
]
},
"endpoints": {
"Tasks": {
"GET /api/tasks": "Get all tasks",
"GET /api/tasks/<id>": "Get specific task",
"POST /api/tasks": "Create new task",
"PUT /api/tasks/<id>": "Update task",
"DELETE /api/tasks/<id>": "Delete task"
},
"Products": {
"GET /api/products": "Get all products",
"GET /api/products/<id>": "Get specific product",
"POST /api/products": "Create new product",
"PUT /api/products/<id>": "Update product",
"DELETE /api/products/<id>": "Delete product"
},
"Users": {
"GET /api/users": "Get all users",
"GET /api/users/<id>": "Get specific user",
"POST /api/users": "Create new user"
},
"Statistics": {
"GET /api/stats": "Get API usage statistics"
}
},
"examples": {
"create_task": {
"method": "POST",
"url": "/api/tasks",
"headers": {"X-API-Key": "demo_key_123456789"},
"body": {
"title": "New Task",
"description": "Task description",
"priority": "high",
"due_date": "2023-12-31"
}
}
}
}
return jsonify(docs)
# Health check endpoint
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
"""Get all tasks with optional filtering"""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
completed = request.args.get('completed', type=bool)
priority = request.args.get('priority')
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if completed is not None:
query += " AND completed = ?"
params.append(completed)
if priority:
query += " AND priority = ?"
params.append(priority)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([per_page, (page - 1) * per_page])
with get_db_connection() as conn:
tasks = conn.execute(query, params).fetchall()
# Get total count
count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
count_params = []
if completed is not None:
count_query += " AND completed = ?"
count_params.append(completed)
if priority:
count_query += " AND priority = ?"
count_params.append(priority)
total = conn.execute(count_query, count_params).fetchone()[0]
return jsonify({
'tasks': [dict(task) for task in tasks],
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page
}
})
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
"""Get specific task"""
with get_db_connection() as conn:
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if task is None:
abort(404)
return jsonify(dict(task))
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
"""Create new task"""
data = request.get_json()
if not data or 'title' not in data:
return jsonify({'error': 'Title is required'}), 400
title = data['title']
description = data.get('description', '')
priority = data.get('priority', 'medium')
due_date = data.get('due_date')
user_id = request.current_user_id
# Validate priority
if priority not in ['low', 'medium', 'high', 'critical']:
return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tasks (title, description, priority, due_date, user_id)
VALUES (?, ?, ?, ?, ?)
''', (title, description, priority, due_date, user_id))
task_id = cursor.lastrowid
conn.commit()
# Get the created task
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(task)), 201
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
"""Update task"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Check if task exists
with get_db_connection() as conn:
existing_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if existing_task is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['title', 'description', 'completed', 'priority', 'due_date']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(task_id)
query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated task
updated_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(updated_task))
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
"""Delete task"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
"""Get all products with optional filtering"""
category = request.args.get('category')
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
in_stock = request.args.get('in_stock', type=bool)
query = "SELECT * FROM products WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if min_price is not None:
query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
query += " AND price <= ?"
params.append(max_price)
if in_stock is not None:
if in_stock:
query += " AND stock_quantity > 0"
else:
query += " AND stock_quantity = 0"
query += " ORDER BY name"
with get_db_connection() as conn:
products = conn.execute(query, params).fetchall()
return jsonify({
'products': [dict(product) for product in products]
})
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
"""Get specific product"""
with get_db_connection() as conn:
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if product is None:
abort(404)
return jsonify(dict(product))
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
"""Create new product"""
data = request.get_json()
required_fields = ['name', 'price']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
name = data['name']
description = data.get('description', '')
price = data['price']
category = data.get('category', '')
stock_quantity = data.get('stock_quantity', 0)
# Validate price
try:
price = float(price)
if price < 0:
raise ValueError()
except (ValueError, TypeError):
return jsonify({'error': 'Price must be a valid positive number'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO products (name, description, price, category, stock_quantity)
VALUES (?, ?, ?, ?, ?)
''', (name, description, price, category, stock_quantity))
product_id = cursor.lastrowid
conn.commit()
# Get the created product
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(product)), 201
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
"""Update product"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
with get_db_connection() as conn:
existing_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if existing_product is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(product_id)
query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated product
updated_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(updated_product))
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
"""Delete product"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
"""Get all users (limited info)"""
with get_db_connection() as conn:
users = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
).fetchall()
return jsonify({
'users': [dict(user) for user in users]
})
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
"""Get specific user"""
with get_db_connection() as conn:
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
if user is None:
abort(404)
return jsonify(dict(user))
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
"""Create new user"""
data = request.get_json()
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
username = data['username']
email = data['email']
password = data['password']
# Hash password
password_hash = hashlib.sha256(password.encode()).hexdigest()
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (username, email, password_hash))
user_id = cursor.lastrowid
conn.commit()
# Get the created user (without password)
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
return jsonify(dict(user)), 201
except sqlite3.IntegrityError as e:
if 'username' in str(e):
return jsonify({'error': 'Username already exists'}), 400
elif 'email' in str(e):
return jsonify({'error': 'Email already exists'}), 400
else:
return jsonify({'error': 'Database constraint violation'}), 400
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
"""Get API usage statistics"""
with get_db_connection() as conn:
# Basic counts
task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
# API usage stats
total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
# Recent activity (last 24 hours)
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
recent_requests = conn.execute(
"SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
).fetchone()[0]
# Top endpoints
top_endpoints = conn.execute('''
SELECT endpoint, COUNT(*) as count
FROM api_logs
GROUP BY endpoint
ORDER BY count DESC
LIMIT 5
''').fetchall()
# Response code distribution
response_codes = conn.execute('''
SELECT response_code, COUNT(*) as count
FROM api_logs
GROUP BY response_code
ORDER BY response_code
''').fetchall()
return jsonify({
'database_stats': {
'tasks': task_count,
'products': product_count,
'users': user_count
},
'api_usage': {
'total_requests': total_requests,
'requests_last_24h': recent_requests,
'top_endpoints': [dict(row) for row in top_endpoints],
'response_codes': [dict(row) for row in response_codes]
},
'timestamp': datetime.now().isoformat()
})
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(400)
def bad_request(error):
return jsonify({'error': 'Bad request'}), 400
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
def main():
"""Main function to run the Flask REST API"""
print("Simple Flask REST API")
print("=====================")
print("\nAPI Documentation: http://localhost:5000/")
print("Health Check: http://localhost:5000/health")
print("\nDemo API Keys:")
print("- demo_key_123456789")
print("- test_key_987654321")
print("- admin_key_555666777")
print("\nExample usage:")
print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
print("\nStarting server...")
app.run(debug=True, host='0.0.0.0', port=5000)
if __name__ == "__main__":
main()
# Simple Flask REST API
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.init_database()
def init_database(self):
"""Initialize the database with required tables"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1
)
''')
# API Keys table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
api_key VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Tasks table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title VARCHAR(200) NOT NULL,
description TEXT,
completed BOOLEAN DEFAULT 0,
priority VARCHAR(10) DEFAULT 'medium',
due_date DATE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
user_id INTEGER,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Products table (example resource)
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name VARCHAR(200) NOT NULL,
description TEXT,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
stock_quantity INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# API Logs table
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
endpoint VARCHAR(200),
method VARCHAR(10),
ip_address VARCHAR(45),
user_agent TEXT,
api_key VARCHAR(255),
response_code INTEGER,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
# Insert sample data
self.insert_sample_data()
def insert_sample_data(self):
"""Insert sample data for demonstration"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Check if sample data already exists
cursor.execute("SELECT COUNT(*) FROM users")
if cursor.fetchone()[0] > 0:
return
# Sample users
sample_users = [
('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
]
cursor.executemany(
"INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
sample_users
)
# Sample API keys
api_keys = [
(1, 'demo_key_123456789', 'Demo Key'),
(2, 'test_key_987654321', 'Test Key'),
(3, 'admin_key_555666777', 'Admin Key')
]
cursor.executemany(
"INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
api_keys
)
# Sample tasks
sample_tasks = [
('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
]
cursor.executemany(
"INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
sample_tasks
)
# Sample products
sample_products = [
('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
]
cursor.executemany(
"INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
sample_products
)
conn.commit()
# Initialize database
db_manager = DatabaseManager(DATABASE)
def require_api_key(f):
"""Decorator to require API key authentication"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
# Validate API key
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ak.id, ak.user_id, u.username
FROM api_keys ak
JOIN users u ON ak.user_id = u.id
WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
''', (api_key,))
result = cursor.fetchone()
if not result:
log_api_request(request, api_key, 401)
return jsonify({'error': 'Invalid API key'}), 401
# Update last used timestamp
cursor.execute(
"UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
(api_key,)
)
conn.commit()
# Log successful request
log_api_request(request, api_key, 200)
# Add user info to request context
request.current_user_id = result[1]
request.current_username = result[2]
return f(*args, **kwargs)
return decorated_function
def log_api_request(request_obj, api_key, response_code):
"""Log API request for monitoring"""
try:
with sqlite3.connect(DATABASE) as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
VALUES (?, ?, ?, ?, ?, ?)
''', (
request_obj.endpoint,
request_obj.method,
request_obj.remote_addr,
request_obj.headers.get('User-Agent', ''),
api_key,
response_code
))
conn.commit()
except Exception as e:
logger.error(f"Error logging API request: {e}")
def get_db_connection():
"""Get database connection with row factory"""
conn = sqlite3.connect(DATABASE)
conn.row_factory = sqlite3.Row
return conn
# API Documentation endpoint
@app.route('/')
def api_documentation():
"""API Documentation"""
docs = {
"title": "Simple Flask REST API",
"version": "1.0.0",
"description": "A demonstration REST API with CRUD operations",
"base_url": request.base_url,
"authentication": {
"type": "API Key",
"header": "X-API-Key",
"demo_keys": [
"demo_key_123456789",
"test_key_987654321",
"admin_key_555666777"
]
},
"endpoints": {
"Tasks": {
"GET /api/tasks": "Get all tasks",
"GET /api/tasks/<id>": "Get specific task",
"POST /api/tasks": "Create new task",
"PUT /api/tasks/<id>": "Update task",
"DELETE /api/tasks/<id>": "Delete task"
},
"Products": {
"GET /api/products": "Get all products",
"GET /api/products/<id>": "Get specific product",
"POST /api/products": "Create new product",
"PUT /api/products/<id>": "Update product",
"DELETE /api/products/<id>": "Delete product"
},
"Users": {
"GET /api/users": "Get all users",
"GET /api/users/<id>": "Get specific user",
"POST /api/users": "Create new user"
},
"Statistics": {
"GET /api/stats": "Get API usage statistics"
}
},
"examples": {
"create_task": {
"method": "POST",
"url": "/api/tasks",
"headers": {"X-API-Key": "demo_key_123456789"},
"body": {
"title": "New Task",
"description": "Task description",
"priority": "high",
"due_date": "2023-12-31"
}
}
}
}
return jsonify(docs)
# Health check endpoint
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
"""Get all tasks with optional filtering"""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 10, type=int), 100)
completed = request.args.get('completed', type=bool)
priority = request.args.get('priority')
query = "SELECT * FROM tasks WHERE 1=1"
params = []
if completed is not None:
query += " AND completed = ?"
params.append(completed)
if priority:
query += " AND priority = ?"
params.append(priority)
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
params.extend([per_page, (page - 1) * per_page])
with get_db_connection() as conn:
tasks = conn.execute(query, params).fetchall()
# Get total count
count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
count_params = []
if completed is not None:
count_query += " AND completed = ?"
count_params.append(completed)
if priority:
count_query += " AND priority = ?"
count_params.append(priority)
total = conn.execute(count_query, count_params).fetchone()[0]
return jsonify({
'tasks': [dict(task) for task in tasks],
'pagination': {
'page': page,
'per_page': per_page,
'total': total,
'pages': (total + per_page - 1) // per_page
}
})
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
"""Get specific task"""
with get_db_connection() as conn:
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if task is None:
abort(404)
return jsonify(dict(task))
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
"""Create new task"""
data = request.get_json()
if not data or 'title' not in data:
return jsonify({'error': 'Title is required'}), 400
title = data['title']
description = data.get('description', '')
priority = data.get('priority', 'medium')
due_date = data.get('due_date')
user_id = request.current_user_id
# Validate priority
if priority not in ['low', 'medium', 'high', 'critical']:
return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tasks (title, description, priority, due_date, user_id)
VALUES (?, ?, ?, ?, ?)
''', (title, description, priority, due_date, user_id))
task_id = cursor.lastrowid
conn.commit()
# Get the created task
task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(task)), 201
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
"""Update task"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
# Check if task exists
with get_db_connection() as conn:
existing_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
if existing_task is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['title', 'description', 'completed', 'priority', 'due_date']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(task_id)
query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated task
updated_task = conn.execute(
"SELECT * FROM tasks WHERE id = ?", (task_id,)
).fetchone()
return jsonify(dict(updated_task))
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
"""Delete task"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
"""Get all products with optional filtering"""
category = request.args.get('category')
min_price = request.args.get('min_price', type=float)
max_price = request.args.get('max_price', type=float)
in_stock = request.args.get('in_stock', type=bool)
query = "SELECT * FROM products WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if min_price is not None:
query += " AND price >= ?"
params.append(min_price)
if max_price is not None:
query += " AND price <= ?"
params.append(max_price)
if in_stock is not None:
if in_stock:
query += " AND stock_quantity > 0"
else:
query += " AND stock_quantity = 0"
query += " ORDER BY name"
with get_db_connection() as conn:
products = conn.execute(query, params).fetchall()
return jsonify({
'products': [dict(product) for product in products]
})
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
"""Get specific product"""
with get_db_connection() as conn:
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if product is None:
abort(404)
return jsonify(dict(product))
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
"""Create new product"""
data = request.get_json()
required_fields = ['name', 'price']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
name = data['name']
description = data.get('description', '')
price = data['price']
category = data.get('category', '')
stock_quantity = data.get('stock_quantity', 0)
# Validate price
try:
price = float(price)
if price < 0:
raise ValueError()
except (ValueError, TypeError):
return jsonify({'error': 'Price must be a valid positive number'}), 400
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO products (name, description, price, category, stock_quantity)
VALUES (?, ?, ?, ?, ?)
''', (name, description, price, category, stock_quantity))
product_id = cursor.lastrowid
conn.commit()
# Get the created product
product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(product)), 201
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
"""Update product"""
data = request.get_json()
if not data:
return jsonify({'error': 'No data provided'}), 400
with get_db_connection() as conn:
existing_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
if existing_product is None:
abort(404)
# Build update query
update_fields = []
params = []
for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
if field in data:
update_fields.append(f"{field} = ?")
params.append(data[field])
if update_fields:
update_fields.append("updated_at = CURRENT_TIMESTAMP")
params.append(product_id)
query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
conn.execute(query, params)
conn.commit()
# Get updated product
updated_product = conn.execute(
"SELECT * FROM products WHERE id = ?", (product_id,)
).fetchone()
return jsonify(dict(updated_product))
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
"""Delete product"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
if cursor.rowcount == 0:
abort(404)
conn.commit()
return '', 204
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
"""Get all users (limited info)"""
with get_db_connection() as conn:
users = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
).fetchall()
return jsonify({
'users': [dict(user) for user in users]
})
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
"""Get specific user"""
with get_db_connection() as conn:
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
if user is None:
abort(404)
return jsonify(dict(user))
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
"""Create new user"""
data = request.get_json()
required_fields = ['username', 'email', 'password']
for field in required_fields:
if field not in data:
return jsonify({'error': f'{field} is required'}), 400
username = data['username']
email = data['email']
password = data['password']
# Hash password
password_hash = hashlib.sha256(password.encode()).hexdigest()
try:
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT INTO users (username, email, password_hash)
VALUES (?, ?, ?)
''', (username, email, password_hash))
user_id = cursor.lastrowid
conn.commit()
# Get the created user (without password)
user = conn.execute(
"SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
(user_id,)
).fetchone()
return jsonify(dict(user)), 201
except sqlite3.IntegrityError as e:
if 'username' in str(e):
return jsonify({'error': 'Username already exists'}), 400
elif 'email' in str(e):
return jsonify({'error': 'Email already exists'}), 400
else:
return jsonify({'error': 'Database constraint violation'}), 400
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
"""Get API usage statistics"""
with get_db_connection() as conn:
# Basic counts
task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
# API usage stats
total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
# Recent activity (last 24 hours)
yesterday = (datetime.now() - timedelta(days=1)).isoformat()
recent_requests = conn.execute(
"SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
).fetchone()[0]
# Top endpoints
top_endpoints = conn.execute('''
SELECT endpoint, COUNT(*) as count
FROM api_logs
GROUP BY endpoint
ORDER BY count DESC
LIMIT 5
''').fetchall()
# Response code distribution
response_codes = conn.execute('''
SELECT response_code, COUNT(*) as count
FROM api_logs
GROUP BY response_code
ORDER BY response_code
''').fetchall()
return jsonify({
'database_stats': {
'tasks': task_count,
'products': product_count,
'users': user_count
},
'api_usage': {
'total_requests': total_requests,
'requests_last_24h': recent_requests,
'top_endpoints': [dict(row) for row in top_endpoints],
'response_codes': [dict(row) for row in response_codes]
},
'timestamp': datetime.now().isoformat()
})
# Error handlers
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(400)
def bad_request(error):
return jsonify({'error': 'Bad request'}), 400
@app.errorhandler(500)
def internal_error(error):
return jsonify({'error': 'Internal server error'}), 500
def main():
"""Main function to run the Flask REST API"""
print("Simple Flask REST API")
print("=====================")
print("\nAPI Documentation: http://localhost:5000/")
print("Health Check: http://localhost:5000/health")
print("\nDemo API Keys:")
print("- demo_key_123456789")
print("- test_key_987654321")
print("- admin_key_555666777")
print("\nExample usage:")
print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
print("\nStarting server...")
app.run(debug=True, host='0.0.0.0', port=5000)
if __name__ == "__main__":
main()
How It Works
1. Flask Application Configuration
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from marshmallow import Schema, fields, ValidationError
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import os
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'your-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///api.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'jwt-secret-key')
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
# Initialize extensions
db = SQLAlchemy(app)
jwt = JWTManager(app)
limiter = Limiter(app, key_func=get_remote_address)
CORS(app)
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from marshmallow import Schema, fields, ValidationError
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import os
app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'your-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///api.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'jwt-secret-key')
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
# Initialize extensions
db = SQLAlchemy(app)
jwt = JWTManager(app)
limiter = Limiter(app, key_func=get_remote_address)
CORS(app)
The application setup includes:
- Flask Configuration: Environment-based configuration management
- Database Setup: SQLAlchemy with configurable database URLs
- JWT Management: Token creation and validation
- Rate Limiting: Request throttling by IP address
- CORS Support: Cross-Origin Resource Sharing for web clients
2. Database Models
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
first_name = db.Column(db.String(50), nullable=False)
last_name = db.Column(db.String(50), nullable=False)
role = db.Column(db.String(20), default='user')
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
posts = db.relationship('Post', backref='author', lazy=True, cascade='all, delete-orphan')
def set_password(self, password):
"""Hash and set password"""
self.password_hash = generate_password_hash(password)
def check_password(self, password):
"""Check password against hash"""
return check_password_hash(self.password_hash, password)
def to_dict(self):
"""Convert user to dictionary"""
return {
'id': self.id,
'username': self.username,
'email': self.email,
'first_name': self.first_name,
'last_name': self.last_name,
'role': self.role,
'is_active': self.is_active,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
published = db.Column(db.Boolean, default=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
"""Convert post to dictionary"""
return {
'id': self.id,
'title': self.title,
'content': self.content,
'slug': self.slug,
'published': self.published,
'user_id': self.user_id,
'author': self.author.username if self.author else None,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
first_name = db.Column(db.String(50), nullable=False)
last_name = db.Column(db.String(50), nullable=False)
role = db.Column(db.String(20), default='user')
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
posts = db.relationship('Post', backref='author', lazy=True, cascade='all, delete-orphan')
def set_password(self, password):
"""Hash and set password"""
self.password_hash = generate_password_hash(password)
def check_password(self, password):
"""Check password against hash"""
return check_password_hash(self.password_hash, password)
def to_dict(self):
"""Convert user to dictionary"""
return {
'id': self.id,
'username': self.username,
'email': self.email,
'first_name': self.first_name,
'last_name': self.last_name,
'role': self.role,
'is_active': self.is_active,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
published = db.Column(db.Boolean, default=False)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
"""Convert post to dictionary"""
return {
'id': self.id,
'title': self.title,
'content': self.content,
'slug': self.slug,
'published': self.published,
'user_id': self.user_id,
'author': self.author.username if self.author else None,
'created_at': self.created_at.isoformat(),
'updated_at': self.updated_at.isoformat()
}
3. Input Validation Schemas
class UserRegistrationSchema(Schema):
"""Schema for user registration validation"""
username = fields.Str(required=True, validate=lambda x: len(x) >= 3)
email = fields.Email(required=True)
password = fields.Str(required=True, validate=lambda x: len(x) >= 6)
first_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
last_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
class UserLoginSchema(Schema):
"""Schema for user login validation"""
username = fields.Str(required=True)
password = fields.Str(required=True)
class PostSchema(Schema):
"""Schema for post validation"""
title = fields.Str(required=True, validate=lambda x: len(x) >= 5)
content = fields.Str(required=True, validate=lambda x: len(x) >= 10)
published = fields.Bool(missing=False)
class PostUpdateSchema(Schema):
"""Schema for post update validation"""
title = fields.Str(validate=lambda x: len(x) >= 5)
content = fields.Str(validate=lambda x: len(x) >= 10)
published = fields.Bool()
class UserRegistrationSchema(Schema):
"""Schema for user registration validation"""
username = fields.Str(required=True, validate=lambda x: len(x) >= 3)
email = fields.Email(required=True)
password = fields.Str(required=True, validate=lambda x: len(x) >= 6)
first_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
last_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
class UserLoginSchema(Schema):
"""Schema for user login validation"""
username = fields.Str(required=True)
password = fields.Str(required=True)
class PostSchema(Schema):
"""Schema for post validation"""
title = fields.Str(required=True, validate=lambda x: len(x) >= 5)
content = fields.Str(required=True, validate=lambda x: len(x) >= 10)
published = fields.Bool(missing=False)
class PostUpdateSchema(Schema):
"""Schema for post update validation"""
title = fields.Str(validate=lambda x: len(x) >= 5)
content = fields.Str(validate=lambda x: len(x) >= 10)
published = fields.Bool()
4. Authentication System
@app.route('/api/auth/register', methods=['POST'])
@limiter.limit("5 per minute")
def register():
"""User registration endpoint"""
try:
# Validate input
schema = UserRegistrationSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Check if user exists
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already exists'}), 400
# Create new user
user = User(
username=data['username'],
email=data['email'],
first_name=data['first_name'],
last_name=data['last_name']
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
# Create access token
access_token = create_access_token(identity=user.id)
return jsonify({
'message': 'User registered successfully',
'user': user.to_dict(),
'access_token': access_token
}), 201
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("10 per minute")
def login():
"""User login endpoint"""
try:
schema = UserLoginSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Find user
user = User.query.filter_by(username=data['username']).first()
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid credentials'}), 401
if not user.is_active:
return jsonify({'error': 'Account is deactivated'}), 401
# Create access token
access_token = create_access_token(identity=user.id)
return jsonify({
'message': 'Login successful',
'user': user.to_dict(),
'access_token': access_token
}), 200
@app.route('/api/auth/profile', methods=['GET'])
@jwt_required()
def get_profile():
"""Get current user profile"""
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({'user': user.to_dict()}), 200
@app.route('/api/auth/register', methods=['POST'])
@limiter.limit("5 per minute")
def register():
"""User registration endpoint"""
try:
# Validate input
schema = UserRegistrationSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Check if user exists
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': 'Username already exists'}), 400
if User.query.filter_by(email=data['email']).first():
return jsonify({'error': 'Email already exists'}), 400
# Create new user
user = User(
username=data['username'],
email=data['email'],
first_name=data['first_name'],
last_name=data['last_name']
)
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
# Create access token
access_token = create_access_token(identity=user.id)
return jsonify({
'message': 'User registered successfully',
'user': user.to_dict(),
'access_token': access_token
}), 201
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("10 per minute")
def login():
"""User login endpoint"""
try:
schema = UserLoginSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Find user
user = User.query.filter_by(username=data['username']).first()
if not user or not user.check_password(data['password']):
return jsonify({'error': 'Invalid credentials'}), 401
if not user.is_active:
return jsonify({'error': 'Account is deactivated'}), 401
# Create access token
access_token = create_access_token(identity=user.id)
return jsonify({
'message': 'Login successful',
'user': user.to_dict(),
'access_token': access_token
}), 200
@app.route('/api/auth/profile', methods=['GET'])
@jwt_required()
def get_profile():
"""Get current user profile"""
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({'user': user.to_dict()}), 200
5. CRUD Operations Implementation
@app.route('/api/posts', methods=['GET'])
@limiter.limit("100 per minute")
def get_posts():
"""Get all posts with pagination"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
published_only = request.args.get('published', 'false').lower() == 'true'
# Limit per_page to prevent abuse
per_page = min(per_page, 100)
query = Post.query
if published_only:
query = query.filter_by(published=True)
posts = query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'posts': [post.to_dict() for post in posts.items],
'pagination': {
'page': posts.page,
'pages': posts.pages,
'per_page': posts.per_page,
'total': posts.total,
'has_next': posts.has_next,
'has_prev': posts.has_prev
}
}), 200
@app.route('/api/posts', methods=['POST'])
@jwt_required()
@limiter.limit("20 per minute")
def create_post():
"""Create a new post"""
current_user_id = get_jwt_identity()
try:
schema = PostSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Generate slug from title
import re
slug = re.sub(r'[^a-zA-Z0-9\s]', '', data['title'].lower())
slug = re.sub(r'\s+', '-', slug.strip())
# Ensure unique slug
original_slug = slug
counter = 1
while Post.query.filter_by(slug=slug).first():
slug = f"{original_slug}-{counter}"
counter += 1
post = Post(
title=data['title'],
content=data['content'],
slug=slug,
published=data['published'],
user_id=current_user_id
)
db.session.add(post)
db.session.commit()
return jsonify({
'message': 'Post created successfully',
'post': post.to_dict()
}), 201
@app.route('/api/posts/<int:post_id>', methods=['GET'])
@limiter.limit("100 per minute")
def get_post(post_id):
"""Get a specific post"""
post = Post.query.get_or_404(post_id)
return jsonify({'post': post.to_dict()}), 200
@app.route('/api/posts/<int:post_id>', methods=['PUT'])
@jwt_required()
@limiter.limit("20 per minute")
def update_post(post_id):
"""Update a post"""
current_user_id = get_jwt_identity()
post = Post.query.get_or_404(post_id)
# Check ownership or admin role
current_user = User.query.get(current_user_id)
if post.user_id != current_user_id and current_user.role != 'admin':
return jsonify({'error': 'Permission denied'}), 403
try:
schema = PostUpdateSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Update post fields
for field, value in data.items():
if field == 'title' and value != post.title:
# Update slug if title changed
import re
slug = re.sub(r'[^a-zA-Z0-9\s]', '', value.lower())
slug = re.sub(r'\s+', '-', slug.strip())
# Ensure unique slug
original_slug = slug
counter = 1
while Post.query.filter_by(slug=slug).filter(Post.id != post.id).first():
slug = f"{original_slug}-{counter}"
counter += 1
post.slug = slug
setattr(post, field, value)
post.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({
'message': 'Post updated successfully',
'post': post.to_dict()
}), 200
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@jwt_required()
@limiter.limit("10 per minute")
def delete_post(post_id):
"""Delete a post"""
current_user_id = get_jwt_identity()
post = Post.query.get_or_404(post_id)
# Check ownership or admin role
current_user = User.query.get(current_user_id)
if post.user_id != current_user_id and current_user.role != 'admin':
return jsonify({'error': 'Permission denied'}), 403
db.session.delete(post)
db.session.commit()
return jsonify({'message': 'Post deleted successfully'}), 200
@app.route('/api/posts', methods=['GET'])
@limiter.limit("100 per minute")
def get_posts():
"""Get all posts with pagination"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
published_only = request.args.get('published', 'false').lower() == 'true'
# Limit per_page to prevent abuse
per_page = min(per_page, 100)
query = Post.query
if published_only:
query = query.filter_by(published=True)
posts = query.order_by(Post.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'posts': [post.to_dict() for post in posts.items],
'pagination': {
'page': posts.page,
'pages': posts.pages,
'per_page': posts.per_page,
'total': posts.total,
'has_next': posts.has_next,
'has_prev': posts.has_prev
}
}), 200
@app.route('/api/posts', methods=['POST'])
@jwt_required()
@limiter.limit("20 per minute")
def create_post():
"""Create a new post"""
current_user_id = get_jwt_identity()
try:
schema = PostSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Generate slug from title
import re
slug = re.sub(r'[^a-zA-Z0-9\s]', '', data['title'].lower())
slug = re.sub(r'\s+', '-', slug.strip())
# Ensure unique slug
original_slug = slug
counter = 1
while Post.query.filter_by(slug=slug).first():
slug = f"{original_slug}-{counter}"
counter += 1
post = Post(
title=data['title'],
content=data['content'],
slug=slug,
published=data['published'],
user_id=current_user_id
)
db.session.add(post)
db.session.commit()
return jsonify({
'message': 'Post created successfully',
'post': post.to_dict()
}), 201
@app.route('/api/posts/<int:post_id>', methods=['GET'])
@limiter.limit("100 per minute")
def get_post(post_id):
"""Get a specific post"""
post = Post.query.get_or_404(post_id)
return jsonify({'post': post.to_dict()}), 200
@app.route('/api/posts/<int:post_id>', methods=['PUT'])
@jwt_required()
@limiter.limit("20 per minute")
def update_post(post_id):
"""Update a post"""
current_user_id = get_jwt_identity()
post = Post.query.get_or_404(post_id)
# Check ownership or admin role
current_user = User.query.get(current_user_id)
if post.user_id != current_user_id and current_user.role != 'admin':
return jsonify({'error': 'Permission denied'}), 403
try:
schema = PostUpdateSchema()
data = schema.load(request.json)
except ValidationError as err:
return jsonify({'errors': err.messages}), 400
# Update post fields
for field, value in data.items():
if field == 'title' and value != post.title:
# Update slug if title changed
import re
slug = re.sub(r'[^a-zA-Z0-9\s]', '', value.lower())
slug = re.sub(r'\s+', '-', slug.strip())
# Ensure unique slug
original_slug = slug
counter = 1
while Post.query.filter_by(slug=slug).filter(Post.id != post.id).first():
slug = f"{original_slug}-{counter}"
counter += 1
post.slug = slug
setattr(post, field, value)
post.updated_at = datetime.utcnow()
db.session.commit()
return jsonify({
'message': 'Post updated successfully',
'post': post.to_dict()
}), 200
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@jwt_required()
@limiter.limit("10 per minute")
def delete_post(post_id):
"""Delete a post"""
current_user_id = get_jwt_identity()
post = Post.query.get_or_404(post_id)
# Check ownership or admin role
current_user = User.query.get(current_user_id)
if post.user_id != current_user_id and current_user.role != 'admin':
return jsonify({'error': 'Permission denied'}), 403
db.session.delete(post)
db.session.commit()
return jsonify({'message': 'Post deleted successfully'}), 200
6. Admin Endpoints
def admin_required(f):
"""Decorator for admin-only endpoints"""
@jwt_required()
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user or user.role != 'admin':
return jsonify({'error': 'Admin access required'}), 403
return f(*args, **kwargs)
return decorated_function
@app.route('/api/admin/users', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_users():
"""Get all users (admin only)"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
users = User.query.order_by(User.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'users': [user.to_dict() for user in users.items],
'pagination': {
'page': users.page,
'pages': users.pages,
'per_page': users.per_page,
'total': users.total
}
}), 200
@app.route('/api/admin/users/<int:user_id>/toggle-status', methods=['POST'])
@admin_required
@limiter.limit("20 per minute")
def admin_toggle_user_status(user_id):
"""Toggle user active status (admin only)"""
user = User.query.get_or_404(user_id)
# Prevent admin from deactivating themselves
current_user_id = get_jwt_identity()
if user_id == current_user_id:
return jsonify({'error': 'Cannot deactivate your own account'}), 400
user.is_active = not user.is_active
user.updated_at = datetime.utcnow()
db.session.commit()
status = 'activated' if user.is_active else 'deactivated'
return jsonify({
'message': f'User {status} successfully',
'user': user.to_dict()
}), 200
@app.route('/api/admin/stats', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_stats():
"""Get system statistics (admin only)"""
stats = {
'total_users': User.query.count(),
'active_users': User.query.filter_by(is_active=True).count(),
'total_posts': Post.query.count(),
'published_posts': Post.query.filter_by(published=True).count(),
'recent_registrations': User.query.filter(
User.created_at >= datetime.utcnow() - timedelta(days=7)
).count()
}
return jsonify({'stats': stats}), 200
def admin_required(f):
"""Decorator for admin-only endpoints"""
@jwt_required()
def decorated_function(*args, **kwargs):
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
if not user or user.role != 'admin':
return jsonify({'error': 'Admin access required'}), 403
return f(*args, **kwargs)
return decorated_function
@app.route('/api/admin/users', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_users():
"""Get all users (admin only)"""
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
users = User.query.order_by(User.created_at.desc()).paginate(
page=page, per_page=per_page, error_out=False
)
return jsonify({
'users': [user.to_dict() for user in users.items],
'pagination': {
'page': users.page,
'pages': users.pages,
'per_page': users.per_page,
'total': users.total
}
}), 200
@app.route('/api/admin/users/<int:user_id>/toggle-status', methods=['POST'])
@admin_required
@limiter.limit("20 per minute")
def admin_toggle_user_status(user_id):
"""Toggle user active status (admin only)"""
user = User.query.get_or_404(user_id)
# Prevent admin from deactivating themselves
current_user_id = get_jwt_identity()
if user_id == current_user_id:
return jsonify({'error': 'Cannot deactivate your own account'}), 400
user.is_active = not user.is_active
user.updated_at = datetime.utcnow()
db.session.commit()
status = 'activated' if user.is_active else 'deactivated'
return jsonify({
'message': f'User {status} successfully',
'user': user.to_dict()
}), 200
@app.route('/api/admin/stats', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_stats():
"""Get system statistics (admin only)"""
stats = {
'total_users': User.query.count(),
'active_users': User.query.filter_by(is_active=True).count(),
'total_posts': Post.query.count(),
'published_posts': Post.query.filter_by(published=True).count(),
'recent_registrations': User.query.filter(
User.created_at >= datetime.utcnow() - timedelta(days=7)
).count()
}
return jsonify({'stats': stats}), 200
Error Handling
1. Global Error Handlers
@app.errorhandler(ValidationError)
def handle_validation_error(error):
"""Handle validation errors"""
return jsonify({'errors': error.messages}), 400
@app.errorhandler(404)
def handle_not_found(error):
"""Handle 404 errors"""
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(500)
def handle_internal_error(error):
"""Handle internal server errors"""
db.session.rollback()
return jsonify({'error': 'Internal server error'}), 500
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
"""Handle expired JWT tokens"""
return jsonify({'error': 'Token has expired'}), 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
"""Handle invalid JWT tokens"""
return jsonify({'error': 'Invalid token'}), 401
@jwt.unauthorized_loader
def missing_token_callback(error):
"""Handle missing JWT tokens"""
return jsonify({'error': 'Authorization token is required'}), 401
@app.errorhandler(ValidationError)
def handle_validation_error(error):
"""Handle validation errors"""
return jsonify({'errors': error.messages}), 400
@app.errorhandler(404)
def handle_not_found(error):
"""Handle 404 errors"""
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(500)
def handle_internal_error(error):
"""Handle internal server errors"""
db.session.rollback()
return jsonify({'error': 'Internal server error'}), 500
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
"""Handle expired JWT tokens"""
return jsonify({'error': 'Token has expired'}), 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
"""Handle invalid JWT tokens"""
return jsonify({'error': 'Invalid token'}), 401
@jwt.unauthorized_loader
def missing_token_callback(error):
"""Handle missing JWT tokens"""
return jsonify({'error': 'Authorization token is required'}), 401
2. Custom Exception Classes
class APIException(Exception):
"""Base API exception class"""
status_code = 500
def __init__(self, message, status_code=None, payload=None):
super().__init__(message)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
rv['error'] = self.message
return rv
class ValidationException(APIException):
"""Validation error exception"""
status_code = 400
class AuthenticationException(APIException):
"""Authentication error exception"""
status_code = 401
class AuthorizationException(APIException):
"""Authorization error exception"""
status_code = 403
@app.errorhandler(APIException)
def handle_api_exception(error):
"""Handle custom API exceptions"""
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
class APIException(Exception):
"""Base API exception class"""
status_code = 500
def __init__(self, message, status_code=None, payload=None):
super().__init__(message)
self.message = message
if status_code is not None:
self.status_code = status_code
self.payload = payload
def to_dict(self):
rv = dict(self.payload or ())
rv['error'] = self.message
return rv
class ValidationException(APIException):
"""Validation error exception"""
status_code = 400
class AuthenticationException(APIException):
"""Authentication error exception"""
status_code = 401
class AuthorizationException(APIException):
"""Authorization error exception"""
status_code = 403
@app.errorhandler(APIException)
def handle_api_exception(error):
"""Handle custom API exceptions"""
response = jsonify(error.to_dict())
response.status_code = error.status_code
return response
API Documentation
1. OpenAPI Specification
from flask import send_from_directory
@app.route('/api/docs')
def api_docs():
"""Serve API documentation"""
return send_from_directory('static', 'swagger-ui.html')
@app.route('/api/openapi.json')
def openapi_spec():
"""OpenAPI specification"""
spec = {
"openapi": "3.0.0",
"info": {
"title": "REST API with JWT Authentication",
"version": "1.0.0",
"description": "A comprehensive REST API with JWT authentication"
},
"servers": [
{"url": "http://localhost:5000", "description": "Development server"}
],
"components": {
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
},
"paths": {
"/api/auth/register": {
"post": {
"summary": "Register a new user",
"tags": ["Authentication"],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"required": ["username", "email", "password", "first_name", "last_name"],
"properties": {
"username": {"type": "string", "minLength": 3},
"email": {"type": "string", "format": "email"},
"password": {"type": "string", "minLength": 6},
"first_name": {"type": "string", "minLength": 2},
"last_name": {"type": "string", "minLength": 2}
}
}
}
}
},
"responses": {
"201": {"description": "User registered successfully"},
"400": {"description": "Validation error"}
}
}
}
}
}
return jsonify(spec)
from flask import send_from_directory
@app.route('/api/docs')
def api_docs():
"""Serve API documentation"""
return send_from_directory('static', 'swagger-ui.html')
@app.route('/api/openapi.json')
def openapi_spec():
"""OpenAPI specification"""
spec = {
"openapi": "3.0.0",
"info": {
"title": "REST API with JWT Authentication",
"version": "1.0.0",
"description": "A comprehensive REST API with JWT authentication"
},
"servers": [
{"url": "http://localhost:5000", "description": "Development server"}
],
"components": {
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
},
"paths": {
"/api/auth/register": {
"post": {
"summary": "Register a new user",
"tags": ["Authentication"],
"requestBody": {
"required": True,
"content": {
"application/json": {
"schema": {
"type": "object",
"required": ["username", "email", "password", "first_name", "last_name"],
"properties": {
"username": {"type": "string", "minLength": 3},
"email": {"type": "string", "format": "email"},
"password": {"type": "string", "minLength": 6},
"first_name": {"type": "string", "minLength": 2},
"last_name": {"type": "string", "minLength": 2}
}
}
}
}
},
"responses": {
"201": {"description": "User registered successfully"},
"400": {"description": "Validation error"}
}
}
}
}
}
return jsonify(spec)
2. API Testing with Postman
{
"info": {
"name": "REST API with JWT Auth",
"description": "Postman collection for testing the API",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "Authentication",
"item": [
{
"name": "Register User",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"username\": \"testuser\",\n \"email\": \"test@example.com\",\n \"password\": \"password123\",\n \"first_name\": \"Test\",\n \"last_name\": \"User\"\n}"
},
"url": {
"raw": "{{base_url}}/api/auth/register",
"host": ["{{base_url}}"],
"path": ["api", "auth", "register"]
}
}
},
{
"name": "Login User",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"username\": \"testuser\",\n \"password\": \"password123\"\n}"
},
"url": {
"raw": "{{base_url}}/api/auth/login",
"host": ["{{base_url}}"],
"path": ["api", "auth", "login"]
}
}
}
]
}
]
}
{
"info": {
"name": "REST API with JWT Auth",
"description": "Postman collection for testing the API",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "Authentication",
"item": [
{
"name": "Register User",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"username\": \"testuser\",\n \"email\": \"test@example.com\",\n \"password\": \"password123\",\n \"first_name\": \"Test\",\n \"last_name\": \"User\"\n}"
},
"url": {
"raw": "{{base_url}}/api/auth/register",
"host": ["{{base_url}}"],
"path": ["api", "auth", "register"]
}
}
},
{
"name": "Login User",
"request": {
"method": "POST",
"header": [
{
"key": "Content-Type",
"value": "application/json"
}
],
"body": {
"mode": "raw",
"raw": "{\n \"username\": \"testuser\",\n \"password\": \"password123\"\n}"
},
"url": {
"raw": "{{base_url}}/api/auth/login",
"host": ["{{base_url}}"],
"path": ["api", "auth", "login"]
}
}
}
]
}
]
}
Running the Application
Development Mode
# Set environment variables
export FLASK_APP=restapi.py
export FLASK_ENV=development
export SECRET_KEY=your-secret-key
export JWT_SECRET_KEY=jwt-secret-key
export DATABASE_URL=sqlite:///api.db
# Initialize database
python -c "from restapi import db; db.create_all()"
# Run the application
flask run
# Set environment variables
export FLASK_APP=restapi.py
export FLASK_ENV=development
export SECRET_KEY=your-secret-key
export JWT_SECRET_KEY=jwt-secret-key
export DATABASE_URL=sqlite:///api.db
# Initialize database
python -c "from restapi import db; db.create_all()"
# Run the application
flask run
Production Configuration
# config.py
import os
from datetime import timedelta
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'fallback-secret-key'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///api.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///api_dev.db'
class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
WTF_CSRF_ENABLED = False
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig,
'default': DevelopmentConfig
}
# config.py
import os
from datetime import timedelta
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'fallback-secret-key'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///api.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///api_dev.db'
class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
WTF_CSRF_ENABLED = False
config = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig,
'default': DevelopmentConfig
}
Docker Setup
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "restapi:app"]
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "restapi:app"]
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "5000:5000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://user:password@db:5432/apidb
- SECRET_KEY=your-secret-key
- JWT_SECRET_KEY=jwt-secret-key
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_DB=apidb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "5000:5000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://user:password@db:5432/apidb
- SECRET_KEY=your-secret-key
- JWT_SECRET_KEY=jwt-secret-key
depends_on:
- db
db:
image: postgres:13
environment:
- POSTGRES_DB=apidb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
Usage Examples
1. User Registration and Authentication
# Register a new user
curl -X POST http://localhost:5000/api/auth/register \
-H "Content-Type: application/json" \
-d '{
"username": "johndoe",
"email": "john@example.com",
"password": "securepassword123",
"first_name": "John",
"last_name": "Doe"
}'
# Login user
curl -X POST http://localhost:5000/api/auth/login \
-H "Content-Type: application/json" \
-d '{
"username": "johndoe",
"password": "securepassword123"
}'
# Response will include access token:
{
"message": "Login successful",
"user": {...},
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}
# Register a new user
curl -X POST http://localhost:5000/api/auth/register \
-H "Content-Type: application/json" \
-d '{
"username": "johndoe",
"email": "john@example.com",
"password": "securepassword123",
"first_name": "John",
"last_name": "Doe"
}'
# Login user
curl -X POST http://localhost:5000/api/auth/login \
-H "Content-Type: application/json" \
-d '{
"username": "johndoe",
"password": "securepassword123"
}'
# Response will include access token:
{
"message": "Login successful",
"user": {...},
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}
2. Making Authenticated Requests
# Get user profile
curl -X GET http://localhost:5000/api/auth/profile \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# Create a new post
curl -X POST http://localhost:5000/api/posts \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"title": "My First Blog Post",
"content": "This is the content of my first blog post.",
"published": true
}'
# Get all posts with pagination
curl -X GET "http://localhost:5000/api/posts?page=1&per_page=10&published=true"
# Get user profile
curl -X GET http://localhost:5000/api/auth/profile \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# Create a new post
curl -X POST http://localhost:5000/api/posts \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"title": "My First Blog Post",
"content": "This is the content of my first blog post.",
"published": true
}'
# Get all posts with pagination
curl -X GET "http://localhost:5000/api/posts?page=1&per_page=10&published=true"
3. Admin Operations
# Get system statistics (admin only)
curl -X GET http://localhost:5000/api/admin/stats \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
# Get all users (admin only)
curl -X GET http://localhost:5000/api/admin/users \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
# Toggle user status (admin only)
curl -X POST http://localhost:5000/api/admin/users/2/toggle-status \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
# Get system statistics (admin only)
curl -X GET http://localhost:5000/api/admin/stats \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
# Get all users (admin only)
curl -X GET http://localhost:5000/api/admin/users \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
# Toggle user status (admin only)
curl -X POST http://localhost:5000/api/admin/users/2/toggle-status \
-H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
Testing Suite
1. Unit Tests
# tests/test_auth.py
import unittest
from restapi import app, db
from restapi.models import User
class AuthTestCase(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
self.app = app.test_client()
self.app_context = app.app_context()
self.app_context.push()
db.create_all()
def tearDown(self):
"""Clean up test fixtures"""
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_user_registration(self):
"""Test user registration"""
response = self.app.post('/api/auth/register', json={
'username': 'testuser',
'email': 'test@example.com',
'password': 'password123',
'first_name': 'Test',
'last_name': 'User'
})
self.assertEqual(response.status_code, 201)
data = response.get_json()
self.assertIn('access_token', data)
self.assertEqual(data['user']['username'], 'testuser')
def test_user_login(self):
"""Test user login"""
# Create test user
user = User(username='testuser', email='test@example.com',
first_name='Test', last_name='User')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# Test login
response = self.app.post('/api/auth/login', json={
'username': 'testuser',
'password': 'password123'
})
self.assertEqual(response.status_code, 200)
data = response.get_json()
self.assertIn('access_token', data)
def test_invalid_login(self):
"""Test invalid login credentials"""
response = self.app.post('/api/auth/login', json={
'username': 'nonexistent',
'password': 'wrongpassword'
})
self.assertEqual(response.status_code, 401)
data = response.get_json()
self.assertIn('error', data)
if __name__ == '__main__':
unittest.main()
# tests/test_auth.py
import unittest
from restapi import app, db
from restapi.models import User
class AuthTestCase(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
self.app = app.test_client()
self.app_context = app.app_context()
self.app_context.push()
db.create_all()
def tearDown(self):
"""Clean up test fixtures"""
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_user_registration(self):
"""Test user registration"""
response = self.app.post('/api/auth/register', json={
'username': 'testuser',
'email': 'test@example.com',
'password': 'password123',
'first_name': 'Test',
'last_name': 'User'
})
self.assertEqual(response.status_code, 201)
data = response.get_json()
self.assertIn('access_token', data)
self.assertEqual(data['user']['username'], 'testuser')
def test_user_login(self):
"""Test user login"""
# Create test user
user = User(username='testuser', email='test@example.com',
first_name='Test', last_name='User')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# Test login
response = self.app.post('/api/auth/login', json={
'username': 'testuser',
'password': 'password123'
})
self.assertEqual(response.status_code, 200)
data = response.get_json()
self.assertIn('access_token', data)
def test_invalid_login(self):
"""Test invalid login credentials"""
response = self.app.post('/api/auth/login', json={
'username': 'nonexistent',
'password': 'wrongpassword'
})
self.assertEqual(response.status_code, 401)
data = response.get_json()
self.assertIn('error', data)
if __name__ == '__main__':
unittest.main()
2. Integration Tests
# tests/test_api.py
import unittest
from restapi import app, db
from restapi.models import User, Post
class APITestCase(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
self.app = app.test_client()
self.app_context = app.app_context()
self.app_context.push()
db.create_all()
# Create test user and get token
user = User(username='testuser', email='test@example.com',
first_name='Test', last_name='User')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# Login to get token
response = self.app.post('/api/auth/login', json={
'username': 'testuser',
'password': 'password123'
})
self.token = response.get_json()['access_token']
self.headers = {'Authorization': f'Bearer {self.token}'}
def tearDown(self):
"""Clean up test fixtures"""
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_create_post(self):
"""Test post creation"""
response = self.app.post('/api/posts',
headers=self.headers,
json={
'title': 'Test Post Title',
'content': 'This is test content for the post.',
'published': True
})
self.assertEqual(response.status_code, 201)
data = response.get_json()
self.assertEqual(data['post']['title'], 'Test Post Title')
def test_get_posts(self):
"""Test getting posts"""
# Create test post
post = Post(title='Test Post', content='Test content',
slug='test-post', user_id=1, published=True)
db.session.add(post)
db.session.commit()
response = self.app.get('/api/posts')
self.assertEqual(response.status_code, 200)
data = response.get_json()
self.assertIn('posts', data)
self.assertEqual(len(data['posts']), 1)
def test_unauthorized_access(self):
"""Test unauthorized access to protected endpoints"""
response = self.app.post('/api/posts', json={
'title': 'Test Post',
'content': 'Test content'
})
self.assertEqual(response.status_code, 401)
if __name__ == '__main__':
unittest.main()
# tests/test_api.py
import unittest
from restapi import app, db
from restapi.models import User, Post
class APITestCase(unittest.TestCase):
def setUp(self):
"""Set up test fixtures"""
app.config['TESTING'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
self.app = app.test_client()
self.app_context = app.app_context()
self.app_context.push()
db.create_all()
# Create test user and get token
user = User(username='testuser', email='test@example.com',
first_name='Test', last_name='User')
user.set_password('password123')
db.session.add(user)
db.session.commit()
# Login to get token
response = self.app.post('/api/auth/login', json={
'username': 'testuser',
'password': 'password123'
})
self.token = response.get_json()['access_token']
self.headers = {'Authorization': f'Bearer {self.token}'}
def tearDown(self):
"""Clean up test fixtures"""
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_create_post(self):
"""Test post creation"""
response = self.app.post('/api/posts',
headers=self.headers,
json={
'title': 'Test Post Title',
'content': 'This is test content for the post.',
'published': True
})
self.assertEqual(response.status_code, 201)
data = response.get_json()
self.assertEqual(data['post']['title'], 'Test Post Title')
def test_get_posts(self):
"""Test getting posts"""
# Create test post
post = Post(title='Test Post', content='Test content',
slug='test-post', user_id=1, published=True)
db.session.add(post)
db.session.commit()
response = self.app.get('/api/posts')
self.assertEqual(response.status_code, 200)
data = response.get_json()
self.assertIn('posts', data)
self.assertEqual(len(data['posts']), 1)
def test_unauthorized_access(self):
"""Test unauthorized access to protected endpoints"""
response = self.app.post('/api/posts', json={
'title': 'Test Post',
'content': 'Test content'
})
self.assertEqual(response.status_code, 401)
if __name__ == '__main__':
unittest.main()
Sample Output
Successful Registration Response
{
"message": "User registered successfully",
"user": {
"id": 1,
"username": "johndoe",
"email": "john@example.com",
"first_name": "John",
"last_name": "Doe",
"role": "user",
"is_active": true,
"created_at": "2024-01-15T10:30:00",
"updated_at": "2024-01-15T10:30:00"
},
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTYzOTU2MjIwMCwianRpIjoiNGY1NjcxMmUtZGZjOS00MDk5LWEyYWUtMmZjMzg5OGY4MTM1IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MSwibmJmIjoxNjM5NTYyMjAwLCJleHAiOjE2Mzk1NjU4MDB9.X1P9z6v8QD2l0o4tH5nF2Qx8r7B3KkM9A6tE5yS8pW1"
}
{
"message": "User registered successfully",
"user": {
"id": 1,
"username": "johndoe",
"email": "john@example.com",
"first_name": "John",
"last_name": "Doe",
"role": "user",
"is_active": true,
"created_at": "2024-01-15T10:30:00",
"updated_at": "2024-01-15T10:30:00"
},
"access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTYzOTU2MjIwMCwianRpIjoiNGY1NjcxMmUtZGZjOS00MDk5LWEyYWUtMmZjMzg5OGY4MTM1IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MSwibmJmIjoxNjM5NTYyMjAwLCJleHAiOjE2Mzk1NjU4MDB9.X1P9z6v8QD2l0o4tH5nF2Qx8r7B3KkM9A6tE5yS8pW1"
}
Posts List Response
{
"posts": [
{
"id": 1,
"title": "Getting Started with Flask",
"content": "Flask is a lightweight web framework for Python...",
"slug": "getting-started-with-flask",
"published": true,
"user_id": 1,
"author": "johndoe",
"created_at": "2024-01-15T11:00:00",
"updated_at": "2024-01-15T11:00:00"
}
],
"pagination": {
"page": 1,
"pages": 1,
"per_page": 10,
"total": 1,
"has_next": false,
"has_prev": false
}
}
{
"posts": [
{
"id": 1,
"title": "Getting Started with Flask",
"content": "Flask is a lightweight web framework for Python...",
"slug": "getting-started-with-flask",
"published": true,
"user_id": 1,
"author": "johndoe",
"created_at": "2024-01-15T11:00:00",
"updated_at": "2024-01-15T11:00:00"
}
],
"pagination": {
"page": 1,
"pages": 1,
"per_page": 10,
"total": 1,
"has_next": false,
"has_prev": false
}
}
Admin Statistics Response
{
"stats": {
"total_users": 15,
"active_users": 14,
"total_posts": 42,
"published_posts": 38,
"recent_registrations": 3
}
}
{
"stats": {
"total_users": 15,
"active_users": 14,
"total_posts": 42,
"published_posts": 38,
"recent_registrations": 3
}
}
Security Best Practices
1. Password Security
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
def generate_secure_password():
"""Generate a secure random password"""
return secrets.token_urlsafe(16)
def validate_password_strength(password):
"""Validate password strength"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not any(c.isupper() for c in password):
return False, "Password must contain at least one uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain at least one lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain at least one digit"
return True, "Password is strong"
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
def generate_secure_password():
"""Generate a secure random password"""
return secrets.token_urlsafe(16)
def validate_password_strength(password):
"""Validate password strength"""
if len(password) < 8:
return False, "Password must be at least 8 characters long"
if not any(c.isupper() for c in password):
return False, "Password must contain at least one uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain at least one lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain at least one digit"
return True, "Password is strong"
2. Rate Limiting Configuration
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# Configure rate limiting
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["1000 per day", "100 per hour"],
storage_uri="redis://localhost:6379"
)
# Custom rate limit for sensitive endpoints
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
# Login implementation
pass
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# Configure rate limiting
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["1000 per day", "100 per hour"],
storage_uri="redis://localhost:6379"
)
# Custom rate limit for sensitive endpoints
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
# Login implementation
pass
3. Input Sanitization
import bleach
from html import escape
def sanitize_input(text):
"""Sanitize user input"""
# Remove HTML tags and escape special characters
cleaned = bleach.clean(text, tags=[], strip=True)
return escape(cleaned)
def validate_slug(slug):
"""Validate URL slug format"""
import re
pattern = r'^[a-z0-9]+(?:-[a-z0-9]+)*$'
return bool(re.match(pattern, slug))
import bleach
from html import escape
def sanitize_input(text):
"""Sanitize user input"""
# Remove HTML tags and escape special characters
cleaned = bleach.clean(text, tags=[], strip=True)
return escape(cleaned)
def validate_slug(slug):
"""Validate URL slug format"""
import re
pattern = r'^[a-z0-9]+(?:-[a-z0-9]+)*$'
return bool(re.match(pattern, slug))
Performance Optimization
1. Database Optimization
# Add database indexes
class User(db.Model):
# ... existing fields ...
__table_args__ = (
db.Index('idx_username_email', 'username', 'email'),
db.Index('idx_created_at', 'created_at'),
)
# Use query optimization
def get_user_posts_optimized(user_id, limit=10):
"""Get user posts with optimized query"""
return Post.query.filter_by(user_id=user_id)\
.options(db.joinedload(Post.author))\
.order_by(Post.created_at.desc())\
.limit(limit).all()
# Add database indexes
class User(db.Model):
# ... existing fields ...
__table_args__ = (
db.Index('idx_username_email', 'username', 'email'),
db.Index('idx_created_at', 'created_at'),
)
# Use query optimization
def get_user_posts_optimized(user_id, limit=10):
"""Get user posts with optimized query"""
return Post.query.filter_by(user_id=user_id)\
.options(db.joinedload(Post.author))\
.order_by(Post.created_at.desc())\
.limit(limit).all()
2. Caching Implementation
from flask_caching import Cache
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
@app.route('/api/posts')
@cache.cached(timeout=300, query_string=True)
def get_posts_cached():
"""Cached posts endpoint"""
# Implementation with caching
pass
@cache.memoize(timeout=600)
def get_user_stats(user_id):
"""Cached user statistics"""
user = User.query.get(user_id)
return {
'post_count': Post.query.filter_by(user_id=user_id).count(),
'published_count': Post.query.filter_by(user_id=user_id, published=True).count()
}
from flask_caching import Cache
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
@app.route('/api/posts')
@cache.cached(timeout=300, query_string=True)
def get_posts_cached():
"""Cached posts endpoint"""
# Implementation with caching
pass
@cache.memoize(timeout=600)
def get_user_stats(user_id):
"""Cached user statistics"""
user = User.query.get(user_id)
return {
'post_count': Post.query.filter_by(user_id=user_id).count(),
'published_count': Post.query.filter_by(user_id=user_id, published=True).count()
}
3. Pagination Optimization
def paginate_query(query, page, per_page, max_per_page=100):
"""Optimized pagination helper"""
per_page = min(per_page, max_per_page) # Prevent abuse
return query.paginate(
page=page,
per_page=per_page,
error_out=False
)
def get_pagination_meta(pagination):
"""Get pagination metadata"""
return {
'page': pagination.page,
'pages': pagination.pages,
'per_page': pagination.per_page,
'total': pagination.total,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev,
'next_num': pagination.next_num,
'prev_num': pagination.prev_num
}
def paginate_query(query, page, per_page, max_per_page=100):
"""Optimized pagination helper"""
per_page = min(per_page, max_per_page) # Prevent abuse
return query.paginate(
page=page,
per_page=per_page,
error_out=False
)
def get_pagination_meta(pagination):
"""Get pagination metadata"""
return {
'page': pagination.page,
'pages': pagination.pages,
'per_page': pagination.per_page,
'total': pagination.total,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev,
'next_num': pagination.next_num,
'prev_num': pagination.prev_num
}
Troubleshooting
Common Issues
1. JWT Token Errors
# Solution: Check token expiration and format
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
return jsonify({'error': 'Token has expired', 'code': 'TOKEN_EXPIRED'}), 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
return jsonify({'error': 'Invalid token format', 'code': 'INVALID_TOKEN'}), 401
# Solution: Check token expiration and format
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
return jsonify({'error': 'Token has expired', 'code': 'TOKEN_EXPIRED'}), 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
return jsonify({'error': 'Invalid token format', 'code': 'INVALID_TOKEN'}), 401
2. Database Connection Issues
# Solution: Add connection pooling and error handling
from sqlalchemy import event
from sqlalchemy.pool import Pool
@event.listens_for(Pool, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
# Solution: Add connection pooling and error handling
from sqlalchemy import event
from sqlalchemy.pool import Pool
@event.listens_for(Pool, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
if 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
3. Rate Limiting Issues
# Solution: Configure appropriate limits and storage
@app.errorhandler(429)
def handle_rate_limit_exceeded(error):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': error.retry_after
}), 429
# Solution: Configure appropriate limits and storage
@app.errorhandler(429)
def handle_rate_limit_exceeded(error):
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': error.retry_after
}), 429
Extensions and Improvements
1. Refresh Token Implementation
@app.route('/api/auth/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""Refresh access token"""
current_user_id = get_jwt_identity()
new_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': new_token}), 200
@app.route('/api/auth/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""Refresh access token"""
current_user_id = get_jwt_identity()
new_token = create_access_token(identity=current_user_id)
return jsonify({'access_token': new_token}), 200
2. Email Verification System
from flask_mail import Mail, Message
import secrets
def send_verification_email(user):
"""Send email verification"""
token = secrets.token_urlsafe(32)
# Store token in database
verification = EmailVerification(user_id=user.id, token=token)
db.session.add(verification)
db.session.commit()
# Send email
msg = Message('Verify Your Email',
recipients=[user.email],
body=f'Click to verify: {url_for("verify_email", token=token, _external=True)}')
mail.send(msg)
from flask_mail import Mail, Message
import secrets
def send_verification_email(user):
"""Send email verification"""
token = secrets.token_urlsafe(32)
# Store token in database
verification = EmailVerification(user_id=user.id, token=token)
db.session.add(verification)
db.session.commit()
# Send email
msg = Message('Verify Your Email',
recipients=[user.email],
body=f'Click to verify: {url_for("verify_email", token=token, _external=True)}')
mail.send(msg)
3. API Versioning
from flask import Blueprint
# API v1
api_v1 = Blueprint('api_v1', __name__, url_prefix='/api/v1')
@api_v1.route('/posts')
def get_posts_v1():
"""Posts endpoint v1"""
pass
# API v2
api_v2 = Blueprint('api_v2', __name__, url_prefix='/api/v2')
@api_v2.route('/posts')
def get_posts_v2():
"""Posts endpoint v2 with enhanced features"""
pass
app.register_blueprint(api_v1)
app.register_blueprint(api_v2)
from flask import Blueprint
# API v1
api_v1 = Blueprint('api_v1', __name__, url_prefix='/api/v1')
@api_v1.route('/posts')
def get_posts_v1():
"""Posts endpoint v1"""
pass
# API v2
api_v2 = Blueprint('api_v2', __name__, url_prefix='/api/v2')
@api_v2.route('/posts')
def get_posts_v2():
"""Posts endpoint v2 with enhanced features"""
pass
app.register_blueprint(api_v1)
app.register_blueprint(api_v2)
Next Steps
After mastering this REST API, consider:
- GraphQL Integration: Add GraphQL endpoints for flexible queries
- Microservices Architecture: Split into smaller, focused services
- WebSocket Support: Add real-time features with Socket.IO
- Advanced Authentication: OAuth2, SAML, or multi-factor authentication
- API Gateway: Implement API gateway for advanced routing and monitoring
Resources
- Flask Documentation
- Flask-JWT-Extended
- REST API Best Practices
- OpenAPI Specification
- JWT.io - JWT debugger and information
Conclusion
This REST API with JWT authentication demonstrates professional API development practices with Flask. It includes comprehensive user management, secure authentication, CRUD operations, rate limiting, and proper error handling. The modular design allows for easy extension and customization for various use cases.
The API follows RESTful principles and includes security best practices, making it suitable for production use. The comprehensive testing suite and documentation ensure maintainability and ease of use for other developers. 🔐🐍
Was this page helpful?
Let us know how we did