mirror of
https://github.com/ghndrx/file-transformer-s3.git
synced 2026-02-10 06:45:05 +00:00
473 lines
18 KiB
Python
473 lines
18 KiB
Python
import os
|
|
import uuid
|
|
import json
|
|
import tempfile
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional, List
|
|
from pathlib import Path
|
|
|
|
from flask import Flask, request, jsonify
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
import structlog
|
|
|
|
# File processing imports
|
|
import PyPDF2
|
|
from docx import Document
|
|
import pandas as pd
|
|
from PIL import Image
|
|
import io
|
|
|
|
# Configure structured logging
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.stdlib.filter_by_level,
|
|
structlog.stdlib.add_logger_name,
|
|
structlog.stdlib.add_log_level,
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
structlog.processors.TimeStamper(fmt="iso"),
|
|
structlog.processors.StackInfoRenderer(),
|
|
structlog.processors.format_exc_info,
|
|
structlog.processors.UnicodeDecoder(),
|
|
structlog.processors.JSONRenderer()
|
|
],
|
|
context_class=dict,
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
cache_logger_on_first_use=True,
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuration
|
|
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
|
|
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
|
|
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin123')
|
|
MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME', 'file-transformer-bucket')
|
|
MINIO_USE_SSL = os.getenv('MINIO_USE_SSL', 'false').lower() == 'true'
|
|
|
|
POSTGRES_URL = os.getenv('POSTGRES_URL', 'postgresql://file_user:secure_password_123@localhost:5432/file_transformer')
|
|
|
|
# Initialize MinIO client
|
|
minio_client = Minio(
|
|
MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
secure=MINIO_USE_SSL
|
|
)
|
|
|
|
def get_db_connection():
|
|
"""Create a database connection."""
|
|
return psycopg2.connect(POSTGRES_URL)
|
|
|
|
def get_file_from_minio(object_key: str) -> bytes:
|
|
"""Download file from MinIO."""
|
|
try:
|
|
response = minio_client.get_object(MINIO_BUCKET_NAME, object_key)
|
|
return response.read()
|
|
except S3Error as e:
|
|
logger.error("Failed to get file from MinIO", object_key=object_key, error=str(e))
|
|
raise
|
|
|
|
def upload_file_to_minio(file_data: bytes, object_key: str) -> bool:
|
|
"""Upload file to MinIO."""
|
|
try:
|
|
minio_client.put_object(
|
|
MINIO_BUCKET_NAME,
|
|
object_key,
|
|
file_data,
|
|
length=len(file_data)
|
|
)
|
|
return True
|
|
except S3Error as e:
|
|
logger.error("Failed to upload file to MinIO", object_key=object_key, error=str(e))
|
|
return False
|
|
|
|
def extract_text_from_pdf(file_data: bytes) -> str:
|
|
"""Extract text from PDF file."""
|
|
try:
|
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data))
|
|
text = ""
|
|
for page in pdf_reader.pages:
|
|
text += page.extract_text() + "\n"
|
|
return text.strip()
|
|
except Exception as e:
|
|
logger.error("PDF text extraction failed", error=str(e))
|
|
raise
|
|
|
|
def extract_text_from_docx(file_data: bytes) -> str:
|
|
"""Extract text from DOCX file."""
|
|
try:
|
|
doc = Document(io.BytesIO(file_data))
|
|
text = ""
|
|
for paragraph in doc.paragraphs:
|
|
text += paragraph.text + "\n"
|
|
return text.strip()
|
|
except Exception as e:
|
|
logger.error("DOCX text extraction failed", error=str(e))
|
|
raise
|
|
|
|
def convert_csv_to_json(file_data: bytes) -> List[Dict[str, Any]]:
|
|
"""Convert CSV to JSON format."""
|
|
try:
|
|
df = pd.read_csv(io.BytesIO(file_data))
|
|
return df.to_dict('records')
|
|
except Exception as e:
|
|
logger.error("CSV to JSON conversion failed", error=str(e))
|
|
raise
|
|
|
|
def convert_excel_to_json(file_data: bytes) -> List[Dict[str, Any]]:
|
|
"""Convert Excel to JSON format."""
|
|
try:
|
|
df = pd.read_excel(io.BytesIO(file_data))
|
|
return df.to_dict('records')
|
|
except Exception as e:
|
|
logger.error("Excel to JSON conversion failed", error=str(e))
|
|
raise
|
|
|
|
def resize_image(file_data: bytes, width: int, height: int) -> bytes:
|
|
"""Resize image to specified dimensions."""
|
|
try:
|
|
image = Image.open(io.BytesIO(file_data))
|
|
resized_image = image.resize((width, height), Image.Resampling.LANCZOS)
|
|
|
|
output = io.BytesIO()
|
|
resized_image.save(output, format=image.format or 'JPEG')
|
|
return output.getvalue()
|
|
except Exception as e:
|
|
logger.error("Image resize failed", error=str(e))
|
|
raise
|
|
|
|
def convert_image_format(file_data: bytes, target_format: str) -> bytes:
|
|
"""Convert image to different format."""
|
|
try:
|
|
image = Image.open(io.BytesIO(file_data))
|
|
|
|
output = io.BytesIO()
|
|
image.save(output, format=target_format.upper())
|
|
return output.getvalue()
|
|
except Exception as e:
|
|
logger.error("Image format conversion failed", error=str(e))
|
|
raise
|
|
|
|
def create_transformation_record(file_id: str, transformation_type: str, config: Dict[str, Any]) -> str:
|
|
"""Create transformation record in database."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
INSERT INTO transformations (
|
|
file_id, transformation_type, input_path, status, config, created_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (
|
|
file_id,
|
|
transformation_type,
|
|
f"files/{file_id}",
|
|
'pending',
|
|
json.dumps(config),
|
|
datetime.utcnow()
|
|
))
|
|
|
|
transformation_id = cur.fetchone()['id']
|
|
conn.commit()
|
|
return str(transformation_id)
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to create transformation record", error=str(e))
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def update_transformation_status(transformation_id: str, status: str, result: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None):
|
|
"""Update transformation status in database."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
if status == 'processing':
|
|
cur.execute("""
|
|
UPDATE transformations
|
|
SET status = %s, started_at = %s
|
|
WHERE id = %s
|
|
""", (status, datetime.utcnow(), transformation_id))
|
|
elif status == 'completed':
|
|
cur.execute("""
|
|
UPDATE transformations
|
|
SET status = %s, completed_at = %s, result = %s
|
|
WHERE id = %s
|
|
""", (status, datetime.utcnow(), json.dumps(result), transformation_id))
|
|
elif status == 'failed':
|
|
cur.execute("""
|
|
UPDATE transformations
|
|
SET status = %s, completed_at = %s, error_message = %s
|
|
WHERE id = %s
|
|
""", (status, datetime.utcnow(), error_message, transformation_id))
|
|
|
|
conn.commit()
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Failed to update transformation status", error=str(e))
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_file_info(file_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get file information from database."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT id, filename, file_type, mime_type, object_key, status
|
|
FROM files
|
|
WHERE id = %s
|
|
""", (file_id,))
|
|
|
|
file_record = cur.fetchone()
|
|
return dict(file_record) if file_record else None
|
|
except Exception as e:
|
|
logger.error("Failed to get file info", error=str(e))
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""Health check endpoint."""
|
|
return jsonify({'status': 'healthy', 'service': 'file-transform'})
|
|
|
|
@app.route('/transform', methods=['POST'])
|
|
def transform_file():
|
|
"""Handle file transformation request."""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'No data provided'}), 400
|
|
|
|
file_id = data.get('file_id')
|
|
transformation_type = data.get('transformation_type')
|
|
config = data.get('config', {})
|
|
|
|
if not file_id or not transformation_type:
|
|
return jsonify({'error': 'file_id and transformation_type are required'}), 400
|
|
|
|
# Get file information
|
|
file_info = get_file_info(file_id)
|
|
if not file_info:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
if file_info['status'] == 'deleted':
|
|
return jsonify({'error': 'File has been deleted'}), 400
|
|
|
|
# Create transformation record
|
|
transformation_id = create_transformation_record(file_id, transformation_type, config)
|
|
|
|
# Update status to processing
|
|
update_transformation_status(transformation_id, 'processing')
|
|
|
|
logger.info("Starting transformation",
|
|
file_id=file_id,
|
|
transformation_id=transformation_id,
|
|
transformation_type=transformation_type)
|
|
|
|
try:
|
|
# Get file from MinIO
|
|
file_data = get_file_from_minio(file_info['object_key'])
|
|
|
|
# Perform transformation based on type
|
|
result = None
|
|
output_data = None
|
|
|
|
if transformation_type == 'extract_text':
|
|
if file_info['file_type'] == 'pdf':
|
|
result = extract_text_from_pdf(file_data)
|
|
elif file_info['file_type'] in ['docx', 'doc']:
|
|
result = extract_text_from_docx(file_data)
|
|
else:
|
|
raise ValueError(f"Text extraction not supported for file type: {file_info['file_type']}")
|
|
|
|
# Save extracted text as new file
|
|
output_filename = f"{Path(file_info['filename']).stem}_extracted.txt"
|
|
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
|
output_data = result.encode('utf-8')
|
|
|
|
elif transformation_type == 'csv_to_json':
|
|
if file_info['file_type'] != 'csv':
|
|
raise ValueError("CSV to JSON conversion only supports CSV files")
|
|
|
|
result = convert_csv_to_json(file_data)
|
|
output_filename = f"{Path(file_info['filename']).stem}.json"
|
|
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
|
output_data = json.dumps(result, indent=2).encode('utf-8')
|
|
|
|
elif transformation_type == 'excel_to_json':
|
|
if file_info['file_type'] not in ['xlsx', 'xls']:
|
|
raise ValueError("Excel to JSON conversion only supports Excel files")
|
|
|
|
result = convert_excel_to_json(file_data)
|
|
output_filename = f"{Path(file_info['filename']).stem}.json"
|
|
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
|
output_data = json.dumps(result, indent=2).encode('utf-8')
|
|
|
|
elif transformation_type == 'resize_image':
|
|
if not file_info['mime_type'].startswith('image/'):
|
|
raise ValueError("Image resize only supports image files")
|
|
|
|
width = config.get('width', 800)
|
|
height = config.get('height', 600)
|
|
output_data = resize_image(file_data, width, height)
|
|
|
|
output_filename = f"{Path(file_info['filename']).stem}_resized.{Path(file_info['filename']).suffix}"
|
|
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
|
|
|
elif transformation_type == 'convert_image':
|
|
if not file_info['mime_type'].startswith('image/'):
|
|
raise ValueError("Image conversion only supports image files")
|
|
|
|
target_format = config.get('format', 'JPEG')
|
|
output_data = convert_image_format(file_data, target_format)
|
|
|
|
output_filename = f"{Path(file_info['filename']).stem}.{target_format.lower()}"
|
|
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
|
|
|
else:
|
|
raise ValueError(f"Unsupported transformation type: {transformation_type}")
|
|
|
|
# Upload transformed file to MinIO
|
|
if output_data:
|
|
if not upload_file_to_minio(output_data, output_object_key):
|
|
raise Exception("Failed to upload transformed file")
|
|
|
|
# Update transformation as completed
|
|
update_transformation_status(transformation_id, 'completed', {
|
|
'output_object_key': output_object_key,
|
|
'output_filename': output_filename,
|
|
'result': result if isinstance(result, (str, list, dict)) else None
|
|
})
|
|
|
|
# Update file status
|
|
conn = get_db_connection()
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
UPDATE files
|
|
SET status = 'transformed', transformation_type = %s, processed_at = %s
|
|
WHERE id = %s
|
|
""", (transformation_type, datetime.utcnow(), file_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
response_data = {
|
|
'transformation_id': transformation_id,
|
|
'file_id': file_id,
|
|
'transformation_type': transformation_type,
|
|
'status': 'completed',
|
|
'output_object_key': output_object_key,
|
|
'output_filename': output_filename,
|
|
'completed_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
logger.info("Transformation completed",
|
|
transformation_id=transformation_id,
|
|
file_id=file_id)
|
|
|
|
return jsonify(response_data), 200
|
|
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
logger.error("Transformation failed",
|
|
transformation_id=transformation_id,
|
|
file_id=file_id,
|
|
error=error_message)
|
|
|
|
# Update transformation as failed
|
|
update_transformation_status(transformation_id, 'failed', error_message=error_message)
|
|
|
|
return jsonify({
|
|
'transformation_id': transformation_id,
|
|
'file_id': file_id,
|
|
'status': 'failed',
|
|
'error': error_message
|
|
}), 500
|
|
|
|
except Exception as e:
|
|
logger.error("Transform request error", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
@app.route('/transformations/<transformation_id>', methods=['GET'])
|
|
def get_transformation_status(transformation_id: str):
|
|
"""Get transformation status and details."""
|
|
try:
|
|
conn = get_db_connection()
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT id, file_id, transformation_type, input_path, output_path,
|
|
status, config, result, error_message, started_at, completed_at, created_at
|
|
FROM transformations
|
|
WHERE id = %s
|
|
""", (transformation_id,))
|
|
|
|
transformation = cur.fetchone()
|
|
if not transformation:
|
|
return jsonify({'error': 'Transformation not found'}), 404
|
|
|
|
return jsonify(dict(transformation)), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching transformation", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
finally:
|
|
conn.close()
|
|
|
|
@app.route('/transformations/<transformation_id>/retry', methods=['POST'])
|
|
def retry_transformation(transformation_id: str):
|
|
"""Retry a failed transformation."""
|
|
try:
|
|
conn = get_db_connection()
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
# Get transformation details
|
|
cur.execute("""
|
|
SELECT file_id, transformation_type, config
|
|
FROM transformations
|
|
WHERE id = %s
|
|
""", (transformation_id,))
|
|
|
|
transformation = cur.fetchone()
|
|
if not transformation:
|
|
return jsonify({'error': 'Transformation not found'}), 404
|
|
|
|
if transformation['status'] != 'failed':
|
|
return jsonify({'error': 'Only failed transformations can be retried'}), 400
|
|
|
|
# Reset transformation status
|
|
cur.execute("""
|
|
UPDATE transformations
|
|
SET status = 'pending', started_at = NULL, completed_at = NULL,
|
|
error_message = NULL, result = NULL
|
|
WHERE id = %s
|
|
""", (transformation_id,))
|
|
conn.commit()
|
|
|
|
# Trigger new transformation
|
|
transform_data = {
|
|
'file_id': transformation['file_id'],
|
|
'transformation_type': transformation['transformation_type'],
|
|
'config': transformation['config'] or {}
|
|
}
|
|
|
|
# Call transform endpoint internally
|
|
with app.test_client() as client:
|
|
response = client.post('/transform', json=transform_data)
|
|
return response.get_json(), response.status_code
|
|
|
|
except Exception as e:
|
|
logger.error("Error retrying transformation", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
finally:
|
|
conn.close()
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=False) |