import os import json import sys import subprocess import shutil import boto3 import logging from datetime import datetime from botocore.exceptions import ClientError, NoCredentialsError from typing import Optional, Dict, Any class BackupManager: """ Manages the lifecycle of a MongoDB backup: Dump -> Compress -> Upload -> Cleanup. """ def __init__(self, config_path: str): self.config_path = config_path self.timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") self.logger = self._setup_logging() self.config = self._load_and_validate_config() # Setup path variables self.db_name = self.config["MONGODB_BACKUP"].get("DATABASE_NAME") self.backup_dir = os.path.join(os.getcwd(), f"temp_backup_{self.timestamp}") self.archive_name_base = f"backup_{self.db_name if self.db_name else 'all'}_{self.timestamp}" self.archive_path_zip = f"{self.archive_name_base}.zip" def _setup_logging(self) -> logging.Logger: """Sets up console and file logging.""" logger = logging.getLogger("BackupAgent") logger.setLevel(logging.INFO) # Console Handler c_handler = logging.StreamHandler() c_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') c_handler.setFormatter(c_format) logger.addHandler(c_handler) # File Handler (Optional: Appends to a log file) f_handler = logging.FileHandler("backup_execution.log") f_handler.setFormatter(c_format) logger.addHandler(f_handler) return logger def _load_and_validate_config(self) -> Dict[str, Any]: """Loads JSON config and validates required keys.""" if not os.path.exists(self.config_path): self.logger.critical(f"Configuration file '{self.config_path}' not found.") sys.exit(1) try: with open(self.config_path, "r", encoding="utf-8") as f: config = json.load(f) except json.JSONDecodeError as e: self.logger.critical(f"Invalid JSON format in config file: {e}") sys.exit(1) # Basic Validation required_sections = ["MONGODB_BACKUP", "AWS_S3_CONFIGURATION"] for section in required_sections: if section not in config: self.logger.critical(f"Missing required config section: {section}") sys.exit(1) # Security Warning if config["AWS_S3_CONFIGURATION"].get("ACCESS_KEY"): self.logger.warning("Security Warning: AWS Keys found in plain text config. Consider using IAM Roles or Environment Variables.") return config def check_prerequisites(self) -> bool: """Checks if mongodump is installed and accessible.""" if shutil.which("mongodump") is None: self.logger.error("'mongodump' executable not found in system PATH.") return False return True def dump_mongodb(self) -> bool: """Runs mongodump to backup data locally.""" mongo_config = self.config["MONGODB_BACKUP"] mongo_uri = mongo_config.get("MONGO_CONNECTION_STRING") if not mongo_uri: self.logger.error("MongoDB Connection String is missing.") return False self.logger.info(f"Starting MongoDB backup for: {self.db_name if self.db_name else 'All Databases'}") # Construct command securely cmd = ["mongodump", "--uri", mongo_uri, "--out", self.backup_dir] if self.db_name: cmd.extend(["--db", self.db_name]) try: # Capture output for logging if needed process = subprocess.run( cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) self.logger.info("MongoDB dump completed successfully.") self.logger.debug(process.stderr) # mongodump writes logs to stderr usually return True except subprocess.CalledProcessError as e: self.logger.error(f"MongoDB dump failed with return code {e.returncode}") self.logger.error(f"Error details: {e.stderr}") return False except Exception as e: self.logger.error(f"Unexpected error during dump: {e}") return False def compress_backup(self) -> Optional[str]: """Compresses the backup directory into a zip file.""" self.logger.info(f"Compressing folder: {self.backup_dir}") try: # shutil.make_archive expects the base name without extension shutil.make_archive(self.archive_name_base, 'zip', self.backup_dir) if os.path.exists(self.archive_path_zip): size_mb = os.path.getsize(self.archive_path_zip) / (1024 * 1024) self.logger.info(f"Compression successful. File: {self.archive_path_zip} ({size_mb:.2f} MB)") return self.archive_path_zip else: self.logger.error("Compression finished but file was not found.") return None except Exception as e: self.logger.error(f"Compression failed: {e}") return None def upload_to_s3(self, file_path: str) -> bool: """Uploads the zip file to AWS S3 using boto3.""" s3_config = self.config["AWS_S3_CONFIGURATION"] bucket_name = s3_config.get("S3_BUCKET_NAME") region = s3_config.get("S3_REGION", "us-east-1") folder = s3_config.get("S3_FOLDER", "backups") self.logger.info(f"Initializing S3 upload to bucket: {bucket_name}") try: s3_client = boto3.client( 's3', aws_access_key_id=s3_config.get("ACCESS_KEY"), aws_secret_access_key=s3_config.get("SECRET_KEY"), region_name=region ) file_name = os.path.basename(file_path) s3_key = f"{folder}/{file_name}" s3_client.upload_file(file_path, bucket_name, s3_key) self.logger.info(f"Upload successful. S3 URI: s3://{bucket_name}/{s3_key}") return True except ClientError as e: self.logger.error(f"AWS ClientError: {e}") return False except NoCredentialsError: self.logger.error("AWS Credentials not found or invalid.") return False except Exception as e: self.logger.error(f"Unexpected error during upload: {e}") return False def cleanup(self): """Removes local artifacts to save space.""" self.logger.info("Starting cleanup of local temporary files...") try: # Remove Zip if os.path.exists(self.archive_path_zip): os.remove(self.archive_path_zip) self.logger.info(f"Deleted file: {self.archive_path_zip}") # Remove Temp Directory if os.path.exists(self.backup_dir): shutil.rmtree(self.backup_dir) self.logger.info(f"Deleted directory: {self.backup_dir}") except Exception as e: self.logger.warning(f"Cleanup encountered issues: {e}") def run(self): """Main execution flow.""" self.logger.info("=== Backup Process Started ===") if not self.check_prerequisites(): sys.exit(1) if self.dump_mongodb(): zip_file = self.compress_backup() upload_success = False if zip_file: upload_success = self.upload_to_s3(zip_file) self.cleanup() if upload_success: self.logger.info("=== Backup Process Completed Successfully ===") else: self.logger.error("=== Backup Process Failed during Upload stage ===") sys.exit(1) else: self.logger.error("=== Backup Process Failed during Dump stage ===") self.cleanup() # Attempt to clean up partial dumps sys.exit(1) if __name__ == "__main__": # Point this to your config file CONFIG_FILE = "config.json" agent = BackupManager(CONFIG_FILE) agent.run()