import os import uuid import hashlib import magic import logging from datetime import datetime from typing import Dict, Any, Optional from flask import Flask, request, jsonify from werkzeug.utils import secure_filename import psycopg2 from psycopg2.extras import RealDictCursor from minio import Minio from minio.error import S3Error import structlog # Configure structured logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger() app = Flask(__name__) # Configuration MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000') MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin') MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin123') MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME', 'file-transformer-bucket') MINIO_USE_SSL = os.getenv('MINIO_USE_SSL', 'false').lower() == 'true' POSTGRES_URL = os.getenv('POSTGRES_URL', 'postgresql://file_user:secure_password_123@localhost:5432/file_transformer') # Initialize MinIO client minio_client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_USE_SSL ) def get_db_connection(): """Create a database connection.""" return psycopg2.connect(POSTGRES_URL) def calculate_file_hash(file_data: bytes) -> str: """Calculate SHA-256 hash of file data.""" return hashlib.sha256(file_data).hexdigest() def get_file_metadata(file_data: bytes, filename: str) -> Dict[str, Any]: """Extract file metadata including MIME type and size.""" mime_type = magic.from_buffer(file_data, mime=True) file_size = len(file_data) # Determine file type from extension file_extension = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' return { 'mime_type': mime_type, 'file_size': file_size, 'file_type': file_extension, 'checksum': calculate_file_hash(file_data) } def save_file_to_database(file_data: bytes, filename: str, object_key: str, metadata: Dict[str, Any]) -> str: """Save file information to PostgreSQL database.""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: # Insert file record cur.execute(""" INSERT INTO files ( filename, original_filename, file_path, file_size, file_type, mime_type, bucket_name, object_key, checksum, status, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id """, ( filename, filename, object_key, metadata['file_size'], metadata['file_type'], metadata['mime_type'], MINIO_BUCKET_NAME, object_key, metadata['checksum'], 'uploaded', datetime.utcnow() )) file_id = cur.fetchone()['id'] conn.commit() return str(file_id) except Exception as e: conn.rollback() logger.error("Database error", error=str(e)) raise finally: conn.close() def upload_to_minio(file_data: bytes, object_key: str) -> bool: """Upload file to MinIO bucket.""" try: # Ensure bucket exists if not minio_client.bucket_exists(MINIO_BUCKET_NAME): minio_client.make_bucket(MINIO_BUCKET_NAME) logger.info("Created bucket", bucket=MINIO_BUCKET_NAME) # Upload file minio_client.put_object( MINIO_BUCKET_NAME, object_key, file_data, length=len(file_data) ) logger.info("File uploaded to MinIO", bucket=MINIO_BUCKET_NAME, object_key=object_key) return True except S3Error as e: logger.error("MinIO upload error", error=str(e)) return False @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint.""" return jsonify({'status': 'healthy', 'service': 'file-upload'}) @app.route('/upload', methods=['POST']) def upload_file(): """Handle file upload request.""" try: # Check if file is present in request if 'file' not in request.files: return jsonify({'error': 'No file provided'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 # Read file data file_data = file.read() if not file_data: return jsonify({'error': 'Empty file'}), 400 # Secure filename and generate object key filename = secure_filename(file.filename) file_id = str(uuid.uuid4()) object_key = f"uploads/{file_id}/{filename}" # Extract metadata metadata = get_file_metadata(file_data, filename) logger.info("Processing file upload", filename=filename, size=metadata['file_size'], mime_type=metadata['mime_type']) # Upload to MinIO if not upload_to_minio(file_data, object_key): return jsonify({'error': 'Failed to upload file to storage'}), 500 # Save to database db_file_id = save_file_to_database(file_data, filename, object_key, metadata) # Log access log_file_access(db_file_id, 'upload', request.remote_addr, request.headers.get('User-Agent')) response_data = { 'file_id': db_file_id, 'filename': filename, 'object_key': object_key, 'file_size': metadata['file_size'], 'mime_type': metadata['mime_type'], 'checksum': metadata['checksum'], 'status': 'uploaded', 'uploaded_at': datetime.utcnow().isoformat() } logger.info("File upload completed", file_id=db_file_id, filename=filename) return jsonify(response_data), 201 except Exception as e: logger.error("Upload error", error=str(e)) return jsonify({'error': 'Internal server error'}), 500 def log_file_access(file_id: str, action: str, ip_address: str, user_agent: Optional[str]): """Log file access for audit purposes.""" conn = get_db_connection() try: with conn.cursor() as cur: cur.execute(""" INSERT INTO file_access_logs (file_id, action, ip_address, user_agent) VALUES (%s, %s, %s, %s) """, (file_id, action, ip_address, user_agent)) conn.commit() except Exception as e: logger.error("Failed to log file access", error=str(e)) conn.rollback() finally: conn.close() @app.route('/files/', methods=['GET']) def get_file_info(file_id: str): """Get file information by ID.""" try: conn = get_db_connection() with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT id, filename, original_filename, file_size, file_type, mime_type, bucket_name, object_key, checksum, status, created_at, updated_at FROM files WHERE id = %s """, (file_id,)) file_record = cur.fetchone() if not file_record: return jsonify({'error': 'File not found'}), 404 return jsonify(dict(file_record)), 200 except Exception as e: logger.error("Error fetching file info", error=str(e)) return jsonify({'error': 'Internal server error'}), 500 finally: conn.close() @app.route('/files/', methods=['DELETE']) def delete_file(file_id: str): """Delete file from storage and database.""" try: conn = get_db_connection() with conn.cursor(cursor_factory=RealDictCursor) as cur: # Get file info cur.execute("SELECT object_key FROM files WHERE id = %s", (file_id,)) file_record = cur.fetchone() if not file_record: return jsonify({'error': 'File not found'}), 404 object_key = file_record['object_key'] # Delete from MinIO try: minio_client.remove_object(MINIO_BUCKET_NAME, object_key) logger.info("File deleted from MinIO", object_key=object_key) except S3Error as e: logger.warning("File not found in MinIO", object_key=object_key, error=str(e)) # Mark as deleted in database cur.execute(""" UPDATE files SET status = 'deleted', deleted_at = %s WHERE id = %s """, (datetime.utcnow(), file_id)) conn.commit() # Log access log_file_access(file_id, 'delete', request.remote_addr, request.headers.get('User-Agent')) return jsonify({'message': 'File deleted successfully'}), 200 except Exception as e: logger.error("Error deleting file", error=str(e)) return jsonify({'error': 'Internal server error'}), 500 finally: conn.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)