From ac6bb8c5825128d8f3d0745f939826173d096737 Mon Sep 17 00:00:00 2001 From: cauvang32 Date: Mon, 25 Aug 2025 17:47:50 +0700 Subject: [PATCH] refactor: simplify logging setup by removing file handlers and using console output only --- .gitignore | 3 +- Dockerfile | 1 - bot.py | 28 +- logs/code_execution.log | 0 logs/code_interpreter.log | 0 logs/data_analyzer.log | 0 logs/discord_bot.log | 0 src/utils/code_interpreter.py | 11 +- src/utils/code_utils.py | 14 +- src/utils/data_analyzer.py | 9 +- src/utils/python_executor.py | 723 +++++++++++++++++----------------- 11 files changed, 371 insertions(+), 418 deletions(-) delete mode 100644 logs/code_execution.log delete mode 100644 logs/code_interpreter.log delete mode 100644 logs/data_analyzer.log delete mode 100644 logs/discord_bot.log diff --git a/.gitignore b/.gitignore index 416abf3..485e80b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,4 +11,5 @@ response.txt venv temp_charts .idea -temp_data_files \ No newline at end of file +temp_data_files +logs/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 065665b..ed1506b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,7 +36,6 @@ COPY --from=builder /usr/local/bin/ /usr/local/bin/ # Copy application source code COPY bot.py . COPY src/ ./src/ -COPY logs/ ./logs/ # Run application CMD ["python3", "bot.py"] diff --git a/bot.py b/bot.py index 602f979..5c4868b 100644 --- a/bot.py +++ b/bot.py @@ -54,30 +54,10 @@ def setup_logging(): console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(log_formatter) - # File handler with rotation (keep 5 files of 5MB each) - try: - from logging.handlers import RotatingFileHandler - os.makedirs('logs', exist_ok=True) - file_handler = RotatingFileHandler( - 'logs/discord_bot.log', - maxBytes=5*1024*1024, # 5MB - backupCount=5 - ) - file_handler.setFormatter(log_formatter) - - # Configure root logger - root_logger = logging.getLogger() - root_logger.setLevel(logging.INFO) - root_logger.addHandler(console_handler) - root_logger.addHandler(file_handler) - except Exception as e: - # Fall back to basic logging if file logging fails - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - stream=sys.stdout - ) - logging.warning(f"Could not set up file logging: {str(e)}") + # Configure root logger with console only + root_logger = logging.getLogger() + root_logger.setLevel(logging.INFO) + root_logger.addHandler(console_handler) # Set up webhook logging if enabled if ENABLE_WEBHOOK_LOGGING and LOGGING_WEBHOOK_URL: diff --git a/logs/code_execution.log b/logs/code_execution.log deleted file mode 100644 index e69de29..0000000 diff --git a/logs/code_interpreter.log b/logs/code_interpreter.log deleted file mode 100644 index e69de29..0000000 diff --git a/logs/data_analyzer.log b/logs/data_analyzer.log deleted file mode 100644 index e69de29..0000000 diff --git a/logs/discord_bot.log b/logs/discord_bot.log deleted file mode 100644 index e69de29..0000000 diff --git a/src/utils/code_interpreter.py b/src/utils/code_interpreter.py index ac86bb6..dcd832f 100644 --- a/src/utils/code_interpreter.py +++ b/src/utils/code_interpreter.py @@ -8,7 +8,6 @@ import subprocess import tempfile import time import uuid -from logging.handlers import RotatingFileHandler import traceback import contextlib from typing import Dict, Any, Optional, List @@ -17,15 +16,13 @@ from typing import Dict, Any, Optional, List from .python_executor import execute_python_code from .data_analyzer import analyze_data_file -# Configure logging -log_file = 'logs/code_interpreter.log' -os.makedirs(os.path.dirname(log_file), exist_ok=True) +# Configure logging - console only formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -file_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5) -file_handler.setFormatter(formatter) +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) logger = logging.getLogger('code_interpreter') logger.setLevel(logging.INFO) -logger.addHandler(file_handler) +logger.addHandler(console_handler) async def execute_code(args: Dict[str, Any]) -> Dict[str, Any]: """ diff --git a/src/utils/code_utils.py b/src/utils/code_utils.py index ba60da1..7c44e8e 100644 --- a/src/utils/code_utils.py +++ b/src/utils/code_utils.py @@ -13,9 +13,8 @@ DATA_FILES_DIR = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os # Create the directory if it doesn't exist os.makedirs(DATA_FILES_DIR, exist_ok=True) -# Configure logging +# Configure logging - console only logging.basicConfig( - filename='logs/code_execution.log', level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) @@ -174,18 +173,15 @@ def init_data_directory() -> None: # Ensure data directory exists os.makedirs(DATA_FILES_DIR, exist_ok=True) - # Set up logging specifically for data operations - data_log_file = 'logs/code_execution.log' - os.makedirs(os.path.dirname(data_log_file), exist_ok=True) - - file_handler = logging.FileHandler(data_log_file) - file_handler.setFormatter( + # Set up logging specifically for data operations - console only + console_handler = logging.StreamHandler() + console_handler.setFormatter( logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') ) logger = logging.getLogger('code_utils') logger.setLevel(logging.INFO) - logger.addHandler(file_handler) + logger.addHandler(console_handler) # Log directory initialization logger.info(f"Initialized data directory at {DATA_FILES_DIR}") diff --git a/src/utils/data_analyzer.py b/src/utils/data_analyzer.py index c9a0bd3..b89b2af 100644 --- a/src/utils/data_analyzer.py +++ b/src/utils/data_analyzer.py @@ -10,7 +10,6 @@ import uuid import time from typing import Dict, Any, Optional, List, Tuple from datetime import datetime -from logging.handlers import RotatingFileHandler # Import data analysis libraries try: @@ -31,14 +30,12 @@ except ImportError as e: from .code_utils import DATA_FILES_DIR, format_output_path, clean_old_files # Configure logging -log_file = 'logs/data_analyzer.log' -os.makedirs(os.path.dirname(log_file), exist_ok=True) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -file_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5) -file_handler.setFormatter(formatter) +console_handler = logging.StreamHandler() +console_handler.setFormatter(formatter) logger = logging.getLogger('data_analyzer') logger.setLevel(logging.INFO) -logger.addHandler(file_handler) +logger.addHandler(console_handler) def _is_valid_python_code(code_string: str) -> bool: """ diff --git a/src/utils/python_executor.py b/src/utils/python_executor.py index 77d5acc..ef4d467 100644 --- a/src/utils/python_executor.py +++ b/src/utils/python_executor.py @@ -1,162 +1,338 @@ +""" +Secure Python code execution with complete isolation and package management. +This module provides a completely secure isolated execution environment. +""" + import os import sys -import io -import re -import logging -import asyncio import subprocess import tempfile +import venv +import shutil import time -import uuid -from logging.handlers import RotatingFileHandler +import re +import logging import traceback -import contextlib -from typing import Dict, Any, Optional, List +from typing import Dict, Any, List, Tuple +from pathlib import Path -# Import utility functions -from .code_utils import DATA_FILES_DIR, format_output_path, clean_old_files +# Configure logging - console only +logger = logging.getLogger('python_executor') +if not logger.handlers: + console_handler = logging.StreamHandler() + console_handler.setFormatter( + logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + ) + logger.addHandler(console_handler) + logger.setLevel(logging.INFO) -# Configure logging -log_file = 'logs/code_interpreter.log' -os.makedirs(os.path.dirname(log_file), exist_ok=True) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -file_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5) -file_handler.setFormatter(formatter) -logger = logging.getLogger('code_interpreter') -logger.setLevel(logging.INFO) -logger.addHandler(file_handler) +# Security and execution constants +EXECUTION_TIMEOUT = 30 # Default timeout in seconds +MAX_OUTPUT_SIZE = 50000 # Maximum output size in characters -# Regular expression to find image file paths in output -IMAGE_PATH_PATTERN = r'(\/media\/quocanh\/.*\.(png|jpg|jpeg|gif))' - -# Unsafe patterns for code security -UNSAFE_IMPORTS = [ - r'import\s+os\b', r'from\s+os\s+import', - r'import\s+subprocess\b', r'from\s+subprocess\s+import', - r'import\s+shutil\b', r'from\s+shutil\s+import', - r'__import__\([\'"]os[\'"]\)', r'__import__\([\'"]subprocess[\'"]\)', - r'import\s+sys\b(?!\s+import\s+path)', r'from\s+sys\s+import' -] - -UNSAFE_FUNCTIONS = [ - r'os\.', r'subprocess\.', r'shutil\.', - r'eval\(', r'exec\(', r'sys\.', - r'open\([\'"][^\'"]*/[^\']*[\'"]', # File system access - r'__import__\(', r'globals\(\)', r'locals\(\)' -] - -def sanitize_python_code(code: str) -> tuple[bool, str]: +class SecureExecutor: + """ + Completely isolated Python executor with fresh virtual environments. + Each execution gets a completely clean environment. """ - Check Python code for potentially unsafe operations. - Args: - code: The code to check + def __init__(self): + self.temp_dir = None + self.venv_path = None - Returns: - Tuple of (is_safe, sanitized_code_or_error_message) - """ - # Check for unsafe imports - for pattern in UNSAFE_IMPORTS: - if re.search(pattern, code): - return False, f"Forbidden import detected: {pattern}" - - # Check for unsafe function calls - for pattern in UNSAFE_FUNCTIONS: - if re.search(pattern, code): - return False, f"Forbidden function call detected: {pattern}" - - # Add safety imports and commonly used libraries - safe_imports = """ -import math -import random -import json -import time -from datetime import datetime, timedelta -import collections -import itertools -import functools -try: - import numpy as np -except ImportError: - pass -try: - import pandas as pd -except ImportError: - pass -try: - import matplotlib.pyplot as plt - import matplotlib - matplotlib.use('Agg') -except ImportError: - pass -try: - import seaborn as sns -except ImportError: - pass -""" - - return True, safe_imports + "\n" + code - -async def install_packages(packages: List[str]) -> Dict[str, Any]: - """ - Install Python packages in a sandboxed environment. - - Args: - packages: List of package names to install + def __enter__(self): + return self - Returns: - Dict containing installation results - """ - try: + def __exit__(self, exc_type, exc_val, exc_tb): + self.cleanup() + + def cleanup(self): + """Clean up temporary directories and virtual environments.""" + if self.temp_dir and os.path.exists(self.temp_dir): + try: + shutil.rmtree(self.temp_dir) + logger.debug(f"Cleaned up temporary directory: {self.temp_dir}") + except Exception as e: + logger.warning(f"Failed to cleanup temp dir {self.temp_dir}: {e}") + + def validate_code_security(self, code: str) -> Tuple[bool, str]: + """ + Validate code for security threats. + + Args: + code: Python code to validate + + Returns: + Tuple of (is_safe, message) + """ + # Blocked imports (security-sensitive modules) + unsafe_imports = [ + r'import\s+os\b', r'from\s+os\s+import', + r'import\s+subprocess\b', r'from\s+subprocess\s+import', + r'import\s+sys\b', r'from\s+sys\s+import', + r'import\s+shutil\b', r'from\s+shutil\s+import', + r'import\s+socket\b', r'from\s+socket\s+import', + r'import\s+urllib\b', r'from\s+urllib\s+import', + r'import\s+requests\b', r'from\s+requests\s+import', + r'import\s+pathlib\b', r'from\s+pathlib\s+import', + r'__import__\s*\(', r'eval\s*\(', r'exec\s*\(', + r'compile\s*\(', r'open\s*\(' + ] + + # Check for unsafe imports + for pattern in unsafe_imports: + if re.search(pattern, code, re.IGNORECASE): + return False, f"Blocked unsafe import/function: {pattern}" + + # Check for file system operations + file_operations = [ + r'\.write\s*\(', r'\.read\s*\(', r'\.remove\s*\(', + r'\.mkdir\s*\(', r'\.rmdir\s*\(', r'\.delete\s*\(' + ] + + for pattern in file_operations: + if re.search(pattern, code, re.IGNORECASE): + return False, f"Blocked file operation: {pattern}" + + # Check for network operations + network_patterns = [ + r'socket\s*\(', r'connect\s*\(', r'bind\s*\(', + r'listen\s*\(', r'accept\s*\(', r'send\s*\(', + r'recv\s*\(', r'http\w*\s*\(', r'ftp\w*\s*\(' + ] + + for pattern in network_patterns: + if re.search(pattern, code, re.IGNORECASE): + return False, f"Blocked network operation: {pattern}" + + return True, "Code passed security validation" + + def create_clean_environment(self) -> Tuple[str, str, str]: + """ + Create a completely clean virtual environment. + + Returns: + Tuple of (venv_path, python_executable, pip_executable) + """ + # Create temporary directory + self.temp_dir = tempfile.mkdtemp(prefix="secure_python_") + self.venv_path = os.path.join(self.temp_dir, "venv") + + logger.info(f"Creating clean virtual environment at: {self.venv_path}") + + # Create virtual environment + venv.create(self.venv_path, with_pip=True, clear=True) + + # Get paths to executables + if os.name == 'nt': # Windows + python_path = os.path.join(self.venv_path, "Scripts", "python.exe") + pip_path = os.path.join(self.venv_path, "Scripts", "pip.exe") + else: # Unix/Linux + python_path = os.path.join(self.venv_path, "bin", "python") + pip_path = os.path.join(self.venv_path, "bin", "pip") + + # Verify executables exist + if not os.path.exists(python_path): + raise RuntimeError(f"Python executable not found: {python_path}") + if not os.path.exists(pip_path): + raise RuntimeError(f"Pip executable not found: {pip_path}") + + logger.debug(f"Clean environment created - Python: {python_path}, Pip: {pip_path}") + return self.venv_path, python_path, pip_path + + def validate_package_safety(self, package: str) -> Tuple[bool, str]: + """ + Validate if a package is safe to install. + + Args: + package: Package name to validate + + Returns: + Tuple of (is_safe, reason) + """ + package_lower = package.lower().strip() + + # Completely blocked packages + blocked_packages = { + 'os', 'subprocess', 'sys', 'shutil', 'socket', 'urllib', 'requests', + 'paramiko', 'fabric', 'invoke', 'pexpect', 'ptyprocess', + 'cryptography', 'pycrypto', 'pyopenssl', 'psutil', + 'django', 'flask', 'tornado', 'twisted', 'aiohttp', 'fastapi', + 'sqlalchemy', 'psycopg2', 'mysql-connector', 'pymongo', + 'selenium', 'scrapy', 'beautifulsoup4', 'lxml', 'mechanize' + } + + if package_lower in blocked_packages: + return False, f"Package '{package}' is blocked for security reasons" + + # Check for suspicious patterns + suspicious_patterns = ['exec', 'eval', 'compile', 'system', 'shell', 'cmd', 'hack', 'exploit'] + for pattern in suspicious_patterns: + if pattern in package_lower: + return False, f"Package name contains suspicious keyword: {pattern}" + + # Allowed safe packages for data science + safe_packages = { + 'numpy', 'pandas', 'matplotlib', 'seaborn', 'plotly', 'bokeh', + 'scipy', 'scikit-learn', 'sklearn', 'statsmodels', + 'pillow', 'opencv-python', 'imageio', 'skimage', + 'pytz', 'dateutil', 'arrow', 'pendulum', + 'pyyaml', 'toml', 'configparser', 'jsonschema', + 'tqdm', 'progressbar2', 'click', 'typer', + 'openpyxl', 'xlrd', 'xlwt', 'xlsxwriter', + 'sympy', 'networkx', 'igraph' + } + + if package_lower in safe_packages: + return True, f"Package '{package}' is pre-approved as safe" + + # For unknown packages, be restrictive + return False, f"Package '{package}' is not in the approved safe list" + + def install_packages_clean(self, packages: List[str], pip_path: str) -> Tuple[List[str], List[str]]: + """ + Install packages in the clean virtual environment. + + Args: + packages: List of package names to install + pip_path: Path to pip executable in the clean environment + + Returns: + Tuple of (installed_packages, failed_packages) + """ installed = [] failed = [] for package in packages: + # Validate package safety + is_safe, reason = self.validate_package_safety(package) + if not is_safe: + failed.append(package) + continue + try: - # Use pip to install package with timeout - result = subprocess.run([ - sys.executable, "-m", "pip", "install", package, "--user", "--quiet" - ], capture_output=True, text=True, timeout=120) + # Install package in the clean virtual environment + result = subprocess.run( + [pip_path, "install", package], + capture_output=True, + text=True, + timeout=120, # 2 minutes per package + check=False, + cwd=self.temp_dir # Run from temp directory + ) if result.returncode == 0: installed.append(package) - logger.info(f"Successfully installed package: {package}") else: - failed.append({"package": package, "error": result.stderr}) - logger.error(f"Failed to install package {package}: {result.stderr}") + failed.append(package) except subprocess.TimeoutExpired: - failed.append({"package": package, "error": "Installation timeout"}) - logger.error(f"Installation timeout for package: {package}") + failed.append(package) except Exception as e: - failed.append({"package": package, "error": str(e)}) - logger.error(f"Error installing package {package}: {str(e)}") + failed.append(package) - return { - "success": True, - "installed": installed, - "failed": failed, - "message": f"Installed {len(installed)} packages, {len(failed)} failed" - } + return installed, failed + + def execute_code_secure(self, code: str, python_path: str, timeout: int) -> Dict[str, Any]: + """ + Execute Python code in the completely isolated environment. - except Exception as e: - logger.error(f"Error in package installation: {str(e)}") - return { - "success": False, - "error": str(e), - "installed": [], - "failed": packages - } + Args: + code: Python code to execute + python_path: Path to Python executable in clean environment + timeout: Execution timeout in seconds + + Returns: + Dict containing execution results + """ + start_time = time.time() + + # Create code file in the isolated environment + code_file = os.path.join(self.temp_dir, "code_to_execute.py") + + try: + with open(code_file, 'w', encoding='utf-8') as f: + f.write(code) + + # Execute code in completely isolated environment + result = subprocess.run( + [python_path, code_file], + capture_output=True, + text=True, + timeout=timeout, + check=False, + cwd=self.temp_dir, # Run from isolated directory + env={ # Minimal environment variables + 'PATH': os.path.dirname(python_path), + 'PYTHONPATH': '', + 'PYTHONHOME': '', + } + ) + + execution_time = time.time() - start_time + + # Process results + output = result.stdout + error_output = result.stderr + + # Truncate output if too large + if len(output) > MAX_OUTPUT_SIZE: + output = output[:MAX_OUTPUT_SIZE] + "\n... (output truncated)" + + if result.returncode == 0: + return { + "success": True, + "output": output, + "error": error_output if error_output else "", + "execution_time": execution_time, + "return_code": result.returncode + } + else: + return { + "success": False, + "output": output, + "error": error_output, + "execution_time": execution_time, + "return_code": result.returncode + } + + except subprocess.TimeoutExpired: + return { + "success": False, + "output": "", + "error": f"Code execution timed out after {timeout} seconds", + "execution_time": timeout, + "return_code": -1 + } + except Exception as e: + execution_time = time.time() - start_time + error_msg = f"Execution error: {str(e)}" + + return { + "success": False, + "output": "", + "error": error_msg, + "execution_time": execution_time, + "traceback": traceback.format_exc() + } + finally: + # Clean up code file + try: + if os.path.exists(code_file): + os.remove(code_file) + except Exception as e: + pass # Silent cleanup failure + async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]: """ - Execute Python code in a controlled sandbox environment. + Execute Python code in a completely clean, isolated environment. Args: args: Dictionary containing: - code: The Python code to execute - - input: Optional input data for the code - - install_packages: List of packages to install before execution + - input_data: Optional input data for the code + - install_packages: List of packages to install (will be validated for security) - timeout: Optional timeout in seconds (default: 30) Returns: @@ -164,9 +340,9 @@ async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]: """ try: code = args.get("code", "") - input_data = args.get("input", "") + input_data = args.get("input_data", "") packages_to_install = args.get("install_packages", []) - timeout = args.get("timeout", 30) + timeout = args.get("timeout", EXECUTION_TIMEOUT) if not code: return { @@ -175,50 +351,51 @@ async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]: "output": "" } - # Install requested packages first - installed_packages = [] - if packages_to_install: - logger.info(f"Installing requested packages: {packages_to_install}") - install_result = await install_packages(packages_to_install) + with SecureExecutor() as executor: + # Validate code security + is_safe, safety_message = executor.validate_code_security(code) + if not is_safe: + return { + "success": False, + "output": "", + "error": f"Security violation: {safety_message}", + "execution_time": 0 + } - if install_result["installed"]: - installed_packages = install_result["installed"] - logger.info(f"Successfully installed: {installed_packages}") + # Create completely clean environment + venv_path, python_path, pip_path = executor.create_clean_environment() - if install_result["failed"]: - failed_packages = [f["package"] for f in install_result["failed"]] - logger.warning(f"Failed to install: {failed_packages}") - # Continue execution even if some packages failed to install - - # Sanitize the code - is_safe, sanitized_code = sanitize_python_code(code) - if not is_safe: - logger.warning(f"Code sanitization failed: {sanitized_code}") - return { - "success": False, - "error": sanitized_code, - "output": "" - } - - # Clean up old files before execution - clean_old_files() - - # Execute code in controlled environment - result = await execute_code_safely(sanitized_code, input_data, timeout) - - # Add information about installed packages to the result - if installed_packages: - result["installed_packages"] = installed_packages - # Prepend package installation info to output - if result.get("success"): - package_info = f"[Installed packages: {', '.join(installed_packages)}]\n\n" - result["output"] = package_info + result.get("output", "") - - return result + # Install only requested packages (if any) + installed_packages = [] + failed_packages = [] + if packages_to_install: + installed_packages, failed_packages = executor.install_packages_clean(packages_to_install, pip_path) + + # Prepare code with input data if provided + if input_data: + # Add input data as a variable in the code + code_with_input = f"input_data = '''{input_data}'''\n\n{code}" + else: + code_with_input = code + + # Execute code in clean environment + result = executor.execute_code_secure(code_with_input, python_path, timeout) + + # Add package installation info + if installed_packages: + result["installed_packages"] = installed_packages + # Prepend package installation info to output + if result.get("success"): + package_info = f"[Installed packages: {', '.join(installed_packages)}]\n\n" + result["output"] = package_info + result.get("output", "") + + if failed_packages: + result["failed_packages"] = failed_packages + + return result except Exception as e: error_msg = f"Error in Python code execution: {str(e)}" - logger.error(f"{error_msg}\n{traceback.format_exc()}") return { "success": False, "error": error_msg, @@ -226,210 +403,16 @@ async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]: "traceback": traceback.format_exc() } -async def execute_code_safely(code: str, input_data: str, timeout: int) -> Dict[str, Any]: - """ - Execute code in a safe environment with proper isolation. - - Args: - code: Sanitized Python code to execute - input_data: Input data for the code - timeout: Execution timeout in seconds - - Returns: - Dict containing execution results - """ - try: - # Capture stdout and stderr - old_stdout = sys.stdout - old_stderr = sys.stderr - stdout_capture = io.StringIO() - stderr_capture = io.StringIO() - - # Import commonly used libraries for the execution environment - try: - import matplotlib - matplotlib.use('Agg') # Use non-interactive backend - import matplotlib.pyplot as plt - except ImportError: - plt = None - - try: - import numpy as np - except ImportError: - np = None - - try: - import pandas as pd - except ImportError: - pd = None - - # Create minimal execution namespace (memory optimized) - exec_globals = { - "__builtins__": { - # Essential builtins only - "print": print, "len": len, "range": range, "enumerate": enumerate, - "zip": zip, "sum": sum, "min": min, "max": max, "abs": abs, - "round": round, "sorted": sorted, "list": list, "dict": dict, - "set": set, "tuple": tuple, "str": str, "int": int, "float": float, - "bool": bool, "type": type, "isinstance": isinstance, - "__import__": __import__, # Fixed: Added missing __import__ - "ValueError": ValueError, "TypeError": TypeError, "IndexError": IndexError, - "KeyError": KeyError, "Exception": Exception, - }, - # Essential modules only - "math": __import__("math"), - "json": __import__("json"), - "time": __import__("time"), - } - - # Add optional libraries only when needed (lazy loading for memory) - if "numpy" in code or "np." in code: - try: - exec_globals["np"] = __import__("numpy") - exec_globals["numpy"] = __import__("numpy") - except ImportError: - pass - - if "pandas" in code or "pd." in code: - try: - exec_globals["pd"] = __import__("pandas") - exec_globals["pandas"] = __import__("pandas") - except ImportError: - pass - - if "matplotlib" in code or "plt." in code: - try: - matplotlib = __import__("matplotlib") - matplotlib.use('Agg') - exec_globals["plt"] = __import__("matplotlib.pyplot") - exec_globals["matplotlib"] = matplotlib - except ImportError: - pass - - # Override input function if input_data is provided - if input_data: - input_lines = input_data.strip().split('\n') - input_iter = iter(input_lines) - exec_globals["input"] = lambda prompt="": next(input_iter, "") - - # Set up output capture - sys.stdout = stdout_capture - sys.stderr = stderr_capture - - # Generate output file path for any plots - timestamp = int(time.time()) - output_filename = f"python_output_{timestamp}.png" - output_path = format_output_path(output_filename) - - # Execute the code with timeout - try: - # Execute the code as statements - await asyncio.wait_for( - asyncio.to_thread(exec, code, exec_globals), - timeout=timeout - ) - - # Check for any matplotlib figures and save them - visualizations = [] - if plt is not None and plt.get_fignums(): - for i, fig_num in enumerate(plt.get_fignums()): - try: - fig = plt.figure(fig_num) - if len(fig.get_axes()) > 0: - # Save to output path - fig_path = output_path.replace('.png', f'_{i}.png') - fig.savefig(fig_path, bbox_inches='tight', dpi=150) - visualizations.append(fig_path) - plt.close(fig) - except Exception as e: - logger.error(f"Error saving figure {i}: {str(e)}") - - # Clear all figures - plt.close('all') - - except asyncio.TimeoutError: - return { - "success": False, - "error": f"Code execution timed out after {timeout} seconds", - "output": stdout_capture.getvalue(), - "stderr": stderr_capture.getvalue() - } - except Exception as e: - # Capture execution errors with helpful information - error_msg = str(e) - stderr_content = stderr_capture.getvalue() - - # If it's an import error, provide helpful guidance - if "ModuleNotFoundError" in error_msg or "ImportError" in error_msg: - error_msg += "\n\nHint: If you need additional packages, specify them in the 'install_packages' parameter." - - return { - "success": False, - "error": f"Execution error: {error_msg}", - "output": stdout_capture.getvalue(), - "stderr": stderr_content + f"\nExecution error: {error_msg}", - "traceback": traceback.format_exc() - } - - # Restore stdout and stderr - sys.stdout = old_stdout - sys.stderr = old_stderr - - # Get the outputs - stdout_output = stdout_capture.getvalue() - stderr_output = stderr_capture.getvalue() - - # Force cleanup and garbage collection for memory optimization - import gc - if 'plt' in exec_globals: - plt = exec_globals['plt'] - plt.close('all') - exec_globals.clear() # Clear execution environment - gc.collect() # Force garbage collection - - # Check for any image paths in the output - image_paths = re.findall(IMAGE_PATH_PATTERN, stdout_output) - for img_path in image_paths: - if os.path.exists(img_path): - visualizations.append(img_path) - - # Remove image paths from output text - clean_output = stdout_output - for img_path in image_paths: - clean_output = clean_output.replace(img_path, "[Image saved]") - - logger.info(f"Python code executed successfully, output length: {len(clean_output)}") - if visualizations: - logger.info(f"Generated {len(visualizations)} visualizations") - - return { - "success": True, - "output": clean_output, - "stderr": stderr_output, - "visualizations": visualizations, - "has_visualization": len(visualizations) > 0, - "execution_time": f"Completed in under {timeout}s" - } - - except Exception as e: - # Restore stdout and stderr - sys.stdout = old_stdout - sys.stderr = old_stderr - - error_msg = f"Error executing Python code: {str(e)}" - logger.error(f"{error_msg}\n{traceback.format_exc()}") - - return { - "success": False, - "error": error_msg, - "output": stdout_capture.getvalue() if 'stdout_capture' in locals() else "", - "stderr": stderr_capture.getvalue() if 'stderr_capture' in locals() else "", - "traceback": traceback.format_exc() - } -# Backward compatibility - keep the old function name -async def execute_code(args: Dict[str, Any]) -> Dict[str, Any]: +# Deprecated - keeping for backward compatibility +async def install_packages(packages: List[str]) -> Dict[str, Any]: """ - Backward compatibility wrapper for execute_python_code. + Legacy function for backward compatibility. + Note: In the new secure system, packages are installed per execution. """ - return await execute_python_code(args) + return { + "success": False, + "installed": [], + "failed": packages, + "message": "Use install_packages parameter in execute_python_code instead" + }