import os import uuid import json import tempfile import logging from datetime import datetime from typing import Dict, Any, Optional, List from pathlib import Path from flask import Flask, request, jsonify import psycopg2 from psycopg2.extras import RealDictCursor from minio import Minio from minio.error import S3Error import structlog # File processing imports import PyPDF2 from docx import Document import pandas as pd from PIL import Image import io # Configure structured logging structlog.configure( processors=[ structlog.stdlib.filter_by_level, structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), structlog.processors.TimeStamper(fmt="iso"), structlog.processors.StackInfoRenderer(), structlog.processors.format_exc_info, structlog.processors.UnicodeDecoder(), structlog.processors.JSONRenderer() ], context_class=dict, logger_factory=structlog.stdlib.LoggerFactory(), wrapper_class=structlog.stdlib.BoundLogger, cache_logger_on_first_use=True, ) logger = structlog.get_logger() app = Flask(__name__) # Configuration MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000') MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin') MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin123') MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME', 'file-transformer-bucket') MINIO_USE_SSL = os.getenv('MINIO_USE_SSL', 'false').lower() == 'true' POSTGRES_URL = os.getenv('POSTGRES_URL', 'postgresql://file_user:secure_password_123@localhost:5432/file_transformer') # Initialize MinIO client minio_client = Minio( MINIO_ENDPOINT, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=MINIO_USE_SSL ) def get_db_connection(): """Create a database connection.""" return psycopg2.connect(POSTGRES_URL) def get_file_from_minio(object_key: str) -> bytes: """Download file from MinIO.""" try: response = minio_client.get_object(MINIO_BUCKET_NAME, object_key) return response.read() except S3Error as e: logger.error("Failed to get file from MinIO", object_key=object_key, error=str(e)) raise def upload_file_to_minio(file_data: bytes, object_key: str) -> bool: """Upload file to MinIO.""" try: minio_client.put_object( MINIO_BUCKET_NAME, object_key, file_data, length=len(file_data) ) return True except S3Error as e: logger.error("Failed to upload file to MinIO", object_key=object_key, error=str(e)) return False def extract_text_from_pdf(file_data: bytes) -> str: """Extract text from PDF file.""" try: pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data)) text = "" for page in pdf_reader.pages: text += page.extract_text() + "\n" return text.strip() except Exception as e: logger.error("PDF text extraction failed", error=str(e)) raise def extract_text_from_docx(file_data: bytes) -> str: """Extract text from DOCX file.""" try: doc = Document(io.BytesIO(file_data)) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text.strip() except Exception as e: logger.error("DOCX text extraction failed", error=str(e)) raise def convert_csv_to_json(file_data: bytes) -> List[Dict[str, Any]]: """Convert CSV to JSON format.""" try: df = pd.read_csv(io.BytesIO(file_data)) return df.to_dict('records') except Exception as e: logger.error("CSV to JSON conversion failed", error=str(e)) raise def convert_excel_to_json(file_data: bytes) -> List[Dict[str, Any]]: """Convert Excel to JSON format.""" try: df = pd.read_excel(io.BytesIO(file_data)) return df.to_dict('records') except Exception as e: logger.error("Excel to JSON conversion failed", error=str(e)) raise def resize_image(file_data: bytes, width: int, height: int) -> bytes: """Resize image to specified dimensions.""" try: image = Image.open(io.BytesIO(file_data)) resized_image = image.resize((width, height), Image.Resampling.LANCZOS) output = io.BytesIO() resized_image.save(output, format=image.format or 'JPEG') return output.getvalue() except Exception as e: logger.error("Image resize failed", error=str(e)) raise def convert_image_format(file_data: bytes, target_format: str) -> bytes: """Convert image to different format.""" try: image = Image.open(io.BytesIO(file_data)) output = io.BytesIO() image.save(output, format=target_format.upper()) return output.getvalue() except Exception as e: logger.error("Image format conversion failed", error=str(e)) raise def create_transformation_record(file_id: str, transformation_type: str, config: Dict[str, Any]) -> str: """Create transformation record in database.""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" INSERT INTO transformations ( file_id, transformation_type, input_path, status, config, created_at ) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id """, ( file_id, transformation_type, f"files/{file_id}", 'pending', json.dumps(config), datetime.utcnow() )) transformation_id = cur.fetchone()['id'] conn.commit() return str(transformation_id) except Exception as e: conn.rollback() logger.error("Failed to create transformation record", error=str(e)) raise finally: conn.close() def update_transformation_status(transformation_id: str, status: str, result: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None): """Update transformation status in database.""" conn = get_db_connection() try: with conn.cursor() as cur: if status == 'processing': cur.execute(""" UPDATE transformations SET status = %s, started_at = %s WHERE id = %s """, (status, datetime.utcnow(), transformation_id)) elif status == 'completed': cur.execute(""" UPDATE transformations SET status = %s, completed_at = %s, result = %s WHERE id = %s """, (status, datetime.utcnow(), json.dumps(result), transformation_id)) elif status == 'failed': cur.execute(""" UPDATE transformations SET status = %s, completed_at = %s, error_message = %s WHERE id = %s """, (status, datetime.utcnow(), error_message, transformation_id)) conn.commit() except Exception as e: conn.rollback() logger.error("Failed to update transformation status", error=str(e)) raise finally: conn.close() def get_file_info(file_id: str) -> Optional[Dict[str, Any]]: """Get file information from database.""" conn = get_db_connection() try: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT id, filename, file_type, mime_type, object_key, status FROM files WHERE id = %s """, (file_id,)) file_record = cur.fetchone() return dict(file_record) if file_record else None except Exception as e: logger.error("Failed to get file info", error=str(e)) return None finally: conn.close() @app.route('/health', methods=['GET']) def health_check(): """Health check endpoint.""" return jsonify({'status': 'healthy', 'service': 'file-transform'}) @app.route('/transform', methods=['POST']) def transform_file(): """Handle file transformation request.""" try: data = request.get_json() if not data: return jsonify({'error': 'No data provided'}), 400 file_id = data.get('file_id') transformation_type = data.get('transformation_type') config = data.get('config', {}) if not file_id or not transformation_type: return jsonify({'error': 'file_id and transformation_type are required'}), 400 # Get file information file_info = get_file_info(file_id) if not file_info: return jsonify({'error': 'File not found'}), 404 if file_info['status'] == 'deleted': return jsonify({'error': 'File has been deleted'}), 400 # Create transformation record transformation_id = create_transformation_record(file_id, transformation_type, config) # Update status to processing update_transformation_status(transformation_id, 'processing') logger.info("Starting transformation", file_id=file_id, transformation_id=transformation_id, transformation_type=transformation_type) try: # Get file from MinIO file_data = get_file_from_minio(file_info['object_key']) # Perform transformation based on type result = None output_data = None if transformation_type == 'extract_text': if file_info['file_type'] == 'pdf': result = extract_text_from_pdf(file_data) elif file_info['file_type'] in ['docx', 'doc']: result = extract_text_from_docx(file_data) else: raise ValueError(f"Text extraction not supported for file type: {file_info['file_type']}") # Save extracted text as new file output_filename = f"{Path(file_info['filename']).stem}_extracted.txt" output_object_key = f"transformations/{transformation_id}/{output_filename}" output_data = result.encode('utf-8') elif transformation_type == 'csv_to_json': if file_info['file_type'] != 'csv': raise ValueError("CSV to JSON conversion only supports CSV files") result = convert_csv_to_json(file_data) output_filename = f"{Path(file_info['filename']).stem}.json" output_object_key = f"transformations/{transformation_id}/{output_filename}" output_data = json.dumps(result, indent=2).encode('utf-8') elif transformation_type == 'excel_to_json': if file_info['file_type'] not in ['xlsx', 'xls']: raise ValueError("Excel to JSON conversion only supports Excel files") result = convert_excel_to_json(file_data) output_filename = f"{Path(file_info['filename']).stem}.json" output_object_key = f"transformations/{transformation_id}/{output_filename}" output_data = json.dumps(result, indent=2).encode('utf-8') elif transformation_type == 'resize_image': if not file_info['mime_type'].startswith('image/'): raise ValueError("Image resize only supports image files") width = config.get('width', 800) height = config.get('height', 600) output_data = resize_image(file_data, width, height) output_filename = f"{Path(file_info['filename']).stem}_resized.{Path(file_info['filename']).suffix}" output_object_key = f"transformations/{transformation_id}/{output_filename}" elif transformation_type == 'convert_image': if not file_info['mime_type'].startswith('image/'): raise ValueError("Image conversion only supports image files") target_format = config.get('format', 'JPEG') output_data = convert_image_format(file_data, target_format) output_filename = f"{Path(file_info['filename']).stem}.{target_format.lower()}" output_object_key = f"transformations/{transformation_id}/{output_filename}" else: raise ValueError(f"Unsupported transformation type: {transformation_type}") # Upload transformed file to MinIO if output_data: if not upload_file_to_minio(output_data, output_object_key): raise Exception("Failed to upload transformed file") # Update transformation as completed update_transformation_status(transformation_id, 'completed', { 'output_object_key': output_object_key, 'output_filename': output_filename, 'result': result if isinstance(result, (str, list, dict)) else None }) # Update file status conn = get_db_connection() with conn.cursor() as cur: cur.execute(""" UPDATE files SET status = 'transformed', transformation_type = %s, processed_at = %s WHERE id = %s """, (transformation_type, datetime.utcnow(), file_id)) conn.commit() conn.close() response_data = { 'transformation_id': transformation_id, 'file_id': file_id, 'transformation_type': transformation_type, 'status': 'completed', 'output_object_key': output_object_key, 'output_filename': output_filename, 'completed_at': datetime.utcnow().isoformat() } logger.info("Transformation completed", transformation_id=transformation_id, file_id=file_id) return jsonify(response_data), 200 except Exception as e: error_message = str(e) logger.error("Transformation failed", transformation_id=transformation_id, file_id=file_id, error=error_message) # Update transformation as failed update_transformation_status(transformation_id, 'failed', error_message=error_message) return jsonify({ 'transformation_id': transformation_id, 'file_id': file_id, 'status': 'failed', 'error': error_message }), 500 except Exception as e: logger.error("Transform request error", error=str(e)) return jsonify({'error': 'Internal server error'}), 500 @app.route('/transformations/', methods=['GET']) def get_transformation_status(transformation_id: str): """Get transformation status and details.""" try: conn = get_db_connection() with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT id, file_id, transformation_type, input_path, output_path, status, config, result, error_message, started_at, completed_at, created_at FROM transformations WHERE id = %s """, (transformation_id,)) transformation = cur.fetchone() if not transformation: return jsonify({'error': 'Transformation not found'}), 404 return jsonify(dict(transformation)), 200 except Exception as e: logger.error("Error fetching transformation", error=str(e)) return jsonify({'error': 'Internal server error'}), 500 finally: conn.close() @app.route('/transformations//retry', methods=['POST']) def retry_transformation(transformation_id: str): """Retry a failed transformation.""" try: conn = get_db_connection() with conn.cursor(cursor_factory=RealDictCursor) as cur: # Get transformation details cur.execute(""" SELECT file_id, transformation_type, config FROM transformations WHERE id = %s """, (transformation_id,)) transformation = cur.fetchone() if not transformation: return jsonify({'error': 'Transformation not found'}), 404 if transformation['status'] != 'failed': return jsonify({'error': 'Only failed transformations can be retried'}), 400 # Reset transformation status cur.execute(""" UPDATE transformations SET status = 'pending', started_at = NULL, completed_at = NULL, error_message = NULL, result = NULL WHERE id = %s """, (transformation_id,)) conn.commit() # Trigger new transformation transform_data = { 'file_id': transformation['file_id'], 'transformation_type': transformation['transformation_type'], 'config': transformation['config'] or {} } # Call transform endpoint internally with app.test_client() as client: response = client.post('/transform', json=transform_data) return response.get_json(), response.status_code except Exception as e: logger.error("Error retrying transformation", error=str(e)) return jsonify({'error': 'Internal server error'}), 500 finally: conn.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)