import json import os import mysql.connector import boto3 import sys # --- Configuration Loading --- CONFIG_FILE = 'config.json' def load_config(): """Loads configuration from config.json.""" if not os.path.exists(CONFIG_FILE): print(f"Error: Configuration file '{CONFIG_FILE}' not found.") sys.exit(1) try: with open(CONFIG_FILE, 'r') as f: return json.load(f) except json.JSONDecodeError as e: print(f"Error decoding JSON in config file: {e}") sys.exit(1) # --- S3 Helper Functions --- def generate_new_s3_key(old_key, prefix): """ Generates a new S3 key by prepending the destination prefix to the old key, thereby preserving the original path structure. """ if not isinstance(old_key, str) or not old_key: return None # Ensure the prefix ends with a slash if it's not empty and doesn't already end with one. prefixed_path = prefix if prefix and not prefix.endswith('/'): prefixed_path += '/' # Combine the prefix and the old key. If the old key starts with a slash, strip it. final_key = old_key.lstrip('/') return f"{prefixed_path}{final_key}" def move_s3_object(s3_client, source_bucket, source_key, dest_bucket, dest_key): """ Performs the S3 'move' operation (Copy + Delete). Returns True on success, False otherwise. """ copy_source = { 'Bucket': source_bucket, 'Key': source_key } try: print(f" -> Copying '{source_key}' to '{dest_key}' in '{dest_bucket}'...") # 1. Copy the object s3_client.copy_object( CopySource=copy_source, Bucket=dest_bucket, Key=dest_key ) print(" -> Copy object successful.") # print(f" -> Deleting original object from '{source_bucket}/{source_key}'...") # # 2. Delete the original object # s3_client.delete_object( # Bucket=source_bucket, # Key=source_key # ) # print(" -> Delete object successful.") return True except Exception as e: print(f" -> S3 Move FAILED for {source_key}: {e}") return False # --- MySQL Helper Functions --- def fetch_documents(db_config_full): """ Connects to MySQL and fetches all documents (ID and S3Key). NOTE: db_config_full includes the 'table' key, which must be filtered out for the connection. """ conn = None cursor = None # Separate connection parameters from the table name table_name = db_config_full['table'] connection_config = {k: v for k, v in db_config_full.items() if k != 'table'} try: # Connect using only valid connection parameters conn = mysql.connector.connect(**connection_config) cursor = conn.cursor() print(f"Fetching document IDs and S3Keys from table: {table_name}...") # Select all ID and S3Key (or whatever column holds the key) query = f"SELECT ID, S3Key FROM {table_name} order by UploadedAt desc limit 10" cursor.execute(query) # Fetch results as a list of dictionaries/tuples documents = cursor.fetchall() print(f"Found {len(documents)} documents to process.") return documents except mysql.connector.Error as err: print(f"MySQL Error: {err}") return [] finally: if cursor: cursor.close() if conn and conn.is_connected(): conn.close() def update_document_key(db_config_full, doc_id, new_s3_key): """ Updates the S3Key for a specific document ID in the database. NOTE: db_config_full includes the 'table' key, which must be filtered out for the connection. """ conn = None cursor = None # Separate connection parameters from the table name table_name = db_config_full['table'] connection_config = {k: v for k, v in db_config_full.items() if k != 'table'} try: # Connect using only valid connection parameters conn = mysql.connector.connect(**connection_config) cursor = conn.cursor() # Prepare the UPDATE query update_query = ( f"UPDATE {table_name} SET S3Key = %s WHERE ID = %s" ) cursor.execute(update_query, (new_s3_key, doc_id)) # Commit the transaction to apply the changes conn.commit() print(f" -> DB Update SUCCESS for ID {doc_id}.") return True except mysql.connector.Error as err: print(f" -> DB Update FAILED for ID {doc_id}: {err}. Rolling back.") if conn: conn.rollback() return False finally: if cursor: cursor.close() if conn and conn.is_connected(): conn.close() # --- Main Migration Logic --- def main(): """Executes the S3 migration and database update workflow.""" config = load_config() # Initialize S3 Client aws_config = config['aws'] try: s3_client = boto3.client( 's3', aws_access_key_id=aws_config['aws_access_key_id'], aws_secret_access_key=aws_config['aws_secret_access_key'], region_name=aws_config['aws_region'] ) print("S3 client initialized successfully.") except Exception as e: print(f"Failed to initialize S3 client: {e}") return # Fetch Documents documents = fetch_documents(config['mysql']) if not documents: print("No documents found or failed to connect to the database. Exiting.") return source_bucket = aws_config['source_bucket'] dest_bucket = aws_config['destination_bucket'] key_prefix = aws_config['destination_key_prefix'] success_count = 0 failure_count = 0 print("\n--- Starting Document Migration Process ---") for doc_id, old_s3_key in documents: print(f"\nProcessing Document ID: {doc_id}, Old Key: {old_s3_key}") if not old_s3_key: print(f" -> Skipping ID {doc_id}: S3Key is empty.") failure_count += 1 continue # 1. Generate new key, preserving path structure new_s3_key = generate_new_s3_key(old_s3_key, key_prefix) if not new_s3_key: print(f" -> Skipping ID {doc_id}: Could not generate new key from old key.") failure_count += 1 continue print(f" -> Calculated New Key: {new_s3_key}") # 2. Move S3 object (Copy + Delete) move_successful = move_s3_object( s3_client, source_bucket, old_s3_key, dest_bucket, new_s3_key ) if move_successful: # 3. Update database db_update_successful = update_document_key( config['mysql'], doc_id, new_s3_key ) if db_update_successful: success_count += 1 else: # If DB update fails, the S3 object is MOVED. Log critical error. print(f"CRITICAL: DB update failed for ID {doc_id}. Object is MOVED to {dest_bucket}/{new_s3_key}. Manual DB correction needed.") failure_count += 1 else: # If S3 move failed, the object remains in the source bucket. print(f"S3 move failed for ID {doc_id}. Object remains in {source_bucket}/{old_s3_key}. DB not updated.") failure_count += 1 print("\n--- Migration Summary ---") print(f"Total documents processed: {len(documents)}") print(f"Successful migrations (S3 Move + DB Update): {success_count}") print(f"Failed migrations: {failure_count}") if __name__ == "__main__": main()