mirror of
https://github.com/ghndrx/file-transformer-s3.git
synced 2026-02-10 06:45:05 +00:00
287 lines
9.9 KiB
Python
287 lines
9.9 KiB
Python
import os
|
|
import uuid
|
|
import hashlib
|
|
import magic
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
|
|
from flask import Flask, request, jsonify
|
|
from werkzeug.utils import secure_filename
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
import structlog
|
|
|
|
# Configure structured logging
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.stdlib.filter_by_level,
|
|
structlog.stdlib.add_logger_name,
|
|
structlog.stdlib.add_log_level,
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
structlog.processors.TimeStamper(fmt="iso"),
|
|
structlog.processors.StackInfoRenderer(),
|
|
structlog.processors.format_exc_info,
|
|
structlog.processors.UnicodeDecoder(),
|
|
structlog.processors.JSONRenderer()
|
|
],
|
|
context_class=dict,
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
cache_logger_on_first_use=True,
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuration
|
|
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
|
|
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
|
|
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin123')
|
|
MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME', 'file-transformer-bucket')
|
|
MINIO_USE_SSL = os.getenv('MINIO_USE_SSL', 'false').lower() == 'true'
|
|
|
|
POSTGRES_URL = os.getenv('POSTGRES_URL', 'postgresql://file_user:secure_password_123@localhost:5432/file_transformer')
|
|
|
|
# Initialize MinIO client
|
|
minio_client = Minio(
|
|
MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
secure=MINIO_USE_SSL
|
|
)
|
|
|
|
def get_db_connection():
|
|
"""Create a database connection."""
|
|
return psycopg2.connect(POSTGRES_URL)
|
|
|
|
def calculate_file_hash(file_data: bytes) -> str:
|
|
"""Calculate SHA-256 hash of file data."""
|
|
return hashlib.sha256(file_data).hexdigest()
|
|
|
|
def get_file_metadata(file_data: bytes, filename: str) -> Dict[str, Any]:
|
|
"""Extract file metadata including MIME type and size."""
|
|
mime_type = magic.from_buffer(file_data, mime=True)
|
|
file_size = len(file_data)
|
|
|
|
# Determine file type from extension
|
|
file_extension = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
|
|
|
|
return {
|
|
'mime_type': mime_type,
|
|
'file_size': file_size,
|
|
'file_type': file_extension,
|
|
'checksum': calculate_file_hash(file_data)
|
|
}
|
|
|
|
def save_file_to_database(file_data: bytes, filename: str, object_key: str, metadata: Dict[str, Any]) -> str:
|
|
"""Save file information to PostgreSQL database."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
# Insert file record
|
|
cur.execute("""
|
|
INSERT INTO files (
|
|
filename, original_filename, file_path, file_size,
|
|
file_type, mime_type, bucket_name, object_key,
|
|
checksum, status, created_at
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (
|
|
filename,
|
|
filename,
|
|
object_key,
|
|
metadata['file_size'],
|
|
metadata['file_type'],
|
|
metadata['mime_type'],
|
|
MINIO_BUCKET_NAME,
|
|
object_key,
|
|
metadata['checksum'],
|
|
'uploaded',
|
|
datetime.utcnow()
|
|
))
|
|
|
|
file_id = cur.fetchone()['id']
|
|
conn.commit()
|
|
return str(file_id)
|
|
except Exception as e:
|
|
conn.rollback()
|
|
logger.error("Database error", error=str(e))
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def upload_to_minio(file_data: bytes, object_key: str) -> bool:
|
|
"""Upload file to MinIO bucket."""
|
|
try:
|
|
# Ensure bucket exists
|
|
if not minio_client.bucket_exists(MINIO_BUCKET_NAME):
|
|
minio_client.make_bucket(MINIO_BUCKET_NAME)
|
|
logger.info("Created bucket", bucket=MINIO_BUCKET_NAME)
|
|
|
|
# Upload file
|
|
minio_client.put_object(
|
|
MINIO_BUCKET_NAME,
|
|
object_key,
|
|
file_data,
|
|
length=len(file_data)
|
|
)
|
|
|
|
logger.info("File uploaded to MinIO", bucket=MINIO_BUCKET_NAME, object_key=object_key)
|
|
return True
|
|
except S3Error as e:
|
|
logger.error("MinIO upload error", error=str(e))
|
|
return False
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""Health check endpoint."""
|
|
return jsonify({'status': 'healthy', 'service': 'file-upload'})
|
|
|
|
@app.route('/upload', methods=['POST'])
|
|
def upload_file():
|
|
"""Handle file upload request."""
|
|
try:
|
|
# Check if file is present in request
|
|
if 'file' not in request.files:
|
|
return jsonify({'error': 'No file provided'}), 400
|
|
|
|
file = request.files['file']
|
|
if file.filename == '':
|
|
return jsonify({'error': 'No file selected'}), 400
|
|
|
|
# Read file data
|
|
file_data = file.read()
|
|
if not file_data:
|
|
return jsonify({'error': 'Empty file'}), 400
|
|
|
|
# Secure filename and generate object key
|
|
filename = secure_filename(file.filename)
|
|
file_id = str(uuid.uuid4())
|
|
object_key = f"uploads/{file_id}/{filename}"
|
|
|
|
# Extract metadata
|
|
metadata = get_file_metadata(file_data, filename)
|
|
|
|
logger.info("Processing file upload",
|
|
filename=filename,
|
|
size=metadata['file_size'],
|
|
mime_type=metadata['mime_type'])
|
|
|
|
# Upload to MinIO
|
|
if not upload_to_minio(file_data, object_key):
|
|
return jsonify({'error': 'Failed to upload file to storage'}), 500
|
|
|
|
# Save to database
|
|
db_file_id = save_file_to_database(file_data, filename, object_key, metadata)
|
|
|
|
# Log access
|
|
log_file_access(db_file_id, 'upload', request.remote_addr, request.headers.get('User-Agent'))
|
|
|
|
response_data = {
|
|
'file_id': db_file_id,
|
|
'filename': filename,
|
|
'object_key': object_key,
|
|
'file_size': metadata['file_size'],
|
|
'mime_type': metadata['mime_type'],
|
|
'checksum': metadata['checksum'],
|
|
'status': 'uploaded',
|
|
'uploaded_at': datetime.utcnow().isoformat()
|
|
}
|
|
|
|
logger.info("File upload completed", file_id=db_file_id, filename=filename)
|
|
return jsonify(response_data), 201
|
|
|
|
except Exception as e:
|
|
logger.error("Upload error", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
def log_file_access(file_id: str, action: str, ip_address: str, user_agent: Optional[str]):
|
|
"""Log file access for audit purposes."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
INSERT INTO file_access_logs (file_id, action, ip_address, user_agent)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (file_id, action, ip_address, user_agent))
|
|
conn.commit()
|
|
except Exception as e:
|
|
logger.error("Failed to log file access", error=str(e))
|
|
conn.rollback()
|
|
finally:
|
|
conn.close()
|
|
|
|
@app.route('/files/<file_id>', methods=['GET'])
|
|
def get_file_info(file_id: str):
|
|
"""Get file information by ID."""
|
|
try:
|
|
conn = get_db_connection()
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT id, filename, original_filename, file_size, file_type,
|
|
mime_type, bucket_name, object_key, checksum, status,
|
|
created_at, updated_at
|
|
FROM files
|
|
WHERE id = %s
|
|
""", (file_id,))
|
|
|
|
file_record = cur.fetchone()
|
|
if not file_record:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
return jsonify(dict(file_record)), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching file info", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
finally:
|
|
conn.close()
|
|
|
|
@app.route('/files/<file_id>', methods=['DELETE'])
|
|
def delete_file(file_id: str):
|
|
"""Delete file from storage and database."""
|
|
try:
|
|
conn = get_db_connection()
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
# Get file info
|
|
cur.execute("SELECT object_key FROM files WHERE id = %s", (file_id,))
|
|
file_record = cur.fetchone()
|
|
|
|
if not file_record:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
object_key = file_record['object_key']
|
|
|
|
# Delete from MinIO
|
|
try:
|
|
minio_client.remove_object(MINIO_BUCKET_NAME, object_key)
|
|
logger.info("File deleted from MinIO", object_key=object_key)
|
|
except S3Error as e:
|
|
logger.warning("File not found in MinIO", object_key=object_key, error=str(e))
|
|
|
|
# Mark as deleted in database
|
|
cur.execute("""
|
|
UPDATE files
|
|
SET status = 'deleted', deleted_at = %s
|
|
WHERE id = %s
|
|
""", (datetime.utcnow(), file_id))
|
|
|
|
conn.commit()
|
|
|
|
# Log access
|
|
log_file_access(file_id, 'delete', request.remote_addr, request.headers.get('User-Agent'))
|
|
|
|
return jsonify({'message': 'File deleted successfully'}), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error deleting file", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
finally:
|
|
conn.close()
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=False) |