Skip to content

REST API with JWT Authentication

Abstract

Create a comprehensive REST API with JWT authentication using Flask that provides secure user management, CRUD operations, and role-based access control. This project demonstrates advanced backend development, authentication systems, API security, and professional API design patterns.

Prerequisites

  • Python 3.7 or above
  • Text Editor or IDE
  • Solid understanding of Python syntax and web development
  • Knowledge of Flask framework and REST principles
  • Familiarity with JWT tokens and authentication concepts
  • Understanding of database operations and SQLAlchemy
  • Basic knowledge of API security and testing tools

Getting Started

Create a new project

  1. Create a new project folder and name it restAPIAuthrestAPIAuth.
  2. Create a new file and name it restapi.pyrestapi.py.
  3. Install required dependencies: pip install flask flask-jwt-extended flask-sqlalchemypip install flask flask-jwt-extended flask-sqlalchemy
  4. Open the project folder in your favorite text editor or IDE.
  5. Copy the code below and paste it into your restapi.pyrestapi.py file.

Write the code

  1. Add the following code to your restapi.pyrestapi.py file.
⚙️ REST API with JWT Authentication
REST API with JWT Authentication
# Simple Flask REST API
 
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
 
app = Flask(__name__)
CORS(app)  # Enable CORS for all routes
 
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
 
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
class DatabaseManager:
    def __init__(self, db_path):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """Initialize the database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Users table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1
                )
            ''')
            
            # API Keys table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_keys (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    api_key VARCHAR(255) UNIQUE NOT NULL,
                    name VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    last_used TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Tasks table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS tasks (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title VARCHAR(200) NOT NULL,
                    description TEXT,
                    completed BOOLEAN DEFAULT 0,
                    priority VARCHAR(10) DEFAULT 'medium',
                    due_date DATE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    user_id INTEGER,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Products table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name VARCHAR(200) NOT NULL,
                    description TEXT,
                    price DECIMAL(10,2) NOT NULL,
                    category VARCHAR(100),
                    stock_quantity INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # API Logs table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    endpoint VARCHAR(200),
                    method VARCHAR(10),
                    ip_address VARCHAR(45),
                    user_agent TEXT,
                    api_key VARCHAR(255),
                    response_code INTEGER,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            conn.commit()
            
        # Insert sample data
        self.insert_sample_data()
    
    def insert_sample_data(self):
        """Insert sample data for demonstration"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Check if sample data already exists
            cursor.execute("SELECT COUNT(*) FROM users")
            if cursor.fetchone()[0] > 0:
                return
            
            # Sample users
            sample_users = [
                ('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
                ('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
                ('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
            ]
            
            cursor.executemany(
                "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
                sample_users
            )
            
            # Sample API keys
            api_keys = [
                (1, 'demo_key_123456789', 'Demo Key'),
                (2, 'test_key_987654321', 'Test Key'),
                (3, 'admin_key_555666777', 'Admin Key')
            ]
            
            cursor.executemany(
                "INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
                api_keys
            )
            
            # Sample tasks
            sample_tasks = [
                ('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
                ('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
                ('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
                ('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
            ]
            
            cursor.executemany(
                "INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
                sample_tasks
            )
            
            # Sample products
            sample_products = [
                ('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
                ('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
                ('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
                ('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
                ('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
            ]
            
            cursor.executemany(
                "INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
                sample_products
            )
            
            conn.commit()
 
# Initialize database
db_manager = DatabaseManager(DATABASE)
 
def require_api_key(f):
    """Decorator to require API key authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        # Validate API key
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT ak.id, ak.user_id, u.username 
                FROM api_keys ak 
                JOIN users u ON ak.user_id = u.id 
                WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
            ''', (api_key,))
            
            result = cursor.fetchone()
            
            if not result:
                log_api_request(request, api_key, 401)
                return jsonify({'error': 'Invalid API key'}), 401
            
            # Update last used timestamp
            cursor.execute(
                "UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
                (api_key,)
            )
            conn.commit()
        
        # Log successful request
        log_api_request(request, api_key, 200)
        
        # Add user info to request context
        request.current_user_id = result[1]
        request.current_username = result[2]
        
        return f(*args, **kwargs)
    
    return decorated_function
 
def log_api_request(request_obj, api_key, response_code):
    """Log API request for monitoring"""
    try:
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                request_obj.endpoint,
                request_obj.method,
                request_obj.remote_addr,
                request_obj.headers.get('User-Agent', ''),
                api_key,
                response_code
            ))
            conn.commit()
    except Exception as e:
        logger.error(f"Error logging API request: {e}")
 
def get_db_connection():
    """Get database connection with row factory"""
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn
 
# API Documentation endpoint
@app.route('/')
def api_documentation():
    """API Documentation"""
    docs = {
        "title": "Simple Flask REST API",
        "version": "1.0.0",
        "description": "A demonstration REST API with CRUD operations",
        "base_url": request.base_url,
        "authentication": {
            "type": "API Key",
            "header": "X-API-Key",
            "demo_keys": [
                "demo_key_123456789",
                "test_key_987654321",
                "admin_key_555666777"
            ]
        },
        "endpoints": {
            "Tasks": {
                "GET /api/tasks": "Get all tasks",
                "GET /api/tasks/<id>": "Get specific task",
                "POST /api/tasks": "Create new task",
                "PUT /api/tasks/<id>": "Update task",
                "DELETE /api/tasks/<id>": "Delete task"
            },
            "Products": {
                "GET /api/products": "Get all products",
                "GET /api/products/<id>": "Get specific product",
                "POST /api/products": "Create new product",
                "PUT /api/products/<id>": "Update product",
                "DELETE /api/products/<id>": "Delete product"
            },
            "Users": {
                "GET /api/users": "Get all users",
                "GET /api/users/<id>": "Get specific user",
                "POST /api/users": "Create new user"
            },
            "Statistics": {
                "GET /api/stats": "Get API usage statistics"
            }
        },
        "examples": {
            "create_task": {
                "method": "POST",
                "url": "/api/tasks",
                "headers": {"X-API-Key": "demo_key_123456789"},
                "body": {
                    "title": "New Task",
                    "description": "Task description",
                    "priority": "high",
                    "due_date": "2023-12-31"
                }
            }
        }
    }
    return jsonify(docs)
 
# Health check endpoint
@app.route('/health')
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'version': '1.0.0'
    })
 
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
    """Get all tasks with optional filtering"""
    page = request.args.get('page', 1, type=int)
    per_page = min(request.args.get('per_page', 10, type=int), 100)
    completed = request.args.get('completed', type=bool)
    priority = request.args.get('priority')
    
    query = "SELECT * FROM tasks WHERE 1=1"
    params = []
    
    if completed is not None:
        query += " AND completed = ?"
        params.append(completed)
    
    if priority:
        query += " AND priority = ?"
        params.append(priority)
    
    query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
    params.extend([per_page, (page - 1) * per_page])
    
    with get_db_connection() as conn:
        tasks = conn.execute(query, params).fetchall()
        
        # Get total count
        count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
        count_params = []
        if completed is not None:
            count_query += " AND completed = ?"
            count_params.append(completed)
        if priority:
            count_query += " AND priority = ?"
            count_params.append(priority)
        
        total = conn.execute(count_query, count_params).fetchone()[0]
    
    return jsonify({
        'tasks': [dict(task) for task in tasks],
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': total,
            'pages': (total + per_page - 1) // per_page
        }
    })
 
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
    """Get specific task"""
    with get_db_connection() as conn:
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    if task is None:
        abort(404)
    
    return jsonify(dict(task))
 
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
    """Create new task"""
    data = request.get_json()
    
    if not data or 'title' not in data:
        return jsonify({'error': 'Title is required'}), 400
    
    title = data['title']
    description = data.get('description', '')
    priority = data.get('priority', 'medium')
    due_date = data.get('due_date')
    user_id = request.current_user_id
    
    # Validate priority
    if priority not in ['low', 'medium', 'high', 'critical']:
        return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO tasks (title, description, priority, due_date, user_id)
            VALUES (?, ?, ?, ?, ?)
        ''', (title, description, priority, due_date, user_id))
        
        task_id = cursor.lastrowid
        conn.commit()
        
        # Get the created task
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(task)), 201
 
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
    """Update task"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    # Check if task exists
    with get_db_connection() as conn:
        existing_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
        
        if existing_task is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['title', 'description', 'completed', 'priority', 'due_date']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(task_id)
            
            query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated task
        updated_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(updated_task))
 
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
    """Delete task"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
    """Get all products with optional filtering"""
    category = request.args.get('category')
    min_price = request.args.get('min_price', type=float)
    max_price = request.args.get('max_price', type=float)
    in_stock = request.args.get('in_stock', type=bool)
    
    query = "SELECT * FROM products WHERE 1=1"
    params = []
    
    if category:
        query += " AND category = ?"
        params.append(category)
    
    if min_price is not None:
        query += " AND price >= ?"
        params.append(min_price)
    
    if max_price is not None:
        query += " AND price <= ?"
        params.append(max_price)
    
    if in_stock is not None:
        if in_stock:
            query += " AND stock_quantity > 0"
        else:
            query += " AND stock_quantity = 0"
    
    query += " ORDER BY name"
    
    with get_db_connection() as conn:
        products = conn.execute(query, params).fetchall()
    
    return jsonify({
        'products': [dict(product) for product in products]
    })
 
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
    """Get specific product"""
    with get_db_connection() as conn:
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    if product is None:
        abort(404)
    
    return jsonify(dict(product))
 
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
    """Create new product"""
    data = request.get_json()
    
    required_fields = ['name', 'price']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    name = data['name']
    description = data.get('description', '')
    price = data['price']
    category = data.get('category', '')
    stock_quantity = data.get('stock_quantity', 0)
    
    # Validate price
    try:
        price = float(price)
        if price < 0:
            raise ValueError()
    except (ValueError, TypeError):
        return jsonify({'error': 'Price must be a valid positive number'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO products (name, description, price, category, stock_quantity)
            VALUES (?, ?, ?, ?, ?)
        ''', (name, description, price, category, stock_quantity))
        
        product_id = cursor.lastrowid
        conn.commit()
        
        # Get the created product
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(product)), 201
 
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
    """Update product"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    with get_db_connection() as conn:
        existing_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
        
        if existing_product is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(product_id)
            
            query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated product
        updated_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(updated_product))
 
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
    """Delete product"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
    """Get all users (limited info)"""
    with get_db_connection() as conn:
        users = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
        ).fetchall()
    
    return jsonify({
        'users': [dict(user) for user in users]
    })
 
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
    """Get specific user"""
    with get_db_connection() as conn:
        user = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
            (user_id,)
        ).fetchone()
    
    if user is None:
        abort(404)
    
    return jsonify(dict(user))
 
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
    """Create new user"""
    data = request.get_json()
    
    required_fields = ['username', 'email', 'password']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    username = data['username']
    email = data['email']
    password = data['password']
    
    # Hash password
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    try:
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO users (username, email, password_hash)
                VALUES (?, ?, ?)
            ''', (username, email, password_hash))
            
            user_id = cursor.lastrowid
            conn.commit()
            
            # Get the created user (without password)
            user = conn.execute(
                "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
                (user_id,)
            ).fetchone()
        
        return jsonify(dict(user)), 201
    
    except sqlite3.IntegrityError as e:
        if 'username' in str(e):
            return jsonify({'error': 'Username already exists'}), 400
        elif 'email' in str(e):
            return jsonify({'error': 'Email already exists'}), 400
        else:
            return jsonify({'error': 'Database constraint violation'}), 400
 
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
    """Get API usage statistics"""
    with get_db_connection() as conn:
        # Basic counts
        task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
        product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
        user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
        
        # API usage stats
        total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
        
        # Recent activity (last 24 hours)
        yesterday = (datetime.now() - timedelta(days=1)).isoformat()
        recent_requests = conn.execute(
            "SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
        ).fetchone()[0]
        
        # Top endpoints
        top_endpoints = conn.execute('''
            SELECT endpoint, COUNT(*) as count 
            FROM api_logs 
            GROUP BY endpoint 
            ORDER BY count DESC 
            LIMIT 5
        ''').fetchall()
        
        # Response code distribution
        response_codes = conn.execute('''
            SELECT response_code, COUNT(*) as count 
            FROM api_logs 
            GROUP BY response_code 
            ORDER BY response_code
        ''').fetchall()
    
    return jsonify({
        'database_stats': {
            'tasks': task_count,
            'products': product_count,
            'users': user_count
        },
        'api_usage': {
            'total_requests': total_requests,
            'requests_last_24h': recent_requests,
            'top_endpoints': [dict(row) for row in top_endpoints],
            'response_codes': [dict(row) for row in response_codes]
        },
        'timestamp': datetime.now().isoformat()
    })
 
# Error handlers
@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Resource not found'}), 404
 
@app.errorhandler(400)
def bad_request(error):
    return jsonify({'error': 'Bad request'}), 400
 
@app.errorhandler(500)
def internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500
 
def main():
    """Main function to run the Flask REST API"""
    print("Simple Flask REST API")
    print("=====================")
    print("\nAPI Documentation: http://localhost:5000/")
    print("Health Check: http://localhost:5000/health")
    print("\nDemo API Keys:")
    print("- demo_key_123456789")
    print("- test_key_987654321") 
    print("- admin_key_555666777")
    print("\nExample usage:")
    print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
    print("\nStarting server...")
    
    app.run(debug=True, host='0.0.0.0', port=5000)
 
if __name__ == "__main__":
    main()
 
REST API with JWT Authentication
# Simple Flask REST API
 
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
 
app = Flask(__name__)
CORS(app)  # Enable CORS for all routes
 
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
 
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
class DatabaseManager:
    def __init__(self, db_path):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """Initialize the database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Users table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1
                )
            ''')
            
            # API Keys table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_keys (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    api_key VARCHAR(255) UNIQUE NOT NULL,
                    name VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    last_used TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Tasks table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS tasks (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title VARCHAR(200) NOT NULL,
                    description TEXT,
                    completed BOOLEAN DEFAULT 0,
                    priority VARCHAR(10) DEFAULT 'medium',
                    due_date DATE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    user_id INTEGER,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Products table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name VARCHAR(200) NOT NULL,
                    description TEXT,
                    price DECIMAL(10,2) NOT NULL,
                    category VARCHAR(100),
                    stock_quantity INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # API Logs table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    endpoint VARCHAR(200),
                    method VARCHAR(10),
                    ip_address VARCHAR(45),
                    user_agent TEXT,
                    api_key VARCHAR(255),
                    response_code INTEGER,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            conn.commit()
            
        # Insert sample data
        self.insert_sample_data()
    
    def insert_sample_data(self):
        """Insert sample data for demonstration"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Check if sample data already exists
            cursor.execute("SELECT COUNT(*) FROM users")
            if cursor.fetchone()[0] > 0:
                return
            
            # Sample users
            sample_users = [
                ('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
                ('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
                ('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
            ]
            
            cursor.executemany(
                "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
                sample_users
            )
            
            # Sample API keys
            api_keys = [
                (1, 'demo_key_123456789', 'Demo Key'),
                (2, 'test_key_987654321', 'Test Key'),
                (3, 'admin_key_555666777', 'Admin Key')
            ]
            
            cursor.executemany(
                "INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
                api_keys
            )
            
            # Sample tasks
            sample_tasks = [
                ('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
                ('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
                ('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
                ('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
            ]
            
            cursor.executemany(
                "INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
                sample_tasks
            )
            
            # Sample products
            sample_products = [
                ('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
                ('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
                ('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
                ('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
                ('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
            ]
            
            cursor.executemany(
                "INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
                sample_products
            )
            
            conn.commit()
 
# Initialize database
db_manager = DatabaseManager(DATABASE)
 
def require_api_key(f):
    """Decorator to require API key authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        # Validate API key
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT ak.id, ak.user_id, u.username 
                FROM api_keys ak 
                JOIN users u ON ak.user_id = u.id 
                WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
            ''', (api_key,))
            
            result = cursor.fetchone()
            
            if not result:
                log_api_request(request, api_key, 401)
                return jsonify({'error': 'Invalid API key'}), 401
            
            # Update last used timestamp
            cursor.execute(
                "UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
                (api_key,)
            )
            conn.commit()
        
        # Log successful request
        log_api_request(request, api_key, 200)
        
        # Add user info to request context
        request.current_user_id = result[1]
        request.current_username = result[2]
        
        return f(*args, **kwargs)
    
    return decorated_function
 
def log_api_request(request_obj, api_key, response_code):
    """Log API request for monitoring"""
    try:
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                request_obj.endpoint,
                request_obj.method,
                request_obj.remote_addr,
                request_obj.headers.get('User-Agent', ''),
                api_key,
                response_code
            ))
            conn.commit()
    except Exception as e:
        logger.error(f"Error logging API request: {e}")
 
def get_db_connection():
    """Get database connection with row factory"""
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn
 
# API Documentation endpoint
@app.route('/')
def api_documentation():
    """API Documentation"""
    docs = {
        "title": "Simple Flask REST API",
        "version": "1.0.0",
        "description": "A demonstration REST API with CRUD operations",
        "base_url": request.base_url,
        "authentication": {
            "type": "API Key",
            "header": "X-API-Key",
            "demo_keys": [
                "demo_key_123456789",
                "test_key_987654321",
                "admin_key_555666777"
            ]
        },
        "endpoints": {
            "Tasks": {
                "GET /api/tasks": "Get all tasks",
                "GET /api/tasks/<id>": "Get specific task",
                "POST /api/tasks": "Create new task",
                "PUT /api/tasks/<id>": "Update task",
                "DELETE /api/tasks/<id>": "Delete task"
            },
            "Products": {
                "GET /api/products": "Get all products",
                "GET /api/products/<id>": "Get specific product",
                "POST /api/products": "Create new product",
                "PUT /api/products/<id>": "Update product",
                "DELETE /api/products/<id>": "Delete product"
            },
            "Users": {
                "GET /api/users": "Get all users",
                "GET /api/users/<id>": "Get specific user",
                "POST /api/users": "Create new user"
            },
            "Statistics": {
                "GET /api/stats": "Get API usage statistics"
            }
        },
        "examples": {
            "create_task": {
                "method": "POST",
                "url": "/api/tasks",
                "headers": {"X-API-Key": "demo_key_123456789"},
                "body": {
                    "title": "New Task",
                    "description": "Task description",
                    "priority": "high",
                    "due_date": "2023-12-31"
                }
            }
        }
    }
    return jsonify(docs)
 
# Health check endpoint
@app.route('/health')
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'version': '1.0.0'
    })
 
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
    """Get all tasks with optional filtering"""
    page = request.args.get('page', 1, type=int)
    per_page = min(request.args.get('per_page', 10, type=int), 100)
    completed = request.args.get('completed', type=bool)
    priority = request.args.get('priority')
    
    query = "SELECT * FROM tasks WHERE 1=1"
    params = []
    
    if completed is not None:
        query += " AND completed = ?"
        params.append(completed)
    
    if priority:
        query += " AND priority = ?"
        params.append(priority)
    
    query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
    params.extend([per_page, (page - 1) * per_page])
    
    with get_db_connection() as conn:
        tasks = conn.execute(query, params).fetchall()
        
        # Get total count
        count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
        count_params = []
        if completed is not None:
            count_query += " AND completed = ?"
            count_params.append(completed)
        if priority:
            count_query += " AND priority = ?"
            count_params.append(priority)
        
        total = conn.execute(count_query, count_params).fetchone()[0]
    
    return jsonify({
        'tasks': [dict(task) for task in tasks],
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': total,
            'pages': (total + per_page - 1) // per_page
        }
    })
 
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
    """Get specific task"""
    with get_db_connection() as conn:
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    if task is None:
        abort(404)
    
    return jsonify(dict(task))
 
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
    """Create new task"""
    data = request.get_json()
    
    if not data or 'title' not in data:
        return jsonify({'error': 'Title is required'}), 400
    
    title = data['title']
    description = data.get('description', '')
    priority = data.get('priority', 'medium')
    due_date = data.get('due_date')
    user_id = request.current_user_id
    
    # Validate priority
    if priority not in ['low', 'medium', 'high', 'critical']:
        return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO tasks (title, description, priority, due_date, user_id)
            VALUES (?, ?, ?, ?, ?)
        ''', (title, description, priority, due_date, user_id))
        
        task_id = cursor.lastrowid
        conn.commit()
        
        # Get the created task
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(task)), 201
 
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
    """Update task"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    # Check if task exists
    with get_db_connection() as conn:
        existing_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
        
        if existing_task is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['title', 'description', 'completed', 'priority', 'due_date']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(task_id)
            
            query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated task
        updated_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(updated_task))
 
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
    """Delete task"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
    """Get all products with optional filtering"""
    category = request.args.get('category')
    min_price = request.args.get('min_price', type=float)
    max_price = request.args.get('max_price', type=float)
    in_stock = request.args.get('in_stock', type=bool)
    
    query = "SELECT * FROM products WHERE 1=1"
    params = []
    
    if category:
        query += " AND category = ?"
        params.append(category)
    
    if min_price is not None:
        query += " AND price >= ?"
        params.append(min_price)
    
    if max_price is not None:
        query += " AND price <= ?"
        params.append(max_price)
    
    if in_stock is not None:
        if in_stock:
            query += " AND stock_quantity > 0"
        else:
            query += " AND stock_quantity = 0"
    
    query += " ORDER BY name"
    
    with get_db_connection() as conn:
        products = conn.execute(query, params).fetchall()
    
    return jsonify({
        'products': [dict(product) for product in products]
    })
 
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
    """Get specific product"""
    with get_db_connection() as conn:
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    if product is None:
        abort(404)
    
    return jsonify(dict(product))
 
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
    """Create new product"""
    data = request.get_json()
    
    required_fields = ['name', 'price']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    name = data['name']
    description = data.get('description', '')
    price = data['price']
    category = data.get('category', '')
    stock_quantity = data.get('stock_quantity', 0)
    
    # Validate price
    try:
        price = float(price)
        if price < 0:
            raise ValueError()
    except (ValueError, TypeError):
        return jsonify({'error': 'Price must be a valid positive number'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO products (name, description, price, category, stock_quantity)
            VALUES (?, ?, ?, ?, ?)
        ''', (name, description, price, category, stock_quantity))
        
        product_id = cursor.lastrowid
        conn.commit()
        
        # Get the created product
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(product)), 201
 
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
    """Update product"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    with get_db_connection() as conn:
        existing_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
        
        if existing_product is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(product_id)
            
            query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated product
        updated_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(updated_product))
 
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
    """Delete product"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
    """Get all users (limited info)"""
    with get_db_connection() as conn:
        users = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
        ).fetchall()
    
    return jsonify({
        'users': [dict(user) for user in users]
    })
 
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
    """Get specific user"""
    with get_db_connection() as conn:
        user = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
            (user_id,)
        ).fetchone()
    
    if user is None:
        abort(404)
    
    return jsonify(dict(user))
 
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
    """Create new user"""
    data = request.get_json()
    
    required_fields = ['username', 'email', 'password']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    username = data['username']
    email = data['email']
    password = data['password']
    
    # Hash password
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    try:
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO users (username, email, password_hash)
                VALUES (?, ?, ?)
            ''', (username, email, password_hash))
            
            user_id = cursor.lastrowid
            conn.commit()
            
            # Get the created user (without password)
            user = conn.execute(
                "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
                (user_id,)
            ).fetchone()
        
        return jsonify(dict(user)), 201
    
    except sqlite3.IntegrityError as e:
        if 'username' in str(e):
            return jsonify({'error': 'Username already exists'}), 400
        elif 'email' in str(e):
            return jsonify({'error': 'Email already exists'}), 400
        else:
            return jsonify({'error': 'Database constraint violation'}), 400
 
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
    """Get API usage statistics"""
    with get_db_connection() as conn:
        # Basic counts
        task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
        product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
        user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
        
        # API usage stats
        total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
        
        # Recent activity (last 24 hours)
        yesterday = (datetime.now() - timedelta(days=1)).isoformat()
        recent_requests = conn.execute(
            "SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
        ).fetchone()[0]
        
        # Top endpoints
        top_endpoints = conn.execute('''
            SELECT endpoint, COUNT(*) as count 
            FROM api_logs 
            GROUP BY endpoint 
            ORDER BY count DESC 
            LIMIT 5
        ''').fetchall()
        
        # Response code distribution
        response_codes = conn.execute('''
            SELECT response_code, COUNT(*) as count 
            FROM api_logs 
            GROUP BY response_code 
            ORDER BY response_code
        ''').fetchall()
    
    return jsonify({
        'database_stats': {
            'tasks': task_count,
            'products': product_count,
            'users': user_count
        },
        'api_usage': {
            'total_requests': total_requests,
            'requests_last_24h': recent_requests,
            'top_endpoints': [dict(row) for row in top_endpoints],
            'response_codes': [dict(row) for row in response_codes]
        },
        'timestamp': datetime.now().isoformat()
    })
 
# Error handlers
@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Resource not found'}), 404
 
@app.errorhandler(400)
def bad_request(error):
    return jsonify({'error': 'Bad request'}), 400
 
@app.errorhandler(500)
def internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500
 
def main():
    """Main function to run the Flask REST API"""
    print("Simple Flask REST API")
    print("=====================")
    print("\nAPI Documentation: http://localhost:5000/")
    print("Health Check: http://localhost:5000/health")
    print("\nDemo API Keys:")
    print("- demo_key_123456789")
    print("- test_key_987654321") 
    print("- admin_key_555666777")
    print("\nExample usage:")
    print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
    print("\nStarting server...")
    
    app.run(debug=True, host='0.0.0.0', port=5000)
 
if __name__ == "__main__":
    main()
 
  1. Save the file.
  2. Run the following command to start the API server.
command
C:\Users\username\Documents\restAPIAuth> python restapi.py
 * Running on http://127.0.0.1:5000
REST API with JWT Authentication
===============================
✓ Database initialized
✓ JWT configured
✓ Routes registered
Available Endpoints:
POST /auth/register - User registration
POST /auth/login - User login
GET /api/protected - Protected endpoint
GET /api/users - Get all users (admin only)
command
C:\Users\username\Documents\restAPIAuth> python restapi.py
 * Running on http://127.0.0.1:5000
REST API with JWT Authentication
===============================
✓ Database initialized
✓ JWT configured
✓ Routes registered
Available Endpoints:
POST /auth/register - User registration
POST /auth/login - User login
GET /api/protected - Protected endpoint
GET /api/users - Get all users (admin only)

Explanation

  1. The from flask_jwt_extended import JWTManagerfrom flask_jwt_extended import JWTManager imports JWT functionality for secure authentication.
  2. The flask_sqlalchemyflask_sqlalchemy provides ORM capabilities for database operations and user management.
  3. User registration includes password hashing and validation for secure account creation.
  4. JWT token generation creates secure access tokens with expiration times.
  5. Protected endpoints require valid JWT tokens for access authorization.
  6. Role-based access control differentiates between user and admin privileges.
  7. Password hashing uses secure algorithms to protect user credentials.
  8. Token validation ensures only authenticated users can access protected resources.
  9. CRUD operations provide complete data management functionality.
  10. Error handling returns appropriate HTTP status codes and error messages.
  11. Input validation prevents malformed requests and security vulnerabilities.
  12. Rate limiting protects the API from abuse and excessive requests.

Next Steps

Congratulations! You have successfully created a REST API with JWT Authentication in Python. Experiment with the code and see if you can modify the application. Here are a few suggestions:

  • Add refresh token functionality for extended sessions
  • Implement OAuth2 integration with Google/Facebook
  • Create comprehensive API documentation with Swagger
  • Add email verification for user registration
  • Implement password recovery and reset features
  • Create API versioning strategies
  • Add comprehensive logging and monitoring
  • Implement database migrations and backups
  • Create automated testing suites

Conclusion

In this project, you learned how to create a REST API with JWT Authentication in Python using Flask. You also learned about secure authentication, token management, API security, and implementing professional authentication systems. You can find the source code on GitHub

The Complete Code

⚙️ REST API with JWT Authentication
REST API with JWT Authentication
# Simple Flask REST API
 
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
 
app = Flask(__name__)
CORS(app)  # Enable CORS for all routes
 
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
 
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
class DatabaseManager:
    def __init__(self, db_path):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """Initialize the database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Users table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1
                )
            ''')
            
            # API Keys table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_keys (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    api_key VARCHAR(255) UNIQUE NOT NULL,
                    name VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    last_used TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Tasks table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS tasks (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title VARCHAR(200) NOT NULL,
                    description TEXT,
                    completed BOOLEAN DEFAULT 0,
                    priority VARCHAR(10) DEFAULT 'medium',
                    due_date DATE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    user_id INTEGER,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Products table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name VARCHAR(200) NOT NULL,
                    description TEXT,
                    price DECIMAL(10,2) NOT NULL,
                    category VARCHAR(100),
                    stock_quantity INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # API Logs table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    endpoint VARCHAR(200),
                    method VARCHAR(10),
                    ip_address VARCHAR(45),
                    user_agent TEXT,
                    api_key VARCHAR(255),
                    response_code INTEGER,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            conn.commit()
            
        # Insert sample data
        self.insert_sample_data()
    
    def insert_sample_data(self):
        """Insert sample data for demonstration"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Check if sample data already exists
            cursor.execute("SELECT COUNT(*) FROM users")
            if cursor.fetchone()[0] > 0:
                return
            
            # Sample users
            sample_users = [
                ('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
                ('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
                ('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
            ]
            
            cursor.executemany(
                "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
                sample_users
            )
            
            # Sample API keys
            api_keys = [
                (1, 'demo_key_123456789', 'Demo Key'),
                (2, 'test_key_987654321', 'Test Key'),
                (3, 'admin_key_555666777', 'Admin Key')
            ]
            
            cursor.executemany(
                "INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
                api_keys
            )
            
            # Sample tasks
            sample_tasks = [
                ('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
                ('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
                ('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
                ('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
            ]
            
            cursor.executemany(
                "INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
                sample_tasks
            )
            
            # Sample products
            sample_products = [
                ('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
                ('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
                ('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
                ('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
                ('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
            ]
            
            cursor.executemany(
                "INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
                sample_products
            )
            
            conn.commit()
 
# Initialize database
db_manager = DatabaseManager(DATABASE)
 
def require_api_key(f):
    """Decorator to require API key authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        # Validate API key
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT ak.id, ak.user_id, u.username 
                FROM api_keys ak 
                JOIN users u ON ak.user_id = u.id 
                WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
            ''', (api_key,))
            
            result = cursor.fetchone()
            
            if not result:
                log_api_request(request, api_key, 401)
                return jsonify({'error': 'Invalid API key'}), 401
            
            # Update last used timestamp
            cursor.execute(
                "UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
                (api_key,)
            )
            conn.commit()
        
        # Log successful request
        log_api_request(request, api_key, 200)
        
        # Add user info to request context
        request.current_user_id = result[1]
        request.current_username = result[2]
        
        return f(*args, **kwargs)
    
    return decorated_function
 
def log_api_request(request_obj, api_key, response_code):
    """Log API request for monitoring"""
    try:
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                request_obj.endpoint,
                request_obj.method,
                request_obj.remote_addr,
                request_obj.headers.get('User-Agent', ''),
                api_key,
                response_code
            ))
            conn.commit()
    except Exception as e:
        logger.error(f"Error logging API request: {e}")
 
def get_db_connection():
    """Get database connection with row factory"""
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn
 
# API Documentation endpoint
@app.route('/')
def api_documentation():
    """API Documentation"""
    docs = {
        "title": "Simple Flask REST API",
        "version": "1.0.0",
        "description": "A demonstration REST API with CRUD operations",
        "base_url": request.base_url,
        "authentication": {
            "type": "API Key",
            "header": "X-API-Key",
            "demo_keys": [
                "demo_key_123456789",
                "test_key_987654321",
                "admin_key_555666777"
            ]
        },
        "endpoints": {
            "Tasks": {
                "GET /api/tasks": "Get all tasks",
                "GET /api/tasks/<id>": "Get specific task",
                "POST /api/tasks": "Create new task",
                "PUT /api/tasks/<id>": "Update task",
                "DELETE /api/tasks/<id>": "Delete task"
            },
            "Products": {
                "GET /api/products": "Get all products",
                "GET /api/products/<id>": "Get specific product",
                "POST /api/products": "Create new product",
                "PUT /api/products/<id>": "Update product",
                "DELETE /api/products/<id>": "Delete product"
            },
            "Users": {
                "GET /api/users": "Get all users",
                "GET /api/users/<id>": "Get specific user",
                "POST /api/users": "Create new user"
            },
            "Statistics": {
                "GET /api/stats": "Get API usage statistics"
            }
        },
        "examples": {
            "create_task": {
                "method": "POST",
                "url": "/api/tasks",
                "headers": {"X-API-Key": "demo_key_123456789"},
                "body": {
                    "title": "New Task",
                    "description": "Task description",
                    "priority": "high",
                    "due_date": "2023-12-31"
                }
            }
        }
    }
    return jsonify(docs)
 
# Health check endpoint
@app.route('/health')
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'version': '1.0.0'
    })
 
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
    """Get all tasks with optional filtering"""
    page = request.args.get('page', 1, type=int)
    per_page = min(request.args.get('per_page', 10, type=int), 100)
    completed = request.args.get('completed', type=bool)
    priority = request.args.get('priority')
    
    query = "SELECT * FROM tasks WHERE 1=1"
    params = []
    
    if completed is not None:
        query += " AND completed = ?"
        params.append(completed)
    
    if priority:
        query += " AND priority = ?"
        params.append(priority)
    
    query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
    params.extend([per_page, (page - 1) * per_page])
    
    with get_db_connection() as conn:
        tasks = conn.execute(query, params).fetchall()
        
        # Get total count
        count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
        count_params = []
        if completed is not None:
            count_query += " AND completed = ?"
            count_params.append(completed)
        if priority:
            count_query += " AND priority = ?"
            count_params.append(priority)
        
        total = conn.execute(count_query, count_params).fetchone()[0]
    
    return jsonify({
        'tasks': [dict(task) for task in tasks],
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': total,
            'pages': (total + per_page - 1) // per_page
        }
    })
 
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
    """Get specific task"""
    with get_db_connection() as conn:
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    if task is None:
        abort(404)
    
    return jsonify(dict(task))
 
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
    """Create new task"""
    data = request.get_json()
    
    if not data or 'title' not in data:
        return jsonify({'error': 'Title is required'}), 400
    
    title = data['title']
    description = data.get('description', '')
    priority = data.get('priority', 'medium')
    due_date = data.get('due_date')
    user_id = request.current_user_id
    
    # Validate priority
    if priority not in ['low', 'medium', 'high', 'critical']:
        return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO tasks (title, description, priority, due_date, user_id)
            VALUES (?, ?, ?, ?, ?)
        ''', (title, description, priority, due_date, user_id))
        
        task_id = cursor.lastrowid
        conn.commit()
        
        # Get the created task
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(task)), 201
 
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
    """Update task"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    # Check if task exists
    with get_db_connection() as conn:
        existing_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
        
        if existing_task is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['title', 'description', 'completed', 'priority', 'due_date']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(task_id)
            
            query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated task
        updated_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(updated_task))
 
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
    """Delete task"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
    """Get all products with optional filtering"""
    category = request.args.get('category')
    min_price = request.args.get('min_price', type=float)
    max_price = request.args.get('max_price', type=float)
    in_stock = request.args.get('in_stock', type=bool)
    
    query = "SELECT * FROM products WHERE 1=1"
    params = []
    
    if category:
        query += " AND category = ?"
        params.append(category)
    
    if min_price is not None:
        query += " AND price >= ?"
        params.append(min_price)
    
    if max_price is not None:
        query += " AND price <= ?"
        params.append(max_price)
    
    if in_stock is not None:
        if in_stock:
            query += " AND stock_quantity > 0"
        else:
            query += " AND stock_quantity = 0"
    
    query += " ORDER BY name"
    
    with get_db_connection() as conn:
        products = conn.execute(query, params).fetchall()
    
    return jsonify({
        'products': [dict(product) for product in products]
    })
 
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
    """Get specific product"""
    with get_db_connection() as conn:
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    if product is None:
        abort(404)
    
    return jsonify(dict(product))
 
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
    """Create new product"""
    data = request.get_json()
    
    required_fields = ['name', 'price']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    name = data['name']
    description = data.get('description', '')
    price = data['price']
    category = data.get('category', '')
    stock_quantity = data.get('stock_quantity', 0)
    
    # Validate price
    try:
        price = float(price)
        if price < 0:
            raise ValueError()
    except (ValueError, TypeError):
        return jsonify({'error': 'Price must be a valid positive number'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO products (name, description, price, category, stock_quantity)
            VALUES (?, ?, ?, ?, ?)
        ''', (name, description, price, category, stock_quantity))
        
        product_id = cursor.lastrowid
        conn.commit()
        
        # Get the created product
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(product)), 201
 
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
    """Update product"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    with get_db_connection() as conn:
        existing_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
        
        if existing_product is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(product_id)
            
            query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated product
        updated_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(updated_product))
 
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
    """Delete product"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
    """Get all users (limited info)"""
    with get_db_connection() as conn:
        users = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
        ).fetchall()
    
    return jsonify({
        'users': [dict(user) for user in users]
    })
 
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
    """Get specific user"""
    with get_db_connection() as conn:
        user = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
            (user_id,)
        ).fetchone()
    
    if user is None:
        abort(404)
    
    return jsonify(dict(user))
 
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
    """Create new user"""
    data = request.get_json()
    
    required_fields = ['username', 'email', 'password']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    username = data['username']
    email = data['email']
    password = data['password']
    
    # Hash password
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    try:
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO users (username, email, password_hash)
                VALUES (?, ?, ?)
            ''', (username, email, password_hash))
            
            user_id = cursor.lastrowid
            conn.commit()
            
            # Get the created user (without password)
            user = conn.execute(
                "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
                (user_id,)
            ).fetchone()
        
        return jsonify(dict(user)), 201
    
    except sqlite3.IntegrityError as e:
        if 'username' in str(e):
            return jsonify({'error': 'Username already exists'}), 400
        elif 'email' in str(e):
            return jsonify({'error': 'Email already exists'}), 400
        else:
            return jsonify({'error': 'Database constraint violation'}), 400
 
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
    """Get API usage statistics"""
    with get_db_connection() as conn:
        # Basic counts
        task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
        product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
        user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
        
        # API usage stats
        total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
        
        # Recent activity (last 24 hours)
        yesterday = (datetime.now() - timedelta(days=1)).isoformat()
        recent_requests = conn.execute(
            "SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
        ).fetchone()[0]
        
        # Top endpoints
        top_endpoints = conn.execute('''
            SELECT endpoint, COUNT(*) as count 
            FROM api_logs 
            GROUP BY endpoint 
            ORDER BY count DESC 
            LIMIT 5
        ''').fetchall()
        
        # Response code distribution
        response_codes = conn.execute('''
            SELECT response_code, COUNT(*) as count 
            FROM api_logs 
            GROUP BY response_code 
            ORDER BY response_code
        ''').fetchall()
    
    return jsonify({
        'database_stats': {
            'tasks': task_count,
            'products': product_count,
            'users': user_count
        },
        'api_usage': {
            'total_requests': total_requests,
            'requests_last_24h': recent_requests,
            'top_endpoints': [dict(row) for row in top_endpoints],
            'response_codes': [dict(row) for row in response_codes]
        },
        'timestamp': datetime.now().isoformat()
    })
 
# Error handlers
@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Resource not found'}), 404
 
@app.errorhandler(400)
def bad_request(error):
    return jsonify({'error': 'Bad request'}), 400
 
@app.errorhandler(500)
def internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500
 
def main():
    """Main function to run the Flask REST API"""
    print("Simple Flask REST API")
    print("=====================")
    print("\nAPI Documentation: http://localhost:5000/")
    print("Health Check: http://localhost:5000/health")
    print("\nDemo API Keys:")
    print("- demo_key_123456789")
    print("- test_key_987654321") 
    print("- admin_key_555666777")
    print("\nExample usage:")
    print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
    print("\nStarting server...")
    
    app.run(debug=True, host='0.0.0.0', port=5000)
 
if __name__ == "__main__":
    main()
 
REST API with JWT Authentication
# Simple Flask REST API
 
from flask import Flask, request, jsonify, abort
from flask_cors import CORS
import sqlite3
import json
from datetime import datetime, timedelta
import hashlib
import secrets
import os
from functools import wraps
import logging
 
app = Flask(__name__)
CORS(app)  # Enable CORS for all routes
 
# Configuration
app.config['SECRET_KEY'] = 'your-secret-key-here'
DATABASE = 'api_database.db'
 
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
 
class DatabaseManager:
    def __init__(self, db_path):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """Initialize the database with required tables"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Users table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    username VARCHAR(50) UNIQUE NOT NULL,
                    email VARCHAR(100) UNIQUE NOT NULL,
                    password_hash VARCHAR(255) NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1
                )
            ''')
            
            # API Keys table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_keys (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    api_key VARCHAR(255) UNIQUE NOT NULL,
                    name VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    last_used TIMESTAMP,
                    is_active BOOLEAN DEFAULT 1,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Tasks table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS tasks (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    title VARCHAR(200) NOT NULL,
                    description TEXT,
                    completed BOOLEAN DEFAULT 0,
                    priority VARCHAR(10) DEFAULT 'medium',
                    due_date DATE,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    user_id INTEGER,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            ''')
            
            # Products table (example resource)
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS products (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name VARCHAR(200) NOT NULL,
                    description TEXT,
                    price DECIMAL(10,2) NOT NULL,
                    category VARCHAR(100),
                    stock_quantity INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            # API Logs table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS api_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    endpoint VARCHAR(200),
                    method VARCHAR(10),
                    ip_address VARCHAR(45),
                    user_agent TEXT,
                    api_key VARCHAR(255),
                    response_code INTEGER,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            
            conn.commit()
            
        # Insert sample data
        self.insert_sample_data()
    
    def insert_sample_data(self):
        """Insert sample data for demonstration"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.cursor()
            
            # Check if sample data already exists
            cursor.execute("SELECT COUNT(*) FROM users")
            if cursor.fetchone()[0] > 0:
                return
            
            # Sample users
            sample_users = [
                ('john_doe', 'john@example.com', hashlib.sha256('password123'.encode()).hexdigest()),
                ('jane_smith', 'jane@example.com', hashlib.sha256('password456'.encode()).hexdigest()),
                ('admin', 'admin@example.com', hashlib.sha256('admin123'.encode()).hexdigest())
            ]
            
            cursor.executemany(
                "INSERT INTO users (username, email, password_hash) VALUES (?, ?, ?)",
                sample_users
            )
            
            # Sample API keys
            api_keys = [
                (1, 'demo_key_123456789', 'Demo Key'),
                (2, 'test_key_987654321', 'Test Key'),
                (3, 'admin_key_555666777', 'Admin Key')
            ]
            
            cursor.executemany(
                "INSERT INTO api_keys (user_id, api_key, name) VALUES (?, ?, ?)",
                api_keys
            )
            
            # Sample tasks
            sample_tasks = [
                ('Complete project documentation', 'Write comprehensive docs for the API', 0, 'high', '2023-12-31', 1),
                ('Fix bug in user authentication', 'Resolve login issues reported by users', 1, 'critical', '2023-12-15', 1),
                ('Implement new feature', 'Add search functionality to products', 0, 'medium', '2024-01-15', 2),
                ('Code review', 'Review pull requests from team members', 0, 'low', '2023-12-20', 2)
            ]
            
            cursor.executemany(
                "INSERT INTO tasks (title, description, completed, priority, due_date, user_id) VALUES (?, ?, ?, ?, ?, ?)",
                sample_tasks
            )
            
            # Sample products
            sample_products = [
                ('Laptop', 'High-performance laptop for developers', 999.99, 'Electronics', 10),
                ('Smartphone', 'Latest model smartphone with advanced features', 599.99, 'Electronics', 25),
                ('Coffee Mug', 'Programmer-themed coffee mug', 14.99, 'Accessories', 50),
                ('Mechanical Keyboard', 'RGB mechanical keyboard for gaming', 149.99, 'Electronics', 15),
                ('Desk Lamp', 'Adjustable LED desk lamp', 39.99, 'Furniture', 30)
            ]
            
            cursor.executemany(
                "INSERT INTO products (name, description, price, category, stock_quantity) VALUES (?, ?, ?, ?, ?)",
                sample_products
            )
            
            conn.commit()
 
# Initialize database
db_manager = DatabaseManager(DATABASE)
 
def require_api_key(f):
    """Decorator to require API key authentication"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        api_key = request.headers.get('X-API-Key') or request.args.get('api_key')
        
        if not api_key:
            return jsonify({'error': 'API key required'}), 401
        
        # Validate API key
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                SELECT ak.id, ak.user_id, u.username 
                FROM api_keys ak 
                JOIN users u ON ak.user_id = u.id 
                WHERE ak.api_key = ? AND ak.is_active = 1 AND u.is_active = 1
            ''', (api_key,))
            
            result = cursor.fetchone()
            
            if not result:
                log_api_request(request, api_key, 401)
                return jsonify({'error': 'Invalid API key'}), 401
            
            # Update last used timestamp
            cursor.execute(
                "UPDATE api_keys SET last_used = CURRENT_TIMESTAMP WHERE api_key = ?",
                (api_key,)
            )
            conn.commit()
        
        # Log successful request
        log_api_request(request, api_key, 200)
        
        # Add user info to request context
        request.current_user_id = result[1]
        request.current_username = result[2]
        
        return f(*args, **kwargs)
    
    return decorated_function
 
def log_api_request(request_obj, api_key, response_code):
    """Log API request for monitoring"""
    try:
        with sqlite3.connect(DATABASE) as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO api_logs (endpoint, method, ip_address, user_agent, api_key, response_code)
                VALUES (?, ?, ?, ?, ?, ?)
            ''', (
                request_obj.endpoint,
                request_obj.method,
                request_obj.remote_addr,
                request_obj.headers.get('User-Agent', ''),
                api_key,
                response_code
            ))
            conn.commit()
    except Exception as e:
        logger.error(f"Error logging API request: {e}")
 
def get_db_connection():
    """Get database connection with row factory"""
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn
 
# API Documentation endpoint
@app.route('/')
def api_documentation():
    """API Documentation"""
    docs = {
        "title": "Simple Flask REST API",
        "version": "1.0.0",
        "description": "A demonstration REST API with CRUD operations",
        "base_url": request.base_url,
        "authentication": {
            "type": "API Key",
            "header": "X-API-Key",
            "demo_keys": [
                "demo_key_123456789",
                "test_key_987654321",
                "admin_key_555666777"
            ]
        },
        "endpoints": {
            "Tasks": {
                "GET /api/tasks": "Get all tasks",
                "GET /api/tasks/<id>": "Get specific task",
                "POST /api/tasks": "Create new task",
                "PUT /api/tasks/<id>": "Update task",
                "DELETE /api/tasks/<id>": "Delete task"
            },
            "Products": {
                "GET /api/products": "Get all products",
                "GET /api/products/<id>": "Get specific product",
                "POST /api/products": "Create new product",
                "PUT /api/products/<id>": "Update product",
                "DELETE /api/products/<id>": "Delete product"
            },
            "Users": {
                "GET /api/users": "Get all users",
                "GET /api/users/<id>": "Get specific user",
                "POST /api/users": "Create new user"
            },
            "Statistics": {
                "GET /api/stats": "Get API usage statistics"
            }
        },
        "examples": {
            "create_task": {
                "method": "POST",
                "url": "/api/tasks",
                "headers": {"X-API-Key": "demo_key_123456789"},
                "body": {
                    "title": "New Task",
                    "description": "Task description",
                    "priority": "high",
                    "due_date": "2023-12-31"
                }
            }
        }
    }
    return jsonify(docs)
 
# Health check endpoint
@app.route('/health')
def health_check():
    """Health check endpoint"""
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.now().isoformat(),
        'version': '1.0.0'
    })
 
# Tasks API endpoints
@app.route('/api/tasks', methods=['GET'])
@require_api_key
def get_tasks():
    """Get all tasks with optional filtering"""
    page = request.args.get('page', 1, type=int)
    per_page = min(request.args.get('per_page', 10, type=int), 100)
    completed = request.args.get('completed', type=bool)
    priority = request.args.get('priority')
    
    query = "SELECT * FROM tasks WHERE 1=1"
    params = []
    
    if completed is not None:
        query += " AND completed = ?"
        params.append(completed)
    
    if priority:
        query += " AND priority = ?"
        params.append(priority)
    
    query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
    params.extend([per_page, (page - 1) * per_page])
    
    with get_db_connection() as conn:
        tasks = conn.execute(query, params).fetchall()
        
        # Get total count
        count_query = "SELECT COUNT(*) FROM tasks WHERE 1=1"
        count_params = []
        if completed is not None:
            count_query += " AND completed = ?"
            count_params.append(completed)
        if priority:
            count_query += " AND priority = ?"
            count_params.append(priority)
        
        total = conn.execute(count_query, count_params).fetchone()[0]
    
    return jsonify({
        'tasks': [dict(task) for task in tasks],
        'pagination': {
            'page': page,
            'per_page': per_page,
            'total': total,
            'pages': (total + per_page - 1) // per_page
        }
    })
 
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_api_key
def get_task(task_id):
    """Get specific task"""
    with get_db_connection() as conn:
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    if task is None:
        abort(404)
    
    return jsonify(dict(task))
 
@app.route('/api/tasks', methods=['POST'])
@require_api_key
def create_task():
    """Create new task"""
    data = request.get_json()
    
    if not data or 'title' not in data:
        return jsonify({'error': 'Title is required'}), 400
    
    title = data['title']
    description = data.get('description', '')
    priority = data.get('priority', 'medium')
    due_date = data.get('due_date')
    user_id = request.current_user_id
    
    # Validate priority
    if priority not in ['low', 'medium', 'high', 'critical']:
        return jsonify({'error': 'Invalid priority. Use: low, medium, high, critical'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO tasks (title, description, priority, due_date, user_id)
            VALUES (?, ?, ?, ?, ?)
        ''', (title, description, priority, due_date, user_id))
        
        task_id = cursor.lastrowid
        conn.commit()
        
        # Get the created task
        task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(task)), 201
 
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_api_key
def update_task(task_id):
    """Update task"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    # Check if task exists
    with get_db_connection() as conn:
        existing_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
        
        if existing_task is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['title', 'description', 'completed', 'priority', 'due_date']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(task_id)
            
            query = f"UPDATE tasks SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated task
        updated_task = conn.execute(
            "SELECT * FROM tasks WHERE id = ?", (task_id,)
        ).fetchone()
    
    return jsonify(dict(updated_task))
 
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_api_key
def delete_task(task_id):
    """Delete task"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM tasks WHERE id = ?", (task_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Products API endpoints
@app.route('/api/products', methods=['GET'])
@require_api_key
def get_products():
    """Get all products with optional filtering"""
    category = request.args.get('category')
    min_price = request.args.get('min_price', type=float)
    max_price = request.args.get('max_price', type=float)
    in_stock = request.args.get('in_stock', type=bool)
    
    query = "SELECT * FROM products WHERE 1=1"
    params = []
    
    if category:
        query += " AND category = ?"
        params.append(category)
    
    if min_price is not None:
        query += " AND price >= ?"
        params.append(min_price)
    
    if max_price is not None:
        query += " AND price <= ?"
        params.append(max_price)
    
    if in_stock is not None:
        if in_stock:
            query += " AND stock_quantity > 0"
        else:
            query += " AND stock_quantity = 0"
    
    query += " ORDER BY name"
    
    with get_db_connection() as conn:
        products = conn.execute(query, params).fetchall()
    
    return jsonify({
        'products': [dict(product) for product in products]
    })
 
@app.route('/api/products/<int:product_id>', methods=['GET'])
@require_api_key
def get_product(product_id):
    """Get specific product"""
    with get_db_connection() as conn:
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    if product is None:
        abort(404)
    
    return jsonify(dict(product))
 
@app.route('/api/products', methods=['POST'])
@require_api_key
def create_product():
    """Create new product"""
    data = request.get_json()
    
    required_fields = ['name', 'price']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    name = data['name']
    description = data.get('description', '')
    price = data['price']
    category = data.get('category', '')
    stock_quantity = data.get('stock_quantity', 0)
    
    # Validate price
    try:
        price = float(price)
        if price < 0:
            raise ValueError()
    except (ValueError, TypeError):
        return jsonify({'error': 'Price must be a valid positive number'}), 400
    
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute('''
            INSERT INTO products (name, description, price, category, stock_quantity)
            VALUES (?, ?, ?, ?, ?)
        ''', (name, description, price, category, stock_quantity))
        
        product_id = cursor.lastrowid
        conn.commit()
        
        # Get the created product
        product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(product)), 201
 
@app.route('/api/products/<int:product_id>', methods=['PUT'])
@require_api_key
def update_product(product_id):
    """Update product"""
    data = request.get_json()
    
    if not data:
        return jsonify({'error': 'No data provided'}), 400
    
    with get_db_connection() as conn:
        existing_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
        
        if existing_product is None:
            abort(404)
        
        # Build update query
        update_fields = []
        params = []
        
        for field in ['name', 'description', 'price', 'category', 'stock_quantity']:
            if field in data:
                update_fields.append(f"{field} = ?")
                params.append(data[field])
        
        if update_fields:
            update_fields.append("updated_at = CURRENT_TIMESTAMP")
            params.append(product_id)
            
            query = f"UPDATE products SET {', '.join(update_fields)} WHERE id = ?"
            conn.execute(query, params)
            conn.commit()
        
        # Get updated product
        updated_product = conn.execute(
            "SELECT * FROM products WHERE id = ?", (product_id,)
        ).fetchone()
    
    return jsonify(dict(updated_product))
 
@app.route('/api/products/<int:product_id>', methods=['DELETE'])
@require_api_key
def delete_product(product_id):
    """Delete product"""
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("DELETE FROM products WHERE id = ?", (product_id,))
        
        if cursor.rowcount == 0:
            abort(404)
        
        conn.commit()
    
    return '', 204
 
# Users API endpoints
@app.route('/api/users', methods=['GET'])
@require_api_key
def get_users():
    """Get all users (limited info)"""
    with get_db_connection() as conn:
        users = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users ORDER BY created_at DESC"
        ).fetchall()
    
    return jsonify({
        'users': [dict(user) for user in users]
    })
 
@app.route('/api/users/<int:user_id>', methods=['GET'])
@require_api_key
def get_user(user_id):
    """Get specific user"""
    with get_db_connection() as conn:
        user = conn.execute(
            "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
            (user_id,)
        ).fetchone()
    
    if user is None:
        abort(404)
    
    return jsonify(dict(user))
 
@app.route('/api/users', methods=['POST'])
@require_api_key
def create_user():
    """Create new user"""
    data = request.get_json()
    
    required_fields = ['username', 'email', 'password']
    for field in required_fields:
        if field not in data:
            return jsonify({'error': f'{field} is required'}), 400
    
    username = data['username']
    email = data['email']
    password = data['password']
    
    # Hash password
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    
    try:
        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute('''
                INSERT INTO users (username, email, password_hash)
                VALUES (?, ?, ?)
            ''', (username, email, password_hash))
            
            user_id = cursor.lastrowid
            conn.commit()
            
            # Get the created user (without password)
            user = conn.execute(
                "SELECT id, username, email, created_at, is_active FROM users WHERE id = ?",
                (user_id,)
            ).fetchone()
        
        return jsonify(dict(user)), 201
    
    except sqlite3.IntegrityError as e:
        if 'username' in str(e):
            return jsonify({'error': 'Username already exists'}), 400
        elif 'email' in str(e):
            return jsonify({'error': 'Email already exists'}), 400
        else:
            return jsonify({'error': 'Database constraint violation'}), 400
 
# Statistics endpoint
@app.route('/api/stats', methods=['GET'])
@require_api_key
def get_statistics():
    """Get API usage statistics"""
    with get_db_connection() as conn:
        # Basic counts
        task_count = conn.execute("SELECT COUNT(*) FROM tasks").fetchone()[0]
        product_count = conn.execute("SELECT COUNT(*) FROM products").fetchone()[0]
        user_count = conn.execute("SELECT COUNT(*) FROM users").fetchone()[0]
        
        # API usage stats
        total_requests = conn.execute("SELECT COUNT(*) FROM api_logs").fetchone()[0]
        
        # Recent activity (last 24 hours)
        yesterday = (datetime.now() - timedelta(days=1)).isoformat()
        recent_requests = conn.execute(
            "SELECT COUNT(*) FROM api_logs WHERE timestamp > ?", (yesterday,)
        ).fetchone()[0]
        
        # Top endpoints
        top_endpoints = conn.execute('''
            SELECT endpoint, COUNT(*) as count 
            FROM api_logs 
            GROUP BY endpoint 
            ORDER BY count DESC 
            LIMIT 5
        ''').fetchall()
        
        # Response code distribution
        response_codes = conn.execute('''
            SELECT response_code, COUNT(*) as count 
            FROM api_logs 
            GROUP BY response_code 
            ORDER BY response_code
        ''').fetchall()
    
    return jsonify({
        'database_stats': {
            'tasks': task_count,
            'products': product_count,
            'users': user_count
        },
        'api_usage': {
            'total_requests': total_requests,
            'requests_last_24h': recent_requests,
            'top_endpoints': [dict(row) for row in top_endpoints],
            'response_codes': [dict(row) for row in response_codes]
        },
        'timestamp': datetime.now().isoformat()
    })
 
# Error handlers
@app.errorhandler(404)
def not_found(error):
    return jsonify({'error': 'Resource not found'}), 404
 
@app.errorhandler(400)
def bad_request(error):
    return jsonify({'error': 'Bad request'}), 400
 
@app.errorhandler(500)
def internal_error(error):
    return jsonify({'error': 'Internal server error'}), 500
 
def main():
    """Main function to run the Flask REST API"""
    print("Simple Flask REST API")
    print("=====================")
    print("\nAPI Documentation: http://localhost:5000/")
    print("Health Check: http://localhost:5000/health")
    print("\nDemo API Keys:")
    print("- demo_key_123456789")
    print("- test_key_987654321") 
    print("- admin_key_555666777")
    print("\nExample usage:")
    print("curl -H 'X-API-Key: demo_key_123456789' http://localhost:5000/api/tasks")
    print("\nStarting server...")
    
    app.run(debug=True, host='0.0.0.0', port=5000)
 
if __name__ == "__main__":
    main()
 

How It Works

1. Flask Application Configuration

restapiauth.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from marshmallow import Schema, fields, ValidationError
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import os
 
app = Flask(__name__)
 
# Configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'your-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///api.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'jwt-secret-key')
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
 
# Initialize extensions
db = SQLAlchemy(app)
jwt = JWTManager(app)
limiter = Limiter(app, key_func=get_remote_address)
CORS(app)
restapiauth.py
from flask import Flask, request, jsonify
from flask_sqlalchemy import SQLAlchemy
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_cors import CORS
from marshmallow import Schema, fields, ValidationError
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime, timedelta
import os
 
app = Flask(__name__)
 
# Configuration
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'your-secret-key')
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL', 'sqlite:///api.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'jwt-secret-key')
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
 
# Initialize extensions
db = SQLAlchemy(app)
jwt = JWTManager(app)
limiter = Limiter(app, key_func=get_remote_address)
CORS(app)

The application setup includes:

  • Flask Configuration: Environment-based configuration management
  • Database Setup: SQLAlchemy with configurable database URLs
  • JWT Management: Token creation and validation
  • Rate Limiting: Request throttling by IP address
  • CORS Support: Cross-Origin Resource Sharing for web clients

2. Database Models

restapiauth.py
class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(255), nullable=False)
    first_name = db.Column(db.String(50), nullable=False)
    last_name = db.Column(db.String(50), nullable=False)
    role = db.Column(db.String(20), default='user')
    is_active = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    posts = db.relationship('Post', backref='author', lazy=True, cascade='all, delete-orphan')
    
    def set_password(self, password):
        """Hash and set password"""
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        """Check password against hash"""
        return check_password_hash(self.password_hash, password)
    
    def to_dict(self):
        """Convert user to dictionary"""
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'role': self.role,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }
 
class Post(db.Model):
    __tablename__ = 'posts'
    
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
    published = db.Column(db.Boolean, default=False)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def to_dict(self):
        """Convert post to dictionary"""
        return {
            'id': self.id,
            'title': self.title,
            'content': self.content,
            'slug': self.slug,
            'published': self.published,
            'user_id': self.user_id,
            'author': self.author.username if self.author else None,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }
restapiauth.py
class User(db.Model):
    __tablename__ = 'users'
    
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(255), nullable=False)
    first_name = db.Column(db.String(50), nullable=False)
    last_name = db.Column(db.String(50), nullable=False)
    role = db.Column(db.String(20), default='user')
    is_active = db.Column(db.Boolean, default=True)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    # Relationships
    posts = db.relationship('Post', backref='author', lazy=True, cascade='all, delete-orphan')
    
    def set_password(self, password):
        """Hash and set password"""
        self.password_hash = generate_password_hash(password)
    
    def check_password(self, password):
        """Check password against hash"""
        return check_password_hash(self.password_hash, password)
    
    def to_dict(self):
        """Convert user to dictionary"""
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'first_name': self.first_name,
            'last_name': self.last_name,
            'role': self.role,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }
 
class Post(db.Model):
    __tablename__ = 'posts'
    
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    content = db.Column(db.Text, nullable=False)
    slug = db.Column(db.String(200), unique=True, nullable=False, index=True)
    published = db.Column(db.Boolean, default=False)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow)
    updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    
    def to_dict(self):
        """Convert post to dictionary"""
        return {
            'id': self.id,
            'title': self.title,
            'content': self.content,
            'slug': self.slug,
            'published': self.published,
            'user_id': self.user_id,
            'author': self.author.username if self.author else None,
            'created_at': self.created_at.isoformat(),
            'updated_at': self.updated_at.isoformat()
        }

3. Input Validation Schemas

restapiauth.py
class UserRegistrationSchema(Schema):
    """Schema for user registration validation"""
    username = fields.Str(required=True, validate=lambda x: len(x) >= 3)
    email = fields.Email(required=True)
    password = fields.Str(required=True, validate=lambda x: len(x) >= 6)
    first_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
    last_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
 
class UserLoginSchema(Schema):
    """Schema for user login validation"""
    username = fields.Str(required=True)
    password = fields.Str(required=True)
 
class PostSchema(Schema):
    """Schema for post validation"""
    title = fields.Str(required=True, validate=lambda x: len(x) >= 5)
    content = fields.Str(required=True, validate=lambda x: len(x) >= 10)
    published = fields.Bool(missing=False)
 
class PostUpdateSchema(Schema):
    """Schema for post update validation"""
    title = fields.Str(validate=lambda x: len(x) >= 5)
    content = fields.Str(validate=lambda x: len(x) >= 10)
    published = fields.Bool()
restapiauth.py
class UserRegistrationSchema(Schema):
    """Schema for user registration validation"""
    username = fields.Str(required=True, validate=lambda x: len(x) >= 3)
    email = fields.Email(required=True)
    password = fields.Str(required=True, validate=lambda x: len(x) >= 6)
    first_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
    last_name = fields.Str(required=True, validate=lambda x: len(x) >= 2)
 
class UserLoginSchema(Schema):
    """Schema for user login validation"""
    username = fields.Str(required=True)
    password = fields.Str(required=True)
 
class PostSchema(Schema):
    """Schema for post validation"""
    title = fields.Str(required=True, validate=lambda x: len(x) >= 5)
    content = fields.Str(required=True, validate=lambda x: len(x) >= 10)
    published = fields.Bool(missing=False)
 
class PostUpdateSchema(Schema):
    """Schema for post update validation"""
    title = fields.Str(validate=lambda x: len(x) >= 5)
    content = fields.Str(validate=lambda x: len(x) >= 10)
    published = fields.Bool()

4. Authentication System

restapiauth.py
@app.route('/api/auth/register', methods=['POST'])
@limiter.limit("5 per minute")
def register():
    """User registration endpoint"""
    try:
        # Validate input
        schema = UserRegistrationSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Check if user exists
    if User.query.filter_by(username=data['username']).first():
        return jsonify({'error': 'Username already exists'}), 400
    
    if User.query.filter_by(email=data['email']).first():
        return jsonify({'error': 'Email already exists'}), 400
    
    # Create new user
    user = User(
        username=data['username'],
        email=data['email'],
        first_name=data['first_name'],
        last_name=data['last_name']
    )
    user.set_password(data['password'])
    
    db.session.add(user)
    db.session.commit()
    
    # Create access token
    access_token = create_access_token(identity=user.id)
    
    return jsonify({
        'message': 'User registered successfully',
        'user': user.to_dict(),
        'access_token': access_token
    }), 201
 
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("10 per minute")
def login():
    """User login endpoint"""
    try:
        schema = UserLoginSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Find user
    user = User.query.filter_by(username=data['username']).first()
    
    if not user or not user.check_password(data['password']):
        return jsonify({'error': 'Invalid credentials'}), 401
    
    if not user.is_active:
        return jsonify({'error': 'Account is deactivated'}), 401
    
    # Create access token
    access_token = create_access_token(identity=user.id)
    
    return jsonify({
        'message': 'Login successful',
        'user': user.to_dict(),
        'access_token': access_token
    }), 200
 
@app.route('/api/auth/profile', methods=['GET'])
@jwt_required()
def get_profile():
    """Get current user profile"""
    current_user_id = get_jwt_identity()
    user = User.query.get(current_user_id)
    
    if not user:
        return jsonify({'error': 'User not found'}), 404
    
    return jsonify({'user': user.to_dict()}), 200
restapiauth.py
@app.route('/api/auth/register', methods=['POST'])
@limiter.limit("5 per minute")
def register():
    """User registration endpoint"""
    try:
        # Validate input
        schema = UserRegistrationSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Check if user exists
    if User.query.filter_by(username=data['username']).first():
        return jsonify({'error': 'Username already exists'}), 400
    
    if User.query.filter_by(email=data['email']).first():
        return jsonify({'error': 'Email already exists'}), 400
    
    # Create new user
    user = User(
        username=data['username'],
        email=data['email'],
        first_name=data['first_name'],
        last_name=data['last_name']
    )
    user.set_password(data['password'])
    
    db.session.add(user)
    db.session.commit()
    
    # Create access token
    access_token = create_access_token(identity=user.id)
    
    return jsonify({
        'message': 'User registered successfully',
        'user': user.to_dict(),
        'access_token': access_token
    }), 201
 
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("10 per minute")
def login():
    """User login endpoint"""
    try:
        schema = UserLoginSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Find user
    user = User.query.filter_by(username=data['username']).first()
    
    if not user or not user.check_password(data['password']):
        return jsonify({'error': 'Invalid credentials'}), 401
    
    if not user.is_active:
        return jsonify({'error': 'Account is deactivated'}), 401
    
    # Create access token
    access_token = create_access_token(identity=user.id)
    
    return jsonify({
        'message': 'Login successful',
        'user': user.to_dict(),
        'access_token': access_token
    }), 200
 
@app.route('/api/auth/profile', methods=['GET'])
@jwt_required()
def get_profile():
    """Get current user profile"""
    current_user_id = get_jwt_identity()
    user = User.query.get(current_user_id)
    
    if not user:
        return jsonify({'error': 'User not found'}), 404
    
    return jsonify({'user': user.to_dict()}), 200

5. CRUD Operations Implementation

restapiauth.py
@app.route('/api/posts', methods=['GET'])
@limiter.limit("100 per minute")
def get_posts():
    """Get all posts with pagination"""
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    published_only = request.args.get('published', 'false').lower() == 'true'
    
    # Limit per_page to prevent abuse
    per_page = min(per_page, 100)
    
    query = Post.query
    
    if published_only:
        query = query.filter_by(published=True)
    
    posts = query.order_by(Post.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    return jsonify({
        'posts': [post.to_dict() for post in posts.items],
        'pagination': {
            'page': posts.page,
            'pages': posts.pages,
            'per_page': posts.per_page,
            'total': posts.total,
            'has_next': posts.has_next,
            'has_prev': posts.has_prev
        }
    }), 200
 
@app.route('/api/posts', methods=['POST'])
@jwt_required()
@limiter.limit("20 per minute")
def create_post():
    """Create a new post"""
    current_user_id = get_jwt_identity()
    
    try:
        schema = PostSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Generate slug from title
    import re
    slug = re.sub(r'[^a-zA-Z0-9\s]', '', data['title'].lower())
    slug = re.sub(r'\s+', '-', slug.strip())
    
    # Ensure unique slug
    original_slug = slug
    counter = 1
    while Post.query.filter_by(slug=slug).first():
        slug = f"{original_slug}-{counter}"
        counter += 1
    
    post = Post(
        title=data['title'],
        content=data['content'],
        slug=slug,
        published=data['published'],
        user_id=current_user_id
    )
    
    db.session.add(post)
    db.session.commit()
    
    return jsonify({
        'message': 'Post created successfully',
        'post': post.to_dict()
    }), 201
 
@app.route('/api/posts/<int:post_id>', methods=['GET'])
@limiter.limit("100 per minute")
def get_post(post_id):
    """Get a specific post"""
    post = Post.query.get_or_404(post_id)
    return jsonify({'post': post.to_dict()}), 200
 
@app.route('/api/posts/<int:post_id>', methods=['PUT'])
@jwt_required()
@limiter.limit("20 per minute")
def update_post(post_id):
    """Update a post"""
    current_user_id = get_jwt_identity()
    post = Post.query.get_or_404(post_id)
    
    # Check ownership or admin role
    current_user = User.query.get(current_user_id)
    if post.user_id != current_user_id and current_user.role != 'admin':
        return jsonify({'error': 'Permission denied'}), 403
    
    try:
        schema = PostUpdateSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Update post fields
    for field, value in data.items():
        if field == 'title' and value != post.title:
            # Update slug if title changed
            import re
            slug = re.sub(r'[^a-zA-Z0-9\s]', '', value.lower())
            slug = re.sub(r'\s+', '-', slug.strip())
            
            # Ensure unique slug
            original_slug = slug
            counter = 1
            while Post.query.filter_by(slug=slug).filter(Post.id != post.id).first():
                slug = f"{original_slug}-{counter}"
                counter += 1
            
            post.slug = slug
        
        setattr(post, field, value)
    
    post.updated_at = datetime.utcnow()
    db.session.commit()
    
    return jsonify({
        'message': 'Post updated successfully',
        'post': post.to_dict()
    }), 200
 
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@jwt_required()
@limiter.limit("10 per minute")
def delete_post(post_id):
    """Delete a post"""
    current_user_id = get_jwt_identity()
    post = Post.query.get_or_404(post_id)
    
    # Check ownership or admin role
    current_user = User.query.get(current_user_id)
    if post.user_id != current_user_id and current_user.role != 'admin':
        return jsonify({'error': 'Permission denied'}), 403
    
    db.session.delete(post)
    db.session.commit()
    
    return jsonify({'message': 'Post deleted successfully'}), 200
restapiauth.py
@app.route('/api/posts', methods=['GET'])
@limiter.limit("100 per minute")
def get_posts():
    """Get all posts with pagination"""
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    published_only = request.args.get('published', 'false').lower() == 'true'
    
    # Limit per_page to prevent abuse
    per_page = min(per_page, 100)
    
    query = Post.query
    
    if published_only:
        query = query.filter_by(published=True)
    
    posts = query.order_by(Post.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    return jsonify({
        'posts': [post.to_dict() for post in posts.items],
        'pagination': {
            'page': posts.page,
            'pages': posts.pages,
            'per_page': posts.per_page,
            'total': posts.total,
            'has_next': posts.has_next,
            'has_prev': posts.has_prev
        }
    }), 200
 
@app.route('/api/posts', methods=['POST'])
@jwt_required()
@limiter.limit("20 per minute")
def create_post():
    """Create a new post"""
    current_user_id = get_jwt_identity()
    
    try:
        schema = PostSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Generate slug from title
    import re
    slug = re.sub(r'[^a-zA-Z0-9\s]', '', data['title'].lower())
    slug = re.sub(r'\s+', '-', slug.strip())
    
    # Ensure unique slug
    original_slug = slug
    counter = 1
    while Post.query.filter_by(slug=slug).first():
        slug = f"{original_slug}-{counter}"
        counter += 1
    
    post = Post(
        title=data['title'],
        content=data['content'],
        slug=slug,
        published=data['published'],
        user_id=current_user_id
    )
    
    db.session.add(post)
    db.session.commit()
    
    return jsonify({
        'message': 'Post created successfully',
        'post': post.to_dict()
    }), 201
 
@app.route('/api/posts/<int:post_id>', methods=['GET'])
@limiter.limit("100 per minute")
def get_post(post_id):
    """Get a specific post"""
    post = Post.query.get_or_404(post_id)
    return jsonify({'post': post.to_dict()}), 200
 
@app.route('/api/posts/<int:post_id>', methods=['PUT'])
@jwt_required()
@limiter.limit("20 per minute")
def update_post(post_id):
    """Update a post"""
    current_user_id = get_jwt_identity()
    post = Post.query.get_or_404(post_id)
    
    # Check ownership or admin role
    current_user = User.query.get(current_user_id)
    if post.user_id != current_user_id and current_user.role != 'admin':
        return jsonify({'error': 'Permission denied'}), 403
    
    try:
        schema = PostUpdateSchema()
        data = schema.load(request.json)
    except ValidationError as err:
        return jsonify({'errors': err.messages}), 400
    
    # Update post fields
    for field, value in data.items():
        if field == 'title' and value != post.title:
            # Update slug if title changed
            import re
            slug = re.sub(r'[^a-zA-Z0-9\s]', '', value.lower())
            slug = re.sub(r'\s+', '-', slug.strip())
            
            # Ensure unique slug
            original_slug = slug
            counter = 1
            while Post.query.filter_by(slug=slug).filter(Post.id != post.id).first():
                slug = f"{original_slug}-{counter}"
                counter += 1
            
            post.slug = slug
        
        setattr(post, field, value)
    
    post.updated_at = datetime.utcnow()
    db.session.commit()
    
    return jsonify({
        'message': 'Post updated successfully',
        'post': post.to_dict()
    }), 200
 
@app.route('/api/posts/<int:post_id>', methods=['DELETE'])
@jwt_required()
@limiter.limit("10 per minute")
def delete_post(post_id):
    """Delete a post"""
    current_user_id = get_jwt_identity()
    post = Post.query.get_or_404(post_id)
    
    # Check ownership or admin role
    current_user = User.query.get(current_user_id)
    if post.user_id != current_user_id and current_user.role != 'admin':
        return jsonify({'error': 'Permission denied'}), 403
    
    db.session.delete(post)
    db.session.commit()
    
    return jsonify({'message': 'Post deleted successfully'}), 200

6. Admin Endpoints

restapiauth.py
def admin_required(f):
    """Decorator for admin-only endpoints"""
    @jwt_required()
    def decorated_function(*args, **kwargs):
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user or user.role != 'admin':
            return jsonify({'error': 'Admin access required'}), 403
        
        return f(*args, **kwargs)
    
    return decorated_function
 
@app.route('/api/admin/users', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_users():
    """Get all users (admin only)"""
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    
    users = User.query.order_by(User.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    return jsonify({
        'users': [user.to_dict() for user in users.items],
        'pagination': {
            'page': users.page,
            'pages': users.pages,
            'per_page': users.per_page,
            'total': users.total
        }
    }), 200
 
@app.route('/api/admin/users/<int:user_id>/toggle-status', methods=['POST'])
@admin_required
@limiter.limit("20 per minute")
def admin_toggle_user_status(user_id):
    """Toggle user active status (admin only)"""
    user = User.query.get_or_404(user_id)
    
    # Prevent admin from deactivating themselves
    current_user_id = get_jwt_identity()
    if user_id == current_user_id:
        return jsonify({'error': 'Cannot deactivate your own account'}), 400
    
    user.is_active = not user.is_active
    user.updated_at = datetime.utcnow()
    db.session.commit()
    
    status = 'activated' if user.is_active else 'deactivated'
    return jsonify({
        'message': f'User {status} successfully',
        'user': user.to_dict()
    }), 200
 
@app.route('/api/admin/stats', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_stats():
    """Get system statistics (admin only)"""
    stats = {
        'total_users': User.query.count(),
        'active_users': User.query.filter_by(is_active=True).count(),
        'total_posts': Post.query.count(),
        'published_posts': Post.query.filter_by(published=True).count(),
        'recent_registrations': User.query.filter(
            User.created_at >= datetime.utcnow() - timedelta(days=7)
        ).count()
    }
    
    return jsonify({'stats': stats}), 200
restapiauth.py
def admin_required(f):
    """Decorator for admin-only endpoints"""
    @jwt_required()
    def decorated_function(*args, **kwargs):
        current_user_id = get_jwt_identity()
        user = User.query.get(current_user_id)
        
        if not user or user.role != 'admin':
            return jsonify({'error': 'Admin access required'}), 403
        
        return f(*args, **kwargs)
    
    return decorated_function
 
@app.route('/api/admin/users', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_users():
    """Get all users (admin only)"""
    page = request.args.get('page', 1, type=int)
    per_page = request.args.get('per_page', 10, type=int)
    
    users = User.query.order_by(User.created_at.desc()).paginate(
        page=page, per_page=per_page, error_out=False
    )
    
    return jsonify({
        'users': [user.to_dict() for user in users.items],
        'pagination': {
            'page': users.page,
            'pages': users.pages,
            'per_page': users.per_page,
            'total': users.total
        }
    }), 200
 
@app.route('/api/admin/users/<int:user_id>/toggle-status', methods=['POST'])
@admin_required
@limiter.limit("20 per minute")
def admin_toggle_user_status(user_id):
    """Toggle user active status (admin only)"""
    user = User.query.get_or_404(user_id)
    
    # Prevent admin from deactivating themselves
    current_user_id = get_jwt_identity()
    if user_id == current_user_id:
        return jsonify({'error': 'Cannot deactivate your own account'}), 400
    
    user.is_active = not user.is_active
    user.updated_at = datetime.utcnow()
    db.session.commit()
    
    status = 'activated' if user.is_active else 'deactivated'
    return jsonify({
        'message': f'User {status} successfully',
        'user': user.to_dict()
    }), 200
 
@app.route('/api/admin/stats', methods=['GET'])
@admin_required
@limiter.limit("50 per minute")
def admin_get_stats():
    """Get system statistics (admin only)"""
    stats = {
        'total_users': User.query.count(),
        'active_users': User.query.filter_by(is_active=True).count(),
        'total_posts': Post.query.count(),
        'published_posts': Post.query.filter_by(published=True).count(),
        'recent_registrations': User.query.filter(
            User.created_at >= datetime.utcnow() - timedelta(days=7)
        ).count()
    }
    
    return jsonify({'stats': stats}), 200

Error Handling

1. Global Error Handlers

restapiauth.py
@app.errorhandler(ValidationError)
def handle_validation_error(error):
    """Handle validation errors"""
    return jsonify({'errors': error.messages}), 400
 
@app.errorhandler(404)
def handle_not_found(error):
    """Handle 404 errors"""
    return jsonify({'error': 'Resource not found'}), 404
 
@app.errorhandler(500)
def handle_internal_error(error):
    """Handle internal server errors"""
    db.session.rollback()
    return jsonify({'error': 'Internal server error'}), 500
 
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
    """Handle expired JWT tokens"""
    return jsonify({'error': 'Token has expired'}), 401
 
@jwt.invalid_token_loader
def invalid_token_callback(error):
    """Handle invalid JWT tokens"""
    return jsonify({'error': 'Invalid token'}), 401
 
@jwt.unauthorized_loader
def missing_token_callback(error):
    """Handle missing JWT tokens"""
    return jsonify({'error': 'Authorization token is required'}), 401
restapiauth.py
@app.errorhandler(ValidationError)
def handle_validation_error(error):
    """Handle validation errors"""
    return jsonify({'errors': error.messages}), 400
 
@app.errorhandler(404)
def handle_not_found(error):
    """Handle 404 errors"""
    return jsonify({'error': 'Resource not found'}), 404
 
@app.errorhandler(500)
def handle_internal_error(error):
    """Handle internal server errors"""
    db.session.rollback()
    return jsonify({'error': 'Internal server error'}), 500
 
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
    """Handle expired JWT tokens"""
    return jsonify({'error': 'Token has expired'}), 401
 
@jwt.invalid_token_loader
def invalid_token_callback(error):
    """Handle invalid JWT tokens"""
    return jsonify({'error': 'Invalid token'}), 401
 
@jwt.unauthorized_loader
def missing_token_callback(error):
    """Handle missing JWT tokens"""
    return jsonify({'error': 'Authorization token is required'}), 401

2. Custom Exception Classes

restapiauth.py
class APIException(Exception):
    """Base API exception class"""
    status_code = 500
    
    def __init__(self, message, status_code=None, payload=None):
        super().__init__(message)
        self.message = message
        if status_code is not None:
            self.status_code = status_code
        self.payload = payload
    
    def to_dict(self):
        rv = dict(self.payload or ())
        rv['error'] = self.message
        return rv
 
class ValidationException(APIException):
    """Validation error exception"""
    status_code = 400
 
class AuthenticationException(APIException):
    """Authentication error exception"""
    status_code = 401
 
class AuthorizationException(APIException):
    """Authorization error exception"""
    status_code = 403
 
@app.errorhandler(APIException)
def handle_api_exception(error):
    """Handle custom API exceptions"""
    response = jsonify(error.to_dict())
    response.status_code = error.status_code
    return response
restapiauth.py
class APIException(Exception):
    """Base API exception class"""
    status_code = 500
    
    def __init__(self, message, status_code=None, payload=None):
        super().__init__(message)
        self.message = message
        if status_code is not None:
            self.status_code = status_code
        self.payload = payload
    
    def to_dict(self):
        rv = dict(self.payload or ())
        rv['error'] = self.message
        return rv
 
class ValidationException(APIException):
    """Validation error exception"""
    status_code = 400
 
class AuthenticationException(APIException):
    """Authentication error exception"""
    status_code = 401
 
class AuthorizationException(APIException):
    """Authorization error exception"""
    status_code = 403
 
@app.errorhandler(APIException)
def handle_api_exception(error):
    """Handle custom API exceptions"""
    response = jsonify(error.to_dict())
    response.status_code = error.status_code
    return response

API Documentation

1. OpenAPI Specification

restapiauth.py
from flask import send_from_directory
 
@app.route('/api/docs')
def api_docs():
    """Serve API documentation"""
    return send_from_directory('static', 'swagger-ui.html')
 
@app.route('/api/openapi.json')
def openapi_spec():
    """OpenAPI specification"""
    spec = {
        "openapi": "3.0.0",
        "info": {
            "title": "REST API with JWT Authentication",
            "version": "1.0.0",
            "description": "A comprehensive REST API with JWT authentication"
        },
        "servers": [
            {"url": "http://localhost:5000", "description": "Development server"}
        ],
        "components": {
            "securitySchemes": {
                "bearerAuth": {
                    "type": "http",
                    "scheme": "bearer",
                    "bearerFormat": "JWT"
                }
            }
        },
        "paths": {
            "/api/auth/register": {
                "post": {
                    "summary": "Register a new user",
                    "tags": ["Authentication"],
                    "requestBody": {
                        "required": True,
                        "content": {
                            "application/json": {
                                "schema": {
                                    "type": "object",
                                    "required": ["username", "email", "password", "first_name", "last_name"],
                                    "properties": {
                                        "username": {"type": "string", "minLength": 3},
                                        "email": {"type": "string", "format": "email"},
                                        "password": {"type": "string", "minLength": 6},
                                        "first_name": {"type": "string", "minLength": 2},
                                        "last_name": {"type": "string", "minLength": 2}
                                    }
                                }
                            }
                        }
                    },
                    "responses": {
                        "201": {"description": "User registered successfully"},
                        "400": {"description": "Validation error"}
                    }
                }
            }
        }
    }
    
    return jsonify(spec)
restapiauth.py
from flask import send_from_directory
 
@app.route('/api/docs')
def api_docs():
    """Serve API documentation"""
    return send_from_directory('static', 'swagger-ui.html')
 
@app.route('/api/openapi.json')
def openapi_spec():
    """OpenAPI specification"""
    spec = {
        "openapi": "3.0.0",
        "info": {
            "title": "REST API with JWT Authentication",
            "version": "1.0.0",
            "description": "A comprehensive REST API with JWT authentication"
        },
        "servers": [
            {"url": "http://localhost:5000", "description": "Development server"}
        ],
        "components": {
            "securitySchemes": {
                "bearerAuth": {
                    "type": "http",
                    "scheme": "bearer",
                    "bearerFormat": "JWT"
                }
            }
        },
        "paths": {
            "/api/auth/register": {
                "post": {
                    "summary": "Register a new user",
                    "tags": ["Authentication"],
                    "requestBody": {
                        "required": True,
                        "content": {
                            "application/json": {
                                "schema": {
                                    "type": "object",
                                    "required": ["username", "email", "password", "first_name", "last_name"],
                                    "properties": {
                                        "username": {"type": "string", "minLength": 3},
                                        "email": {"type": "string", "format": "email"},
                                        "password": {"type": "string", "minLength": 6},
                                        "first_name": {"type": "string", "minLength": 2},
                                        "last_name": {"type": "string", "minLength": 2}
                                    }
                                }
                            }
                        }
                    },
                    "responses": {
                        "201": {"description": "User registered successfully"},
                        "400": {"description": "Validation error"}
                    }
                }
            }
        }
    }
    
    return jsonify(spec)

2. API Testing with Postman

{
    "info": {
        "name": "REST API with JWT Auth",
        "description": "Postman collection for testing the API",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "Authentication",
            "item": [
                {
                    "name": "Register User",
                    "request": {
                        "method": "POST",
                        "header": [
                            {
                                "key": "Content-Type",
                                "value": "application/json"
                            }
                        ],
                        "body": {
                            "mode": "raw",
                            "raw": "{\n    \"username\": \"testuser\",\n    \"email\": \"test@example.com\",\n    \"password\": \"password123\",\n    \"first_name\": \"Test\",\n    \"last_name\": \"User\"\n}"
                        },
                        "url": {
                            "raw": "{{base_url}}/api/auth/register",
                            "host": ["{{base_url}}"],
                            "path": ["api", "auth", "register"]
                        }
                    }
                },
                {
                    "name": "Login User",
                    "request": {
                        "method": "POST",
                        "header": [
                            {
                                "key": "Content-Type",
                                "value": "application/json"
                            }
                        ],
                        "body": {
                            "mode": "raw",
                            "raw": "{\n    \"username\": \"testuser\",\n    \"password\": \"password123\"\n}"
                        },
                        "url": {
                            "raw": "{{base_url}}/api/auth/login",
                            "host": ["{{base_url}}"],
                            "path": ["api", "auth", "login"]
                        }
                    }
                }
            ]
        }
    ]
}
{
    "info": {
        "name": "REST API with JWT Auth",
        "description": "Postman collection for testing the API",
        "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
    },
    "item": [
        {
            "name": "Authentication",
            "item": [
                {
                    "name": "Register User",
                    "request": {
                        "method": "POST",
                        "header": [
                            {
                                "key": "Content-Type",
                                "value": "application/json"
                            }
                        ],
                        "body": {
                            "mode": "raw",
                            "raw": "{\n    \"username\": \"testuser\",\n    \"email\": \"test@example.com\",\n    \"password\": \"password123\",\n    \"first_name\": \"Test\",\n    \"last_name\": \"User\"\n}"
                        },
                        "url": {
                            "raw": "{{base_url}}/api/auth/register",
                            "host": ["{{base_url}}"],
                            "path": ["api", "auth", "register"]
                        }
                    }
                },
                {
                    "name": "Login User",
                    "request": {
                        "method": "POST",
                        "header": [
                            {
                                "key": "Content-Type",
                                "value": "application/json"
                            }
                        ],
                        "body": {
                            "mode": "raw",
                            "raw": "{\n    \"username\": \"testuser\",\n    \"password\": \"password123\"\n}"
                        },
                        "url": {
                            "raw": "{{base_url}}/api/auth/login",
                            "host": ["{{base_url}}"],
                            "path": ["api", "auth", "login"]
                        }
                    }
                }
            ]
        }
    ]
}

Running the Application

Development Mode

# Set environment variables
export FLASK_APP=restapi.py
export FLASK_ENV=development
export SECRET_KEY=your-secret-key
export JWT_SECRET_KEY=jwt-secret-key
export DATABASE_URL=sqlite:///api.db
 
# Initialize database
python -c "from restapi import db; db.create_all()"
 
# Run the application
flask run
# Set environment variables
export FLASK_APP=restapi.py
export FLASK_ENV=development
export SECRET_KEY=your-secret-key
export JWT_SECRET_KEY=jwt-secret-key
export DATABASE_URL=sqlite:///api.db
 
# Initialize database
python -c "from restapi import db; db.create_all()"
 
# Run the application
flask run

Production Configuration

restapiauth.py
# config.py
import os
from datetime import timedelta
 
class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'fallback-secret-key'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///api.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
 
class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///api_dev.db'
 
class ProductionConfig(Config):
    DEBUG = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
 
class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    WTF_CSRF_ENABLED = False
 
config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'testing': TestingConfig,
    'default': DevelopmentConfig
}
restapiauth.py
# config.py
import os
from datetime import timedelta
 
class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'fallback-secret-key'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///api.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-secret-key'
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
 
class DevelopmentConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///api_dev.db'
 
class ProductionConfig(Config):
    DEBUG = False
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
 
class TestingConfig(Config):
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    WTF_CSRF_ENABLED = False
 
config = {
    'development': DevelopmentConfig,
    'production': ProductionConfig,
    'testing': TestingConfig,
    'default': DevelopmentConfig
}

Docker Setup

# Dockerfile
FROM python:3.9-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY . .
 
EXPOSE 5000
 
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "restapi:app"]
# Dockerfile
FROM python:3.9-slim
 
WORKDIR /app
 
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
 
COPY . .
 
EXPOSE 5000
 
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "restapi:app"]
# docker-compose.yml
version: '3.8'
 
services:
  api:
    build: .
    ports:
      - "5000:5000"
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://user:password@db:5432/apidb
      - SECRET_KEY=your-secret-key
      - JWT_SECRET_KEY=jwt-secret-key
    depends_on:
      - db
  
  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=apidb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
 
volumes:
  postgres_data:
# docker-compose.yml
version: '3.8'
 
services:
  api:
    build: .
    ports:
      - "5000:5000"
    environment:
      - FLASK_ENV=production
      - DATABASE_URL=postgresql://user:password@db:5432/apidb
      - SECRET_KEY=your-secret-key
      - JWT_SECRET_KEY=jwt-secret-key
    depends_on:
      - db
  
  db:
    image: postgres:13
    environment:
      - POSTGRES_DB=apidb
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
    volumes:
      - postgres_data:/var/lib/postgresql/data
 
volumes:
  postgres_data:

Usage Examples

1. User Registration and Authentication

# Register a new user
curl -X POST http://localhost:5000/api/auth/register \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "email": "john@example.com", 
    "password": "securepassword123",
    "first_name": "John",
    "last_name": "Doe"
  }'
 
# Login user
curl -X POST http://localhost:5000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "password": "securepassword123"
  }'
 
# Response will include access token:
{
  "message": "Login successful",
  "user": {...},
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}
# Register a new user
curl -X POST http://localhost:5000/api/auth/register \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "email": "john@example.com", 
    "password": "securepassword123",
    "first_name": "John",
    "last_name": "Doe"
  }'
 
# Login user
curl -X POST http://localhost:5000/api/auth/login \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "password": "securepassword123"
  }'
 
# Response will include access token:
{
  "message": "Login successful",
  "user": {...},
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
}

2. Making Authenticated Requests

# Get user profile
curl -X GET http://localhost:5000/api/auth/profile \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
 
# Create a new post
curl -X POST http://localhost:5000/api/posts \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "title": "My First Blog Post",
    "content": "This is the content of my first blog post.",
    "published": true
  }'
 
# Get all posts with pagination
curl -X GET "http://localhost:5000/api/posts?page=1&per_page=10&published=true"
# Get user profile
curl -X GET http://localhost:5000/api/auth/profile \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"
 
# Create a new post
curl -X POST http://localhost:5000/api/posts \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "title": "My First Blog Post",
    "content": "This is the content of my first blog post.",
    "published": true
  }'
 
# Get all posts with pagination
curl -X GET "http://localhost:5000/api/posts?page=1&per_page=10&published=true"

3. Admin Operations

# Get system statistics (admin only)
curl -X GET http://localhost:5000/api/admin/stats \
  -H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
 
# Get all users (admin only)
curl -X GET http://localhost:5000/api/admin/users \
  -H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
 
# Toggle user status (admin only)
curl -X POST http://localhost:5000/api/admin/users/2/toggle-status \
  -H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
# Get system statistics (admin only)
curl -X GET http://localhost:5000/api/admin/stats \
  -H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
 
# Get all users (admin only)
curl -X GET http://localhost:5000/api/admin/users \
  -H "Authorization: Bearer ADMIN_ACCESS_TOKEN"
 
# Toggle user status (admin only)
curl -X POST http://localhost:5000/api/admin/users/2/toggle-status \
  -H "Authorization: Bearer ADMIN_ACCESS_TOKEN"

Testing Suite

1. Unit Tests

restapiauth.py
# tests/test_auth.py
import unittest
from restapi import app, db
from restapi.models import User
 
class AuthTestCase(unittest.TestCase):
    def setUp(self):
        """Set up test fixtures"""
        app.config['TESTING'] = True
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
        self.app = app.test_client()
        self.app_context = app.app_context()
        self.app_context.push()
        db.create_all()
    
    def tearDown(self):
        """Clean up test fixtures"""
        db.session.remove()
        db.drop_all()
        self.app_context.pop()
    
    def test_user_registration(self):
        """Test user registration"""
        response = self.app.post('/api/auth/register', json={
            'username': 'testuser',
            'email': 'test@example.com',
            'password': 'password123',
            'first_name': 'Test',
            'last_name': 'User'
        })
        
        self.assertEqual(response.status_code, 201)
        data = response.get_json()
        self.assertIn('access_token', data)
        self.assertEqual(data['user']['username'], 'testuser')
    
    def test_user_login(self):
        """Test user login"""
        # Create test user
        user = User(username='testuser', email='test@example.com',
                   first_name='Test', last_name='User')
        user.set_password('password123')
        db.session.add(user)
        db.session.commit()
        
        # Test login
        response = self.app.post('/api/auth/login', json={
            'username': 'testuser',
            'password': 'password123'
        })
        
        self.assertEqual(response.status_code, 200)
        data = response.get_json()
        self.assertIn('access_token', data)
    
    def test_invalid_login(self):
        """Test invalid login credentials"""
        response = self.app.post('/api/auth/login', json={
            'username': 'nonexistent',
            'password': 'wrongpassword'
        })
        
        self.assertEqual(response.status_code, 401)
        data = response.get_json()
        self.assertIn('error', data)
 
if __name__ == '__main__':
    unittest.main()
restapiauth.py
# tests/test_auth.py
import unittest
from restapi import app, db
from restapi.models import User
 
class AuthTestCase(unittest.TestCase):
    def setUp(self):
        """Set up test fixtures"""
        app.config['TESTING'] = True
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
        self.app = app.test_client()
        self.app_context = app.app_context()
        self.app_context.push()
        db.create_all()
    
    def tearDown(self):
        """Clean up test fixtures"""
        db.session.remove()
        db.drop_all()
        self.app_context.pop()
    
    def test_user_registration(self):
        """Test user registration"""
        response = self.app.post('/api/auth/register', json={
            'username': 'testuser',
            'email': 'test@example.com',
            'password': 'password123',
            'first_name': 'Test',
            'last_name': 'User'
        })
        
        self.assertEqual(response.status_code, 201)
        data = response.get_json()
        self.assertIn('access_token', data)
        self.assertEqual(data['user']['username'], 'testuser')
    
    def test_user_login(self):
        """Test user login"""
        # Create test user
        user = User(username='testuser', email='test@example.com',
                   first_name='Test', last_name='User')
        user.set_password('password123')
        db.session.add(user)
        db.session.commit()
        
        # Test login
        response = self.app.post('/api/auth/login', json={
            'username': 'testuser',
            'password': 'password123'
        })
        
        self.assertEqual(response.status_code, 200)
        data = response.get_json()
        self.assertIn('access_token', data)
    
    def test_invalid_login(self):
        """Test invalid login credentials"""
        response = self.app.post('/api/auth/login', json={
            'username': 'nonexistent',
            'password': 'wrongpassword'
        })
        
        self.assertEqual(response.status_code, 401)
        data = response.get_json()
        self.assertIn('error', data)
 
if __name__ == '__main__':
    unittest.main()

2. Integration Tests

restapiauth.py
# tests/test_api.py
import unittest
from restapi import app, db
from restapi.models import User, Post
 
class APITestCase(unittest.TestCase):
    def setUp(self):
        """Set up test fixtures"""
        app.config['TESTING'] = True
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
        self.app = app.test_client()
        self.app_context = app.app_context()
        self.app_context.push()
        db.create_all()
        
        # Create test user and get token
        user = User(username='testuser', email='test@example.com',
                   first_name='Test', last_name='User')
        user.set_password('password123')
        db.session.add(user)
        db.session.commit()
        
        # Login to get token
        response = self.app.post('/api/auth/login', json={
            'username': 'testuser',
            'password': 'password123'
        })
        self.token = response.get_json()['access_token']
        self.headers = {'Authorization': f'Bearer {self.token}'}
    
    def tearDown(self):
        """Clean up test fixtures"""
        db.session.remove()
        db.drop_all()
        self.app_context.pop()
    
    def test_create_post(self):
        """Test post creation"""
        response = self.app.post('/api/posts', 
                                headers=self.headers,
                                json={
                                    'title': 'Test Post Title',
                                    'content': 'This is test content for the post.',
                                    'published': True
                                })
        
        self.assertEqual(response.status_code, 201)
        data = response.get_json()
        self.assertEqual(data['post']['title'], 'Test Post Title')
    
    def test_get_posts(self):
        """Test getting posts"""
        # Create test post
        post = Post(title='Test Post', content='Test content',
                   slug='test-post', user_id=1, published=True)
        db.session.add(post)
        db.session.commit()
        
        response = self.app.get('/api/posts')
        self.assertEqual(response.status_code, 200)
        data = response.get_json()
        self.assertIn('posts', data)
        self.assertEqual(len(data['posts']), 1)
    
    def test_unauthorized_access(self):
        """Test unauthorized access to protected endpoints"""
        response = self.app.post('/api/posts', json={
            'title': 'Test Post',
            'content': 'Test content'
        })
        
        self.assertEqual(response.status_code, 401)
 
if __name__ == '__main__':
    unittest.main()
restapiauth.py
# tests/test_api.py
import unittest
from restapi import app, db
from restapi.models import User, Post
 
class APITestCase(unittest.TestCase):
    def setUp(self):
        """Set up test fixtures"""
        app.config['TESTING'] = True
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
        self.app = app.test_client()
        self.app_context = app.app_context()
        self.app_context.push()
        db.create_all()
        
        # Create test user and get token
        user = User(username='testuser', email='test@example.com',
                   first_name='Test', last_name='User')
        user.set_password('password123')
        db.session.add(user)
        db.session.commit()
        
        # Login to get token
        response = self.app.post('/api/auth/login', json={
            'username': 'testuser',
            'password': 'password123'
        })
        self.token = response.get_json()['access_token']
        self.headers = {'Authorization': f'Bearer {self.token}'}
    
    def tearDown(self):
        """Clean up test fixtures"""
        db.session.remove()
        db.drop_all()
        self.app_context.pop()
    
    def test_create_post(self):
        """Test post creation"""
        response = self.app.post('/api/posts', 
                                headers=self.headers,
                                json={
                                    'title': 'Test Post Title',
                                    'content': 'This is test content for the post.',
                                    'published': True
                                })
        
        self.assertEqual(response.status_code, 201)
        data = response.get_json()
        self.assertEqual(data['post']['title'], 'Test Post Title')
    
    def test_get_posts(self):
        """Test getting posts"""
        # Create test post
        post = Post(title='Test Post', content='Test content',
                   slug='test-post', user_id=1, published=True)
        db.session.add(post)
        db.session.commit()
        
        response = self.app.get('/api/posts')
        self.assertEqual(response.status_code, 200)
        data = response.get_json()
        self.assertIn('posts', data)
        self.assertEqual(len(data['posts']), 1)
    
    def test_unauthorized_access(self):
        """Test unauthorized access to protected endpoints"""
        response = self.app.post('/api/posts', json={
            'title': 'Test Post',
            'content': 'Test content'
        })
        
        self.assertEqual(response.status_code, 401)
 
if __name__ == '__main__':
    unittest.main()

Sample Output

Successful Registration Response

{
  "message": "User registered successfully",
  "user": {
    "id": 1,
    "username": "johndoe",
    "email": "john@example.com",
    "first_name": "John",
    "last_name": "Doe",
    "role": "user",
    "is_active": true,
    "created_at": "2024-01-15T10:30:00",
    "updated_at": "2024-01-15T10:30:00"
  },
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTYzOTU2MjIwMCwianRpIjoiNGY1NjcxMmUtZGZjOS00MDk5LWEyYWUtMmZjMzg5OGY4MTM1IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MSwibmJmIjoxNjM5NTYyMjAwLCJleHAiOjE2Mzk1NjU4MDB9.X1P9z6v8QD2l0o4tH5nF2Qx8r7B3KkM9A6tE5yS8pW1"
}
{
  "message": "User registered successfully",
  "user": {
    "id": 1,
    "username": "johndoe",
    "email": "john@example.com",
    "first_name": "John",
    "last_name": "Doe",
    "role": "user",
    "is_active": true,
    "created_at": "2024-01-15T10:30:00",
    "updated_at": "2024-01-15T10:30:00"
  },
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJmcmVzaCI6ZmFsc2UsImlhdCI6MTYzOTU2MjIwMCwianRpIjoiNGY1NjcxMmUtZGZjOS00MDk5LWEyYWUtMmZjMzg5OGY4MTM1IiwidHlwZSI6ImFjY2VzcyIsInN1YiI6MSwibmJmIjoxNjM5NTYyMjAwLCJleHAiOjE2Mzk1NjU4MDB9.X1P9z6v8QD2l0o4tH5nF2Qx8r7B3KkM9A6tE5yS8pW1"
}

Posts List Response

{
  "posts": [
    {
      "id": 1,
      "title": "Getting Started with Flask",
      "content": "Flask is a lightweight web framework for Python...",
      "slug": "getting-started-with-flask",
      "published": true,
      "user_id": 1,
      "author": "johndoe",
      "created_at": "2024-01-15T11:00:00",
      "updated_at": "2024-01-15T11:00:00"
    }
  ],
  "pagination": {
    "page": 1,
    "pages": 1,
    "per_page": 10,
    "total": 1,
    "has_next": false,
    "has_prev": false
  }
}
{
  "posts": [
    {
      "id": 1,
      "title": "Getting Started with Flask",
      "content": "Flask is a lightweight web framework for Python...",
      "slug": "getting-started-with-flask",
      "published": true,
      "user_id": 1,
      "author": "johndoe",
      "created_at": "2024-01-15T11:00:00",
      "updated_at": "2024-01-15T11:00:00"
    }
  ],
  "pagination": {
    "page": 1,
    "pages": 1,
    "per_page": 10,
    "total": 1,
    "has_next": false,
    "has_prev": false
  }
}

Admin Statistics Response

{
  "stats": {
    "total_users": 15,
    "active_users": 14,
    "total_posts": 42,
    "published_posts": 38,
    "recent_registrations": 3
  }
}
{
  "stats": {
    "total_users": 15,
    "active_users": 14,
    "total_posts": 42,
    "published_posts": 38,
    "recent_registrations": 3
  }
}

Security Best Practices

1. Password Security

restapiauth.py
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
 
def generate_secure_password():
    """Generate a secure random password"""
    return secrets.token_urlsafe(16)
 
def validate_password_strength(password):
    """Validate password strength"""
    if len(password) < 8:
        return False, "Password must be at least 8 characters long"
    
    if not any(c.isupper() for c in password):
        return False, "Password must contain at least one uppercase letter"
    
    if not any(c.islower() for c in password):
        return False, "Password must contain at least one lowercase letter"
    
    if not any(c.isdigit() for c in password):
        return False, "Password must contain at least one digit"
    
    return True, "Password is strong"
restapiauth.py
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
 
def generate_secure_password():
    """Generate a secure random password"""
    return secrets.token_urlsafe(16)
 
def validate_password_strength(password):
    """Validate password strength"""
    if len(password) < 8:
        return False, "Password must be at least 8 characters long"
    
    if not any(c.isupper() for c in password):
        return False, "Password must contain at least one uppercase letter"
    
    if not any(c.islower() for c in password):
        return False, "Password must contain at least one lowercase letter"
    
    if not any(c.isdigit() for c in password):
        return False, "Password must contain at least one digit"
    
    return True, "Password is strong"

2. Rate Limiting Configuration

restapiauth.py
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
 
# Configure rate limiting
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["1000 per day", "100 per hour"],
    storage_uri="redis://localhost:6379"
)
 
# Custom rate limit for sensitive endpoints
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
    # Login implementation
    pass
restapiauth.py
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
 
# Configure rate limiting
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["1000 per day", "100 per hour"],
    storage_uri="redis://localhost:6379"
)
 
# Custom rate limit for sensitive endpoints
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute")
def login():
    # Login implementation
    pass

3. Input Sanitization

restapiauth.py
import bleach
from html import escape
 
def sanitize_input(text):
    """Sanitize user input"""
    # Remove HTML tags and escape special characters
    cleaned = bleach.clean(text, tags=[], strip=True)
    return escape(cleaned)
 
def validate_slug(slug):
    """Validate URL slug format"""
    import re
    pattern = r'^[a-z0-9]+(?:-[a-z0-9]+)*$'
    return bool(re.match(pattern, slug))
restapiauth.py
import bleach
from html import escape
 
def sanitize_input(text):
    """Sanitize user input"""
    # Remove HTML tags and escape special characters
    cleaned = bleach.clean(text, tags=[], strip=True)
    return escape(cleaned)
 
def validate_slug(slug):
    """Validate URL slug format"""
    import re
    pattern = r'^[a-z0-9]+(?:-[a-z0-9]+)*$'
    return bool(re.match(pattern, slug))

Performance Optimization

1. Database Optimization

restapiauth.py
# Add database indexes
class User(db.Model):
    # ... existing fields ...
    
    __table_args__ = (
        db.Index('idx_username_email', 'username', 'email'),
        db.Index('idx_created_at', 'created_at'),
    )
 
# Use query optimization
def get_user_posts_optimized(user_id, limit=10):
    """Get user posts with optimized query"""
    return Post.query.filter_by(user_id=user_id)\
                    .options(db.joinedload(Post.author))\
                    .order_by(Post.created_at.desc())\
                    .limit(limit).all()
restapiauth.py
# Add database indexes
class User(db.Model):
    # ... existing fields ...
    
    __table_args__ = (
        db.Index('idx_username_email', 'username', 'email'),
        db.Index('idx_created_at', 'created_at'),
    )
 
# Use query optimization
def get_user_posts_optimized(user_id, limit=10):
    """Get user posts with optimized query"""
    return Post.query.filter_by(user_id=user_id)\
                    .options(db.joinedload(Post.author))\
                    .order_by(Post.created_at.desc())\
                    .limit(limit).all()

2. Caching Implementation

restapiauth.py
from flask_caching import Cache
 
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
 
@app.route('/api/posts')
@cache.cached(timeout=300, query_string=True)
def get_posts_cached():
    """Cached posts endpoint"""
    # Implementation with caching
    pass
 
@cache.memoize(timeout=600)
def get_user_stats(user_id):
    """Cached user statistics"""
    user = User.query.get(user_id)
    return {
        'post_count': Post.query.filter_by(user_id=user_id).count(),
        'published_count': Post.query.filter_by(user_id=user_id, published=True).count()
    }
restapiauth.py
from flask_caching import Cache
 
cache = Cache(app, config={'CACHE_TYPE': 'simple'})
 
@app.route('/api/posts')
@cache.cached(timeout=300, query_string=True)
def get_posts_cached():
    """Cached posts endpoint"""
    # Implementation with caching
    pass
 
@cache.memoize(timeout=600)
def get_user_stats(user_id):
    """Cached user statistics"""
    user = User.query.get(user_id)
    return {
        'post_count': Post.query.filter_by(user_id=user_id).count(),
        'published_count': Post.query.filter_by(user_id=user_id, published=True).count()
    }

3. Pagination Optimization

restapiauth.py
def paginate_query(query, page, per_page, max_per_page=100):
    """Optimized pagination helper"""
    per_page = min(per_page, max_per_page)  # Prevent abuse
    
    return query.paginate(
        page=page,
        per_page=per_page,
        error_out=False
    )
 
def get_pagination_meta(pagination):
    """Get pagination metadata"""
    return {
        'page': pagination.page,
        'pages': pagination.pages,
        'per_page': pagination.per_page,
        'total': pagination.total,
        'has_next': pagination.has_next,
        'has_prev': pagination.has_prev,
        'next_num': pagination.next_num,
        'prev_num': pagination.prev_num
    }
restapiauth.py
def paginate_query(query, page, per_page, max_per_page=100):
    """Optimized pagination helper"""
    per_page = min(per_page, max_per_page)  # Prevent abuse
    
    return query.paginate(
        page=page,
        per_page=per_page,
        error_out=False
    )
 
def get_pagination_meta(pagination):
    """Get pagination metadata"""
    return {
        'page': pagination.page,
        'pages': pagination.pages,
        'per_page': pagination.per_page,
        'total': pagination.total,
        'has_next': pagination.has_next,
        'has_prev': pagination.has_prev,
        'next_num': pagination.next_num,
        'prev_num': pagination.prev_num
    }

Troubleshooting

Common Issues

1. JWT Token Errors

restapiauth.py
# Solution: Check token expiration and format
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
    return jsonify({'error': 'Token has expired', 'code': 'TOKEN_EXPIRED'}), 401
 
@jwt.invalid_token_loader  
def invalid_token_callback(error):
    return jsonify({'error': 'Invalid token format', 'code': 'INVALID_TOKEN'}), 401
restapiauth.py
# Solution: Check token expiration and format
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
    return jsonify({'error': 'Token has expired', 'code': 'TOKEN_EXPIRED'}), 401
 
@jwt.invalid_token_loader  
def invalid_token_callback(error):
    return jsonify({'error': 'Invalid token format', 'code': 'INVALID_TOKEN'}), 401

2. Database Connection Issues

restapiauth.py
# Solution: Add connection pooling and error handling
from sqlalchemy import event
from sqlalchemy.pool import Pool
 
@event.listens_for(Pool, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
    if 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']:
        cursor = dbapi_connection.cursor()
        cursor.execute("PRAGMA foreign_keys=ON")
        cursor.close()
restapiauth.py
# Solution: Add connection pooling and error handling
from sqlalchemy import event
from sqlalchemy.pool import Pool
 
@event.listens_for(Pool, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
    if 'sqlite' in app.config['SQLALCHEMY_DATABASE_URI']:
        cursor = dbapi_connection.cursor()
        cursor.execute("PRAGMA foreign_keys=ON")
        cursor.close()

3. Rate Limiting Issues

restapiauth.py
# Solution: Configure appropriate limits and storage
@app.errorhandler(429)
def handle_rate_limit_exceeded(error):
    return jsonify({
        'error': 'Rate limit exceeded',
        'retry_after': error.retry_after
    }), 429
restapiauth.py
# Solution: Configure appropriate limits and storage
@app.errorhandler(429)
def handle_rate_limit_exceeded(error):
    return jsonify({
        'error': 'Rate limit exceeded',
        'retry_after': error.retry_after
    }), 429

Extensions and Improvements

1. Refresh Token Implementation

restapiauth.py
@app.route('/api/auth/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
    """Refresh access token"""
    current_user_id = get_jwt_identity()
    new_token = create_access_token(identity=current_user_id)
    return jsonify({'access_token': new_token}), 200
restapiauth.py
@app.route('/api/auth/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
    """Refresh access token"""
    current_user_id = get_jwt_identity()
    new_token = create_access_token(identity=current_user_id)
    return jsonify({'access_token': new_token}), 200

2. Email Verification System

restapiauth.py
from flask_mail import Mail, Message
import secrets
 
def send_verification_email(user):
    """Send email verification"""
    token = secrets.token_urlsafe(32)
    # Store token in database
    verification = EmailVerification(user_id=user.id, token=token)
    db.session.add(verification)
    db.session.commit()
    
    # Send email
    msg = Message('Verify Your Email',
                  recipients=[user.email],
                  body=f'Click to verify: {url_for("verify_email", token=token, _external=True)}')
    mail.send(msg)
restapiauth.py
from flask_mail import Mail, Message
import secrets
 
def send_verification_email(user):
    """Send email verification"""
    token = secrets.token_urlsafe(32)
    # Store token in database
    verification = EmailVerification(user_id=user.id, token=token)
    db.session.add(verification)
    db.session.commit()
    
    # Send email
    msg = Message('Verify Your Email',
                  recipients=[user.email],
                  body=f'Click to verify: {url_for("verify_email", token=token, _external=True)}')
    mail.send(msg)

3. API Versioning

restapiauth.py
from flask import Blueprint
 
# API v1
api_v1 = Blueprint('api_v1', __name__, url_prefix='/api/v1')
 
@api_v1.route('/posts')
def get_posts_v1():
    """Posts endpoint v1"""
    pass
 
# API v2  
api_v2 = Blueprint('api_v2', __name__, url_prefix='/api/v2')
 
@api_v2.route('/posts')
def get_posts_v2():
    """Posts endpoint v2 with enhanced features"""
    pass
 
app.register_blueprint(api_v1)
app.register_blueprint(api_v2)
restapiauth.py
from flask import Blueprint
 
# API v1
api_v1 = Blueprint('api_v1', __name__, url_prefix='/api/v1')
 
@api_v1.route('/posts')
def get_posts_v1():
    """Posts endpoint v1"""
    pass
 
# API v2  
api_v2 = Blueprint('api_v2', __name__, url_prefix='/api/v2')
 
@api_v2.route('/posts')
def get_posts_v2():
    """Posts endpoint v2 with enhanced features"""
    pass
 
app.register_blueprint(api_v1)
app.register_blueprint(api_v2)

Next Steps

After mastering this REST API, consider:

  1. GraphQL Integration: Add GraphQL endpoints for flexible queries
  2. Microservices Architecture: Split into smaller, focused services
  3. WebSocket Support: Add real-time features with Socket.IO
  4. Advanced Authentication: OAuth2, SAML, or multi-factor authentication
  5. API Gateway: Implement API gateway for advanced routing and monitoring

Resources

Conclusion

This REST API with JWT authentication demonstrates professional API development practices with Flask. It includes comprehensive user management, secure authentication, CRUD operations, rate limiting, and proper error handling. The modular design allows for easy extension and customization for various use cases.

The API follows RESTful principles and includes security best practices, making it suitable for production use. The comprehensive testing suite and documentation ensure maintainability and ease of use for other developers. 🔐🐍

Was this page helpful?

Let us know how we did