mirror of
https://github.com/ghndrx/file-transformer-s3.git
synced 2026-02-10 06:45:05 +00:00
Initial commit: File Transformer S3 project with React dashboard and Knative functions
This commit is contained in:
473
functions/transform/app.py
Normal file
473
functions/transform/app.py
Normal file
@@ -0,0 +1,473 @@
|
||||
import os
|
||||
import uuid
|
||||
import json
|
||||
import tempfile
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Dict, Any, Optional, List
|
||||
from pathlib import Path
|
||||
|
||||
from flask import Flask, request, jsonify
|
||||
import psycopg2
|
||||
from psycopg2.extras import RealDictCursor
|
||||
from minio import Minio
|
||||
from minio.error import S3Error
|
||||
import structlog
|
||||
|
||||
# File processing imports
|
||||
import PyPDF2
|
||||
from docx import Document
|
||||
import pandas as pd
|
||||
from PIL import Image
|
||||
import io
|
||||
|
||||
# Configure structured logging
|
||||
structlog.configure(
|
||||
processors=[
|
||||
structlog.stdlib.filter_by_level,
|
||||
structlog.stdlib.add_logger_name,
|
||||
structlog.stdlib.add_log_level,
|
||||
structlog.stdlib.PositionalArgumentsFormatter(),
|
||||
structlog.processors.TimeStamper(fmt="iso"),
|
||||
structlog.processors.StackInfoRenderer(),
|
||||
structlog.processors.format_exc_info,
|
||||
structlog.processors.UnicodeDecoder(),
|
||||
structlog.processors.JSONRenderer()
|
||||
],
|
||||
context_class=dict,
|
||||
logger_factory=structlog.stdlib.LoggerFactory(),
|
||||
wrapper_class=structlog.stdlib.BoundLogger,
|
||||
cache_logger_on_first_use=True,
|
||||
)
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# Configuration
|
||||
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
|
||||
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
|
||||
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin123')
|
||||
MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME', 'file-transformer-bucket')
|
||||
MINIO_USE_SSL = os.getenv('MINIO_USE_SSL', 'false').lower() == 'true'
|
||||
|
||||
POSTGRES_URL = os.getenv('POSTGRES_URL', 'postgresql://file_user:secure_password_123@localhost:5432/file_transformer')
|
||||
|
||||
# Initialize MinIO client
|
||||
minio_client = Minio(
|
||||
MINIO_ENDPOINT,
|
||||
access_key=MINIO_ACCESS_KEY,
|
||||
secret_key=MINIO_SECRET_KEY,
|
||||
secure=MINIO_USE_SSL
|
||||
)
|
||||
|
||||
def get_db_connection():
|
||||
"""Create a database connection."""
|
||||
return psycopg2.connect(POSTGRES_URL)
|
||||
|
||||
def get_file_from_minio(object_key: str) -> bytes:
|
||||
"""Download file from MinIO."""
|
||||
try:
|
||||
response = minio_client.get_object(MINIO_BUCKET_NAME, object_key)
|
||||
return response.read()
|
||||
except S3Error as e:
|
||||
logger.error("Failed to get file from MinIO", object_key=object_key, error=str(e))
|
||||
raise
|
||||
|
||||
def upload_file_to_minio(file_data: bytes, object_key: str) -> bool:
|
||||
"""Upload file to MinIO."""
|
||||
try:
|
||||
minio_client.put_object(
|
||||
MINIO_BUCKET_NAME,
|
||||
object_key,
|
||||
file_data,
|
||||
length=len(file_data)
|
||||
)
|
||||
return True
|
||||
except S3Error as e:
|
||||
logger.error("Failed to upload file to MinIO", object_key=object_key, error=str(e))
|
||||
return False
|
||||
|
||||
def extract_text_from_pdf(file_data: bytes) -> str:
|
||||
"""Extract text from PDF file."""
|
||||
try:
|
||||
pdf_reader = PyPDF2.PdfReader(io.BytesIO(file_data))
|
||||
text = ""
|
||||
for page in pdf_reader.pages:
|
||||
text += page.extract_text() + "\n"
|
||||
return text.strip()
|
||||
except Exception as e:
|
||||
logger.error("PDF text extraction failed", error=str(e))
|
||||
raise
|
||||
|
||||
def extract_text_from_docx(file_data: bytes) -> str:
|
||||
"""Extract text from DOCX file."""
|
||||
try:
|
||||
doc = Document(io.BytesIO(file_data))
|
||||
text = ""
|
||||
for paragraph in doc.paragraphs:
|
||||
text += paragraph.text + "\n"
|
||||
return text.strip()
|
||||
except Exception as e:
|
||||
logger.error("DOCX text extraction failed", error=str(e))
|
||||
raise
|
||||
|
||||
def convert_csv_to_json(file_data: bytes) -> List[Dict[str, Any]]:
|
||||
"""Convert CSV to JSON format."""
|
||||
try:
|
||||
df = pd.read_csv(io.BytesIO(file_data))
|
||||
return df.to_dict('records')
|
||||
except Exception as e:
|
||||
logger.error("CSV to JSON conversion failed", error=str(e))
|
||||
raise
|
||||
|
||||
def convert_excel_to_json(file_data: bytes) -> List[Dict[str, Any]]:
|
||||
"""Convert Excel to JSON format."""
|
||||
try:
|
||||
df = pd.read_excel(io.BytesIO(file_data))
|
||||
return df.to_dict('records')
|
||||
except Exception as e:
|
||||
logger.error("Excel to JSON conversion failed", error=str(e))
|
||||
raise
|
||||
|
||||
def resize_image(file_data: bytes, width: int, height: int) -> bytes:
|
||||
"""Resize image to specified dimensions."""
|
||||
try:
|
||||
image = Image.open(io.BytesIO(file_data))
|
||||
resized_image = image.resize((width, height), Image.Resampling.LANCZOS)
|
||||
|
||||
output = io.BytesIO()
|
||||
resized_image.save(output, format=image.format or 'JPEG')
|
||||
return output.getvalue()
|
||||
except Exception as e:
|
||||
logger.error("Image resize failed", error=str(e))
|
||||
raise
|
||||
|
||||
def convert_image_format(file_data: bytes, target_format: str) -> bytes:
|
||||
"""Convert image to different format."""
|
||||
try:
|
||||
image = Image.open(io.BytesIO(file_data))
|
||||
|
||||
output = io.BytesIO()
|
||||
image.save(output, format=target_format.upper())
|
||||
return output.getvalue()
|
||||
except Exception as e:
|
||||
logger.error("Image format conversion failed", error=str(e))
|
||||
raise
|
||||
|
||||
def create_transformation_record(file_id: str, transformation_type: str, config: Dict[str, Any]) -> str:
|
||||
"""Create transformation record in database."""
|
||||
conn = get_db_connection()
|
||||
try:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute("""
|
||||
INSERT INTO transformations (
|
||||
file_id, transformation_type, input_path, status, config, created_at
|
||||
) VALUES (%s, %s, %s, %s, %s, %s)
|
||||
RETURNING id
|
||||
""", (
|
||||
file_id,
|
||||
transformation_type,
|
||||
f"files/{file_id}",
|
||||
'pending',
|
||||
json.dumps(config),
|
||||
datetime.utcnow()
|
||||
))
|
||||
|
||||
transformation_id = cur.fetchone()['id']
|
||||
conn.commit()
|
||||
return str(transformation_id)
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
logger.error("Failed to create transformation record", error=str(e))
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def update_transformation_status(transformation_id: str, status: str, result: Optional[Dict[str, Any]] = None, error_message: Optional[str] = None):
|
||||
"""Update transformation status in database."""
|
||||
conn = get_db_connection()
|
||||
try:
|
||||
with conn.cursor() as cur:
|
||||
if status == 'processing':
|
||||
cur.execute("""
|
||||
UPDATE transformations
|
||||
SET status = %s, started_at = %s
|
||||
WHERE id = %s
|
||||
""", (status, datetime.utcnow(), transformation_id))
|
||||
elif status == 'completed':
|
||||
cur.execute("""
|
||||
UPDATE transformations
|
||||
SET status = %s, completed_at = %s, result = %s
|
||||
WHERE id = %s
|
||||
""", (status, datetime.utcnow(), json.dumps(result), transformation_id))
|
||||
elif status == 'failed':
|
||||
cur.execute("""
|
||||
UPDATE transformations
|
||||
SET status = %s, completed_at = %s, error_message = %s
|
||||
WHERE id = %s
|
||||
""", (status, datetime.utcnow(), error_message, transformation_id))
|
||||
|
||||
conn.commit()
|
||||
except Exception as e:
|
||||
conn.rollback()
|
||||
logger.error("Failed to update transformation status", error=str(e))
|
||||
raise
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def get_file_info(file_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get file information from database."""
|
||||
conn = get_db_connection()
|
||||
try:
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute("""
|
||||
SELECT id, filename, file_type, mime_type, object_key, status
|
||||
FROM files
|
||||
WHERE id = %s
|
||||
""", (file_id,))
|
||||
|
||||
file_record = cur.fetchone()
|
||||
return dict(file_record) if file_record else None
|
||||
except Exception as e:
|
||||
logger.error("Failed to get file info", error=str(e))
|
||||
return None
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@app.route('/health', methods=['GET'])
|
||||
def health_check():
|
||||
"""Health check endpoint."""
|
||||
return jsonify({'status': 'healthy', 'service': 'file-transform'})
|
||||
|
||||
@app.route('/transform', methods=['POST'])
|
||||
def transform_file():
|
||||
"""Handle file transformation request."""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data:
|
||||
return jsonify({'error': 'No data provided'}), 400
|
||||
|
||||
file_id = data.get('file_id')
|
||||
transformation_type = data.get('transformation_type')
|
||||
config = data.get('config', {})
|
||||
|
||||
if not file_id or not transformation_type:
|
||||
return jsonify({'error': 'file_id and transformation_type are required'}), 400
|
||||
|
||||
# Get file information
|
||||
file_info = get_file_info(file_id)
|
||||
if not file_info:
|
||||
return jsonify({'error': 'File not found'}), 404
|
||||
|
||||
if file_info['status'] == 'deleted':
|
||||
return jsonify({'error': 'File has been deleted'}), 400
|
||||
|
||||
# Create transformation record
|
||||
transformation_id = create_transformation_record(file_id, transformation_type, config)
|
||||
|
||||
# Update status to processing
|
||||
update_transformation_status(transformation_id, 'processing')
|
||||
|
||||
logger.info("Starting transformation",
|
||||
file_id=file_id,
|
||||
transformation_id=transformation_id,
|
||||
transformation_type=transformation_type)
|
||||
|
||||
try:
|
||||
# Get file from MinIO
|
||||
file_data = get_file_from_minio(file_info['object_key'])
|
||||
|
||||
# Perform transformation based on type
|
||||
result = None
|
||||
output_data = None
|
||||
|
||||
if transformation_type == 'extract_text':
|
||||
if file_info['file_type'] == 'pdf':
|
||||
result = extract_text_from_pdf(file_data)
|
||||
elif file_info['file_type'] in ['docx', 'doc']:
|
||||
result = extract_text_from_docx(file_data)
|
||||
else:
|
||||
raise ValueError(f"Text extraction not supported for file type: {file_info['file_type']}")
|
||||
|
||||
# Save extracted text as new file
|
||||
output_filename = f"{Path(file_info['filename']).stem}_extracted.txt"
|
||||
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
||||
output_data = result.encode('utf-8')
|
||||
|
||||
elif transformation_type == 'csv_to_json':
|
||||
if file_info['file_type'] != 'csv':
|
||||
raise ValueError("CSV to JSON conversion only supports CSV files")
|
||||
|
||||
result = convert_csv_to_json(file_data)
|
||||
output_filename = f"{Path(file_info['filename']).stem}.json"
|
||||
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
||||
output_data = json.dumps(result, indent=2).encode('utf-8')
|
||||
|
||||
elif transformation_type == 'excel_to_json':
|
||||
if file_info['file_type'] not in ['xlsx', 'xls']:
|
||||
raise ValueError("Excel to JSON conversion only supports Excel files")
|
||||
|
||||
result = convert_excel_to_json(file_data)
|
||||
output_filename = f"{Path(file_info['filename']).stem}.json"
|
||||
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
||||
output_data = json.dumps(result, indent=2).encode('utf-8')
|
||||
|
||||
elif transformation_type == 'resize_image':
|
||||
if not file_info['mime_type'].startswith('image/'):
|
||||
raise ValueError("Image resize only supports image files")
|
||||
|
||||
width = config.get('width', 800)
|
||||
height = config.get('height', 600)
|
||||
output_data = resize_image(file_data, width, height)
|
||||
|
||||
output_filename = f"{Path(file_info['filename']).stem}_resized.{Path(file_info['filename']).suffix}"
|
||||
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
||||
|
||||
elif transformation_type == 'convert_image':
|
||||
if not file_info['mime_type'].startswith('image/'):
|
||||
raise ValueError("Image conversion only supports image files")
|
||||
|
||||
target_format = config.get('format', 'JPEG')
|
||||
output_data = convert_image_format(file_data, target_format)
|
||||
|
||||
output_filename = f"{Path(file_info['filename']).stem}.{target_format.lower()}"
|
||||
output_object_key = f"transformations/{transformation_id}/{output_filename}"
|
||||
|
||||
else:
|
||||
raise ValueError(f"Unsupported transformation type: {transformation_type}")
|
||||
|
||||
# Upload transformed file to MinIO
|
||||
if output_data:
|
||||
if not upload_file_to_minio(output_data, output_object_key):
|
||||
raise Exception("Failed to upload transformed file")
|
||||
|
||||
# Update transformation as completed
|
||||
update_transformation_status(transformation_id, 'completed', {
|
||||
'output_object_key': output_object_key,
|
||||
'output_filename': output_filename,
|
||||
'result': result if isinstance(result, (str, list, dict)) else None
|
||||
})
|
||||
|
||||
# Update file status
|
||||
conn = get_db_connection()
|
||||
with conn.cursor() as cur:
|
||||
cur.execute("""
|
||||
UPDATE files
|
||||
SET status = 'transformed', transformation_type = %s, processed_at = %s
|
||||
WHERE id = %s
|
||||
""", (transformation_type, datetime.utcnow(), file_id))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
response_data = {
|
||||
'transformation_id': transformation_id,
|
||||
'file_id': file_id,
|
||||
'transformation_type': transformation_type,
|
||||
'status': 'completed',
|
||||
'output_object_key': output_object_key,
|
||||
'output_filename': output_filename,
|
||||
'completed_at': datetime.utcnow().isoformat()
|
||||
}
|
||||
|
||||
logger.info("Transformation completed",
|
||||
transformation_id=transformation_id,
|
||||
file_id=file_id)
|
||||
|
||||
return jsonify(response_data), 200
|
||||
|
||||
except Exception as e:
|
||||
error_message = str(e)
|
||||
logger.error("Transformation failed",
|
||||
transformation_id=transformation_id,
|
||||
file_id=file_id,
|
||||
error=error_message)
|
||||
|
||||
# Update transformation as failed
|
||||
update_transformation_status(transformation_id, 'failed', error_message=error_message)
|
||||
|
||||
return jsonify({
|
||||
'transformation_id': transformation_id,
|
||||
'file_id': file_id,
|
||||
'status': 'failed',
|
||||
'error': error_message
|
||||
}), 500
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Transform request error", error=str(e))
|
||||
return jsonify({'error': 'Internal server error'}), 500
|
||||
|
||||
@app.route('/transformations/<transformation_id>', methods=['GET'])
|
||||
def get_transformation_status(transformation_id: str):
|
||||
"""Get transformation status and details."""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
cur.execute("""
|
||||
SELECT id, file_id, transformation_type, input_path, output_path,
|
||||
status, config, result, error_message, started_at, completed_at, created_at
|
||||
FROM transformations
|
||||
WHERE id = %s
|
||||
""", (transformation_id,))
|
||||
|
||||
transformation = cur.fetchone()
|
||||
if not transformation:
|
||||
return jsonify({'error': 'Transformation not found'}), 404
|
||||
|
||||
return jsonify(dict(transformation)), 200
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error fetching transformation", error=str(e))
|
||||
return jsonify({'error': 'Internal server error'}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@app.route('/transformations/<transformation_id>/retry', methods=['POST'])
|
||||
def retry_transformation(transformation_id: str):
|
||||
"""Retry a failed transformation."""
|
||||
try:
|
||||
conn = get_db_connection()
|
||||
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
||||
# Get transformation details
|
||||
cur.execute("""
|
||||
SELECT file_id, transformation_type, config
|
||||
FROM transformations
|
||||
WHERE id = %s
|
||||
""", (transformation_id,))
|
||||
|
||||
transformation = cur.fetchone()
|
||||
if not transformation:
|
||||
return jsonify({'error': 'Transformation not found'}), 404
|
||||
|
||||
if transformation['status'] != 'failed':
|
||||
return jsonify({'error': 'Only failed transformations can be retried'}), 400
|
||||
|
||||
# Reset transformation status
|
||||
cur.execute("""
|
||||
UPDATE transformations
|
||||
SET status = 'pending', started_at = NULL, completed_at = NULL,
|
||||
error_message = NULL, result = NULL
|
||||
WHERE id = %s
|
||||
""", (transformation_id,))
|
||||
conn.commit()
|
||||
|
||||
# Trigger new transformation
|
||||
transform_data = {
|
||||
'file_id': transformation['file_id'],
|
||||
'transformation_type': transformation['transformation_type'],
|
||||
'config': transformation['config'] or {}
|
||||
}
|
||||
|
||||
# Call transform endpoint internally
|
||||
with app.test_client() as client:
|
||||
response = client.post('/transform', json=transform_data)
|
||||
return response.get_json(), response.status_code
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error retrying transformation", error=str(e))
|
||||
return jsonify({'error': 'Internal server error'}), 500
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(host='0.0.0.0', port=5000, debug=False)
|
||||
Reference in New Issue
Block a user