mirror of
https://github.com/ghndrx/file-transformer-s3.git
synced 2026-02-10 06:45:05 +00:00
307 lines
11 KiB
Python
307 lines
11 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any
|
|
|
|
from flask import Flask, request, jsonify
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
import structlog
|
|
|
|
# Configure structured logging
|
|
structlog.configure(
|
|
processors=[
|
|
structlog.stdlib.filter_by_level,
|
|
structlog.stdlib.add_logger_name,
|
|
structlog.stdlib.add_log_level,
|
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
|
structlog.processors.TimeStamper(fmt="iso"),
|
|
structlog.processors.StackInfoRenderer(),
|
|
structlog.processors.format_exc_info,
|
|
structlog.processors.UnicodeDecoder(),
|
|
structlog.processors.JSONRenderer()
|
|
],
|
|
context_class=dict,
|
|
logger_factory=structlog.stdlib.LoggerFactory(),
|
|
wrapper_class=structlog.stdlib.BoundLogger,
|
|
cache_logger_on_first_use=True,
|
|
)
|
|
|
|
logger = structlog.get_logger()
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Configuration
|
|
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
|
|
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
|
|
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', 'minioadmin123')
|
|
MINIO_BUCKET_NAME = os.getenv('MINIO_BUCKET_NAME', 'file-transformer-bucket')
|
|
MINIO_USE_SSL = os.getenv('MINIO_USE_SSL', 'false').lower() == 'true'
|
|
|
|
POSTGRES_URL = os.getenv('POSTGRES_URL', 'postgresql://file_user:secure_password_123@localhost:5432/file_transformer')
|
|
|
|
# Initialize MinIO client
|
|
minio_client = Minio(
|
|
MINIO_ENDPOINT,
|
|
access_key=MINIO_ACCESS_KEY,
|
|
secret_key=MINIO_SECRET_KEY,
|
|
secure=MINIO_USE_SSL
|
|
)
|
|
|
|
def get_db_connection():
|
|
"""Create a database connection."""
|
|
return psycopg2.connect(POSTGRES_URL)
|
|
|
|
def get_file_metadata(file_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get file metadata from database."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT id, filename, original_filename, file_size, file_type,
|
|
mime_type, bucket_name, object_key, checksum, status,
|
|
transformation_type, transformation_config, metadata,
|
|
created_at, updated_at, processed_at
|
|
FROM files
|
|
WHERE id = %s
|
|
""", (file_id,))
|
|
|
|
file_record = cur.fetchone()
|
|
return dict(file_record) if file_record else None
|
|
except Exception as e:
|
|
logger.error("Failed to get file metadata", error=str(e))
|
|
return None
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_file_transformations(file_id: str) -> list:
|
|
"""Get transformations for a file."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT id, transformation_type, input_path, output_path,
|
|
status, config, result, error_message,
|
|
started_at, completed_at, created_at
|
|
FROM transformations
|
|
WHERE file_id = %s
|
|
ORDER BY created_at DESC
|
|
""", (file_id,))
|
|
|
|
transformations = cur.fetchall()
|
|
return [dict(t) for t in transformations]
|
|
except Exception as e:
|
|
logger.error("Failed to get file transformations", error=str(e))
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_file_access_logs(file_id: str, limit: int = 50) -> list:
|
|
"""Get access logs for a file."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT id, action, ip_address, user_agent, created_at
|
|
FROM file_access_logs
|
|
WHERE file_id = %s
|
|
ORDER BY created_at DESC
|
|
LIMIT %s
|
|
""", (file_id, limit))
|
|
|
|
logs = cur.fetchall()
|
|
return [dict(log) for log in logs]
|
|
except Exception as e:
|
|
logger.error("Failed to get file access logs", error=str(e))
|
|
return []
|
|
finally:
|
|
conn.close()
|
|
|
|
def update_file_metadata(file_id: str, metadata: Dict[str, Any]) -> bool:
|
|
"""Update file metadata."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
UPDATE files
|
|
SET metadata = %s, updated_at = %s
|
|
WHERE id = %s
|
|
""", (json.dumps(metadata), datetime.utcnow(), file_id))
|
|
conn.commit()
|
|
return True
|
|
except Exception as e:
|
|
logger.error("Failed to update file metadata", error=str(e))
|
|
conn.rollback()
|
|
return False
|
|
finally:
|
|
conn.close()
|
|
|
|
def get_storage_stats() -> Dict[str, Any]:
|
|
"""Get storage statistics."""
|
|
conn = get_db_connection()
|
|
try:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
# Total files and size
|
|
cur.execute("""
|
|
SELECT COUNT(*) as total_files,
|
|
SUM(file_size) as total_size,
|
|
COUNT(CASE WHEN status = 'uploaded' THEN 1 END) as uploaded_files,
|
|
COUNT(CASE WHEN status = 'processing' THEN 1 END) as processing_files,
|
|
COUNT(CASE WHEN status = 'transformed' THEN 1 END) as transformed_files,
|
|
COUNT(CASE WHEN status = 'error' THEN 1 END) as error_files,
|
|
COUNT(CASE WHEN status = 'deleted' THEN 1 END) as deleted_files
|
|
FROM files
|
|
""")
|
|
stats = cur.fetchone()
|
|
|
|
# File types distribution
|
|
cur.execute("""
|
|
SELECT file_type, COUNT(*) as count
|
|
FROM files
|
|
WHERE status != 'deleted'
|
|
GROUP BY file_type
|
|
ORDER BY count DESC
|
|
""")
|
|
file_types = cur.fetchall()
|
|
|
|
# Recent activity
|
|
cur.execute("""
|
|
SELECT COUNT(*) as recent_uploads
|
|
FROM files
|
|
WHERE created_at >= NOW() - INTERVAL '24 hours'
|
|
""")
|
|
recent = cur.fetchone()
|
|
|
|
return {
|
|
'stats': dict(stats),
|
|
'file_types': [dict(ft) for ft in file_types],
|
|
'recent_uploads': recent['recent_uploads'] if recent else 0
|
|
}
|
|
except Exception as e:
|
|
logger.error("Failed to get storage stats", error=str(e))
|
|
return {}
|
|
finally:
|
|
conn.close()
|
|
|
|
@app.route('/health', methods=['GET'])
|
|
def health_check():
|
|
"""Health check endpoint."""
|
|
return jsonify({'status': 'healthy', 'service': 'file-metadata'})
|
|
|
|
@app.route('/files/<file_id>/metadata', methods=['GET'])
|
|
def get_file_metadata_endpoint(file_id: str):
|
|
"""Get comprehensive file metadata."""
|
|
try:
|
|
# Get basic file metadata
|
|
file_metadata = get_file_metadata(file_id)
|
|
if not file_metadata:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
# Get transformations
|
|
transformations = get_file_transformations(file_id)
|
|
|
|
# Get recent access logs
|
|
access_logs = get_file_access_logs(file_id, limit=10)
|
|
|
|
# Check if file exists in MinIO
|
|
minio_exists = False
|
|
try:
|
|
minio_client.stat_object(
|
|
file_metadata['bucket_name'],
|
|
file_metadata['object_key']
|
|
)
|
|
minio_exists = True
|
|
except S3Error:
|
|
minio_exists = False
|
|
|
|
response_data = {
|
|
'file': file_metadata,
|
|
'transformations': transformations,
|
|
'access_logs': access_logs,
|
|
'storage': {
|
|
'minio_exists': minio_exists,
|
|
'bucket': file_metadata['bucket_name'],
|
|
'object_key': file_metadata['object_key']
|
|
}
|
|
}
|
|
|
|
return jsonify(response_data), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching file metadata", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
@app.route('/files/<file_id>/metadata', methods=['PUT'])
|
|
def update_file_metadata_endpoint(file_id: str):
|
|
"""Update file metadata."""
|
|
try:
|
|
data = request.get_json()
|
|
if not data:
|
|
return jsonify({'error': 'No data provided'}), 400
|
|
|
|
# Check if file exists
|
|
file_metadata = get_file_metadata(file_id)
|
|
if not file_metadata:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
# Update metadata
|
|
success = update_file_metadata(file_id, data)
|
|
if not success:
|
|
return jsonify({'error': 'Failed to update metadata'}), 500
|
|
|
|
logger.info("File metadata updated", file_id=file_id)
|
|
return jsonify({'message': 'Metadata updated successfully'}), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error updating file metadata", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
@app.route('/files/<file_id>/transformations', methods=['GET'])
|
|
def get_file_transformations_endpoint(file_id: str):
|
|
"""Get transformations for a file."""
|
|
try:
|
|
# Check if file exists
|
|
file_metadata = get_file_metadata(file_id)
|
|
if not file_metadata:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
transformations = get_file_transformations(file_id)
|
|
return jsonify(transformations), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching file transformations", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
@app.route('/files/<file_id>/access-logs', methods=['GET'])
|
|
def get_file_access_logs_endpoint(file_id: str):
|
|
"""Get access logs for a file."""
|
|
try:
|
|
# Check if file exists
|
|
file_metadata = get_file_metadata(file_id)
|
|
if not file_metadata:
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
limit = request.args.get('limit', 50, type=int)
|
|
access_logs = get_file_access_logs(file_id, limit=limit)
|
|
return jsonify(access_logs), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching file access logs", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
@app.route('/stats', methods=['GET'])
|
|
def get_stats_endpoint():
|
|
"""Get system statistics."""
|
|
try:
|
|
stats = get_storage_stats()
|
|
return jsonify(stats), 200
|
|
|
|
except Exception as e:
|
|
logger.error("Error fetching stats", error=str(e))
|
|
return jsonify({'error': 'Internal server error'}), 500
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000, debug=False) |