refactor: enhance persistent package management and streamline code execution environment

This commit is contained in:
2025-08-26 23:21:28 +07:00
parent b51bcdc3a9
commit eaaef0676a
2 changed files with 231 additions and 72 deletions

View File

@@ -438,8 +438,7 @@ async def call_openai_api(client, messages, model, temperature=0.7, max_tokens=N
"model": model, "model": model,
"messages": messages, "messages": messages,
"max_tokens": max_tokens, "max_tokens": max_tokens,
"tools": tools, "tools": tools
"tool_choice": "required" if tools else None # Force tool usage when available
} }
# Add temperature only for models that support it (exclude GPT-5 family) # Add temperature only for models that support it (exclude GPT-5 family)

View File

@@ -1,6 +1,6 @@
""" """
Secure Python code execution with complete isolation and package management. Secure Python code execution with persistent virtual environment and package management.
This module provides a completely secure isolated execution environment. This module provides secure execution with persistent package storage but clean code execution.
""" """
import os import os
@@ -14,8 +14,10 @@ import time
import re import re
import logging import logging
import traceback import traceback
import json
from typing import Dict, Any, List, Tuple from typing import Dict, Any, List, Tuple
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta
# Configure logging - console only # Configure logging - console only
logger = logging.getLogger('python_executor') logger = logging.getLogger('python_executor')
@@ -31,15 +33,175 @@ if not logger.handlers:
EXECUTION_TIMEOUT = 30 # Default timeout in seconds EXECUTION_TIMEOUT = 30 # Default timeout in seconds
MAX_OUTPUT_SIZE = 50000 # Maximum output size in characters MAX_OUTPUT_SIZE = 50000 # Maximum output size in characters
# Persistent environment configuration
PACKAGE_CLEANUP_DAYS = 3 # Cleanup packages every 3 days
PERSISTENT_VENV_DIR = Path("/tmp/bot_code_executor")
PACKAGE_CACHE_FILE = PERSISTENT_VENV_DIR / "package_cache.json"
class PersistentPackageManager:
"""
Manages a persistent virtual environment for packages while keeping code execution clean.
Packages persist for 3 days, code files are cleaned up after each execution.
"""
def __init__(self):
self.venv_dir = PERSISTENT_VENV_DIR
self.cache_file = PACKAGE_CACHE_FILE
self.python_path = None
self.pip_path = None
self._setup_paths()
def _setup_paths(self):
"""Setup Python and pip executable paths."""
if os.name == 'nt': # Windows
self.python_path = self.venv_dir / "Scripts" / "python.exe"
self.pip_path = self.venv_dir / "Scripts" / "pip.exe"
else: # Unix/Linux
self.python_path = self.venv_dir / "bin" / "python"
self.pip_path = self.venv_dir / "bin" / "pip"
def _load_package_cache(self) -> Dict[str, Any]:
"""Load package installation cache."""
if not self.cache_file.exists():
return {"packages": {}, "last_cleanup": None}
try:
with open(self.cache_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load package cache: {e}")
return {"packages": {}, "last_cleanup": None}
def _save_package_cache(self, cache_data: Dict[str, Any]):
"""Save package installation cache."""
try:
self.venv_dir.mkdir(parents=True, exist_ok=True)
with open(self.cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
except Exception as e:
logger.warning(f"Failed to save package cache: {e}")
def _needs_cleanup(self) -> bool:
"""Check if package cleanup is needed (every 3 days)."""
cache = self._load_package_cache()
last_cleanup = cache.get("last_cleanup")
if not last_cleanup:
return True
try:
last_cleanup_date = datetime.fromisoformat(last_cleanup)
return datetime.now() - last_cleanup_date > timedelta(days=PACKAGE_CLEANUP_DAYS)
except Exception:
return True
async def ensure_venv_ready(self) -> bool:
"""Ensure the persistent virtual environment is ready."""
try:
# Check if cleanup is needed
if self._needs_cleanup():
logger.info("Performing periodic package cleanup...")
await self._cleanup_packages()
return True
# Check if venv exists and is functional
if not self.venv_dir.exists() or not self.python_path.exists():
logger.info("Creating persistent virtual environment for packages...")
await self._create_venv()
return True
# Test if venv is functional
try:
process = await asyncio.create_subprocess_exec(
str(self.python_path), "-c", "import sys; print('OK')",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0 or b'OK' not in stdout:
logger.info("Persistent venv is corrupted, recreating...")
await self._cleanup_packages()
return True
except Exception:
logger.info("Persistent venv test failed, recreating...")
await self._cleanup_packages()
return True
logger.debug("Using existing persistent virtual environment")
return True
except Exception as e:
logger.error(f"Error ensuring venv ready: {e}")
return False
async def _create_venv(self):
"""Create a fresh virtual environment."""
try:
# Remove existing venv if it exists
if self.venv_dir.exists():
shutil.rmtree(self.venv_dir)
# Create new venv
self.venv_dir.mkdir(parents=True, exist_ok=True)
venv.create(str(self.venv_dir), with_pip=True, clear=True)
# Initialize cache
cache_data = {
"packages": {},
"last_cleanup": datetime.now().isoformat()
}
self._save_package_cache(cache_data)
logger.info(f"Created fresh persistent venv at {self.venv_dir}")
except Exception as e:
logger.error(f"Failed to create persistent venv: {e}")
raise
async def _cleanup_packages(self):
"""Cleanup and recreate the virtual environment."""
try:
logger.info("Cleaning up persistent virtual environment...")
# Remove the entire venv directory
if self.venv_dir.exists():
shutil.rmtree(self.venv_dir)
# Create fresh venv
await self._create_venv()
logger.info("Persistent virtual environment cleaned and recreated")
except Exception as e:
logger.error(f"Failed to cleanup packages: {e}")
raise
def is_package_installed(self, package: str) -> bool:
"""Check if a package is already installed in cache."""
cache = self._load_package_cache()
return package.lower() in cache.get("packages", {})
def mark_package_installed(self, package: str):
"""Mark a package as installed in cache."""
cache = self._load_package_cache()
cache["packages"][package.lower()] = {
"installed_at": datetime.now().isoformat(),
"name": package
}
self._save_package_cache(cache)
# Global persistent package manager
package_manager = PersistentPackageManager()
class SecureExecutor: class SecureExecutor:
""" """
Completely isolated Python executor with fresh virtual environments. Secure Python executor that uses persistent packages but cleans up code files.
Each execution gets a completely clean environment. Each execution gets a clean temporary directory but reuses installed packages.
""" """
def __init__(self): def __init__(self):
self.temp_dir = None self.temp_dir = None
self.venv_path = None
def __enter__(self): def __enter__(self):
return self return self
@@ -48,7 +210,7 @@ class SecureExecutor:
self.cleanup() self.cleanup()
def cleanup(self): def cleanup(self):
"""Clean up temporary directories and virtual environments.""" """Clean up temporary directories (code files only)."""
if self.temp_dir and os.path.exists(self.temp_dir): if self.temp_dir and os.path.exists(self.temp_dir):
try: try:
shutil.rmtree(self.temp_dir) shutil.rmtree(self.temp_dir)
@@ -108,39 +270,6 @@ class SecureExecutor:
return True, "Code passed security validation" return True, "Code passed security validation"
def create_clean_environment(self) -> Tuple[str, str, str]:
"""
Create a completely clean virtual environment.
Returns:
Tuple of (venv_path, python_executable, pip_executable)
"""
# Create temporary directory
self.temp_dir = tempfile.mkdtemp(prefix="secure_python_")
self.venv_path = os.path.join(self.temp_dir, "venv")
logger.info(f"Creating clean virtual environment at: {self.venv_path}")
# Create virtual environment
venv.create(self.venv_path, with_pip=True, clear=True)
# Get paths to executables
if os.name == 'nt': # Windows
python_path = os.path.join(self.venv_path, "Scripts", "python.exe")
pip_path = os.path.join(self.venv_path, "Scripts", "pip.exe")
else: # Unix/Linux
python_path = os.path.join(self.venv_path, "bin", "python")
pip_path = os.path.join(self.venv_path, "bin", "pip")
# Verify executables exist
if not os.path.exists(python_path):
raise RuntimeError(f"Python executable not found: {python_path}")
if not os.path.exists(pip_path):
raise RuntimeError(f"Pip executable not found: {pip_path}")
logger.debug(f"Clean environment created - Python: {python_path}, Pip: {pip_path}")
return self.venv_path, python_path, pip_path
def validate_package_safety(self, package: str) -> Tuple[bool, str]: def validate_package_safety(self, package: str) -> Tuple[bool, str]:
""" """
Validate if a package is safe to install. Validate if a package is safe to install.
@@ -190,13 +319,12 @@ class SecureExecutor:
# For unknown packages, be restrictive # For unknown packages, be restrictive
return False, f"Package '{package}' is not in the approved safe list" return False, f"Package '{package}' is not in the approved safe list"
async def install_packages_clean(self, packages: List[str], pip_path: str) -> Tuple[List[str], List[str]]: async def install_packages_persistent(self, packages: List[str]) -> Tuple[List[str], List[str]]:
""" """
Install packages in the clean virtual environment (async to prevent blocking). Install packages in the persistent virtual environment.
Args: Args:
packages: List of package names to install packages: List of package names to install
pip_path: Path to pip executable in the clean environment
Returns: Returns:
Tuple of (installed_packages, failed_packages) Tuple of (installed_packages, failed_packages)
@@ -204,20 +332,30 @@ class SecureExecutor:
installed = [] installed = []
failed = [] failed = []
# Ensure persistent venv is ready
if not await package_manager.ensure_venv_ready():
return [], packages
for package in packages: for package in packages:
# Validate package safety # Validate package safety
is_safe, reason = self.validate_package_safety(package) is_safe, reason = self.validate_package_safety(package)
if not is_safe: if not is_safe:
logger.warning(f"Package '{package}' blocked: {reason}")
failed.append(package) failed.append(package)
continue continue
# Check if already installed
if package_manager.is_package_installed(package):
logger.debug(f"Package '{package}' already installed")
installed.append(package)
continue
try: try:
# Install package in the clean virtual environment using async subprocess # Install package in the persistent virtual environment
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
pip_path, "install", package, str(package_manager.pip_path), "install", "--no-cache-dir", package,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
cwd=self.temp_dir
) )
try: try:
@@ -226,8 +364,11 @@ class SecureExecutor:
if return_code == 0: if return_code == 0:
installed.append(package) installed.append(package)
package_manager.mark_package_installed(package)
logger.info(f"Successfully installed package: {package}")
else: else:
failed.append(package) failed.append(package)
logger.warning(f"Failed to install {package}: {stderr.decode()}")
except asyncio.TimeoutError: except asyncio.TimeoutError:
# Kill the process if it times out # Kill the process if it times out
@@ -237,19 +378,20 @@ class SecureExecutor:
except: except:
pass pass
failed.append(package) failed.append(package)
logger.warning(f"Installation timeout for package: {package}")
except Exception as e: except Exception as e:
failed.append(package) failed.append(package)
logger.warning(f"Error installing {package}: {e}")
return installed, failed return installed, failed
async def execute_code_secure(self, code: str, python_path: str, timeout: int) -> Dict[str, Any]: async def execute_code_secure(self, code: str, timeout: int) -> Dict[str, Any]:
""" """
Execute Python code in the completely isolated environment (async to prevent blocking). Execute Python code using persistent packages but clean temporary directory.
Args: Args:
code: Python code to execute code: Python code to execute
python_path: Path to Python executable in clean environment
timeout: Execution timeout in seconds timeout: Execution timeout in seconds
Returns: Returns:
@@ -257,24 +399,20 @@ class SecureExecutor:
""" """
start_time = time.time() start_time = time.time()
# Create code file in the isolated environment # Create temporary directory for code execution
self.temp_dir = tempfile.mkdtemp(prefix="code_exec_")
code_file = os.path.join(self.temp_dir, "code_to_execute.py") code_file = os.path.join(self.temp_dir, "code_to_execute.py")
try: try:
with open(code_file, 'w', encoding='utf-8') as f: with open(code_file, 'w', encoding='utf-8') as f:
f.write(code) f.write(code)
# Execute code in completely isolated environment using async subprocess # Execute code using persistent Python environment
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
python_path, code_file, str(package_manager.python_path), code_file,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
cwd=self.temp_dir, cwd=self.temp_dir
env={ # Minimal environment variables
'PATH': os.path.dirname(python_path),
'PYTHONPATH': '',
'PYTHONHOME': '',
}
) )
try: try:
@@ -337,17 +475,18 @@ class SecureExecutor:
"traceback": traceback.format_exc() "traceback": traceback.format_exc()
} }
finally: finally:
# Clean up code file # Clean up code file (but keep packages in persistent venv)
try: try:
if os.path.exists(code_file): if os.path.exists(code_file):
os.remove(code_file) os.remove(code_file)
except Exception as e: except Exception:
pass # Silent cleanup failure pass # Silent cleanup failure
async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]: async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]:
""" """
Execute Python code in a completely clean, isolated environment. Execute Python code using persistent packages but clean code execution.
Packages persist for 3 days, code files are cleaned up after each execution.
Args: Args:
args: Dictionary containing: args: Dictionary containing:
@@ -383,14 +522,11 @@ async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]:
"execution_time": 0 "execution_time": 0
} }
# Create completely clean environment # Install packages in persistent environment (if any)
venv_path, python_path, pip_path = executor.create_clean_environment()
# Install only requested packages (if any)
installed_packages = [] installed_packages = []
failed_packages = [] failed_packages = []
if packages_to_install: if packages_to_install:
installed_packages, failed_packages = await executor.install_packages_clean(packages_to_install, pip_path) installed_packages, failed_packages = await executor.install_packages_persistent(packages_to_install)
# Prepare code with input data if provided # Prepare code with input data if provided
if input_data: if input_data:
@@ -399,15 +535,15 @@ async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]:
else: else:
code_with_input = code code_with_input = code
# Execute code in clean environment # Execute code using persistent packages
result = await executor.execute_code_secure(code_with_input, python_path, timeout) result = await executor.execute_code_secure(code_with_input, timeout)
# Add package installation info # Add package installation info
if installed_packages: if installed_packages:
result["installed_packages"] = installed_packages result["installed_packages"] = installed_packages
# Prepend package installation info to output # Prepend package installation info to output
if result.get("success"): if result.get("success"):
package_info = f"[Installed packages: {', '.join(installed_packages)}]\n\n" package_info = f"[Using packages: {', '.join(installed_packages)}]\n\n"
result["output"] = package_info + result.get("output", "") result["output"] = package_info + result.get("output", "")
if failed_packages: if failed_packages:
@@ -425,11 +561,35 @@ async def execute_python_code(args: Dict[str, Any]) -> Dict[str, Any]:
} }
# Utility functions for package management
async def force_cleanup_packages():
"""Force cleanup of the persistent package environment."""
logger.info("Forcing cleanup of persistent packages...")
await package_manager._cleanup_packages()
logger.info("Forced package cleanup completed")
def get_package_status() -> Dict[str, Any]:
"""Get status information about the persistent package environment."""
cache = package_manager._load_package_cache()
status = {
"persistent_venv_exists": package_manager.venv_dir.exists(),
"python_executable": str(package_manager.python_path),
"pip_executable": str(package_manager.pip_path),
"installed_packages": cache.get("packages", {}),
"last_cleanup": cache.get("last_cleanup"),
"needs_cleanup": package_manager._needs_cleanup(),
"cleanup_interval_days": PACKAGE_CLEANUP_DAYS
}
return status
# Deprecated - keeping for backward compatibility # Deprecated - keeping for backward compatibility
async def install_packages(packages: List[str]) -> Dict[str, Any]: async def install_packages(packages: List[str]) -> Dict[str, Any]:
""" """
Legacy function for backward compatibility. Legacy function for backward compatibility.
Note: In the new secure system, packages are installed per execution. Note: In the persistent system, packages are managed automatically.
""" """
return { return {
"success": False, "success": False,