From e2b961e9c0e06b123f54cc531755ef48d96e2df6 Mon Sep 17 00:00:00 2001 From: cauvang32 Date: Sat, 29 Nov 2025 23:00:34 +0700 Subject: [PATCH] Refactor OpenAI utility functions: enhance descriptions, add parameters, and improve clarity - Updated descriptions for functions to provide clearer guidance on usage. - Added detailed parameter descriptions for better understanding. - Introduced new function `remove_background` for background removal. - Adjusted parameter requirements and constraints across various functions. - Improved overall consistency and readability of the code. --- config/image_config.json | 266 +++++++ requirements.txt | 5 +- src/config/config.py | 263 ++++--- src/database/db_handler.py | 122 +++- src/module/message_handler.py | 119 ++- src/utils/image_utils.py | 1279 +++++++++++++++++++-------------- src/utils/openai_utils.py | 110 +-- 7 files changed, 1484 insertions(+), 680 deletions(-) create mode 100644 config/image_config.json diff --git a/config/image_config.json b/config/image_config.json new file mode 100644 index 0000000..55356a6 --- /dev/null +++ b/config/image_config.json @@ -0,0 +1,266 @@ +{ + "_comment": "Image Generation Configuration - Add/modify models here", + "_version": "2.0.0", + + "settings": { + "default_model": "flux", + "default_upscale_model": "clarity", + "default_background_removal_model": "bria", + "connection_timeout": 120, + "max_retries": 3, + "retry_delay": 2, + "output_format": "WEBP", + "output_quality": 95 + }, + + "image_models": { + "flux": { + "model_id": "runware:101@1", + "name": "FLUX.1", + "description": "High-quality FLUX model for general image generation", + "default_width": 1024, + "default_height": 1024, + "min_width": 512, + "min_height": 512, + "max_width": 2048, + "max_height": 2048, + "step_size": 64, + "default_steps": 30, + "default_cfg_scale": 7.5, + "supports_negative_prompt": true, + "max_images": 4, + "category": "general" + }, + "flux-dev": { + "model_id": "runware:100@1", + "name": "FLUX.1 Dev", + "description": "FLUX.1 Development version with more creative outputs", + "default_width": 1024, + "default_height": 1024, + "min_width": 512, + "min_height": 512, + "max_width": 2048, + "max_height": 2048, + "step_size": 64, + "default_steps": 25, + "default_cfg_scale": 7.0, + "supports_negative_prompt": true, + "max_images": 4, + "category": "general" + }, + "flux-fill": { + "model_id": "runware:102@1", + "name": "FLUX Fill", + "description": "FLUX model optimized for inpainting and editing", + "default_width": 1024, + "default_height": 1024, + "min_width": 512, + "min_height": 512, + "max_width": 2048, + "max_height": 2048, + "step_size": 64, + "default_steps": 30, + "default_cfg_scale": 7.5, + "supports_negative_prompt": true, + "max_images": 4, + "category": "editing" + }, + "sdxl": { + "model_id": "civitai:101055@128078", + "name": "Stable Diffusion XL", + "description": "Stable Diffusion XL for detailed, high-resolution images", + "default_width": 1024, + "default_height": 1024, + "min_width": 512, + "min_height": 512, + "max_width": 2048, + "max_height": 2048, + "step_size": 64, + "default_steps": 30, + "default_cfg_scale": 7.0, + "supports_negative_prompt": true, + "max_images": 4, + "category": "general" + }, + "realistic": { + "model_id": "civitai:4201@130072", + "name": "Realistic Vision", + "description": "Photorealistic image generation", + "default_width": 768, + "default_height": 768, + "min_width": 512, + "min_height": 512, + "max_width": 1536, + "max_height": 1536, + "step_size": 64, + "default_steps": 35, + "default_cfg_scale": 7.5, + "supports_negative_prompt": true, + "max_images": 4, + "category": "realistic" + }, + "anime": { + "model_id": "civitai:4384@128713", + "name": "Anime Style", + "description": "Anime and illustration style images", + "default_width": 768, + "default_height": 768, + "min_width": 512, + "min_height": 512, + "max_width": 1536, + "max_height": 1536, + "step_size": 64, + "default_steps": 28, + "default_cfg_scale": 7.0, + "supports_negative_prompt": true, + "max_images": 4, + "category": "anime" + }, + "dreamshaper": { + "model_id": "civitai:4384@128713", + "name": "DreamShaper", + "description": "Creative and artistic image generation", + "default_width": 768, + "default_height": 768, + "min_width": 512, + "min_height": 512, + "max_width": 1536, + "max_height": 1536, + "step_size": 64, + "default_steps": 30, + "default_cfg_scale": 7.0, + "supports_negative_prompt": true, + "max_images": 4, + "category": "artistic" + } + }, + + "upscale_models": { + "clarity": { + "model_id": "runware:500@1", + "name": "Clarity", + "description": "High-quality clarity upscaling", + "supported_factors": [2, 4], + "max_input_size": 2048, + "max_output_size": 4096, + "supports_prompts": true + }, + "ccsr": { + "model_id": "runware:501@1", + "name": "CCSR", + "description": "Content-consistent super-resolution upscaling", + "supported_factors": [2, 4], + "max_input_size": 2048, + "max_output_size": 4096, + "supports_prompts": true + }, + "sd-latent": { + "model_id": "runware:502@1", + "name": "SD Latent Upscaler", + "description": "Stable Diffusion latent space upscaling", + "supported_factors": [2], + "max_input_size": 2048, + "max_output_size": 4096, + "supports_prompts": true + }, + "swinir": { + "model_id": "runware:503@1", + "name": "SwinIR", + "description": "Fast and efficient SwinIR upscaling (supports 4x)", + "supported_factors": [2, 4], + "max_input_size": 2048, + "max_output_size": 4096, + "supports_prompts": false + } + }, + + "background_removal_models": { + "bria": { + "model_id": "runware:110@1", + "name": "Bria RMBG 2.0", + "description": "High-quality background removal by Bria", + "supports_alpha_matting": false + }, + "rembg": { + "model_id": "runware:109@1", + "name": "RemBG 1.4", + "description": "Classic RemBG with alpha matting support", + "supports_alpha_matting": true + }, + "birefnet-base": { + "model_id": "runware:112@1", + "name": "BiRefNet Base", + "description": "BiRefNet base model for background removal", + "supports_alpha_matting": false + }, + "birefnet-general": { + "model_id": "runware:112@5", + "name": "BiRefNet General", + "description": "BiRefNet general purpose model", + "supports_alpha_matting": false + }, + "birefnet-portrait": { + "model_id": "runware:112@10", + "name": "BiRefNet Portrait", + "description": "BiRefNet optimized for portraits", + "supports_alpha_matting": false + } + }, + + "controlnet_models": { + "flux-canny": { + "model_id": "runware:25@1", + "name": "FLUX Canny", + "description": "Edge detection control for FLUX models", + "architecture": "flux" + }, + "flux-depth": { + "model_id": "runware:27@1", + "name": "FLUX Depth", + "description": "Depth map control for FLUX models", + "architecture": "flux" + }, + "flux-pose": { + "model_id": "runware:29@1", + "name": "FLUX Pose", + "description": "Pose control for FLUX models", + "architecture": "flux" + }, + "sdxl-canny": { + "model_id": "runware:20@1", + "name": "SDXL Canny", + "description": "Edge detection control for SDXL models", + "architecture": "sdxl" + }, + "sd15-canny": { + "model_id": "civitai:38784@44716", + "name": "SD 1.5 Canny", + "description": "Edge detection control for SD 1.5 models", + "architecture": "sd15" + }, + "sd15-lineart": { + "model_id": "civitai:38784@44877", + "name": "SD 1.5 Line Art", + "description": "Line art control for SD 1.5 models", + "architecture": "sd15" + } + }, + + "default_negative_prompts": { + "general": "blurry, distorted, low quality, watermark, signature, text, bad anatomy, deformed", + "realistic": "cartoon, anime, illustration, painting, drawing, bad anatomy, deformed, blurry, low quality", + "anime": "realistic, photo, 3d render, bad anatomy, deformed hands, extra fingers, blurry", + "artistic": "bad quality, low resolution, blurry, watermark, signature" + }, + + "aspect_ratios": { + "1:1": {"width": 1024, "height": 1024, "description": "Square"}, + "16:9": {"width": 1344, "height": 768, "description": "Landscape Wide"}, + "9:16": {"width": 768, "height": 1344, "description": "Portrait Tall"}, + "4:3": {"width": 1152, "height": 896, "description": "Landscape"}, + "3:4": {"width": 896, "height": 1152, "description": "Portrait"}, + "3:2": {"width": 1248, "height": 832, "description": "Photo Landscape"}, + "2:3": {"width": 832, "height": 1248, "description": "Photo Portrait"}, + "21:9": {"width": 1536, "height": 640, "description": "Ultrawide"} + } +} diff --git a/requirements.txt b/requirements.txt index d252df8..f687597 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,13 @@ discord.py openai motor -pymongo +pymongo[srv] +dnspython>=2.0.0 pypdf beautifulsoup4 requests aiohttp -runware +runware>=0.4.33 python-dotenv matplotlib pandas diff --git a/src/config/config.py b/src/config/config.py index 08eb07d..7630ec6 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -1,9 +1,34 @@ import os +import json +from pathlib import Path from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() +# ==================== IMAGE CONFIGURATION ==================== +# Load image configuration from JSON file +def load_image_config() -> dict: + """Load image configuration from JSON file""" + config_paths = [ + Path(__file__).parent.parent.parent / "config" / "image_config.json", + Path(__file__).parent.parent / "config" / "image_config.json", + Path("config/image_config.json"), + ] + + for config_path in config_paths: + if config_path.exists(): + try: + with open(config_path, 'r') as f: + return json.load(f) + except Exception as e: + print(f"Warning: Error loading image config from {config_path}: {e}") + + return {} + +# Load image config once at module import +_IMAGE_CONFIG = load_image_config() + # Bot statuses STATUSES = [ "Powered by openai/gpt-4o!", @@ -79,6 +104,61 @@ MODEL_OPTIONS = [ "openai/o4-mini" ] +# ==================== IMAGE GENERATION MODELS ==================== +# Models are loaded from config/image_config.json +# Edit that file to add/modify image models +IMAGE_MODELS = _IMAGE_CONFIG.get("image_models", { + "flux": { + "model_id": "runware:101@1", + "name": "FLUX.1", + "description": "High-quality image generation with FLUX", + "default_width": 1024, + "default_height": 1024, + "max_width": 2048, + "max_height": 2048, + "supports_negative_prompt": True + } +}) + +# Upscale models from config +UPSCALE_MODELS = _IMAGE_CONFIG.get("upscale_models", { + "clarity": { + "model_id": "runware:500@1", + "name": "Clarity", + "supported_factors": [2, 4] + } +}) + +# Background removal models from config +BACKGROUND_REMOVAL_MODELS = _IMAGE_CONFIG.get("background_removal_models", { + "bria": { + "model_id": "runware:110@1", + "name": "Bria RMBG 2.0" + } +}) + +# Image settings from config +IMAGE_SETTINGS = _IMAGE_CONFIG.get("settings", { + "default_model": "flux", + "default_upscale_model": "clarity", + "default_background_removal_model": "bria" +}) + +# Default image model +DEFAULT_IMAGE_MODEL = IMAGE_SETTINGS.get("default_model", "flux") + +# Default negative prompts by category +DEFAULT_NEGATIVE_PROMPTS = _IMAGE_CONFIG.get("default_negative_prompts", { + "general": "blurry, distorted, low quality, watermark, signature, text, bad anatomy, deformed" +}) + +# Aspect ratios from config +ASPECT_RATIOS = _IMAGE_CONFIG.get("aspect_ratios", { + "1:1": {"width": 1024, "height": 1024}, + "16:9": {"width": 1344, "height": 768}, + "9:16": {"width": 768, "height": 1344} +}) + # Model-specific token limits for automatic history management MODEL_TOKEN_LIMITS = { "openai/o1-preview": 4000, # Conservative limit (max 4000) @@ -112,101 +192,116 @@ WEB_SCRAPING_PROMPT = "Analyze webpage content and extract key information. Focu NORMAL_CHAT_PROMPT = """You're ChatGPT for Discord. Be concise, helpful, safe. Reply in user's language. Use short paragraphs, bullets, minimal markdown. -Tools: -- google_search: real-time info, fact-checking, news -- scrape_webpage: extract/analyze webpage content -- execute_python_code: Python code execution with AUTO-INSTALL packages & file access -- image_suite: generate/edit/upscale/create portraits -- reminders: schedule/retrieve user reminders -- web_search_multi: parallel searches for comprehensive research +TOOLS: +1. google_search(query) - Web search for current info +2. scrape_webpage(url) - Extract webpage content +3. execute_python_code(code) - Run Python, packages auto-install. Use load_file('file_id') for user files. Save outputs to files. +4. set_reminder(content, time) / get_reminders() - Manage reminders -🐍 Code Interpreter (execute_python_code): -⚠️ CRITICAL: Packages AUTO-INSTALL when imported! ALWAYS import what you need - installation is automatic. +═══════════════════════════════════════════════════════════════ +IMAGE GENERATION & EDITING TOOLS +═══════════════════════════════════════════════════════════════ -βœ… Approved: pandas, numpy, matplotlib, seaborn, scikit-learn, tensorflow, pytorch, plotly, opencv, scipy, statsmodels, pillow, openpyxl, geopandas, folium, xgboost, lightgbm, bokeh, altair, and 80+ more. +5. generate_image(prompt, model, num_images, width, height, aspect_ratio, negative_prompt, steps, cfg_scale, seed) + Create images from text descriptions. + + MODELS (use model parameter): + β€’ "flux" - FLUX.1 (default, best quality, 1024x1024) + β€’ "flux-dev" - FLUX.1 Dev (more creative outputs) + β€’ "sdxl" - Stable Diffusion XL (detailed, high-res) + β€’ "realistic" - Realistic Vision (photorealistic) + β€’ "anime" - Anime/illustration style + β€’ "dreamshaper" - Creative/artistic style + + ASPECT RATIOS (use aspect_ratio parameter): + β€’ "1:1" - Square (1024x1024) + β€’ "16:9" - Landscape wide (1344x768) + β€’ "9:16" - Portrait tall (768x1344) + β€’ "4:3" - Landscape (1152x896) + β€’ "3:4" - Portrait (896x1152) + β€’ "3:2" - Photo landscape (1248x832) + β€’ "2:3" - Photo portrait (832x1248) + β€’ "21:9" - Ultrawide (1536x640) + + Examples: + generate_image("a dragon in a forest", "flux", 1) + generate_image({"prompt": "sunset beach", "model": "realistic", "aspect_ratio": "16:9"}) + generate_image({"prompt": "anime girl", "model": "anime", "width": 768, "height": 1024}) -πŸ“‚ File Access: When users upload files, you'll receive the file_id in the conversation context (e.g., "File ID: abc123_xyz"). Use load_file('file_id') to access them. The function auto-detects file types: -- CSV/TSV β†’ pandas DataFrame -- Excel (.xlsx, .xls) β†’ pandas ExcelFile object (use .sheet_names and .parse('Sheet1')) -- JSON β†’ dict or DataFrame -- Images β†’ PIL Image object -- Text β†’ string content -- And 200+ more formats... +6. generate_image_with_refiner(prompt, model, num_images) + Generate high-quality images using SDXL with refiner for better details. + Best for: detailed artwork, complex scenes + Example: generate_image_with_refiner("detailed fantasy castle", "sdxl", 1) -πŸ“Š Excel Files: load_file() returns ExcelFile object for multi-sheet support: - excel_file = load_file('file_id') - sheets = excel_file.sheet_names # Get all sheet names - df = excel_file.parse('Sheet1') # Read specific sheet - # Or: df = pd.read_excel(excel_file, sheet_name='Sheet1') - # Check if sheet has data: if not df.empty and len(df.columns) > 0 +7. upscale_image(image_url, scale_factor, model) + Enlarge images to higher resolution. + + UPSCALE MODELS: + β€’ "clarity" - High-quality clarity upscaling (default) + β€’ "ccsr" - Content-consistent super-resolution + β€’ "sd-latent" - SD latent space upscaling + β€’ "swinir" - Fast SwinIR (supports 4x) + + SCALE FACTORS: 2 or 4 (depending on model) + + Requires: User must provide an image URL first + Example: upscale_image("https://example.com/image.jpg", 2, "clarity") -⚠️ IMPORTANT: -- If load_file() fails, error lists available file IDs - use the correct one -- Always check if DataFrames are empty before operations like .describe() -- Excel files may have empty sheets - skip or handle them gracefully +8. remove_background(image_url, model) / edit_image(image_url, "remove_background") + Remove background from images (outputs PNG with transparency). + + BACKGROUND REMOVAL MODELS: + β€’ "bria" - Bria RMBG 2.0 (default, high quality) + β€’ "rembg" - RemBG 1.4 (classic, supports alpha matting) + β€’ "birefnet-base" - BiRefNet base model + β€’ "birefnet-general" - BiRefNet general purpose + β€’ "birefnet-portrait" - BiRefNet optimized for portraits + + Requires: User must provide an image URL first + Example: remove_background("https://example.com/photo.jpg", "bria") -πŸ’Ύ Output Files: ALL generated files (CSV, images, JSON, text, plots, etc.) are AUTO-CAPTURED and sent to user. Files stored for 48h (configurable). Just create files - they're automatically shared! +9. photo_maker(prompt, input_images, style, strength, num_images) + Generate images based on reference photos (identity preservation). + + Parameters: + β€’ prompt: Text description of desired output + β€’ input_images: List of reference image URLs + β€’ style: Style to apply (default: "No style") + β€’ strength: Reference influence 0-100 (default: 40) + + Requires: User must provide reference images first + Example: photo_maker({"prompt": "professional headshot", "input_images": ["url1", "url2"], "style": "Photographic"}) -βœ… DO: -- Import packages directly (auto-installs!) -- Use load_file('file_id') with the EXACT file_id from context -- Check if DataFrames are empty: if not df.empty and len(df.columns) > 0 -- Handle errors gracefully (empty sheets, missing data, etc.) -- Create output files with descriptive names -- Generate visualizations (plt.savefig, etc.) -- Return multiple files (data + plots + reports) +10. image_to_text(image_url) + Generate text description/caption from an image. + Use for: Understanding image content, accessibility, OCR-like tasks + Example: image_to_text("https://example.com/image.jpg") -❌ DON'T: -- Check if packages are installed -- Use install_packages parameter -- Print large datasets (create CSV instead) -- Manually handle file paths -- Guess file_ids - use the exact ID from the upload message +11. enhance_prompt(prompt, num_versions, max_length) + Improve prompts for better image generation results. + Returns multiple enhanced versions of your prompt. + Example: enhance_prompt("cat on roof", 3, 200) -Example: -```python -import pandas as pd -import seaborn as sns # Auto-installs! -import matplotlib.pyplot as plt +═══════════════════════════════════════════════════════════════ +USAGE GUIDELINES +═══════════════════════════════════════════════════════════════ -# Load user's file (file_id from upload message: "File ID: 123456_abc") -data = load_file('123456_abc') # Auto-detects type +WHEN TO USE EACH TOOL: +β€’ "create/draw/generate/make an image of X" β†’ generate_image +β€’ "high quality/detailed image" β†’ generate_image_with_refiner +β€’ "remove/delete background" β†’ remove_background (pass 'latest_image') +β€’ "make image bigger/larger/upscale" β†’ upscale_image (pass 'latest_image') +β€’ "create image like this/based on this photo" β†’ photo_maker (pass ['latest_image']) +β€’ "what's in this image/describe image" β†’ image_to_text (pass 'latest_image') +β€’ "improve this prompt" β†’ enhance_prompt -# For Excel files: -if hasattr(data, 'sheet_names'): # It's an ExcelFile - for sheet in data.sheet_names: - df = data.parse(sheet) - if not df.empty and len(df.columns) > 0: - # Process non-empty sheets - summary = df.describe() - summary.to_csv(f'{sheet}_summary.csv') -else: # It's already a DataFrame (CSV, etc.) - df = data - summary = df.describe() - summary.to_csv('summary_stats.csv') - -# Create visualization -if not df.empty: - sns.heatmap(df.corr(), annot=True) - plt.savefig('correlation_plot.png') - -# Everything is automatically sent to user! -``` - -Smart Usage: -- Chain tools: searchβ†’scrapeβ†’analyze for deep research -- Auto-suggest relevant tools based on user intent -- Create multiple outputs (CSV, plots, reports) in one execution -- Use execute_python_code for ALL data analysis (replaces old analyze_data_file tool) - -Rules: -- One clarifying question if ambiguous -- Prioritize answers over details -- Cite sources: (Title – URL) -- Use execute_python_code for complex math & data analysis -- Never invent sources -- Code fences for equations (no LaTeX) -- Return image URLs with brief descriptions""" +IMPORTANT NOTES: +β€’ For image tools (upscale, remove_background, photo_maker, image_to_text), when user uploads an image, pass 'latest_image' as the image_url parameter - the system automatically uses their most recent uploaded image +β€’ You don't need to extract or copy image URLs - just use 'latest_image' +β€’ Default model is "flux" - best for general use +β€’ Use "realistic" for photos, "anime" for illustrations +β€’ For math/data analysis β†’ use execute_python_code instead +β€’ Always cite sources (Title–URL) when searching web""" SEARCH_PROMPT = "Research Assistant with Google Search access. Synthesize search results into accurate answers. Prioritize credible sources, compare perspectives, acknowledge limitations, cite sources. Structure responses logically." diff --git a/src/database/db_handler.py b/src/database/db_handler.py index ab69ff3..c829b43 100644 --- a/src/database/db_handler.py +++ b/src/database/db_handler.py @@ -5,21 +5,43 @@ import asyncio from datetime import datetime, timedelta import logging import re +import os + +# Configure DNS resolver to be more resilient +try: + import dns.resolver + dns.resolver.default_resolver = dns.resolver.Resolver(configure=False) + dns.resolver.default_resolver.nameservers = ['8.8.8.8', '8.8.4.4', '1.1.1.1'] + dns.resolver.default_resolver.lifetime = 15.0 # 15 second timeout for DNS +except ImportError: + logging.warning("dnspython not installed, using system DNS resolver") +except Exception as e: + logging.warning(f"Could not configure custom DNS resolver: {e}") class DatabaseHandler: - def __init__(self, mongodb_uri: str): - """Initialize database connection with optimized settings""" - # Set up a memory-optimized connection pool + def __init__(self, mongodb_uri: str, max_retries: int = 5): + """Initialize database connection with optimized settings and retry logic""" + self.mongodb_uri = mongodb_uri + self.max_retries = max_retries + self._connected = False + self._connection_lock = asyncio.Lock() + + # Set up a memory-optimized connection pool with better resilience self.client = AsyncIOMotorClient( mongodb_uri, - maxIdleTimeMS=30000, # Reduced from 45000 - connectTimeoutMS=8000, # Reduced from 10000 - serverSelectionTimeoutMS=12000, # Reduced from 15000 - waitQueueTimeoutMS=3000, # Reduced from 5000 - socketTimeoutMS=25000, # Reduced from 30000 - maxPoolSize=8, # Limit connection pool size - minPoolSize=2, # Maintain minimum connections - retryWrites=True + maxIdleTimeMS=45000, # Keep connections alive longer + connectTimeoutMS=20000, # 20s connect timeout for DNS issues + serverSelectionTimeoutMS=30000, # 30s for server selection + waitQueueTimeoutMS=10000, # Wait longer for available connection + socketTimeoutMS=45000, # Socket operations timeout + maxPoolSize=10, # Slightly larger pool + minPoolSize=1, # Keep at least 1 connection + retryWrites=True, + retryReads=True, # Also retry reads + directConnection=False, # Allow replica set discovery + appName="ChatGPT-Discord-Bot", + heartbeatFrequencyMS=30000, # Reduce heartbeat frequency to avoid DNS issues + localThresholdMS=30, # Local threshold for selecting servers ) self.db = self.client['chatgpt_discord_bot'] # Database name @@ -32,12 +54,86 @@ class DatabaseHandler: self.logs_collection = self.db.logs self.reminders_collection = self.db.reminders - logging.info("Database handler initialized") + logging.info("Database handler initialized with enhanced connection resilience") + + async def _retry_operation(self, operation, *args, **kwargs): + """Execute a database operation with retry logic for transient errors""" + last_error = None + for attempt in range(self.max_retries): + try: + return await operation(*args, **kwargs) + except Exception as e: + last_error = e + error_str = str(e).lower() + # Check for transient/retryable errors (expanded list) + retryable_errors = [ + 'timeout', 'connection', 'socket', 'dns', 'try again', + 'network', 'errno -3', 'gaierror', 'nodename', 'servname', + 'temporary failure', 'name resolution', 'unreachable', + 'reset by peer', 'broken pipe', 'not connected' + ] + if any(err in error_str for err in retryable_errors): + wait_time = min((attempt + 1) * 2, 10) # Exponential backoff: 2s, 4s, 6s, 8s, 10s (max) + logging.warning(f"Database operation failed (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s...") + await asyncio.sleep(wait_time) + else: + # Non-retryable error, raise immediately + raise + # All retries exhausted + logging.error(f"Database operation failed after {self.max_retries} attempts: {last_error}") + raise last_error + + async def ensure_connected(self) -> bool: + """Ensure database connection is established with retry logic""" + async with self._connection_lock: + if self._connected: + return True + + for attempt in range(self.max_retries): + try: + await self.client.admin.command('ping') + self._connected = True + logging.info("Database connection established successfully") + return True + except Exception as e: + wait_time = min((attempt + 1) * 2, 10) + logging.warning(f"Database connection attempt {attempt + 1}/{self.max_retries} failed: {e}. Retrying in {wait_time}s...") + await asyncio.sleep(wait_time) + + logging.error("Failed to establish database connection after all retries") + return False + + async def check_connection(self) -> bool: + """Check if database connection is alive with graceful error handling""" + try: + # Use a short timeout for the ping operation + await asyncio.wait_for( + self.client.admin.command('ping'), + timeout=10.0 + ) + self._connected = True + return True + except asyncio.TimeoutError: + logging.warning("Database ping timed out") + self._connected = False + return False + except Exception as e: + error_str = str(e).lower() + # Don't log DNS resolution failures as errors (they're often transient) + if any(err in error_str for err in ['errno -3', 'try again', 'dns', 'gaierror']): + logging.debug(f"Transient database connection check failed (DNS): {e}") + else: + logging.error(f"Database connection check failed: {e}") + self._connected = False + return False # User history methods async def get_history(self, user_id: int) -> List[Dict[str, Any]]: """Get user conversation history and filter expired image links""" - user_data = await self.db.user_histories.find_one({'user_id': user_id}) + async def _get(): + return await self.db.user_histories.find_one({'user_id': user_id}) + + user_data = await self._retry_operation(_get) if user_data and 'history' in user_data: # Filter out expired image links filtered_history = self._filter_expired_images(user_data['history']) diff --git a/src/module/message_handler.py b/src/module/message_handler.py index f8e95ee..2d07fee 100644 --- a/src/module/message_handler.py +++ b/src/module/message_handler.py @@ -135,6 +135,9 @@ class MessageHandler: self.user_charts = {} # Will be cleaned up periodically self.max_user_files = 20 # Limit concurrent user files + # Store latest image URL per user (in-memory, refreshed from attachments) + self.user_latest_image_url = {} + # Tool mapping for API integration self.tool_mapping = { "google_search": self._google_search, @@ -142,6 +145,7 @@ class MessageHandler: "execute_python_code": self._execute_python_code, "generate_image": self._generate_image, "edit_image": self._edit_image, + "remove_background": self._remove_background, "set_reminder": self._set_reminder, "get_reminders": self._get_reminders, "enhance_prompt": self._enhance_prompt, @@ -200,6 +204,28 @@ class MessageHandler: return user_id return None + async def _get_latest_image_url_from_db(self, user_id: int) -> str: + """Get the latest valid image URL from user's history in database""" + try: + # Get history from database (already filtered for expired images) + history = await self.db.get_history(user_id) + + # Find the latest image URL by iterating in reverse + for msg in reversed(history): + content = msg.get('content') + if isinstance(content, list): + for item in reversed(content): + if item.get('type') == 'image_url': + image_url_data = item.get('image_url', {}) + url = image_url_data.get('url') if isinstance(image_url_data, dict) else None + if url: + logging.info(f"Found latest image URL from database: {url[:80]}...") + return url + return None + except Exception as e: + logging.error(f"Error getting latest image URL from database: {e}") + return None + def _count_tokens_with_tiktoken(self, text: str) -> int: """Count tokens using tiktoken encoder for internal operations.""" if self.token_encoder is None: @@ -1366,13 +1392,16 @@ print("\\n=== Correlation Analysis ===") content.append({"type": "text", "text": f"[Error processing {attachment.filename}: {str(e)}]"}) elif any(attachment.filename.endswith(ext) for ext in ['.png', '.jpg', '.jpeg', '.gif', '.webp']): + # Store latest image URL for this user + self.user_latest_image_url[user_id] = attachment.url + logging.info(f"Stored latest image URL for user {user_id}") + content.append({ "type": "image_url", "image_url": { "url": attachment.url, "detail": "high" - }, - "timestamp": datetime.now().isoformat() # Add timestamp to track image expiration + } }) else: content.append({"type": "text", "text": f"[Attachment: {attachment.filename}] - I can't process this type of file directly."}) @@ -2087,6 +2116,25 @@ print("\\n=== Correlation Analysis ===") async def _image_to_text(self, args: Dict[str, Any]): """Convert image to text""" try: + # Check if model passed "latest_image" - use stored URL + image_url = args.get("image_url", "") + if image_url == "latest_image" or not image_url: + user_id = self._find_user_id_from_current_task() + if user_id: + # Try in-memory first (from current session), then database + if user_id in self.user_latest_image_url: + args["image_url"] = self.user_latest_image_url[user_id] + logging.info(f"Using in-memory image URL for image_to_text") + else: + db_url = await self._get_latest_image_url_from_db(user_id) + if db_url: + args["image_url"] = db_url + logging.info(f"Using database image URL for image_to_text") + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + result = await self.image_generator.image_to_text(args) return result except Exception as e: @@ -2096,15 +2144,82 @@ print("\\n=== Correlation Analysis ===") async def _upscale_image(self, args: Dict[str, Any]): """Upscale an image""" try: + # Check if model passed "latest_image" - use stored URL + image_url = args.get("image_url", "") + if image_url == "latest_image" or not image_url: + user_id = self._find_user_id_from_current_task() + if user_id: + # Try in-memory first (from current session), then database + if user_id in self.user_latest_image_url: + args["image_url"] = self.user_latest_image_url[user_id] + logging.info(f"Using in-memory image URL for upscale") + else: + db_url = await self._get_latest_image_url_from_db(user_id) + if db_url: + args["image_url"] = db_url + logging.info(f"Using database image URL for upscale") + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + result = await self.image_generator.upscale_image(args) return result except Exception as e: logging.error(f"Error in image upscaling: {str(e)}") return json.dumps({"error": f"Image upscaling failed: {str(e)}"}) + async def _remove_background(self, args: Dict[str, Any]): + """Remove background from an image""" + try: + # Check if model passed "latest_image" - use stored URL + image_url = args.get("image_url", "") + if image_url == "latest_image" or not image_url: + user_id = self._find_user_id_from_current_task() + if user_id: + # Try in-memory first (from current session), then database + if user_id in self.user_latest_image_url: + args["image_url"] = self.user_latest_image_url[user_id] + logging.info(f"Using in-memory image URL for background removal") + else: + db_url = await self._get_latest_image_url_from_db(user_id) + if db_url: + args["image_url"] = db_url + logging.info(f"Using database image URL for background removal") + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + + result = await self.image_generator.remove_background(args) + return result + except Exception as e: + logging.error(f"Error in background removal: {str(e)}") + return json.dumps({"error": f"Background removal failed: {str(e)}"}) + async def _photo_maker(self, args: Dict[str, Any]): """Create a photo""" try: + # Check if model passed "latest_image" in input_images - use stored URL + input_images = args.get("input_images", []) + if input_images and "latest_image" in input_images: + user_id = self._find_user_id_from_current_task() + if user_id: + # Try in-memory first (from current session), then database + if user_id in self.user_latest_image_url: + url = self.user_latest_image_url[user_id] + args["input_images"] = [url if img == "latest_image" else img for img in input_images] + logging.info(f"Using in-memory image URL for photo_maker") + else: + db_url = await self._get_latest_image_url_from_db(user_id) + if db_url: + args["input_images"] = [db_url if img == "latest_image" else img for img in input_images] + logging.info(f"Using database image URL for photo_maker") + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + else: + return json.dumps({"error": "No image found. Please upload an image first."}) + result = await self.image_generator.photo_maker(args) return result except Exception as e: diff --git a/src/utils/image_utils.py b/src/utils/image_utils.py index 36d4a7b..a8616ca 100644 --- a/src/utils/image_utils.py +++ b/src/utils/image_utils.py @@ -1,11 +1,19 @@ +""" +Image Generation Utilities - Runware API Integration +====================================================== +Comprehensive image generation, editing, and manipulation tools using the Runware SDK. +Configuration is loaded from config/image_config.json for easy model management. +""" + import io import aiohttp import logging import tempfile import os -import time +import json import uuid -from typing import List, Dict, Any, Optional +from pathlib import Path +from typing import List, Dict, Any, Optional, Union from runware import ( Runware, IImageInference, @@ -16,344 +24,586 @@ from runware import ( IPhotoMaker ) + +def load_image_config() -> Dict[str, Any]: + """Load image configuration from JSON file""" + config_paths = [ + Path(__file__).parent.parent.parent / "config" / "image_config.json", + Path(__file__).parent.parent / "config" / "image_config.json", + Path("config/image_config.json"), + Path("image_config.json") + ] + + for config_path in config_paths: + if config_path.exists(): + try: + with open(config_path, 'r') as f: + config = json.load(f) + logging.info(f"Loaded image config from {config_path}") + return config + except Exception as e: + logging.error(f"Error loading image config from {config_path}: {e}") + + logging.warning("Image config file not found, using defaults") + return get_default_config() + + +def get_default_config() -> Dict[str, Any]: + """Return default configuration if config file is not found""" + return { + "settings": { + "default_model": "flux", + "default_upscale_model": "clarity", + "default_background_removal_model": "bria", + "connection_timeout": 120, + "max_retries": 3, + "retry_delay": 2, + "output_format": "WEBP", + "output_quality": 95 + }, + "image_models": { + "flux": { + "model_id": "runware:101@1", + "name": "FLUX.1", + "description": "High-quality FLUX model", + "default_width": 1024, + "default_height": 1024, + "max_width": 2048, + "max_height": 2048, + "default_steps": 30, + "default_cfg_scale": 7.5, + "supports_negative_prompt": True, + "max_images": 4 + } + }, + "upscale_models": { + "clarity": { + "model_id": "runware:500@1", + "name": "Clarity", + "supported_factors": [2, 4] + } + }, + "background_removal_models": { + "bria": { + "model_id": "runware:110@1", + "name": "Bria RMBG 2.0" + } + }, + "default_negative_prompts": { + "general": "blurry, distorted, low quality, watermark, signature, text, bad anatomy, deformed" + } + } + + +# Global config - loaded once at module import +IMAGE_CONFIG = load_image_config() + + class ImageGenerator: - def __init__(self, api_key: str): + """ + Image generation and manipulation using Runware API. + + Features: + - Text-to-image generation with multiple models + - Image upscaling with various algorithms + - Background removal + - Image captioning (image-to-text) + - Prompt enhancement + - PhotoMaker for reference-based generation + + Configuration is loaded from config/image_config.json + """ + + def __init__(self, api_key: str = None): """ Initialize the image generator with the Runware API key. Args: - api_key: API key for Runware + api_key: API key for Runware (optional - can use RUNWARE_API_KEY env var) """ - # Use the API key if provided, otherwise Runware will read from environment - if api_key and api_key != "fake_key" and api_key != "test_key": + self.config = IMAGE_CONFIG + self.settings = self.config.get("settings", {}) + + # Initialize Runware client + if api_key and api_key not in ("fake_key", "test_key", ""): self.runware = Runware(api_key=api_key) else: - # Let Runware read from RUNWARE_API_KEY environment variable self.runware = Runware() - self.connected = False - - async def ensure_connected(self): - """Ensure connection to Runware API is established""" - if not self.connected: - await self.runware.connect() - self.connected = True - async def generate_image(self, args, num_images: int = 1, negative_prompt: str = "blurry, distorted, low quality"): + self.connected = False + self._connection_retries = 0 + self._max_retries = self.settings.get("max_retries", 3) + + logging.info(f"ImageGenerator initialized with {len(self.get_available_models())} models") + + def get_available_models(self) -> Dict[str, Dict]: + """Get all available image generation models""" + return self.config.get("image_models", {}) + + def get_model_info(self, model_key: str) -> Optional[Dict]: + """Get information about a specific model""" + models = self.get_available_models() + return models.get(model_key) + + def get_upscale_models(self) -> Dict[str, Dict]: + """Get all available upscale models""" + return self.config.get("upscale_models", {}) + + def get_background_removal_models(self) -> Dict[str, Dict]: + """Get all available background removal models""" + return self.config.get("background_removal_models", {}) + + def get_default_negative_prompt(self, category: str = "general") -> str: + """Get default negative prompt for a category""" + prompts = self.config.get("default_negative_prompts", {}) + return prompts.get(category, prompts.get("general", "blurry, low quality")) + + def get_aspect_ratio_dimensions(self, aspect_ratio: str) -> Optional[Dict]: + """Get dimensions for an aspect ratio""" + ratios = self.config.get("aspect_ratios", {}) + return ratios.get(aspect_ratio) + + async def ensure_connected(self) -> bool: + """Ensure connection to Runware API is established with retry logic""" + if self.connected: + return True + + max_retries = self._max_retries + retry_delay = self.settings.get("retry_delay", 2) + + for attempt in range(max_retries): + try: + await self.runware.connect() + self.connected = True + self._connection_retries = 0 + logging.info("Successfully connected to Runware API") + return True + except Exception as e: + self._connection_retries += 1 + if attempt < max_retries - 1: + wait_time = retry_delay * (attempt + 1) + logging.warning(f"Runware connection attempt {attempt + 1}/{max_retries} failed: {e}. Retrying in {wait_time}s...") + import asyncio + await asyncio.sleep(wait_time) + else: + logging.error(f"Failed to connect to Runware API after {max_retries} attempts: {e}") + return False + + return False + + async def disconnect(self): + """Disconnect from Runware API""" + if self.connected: + try: + await self.runware.disconnect() + self.connected = False + logging.info("Disconnected from Runware API") + except Exception as e: + logging.warning(f"Error disconnecting from Runware: {e}") + + async def generate_image( + self, + args: Union[str, Dict], + model: str = None, + num_images: int = 1, + negative_prompt: str = None, + width: int = None, + height: int = None, + steps: int = None, + cfg_scale: float = None, + seed: int = None, + aspect_ratio: str = None + ) -> Dict[str, Any]: """ - Generate images based on a text prompt + Generate images based on a text prompt. Args: args: Either a string prompt or dict containing prompt and options + model: Model key from config (e.g., "flux", "sdxl", "anime") num_images: Number of images to generate (max 4) negative_prompt: Things to avoid in the generated image + width: Image width (overrides model default) + height: Image height (overrides model default) + steps: Number of inference steps + cfg_scale: Classifier-free guidance scale + seed: Random seed for reproducibility + aspect_ratio: Aspect ratio key (e.g., "16:9", "1:1") Returns: Dict with generated images or error information """ - # Handle both string and dict input for backward compatibility + # Parse input arguments if isinstance(args, dict): prompt = args.get('prompt', '') + model = args.get('model', model) num_images = args.get('num_images', num_images) negative_prompt = args.get('negative_prompt', negative_prompt) + width = args.get('width', width) + height = args.get('height', height) + steps = args.get('steps', steps) + cfg_scale = args.get('cfg_scale', cfg_scale) + seed = args.get('seed', seed) + aspect_ratio = args.get('aspect_ratio', aspect_ratio) else: - prompt = str(args) # Ensure it's a string - - num_images = min(num_images, 4) + prompt = str(args) + + # Get model configuration + model = model or self.settings.get("default_model", "flux") + model_config = self.get_model_info(model) + + if not model_config: + logging.warning(f"Model '{model}' not found, using default") + model = self.settings.get("default_model", "flux") + model_config = self.get_model_info(model) or {} + + model_id = model_config.get("model_id", "runware:101@1") + + # Handle aspect ratio + if aspect_ratio: + ratio_dims = self.get_aspect_ratio_dimensions(aspect_ratio) + if ratio_dims: + width = width or ratio_dims.get("width") + height = height or ratio_dims.get("height") + + # Apply defaults from model config + width = width or model_config.get("default_width", 1024) + height = height or model_config.get("default_height", 1024) + steps = steps or model_config.get("default_steps", 30) + cfg_scale = cfg_scale or model_config.get("default_cfg_scale", 7.5) + max_images = model_config.get("max_images", 4) + num_images = min(num_images, max_images) + + # Ensure dimensions are within limits and divisible by 64 + max_width = model_config.get("max_width", 2048) + max_height = model_config.get("max_height", 2048) + min_width = model_config.get("min_width", 512) + min_height = model_config.get("min_height", 512) + step_size = model_config.get("step_size", 64) + + width = max(min_width, min(width, max_width)) + height = max(min_height, min(height, max_height)) + width = (width // step_size) * step_size + height = (height // step_size) * step_size + + # Get negative prompt + if negative_prompt is None: + category = model_config.get("category", "general") + negative_prompt = self.get_default_negative_prompt(category) try: - # Ensure connection is established - await self.ensure_connected() + if not await self.ensure_connected(): + return { + "success": False, + "error": "Failed to connect to image generation API", + "prompt": prompt, + "image_urls": [], + "image_count": 0 + } - # Configure request for Runware - request_image = IImageInference( - positivePrompt=prompt, - numberResults=num_images, - model="runware:5@1", # Specify the model - negativePrompt=negative_prompt, - height=512, - width=512, - ) + # Build request parameters + request_params = { + "positivePrompt": prompt, + "model": model_id, + "numberResults": num_images, + "width": width, + "height": height, + "steps": steps, + "CFGScale": cfg_scale, + "outputFormat": self.settings.get("output_format", "WEBP") + } - # Generate images + if model_config.get("supports_negative_prompt", True) and negative_prompt: + request_params["negativePrompt"] = negative_prompt + + if seed is not None: + request_params["seed"] = seed + + request_image = IImageInference(**request_params) images = await self.runware.imageInference(requestImage=request_image) result = { "success": True, "prompt": prompt, - "image_urls": [], # Only URLs for API response + "model": model, + "model_name": model_config.get("name", model), + "image_urls": [], + "image_count": 0, + "width": width, + "height": height + } + + if images: + for image in images: + if hasattr(image, 'imageURL') and image.imageURL: + result["image_urls"].append(image.imageURL) + elif hasattr(image, 'imageDataURI') and image.imageDataURI: + result["image_urls"].append(image.imageDataURI) + + result["image_count"] = len(result["image_urls"]) + + if result["image_count"] > 0: + logging.info(f"Generated {result['image_count']} images with {model} for: {prompt[:50]}...") + else: + logging.warning(f"Image generation succeeded but no images received for: {prompt[:50]}...") + + return result + + except Exception as e: + logging.error(f"Error in generate_image: {e}") + return { + "success": False, + "error": str(e), + "prompt": prompt, + "model": model, + "image_urls": [], + "image_count": 0 + } + + async def upscale_image( + self, + args: Union[str, Dict], + scale_factor: int = 2, + model: str = None + ) -> Dict[str, Any]: + """ + Upscale an image to higher resolution. + + Args: + args: Image URL or dict with image_url/image_data and options + scale_factor: Upscale factor (2 or 4) + model: Upscale model key (e.g., "clarity", "swinir") + + Returns: + Dict with upscaled image information + """ + image_data = None + if isinstance(args, dict): + image_url = args.get('image_url', '') + scale_factor = args.get('scale_factor', scale_factor) + model = args.get('model', model) + else: + image_url = str(args) + + # Validate URL + is_valid, error_msg = self._validate_url(image_url) + if not is_valid: + return { + "success": False, + "error": f"Invalid image URL: {error_msg}", + "image_urls": [], + "image_count": 0 + } + + model = model or self.settings.get("default_upscale_model", "clarity") + upscale_models = self.get_upscale_models() + model_config = upscale_models.get(model, {}) + + if not model_config: + model = self.settings.get("default_upscale_model", "clarity") + model_config = upscale_models.get(model, {}) + + model_id = model_config.get("model_id", "runware:500@1") + supported_factors = model_config.get("supported_factors", [2, 4]) + + if scale_factor not in supported_factors: + scale_factor = supported_factors[0] if supported_factors else 2 + + try: + if not await self.ensure_connected(): + return { + "success": False, + "error": "Failed to connect to image processing API", + "image_urls": [], + "image_count": 0 + } + + # Pass URL directly to Runware API (it handles downloading) + logging.info(f"Sending image URL to Runware upscale API: {image_url}") + upscale_payload = IImageUpscale( + inputImage=image_url, + upscaleFactor=scale_factor, + model=model_id + ) + + upscaled_images = await self.runware.imageUpscale(upscaleGanPayload=upscale_payload) + + result = { + "success": True, + "original_url": image_url, + "scale_factor": scale_factor, + "model": model, + "model_name": model_config.get("name", model), + "image_urls": [], "image_count": 0 } - # Process generated images - handle different response formats - if images: - # Extract image URLs based on response structure - image_urls = [] - - # Case 1: Response is a direct list/iterable of image objects - if hasattr(images, '__iter__') and not hasattr(images, 'images'): - for image in images: - if hasattr(image, 'imageURL'): - image_urls.append(image.imageURL) - - # Case 2: Response has an 'images' attribute with URLs - elif hasattr(images, 'images') and images.images: - image_urls = images.images - - # Update result with image info - result["image_count"] = len(image_urls) - result["image_urls"] = image_urls # Only URLs in result - - # For Discord display, we'll download images separately in message handler + if upscaled_images: + for image in upscaled_images: + if hasattr(image, 'imageURL') and image.imageURL: + result["image_urls"].append(image.imageURL) + elif hasattr(image, 'imageSrc') and image.imageSrc: + result["image_urls"].append(image.imageSrc) + + result["image_count"] = len(result["image_urls"]) - # Log success or failure if result["image_count"] > 0: - logging.info(f"Generated {result['image_count']} images for prompt: {prompt[:50]}...") + logging.info(f"Successfully upscaled image by {scale_factor}x with {model}") else: - logging.warning(f"Image generation succeeded but no images were received for prompt: {prompt[:50]}...") + logging.warning("Upscaling succeeded but no images returned") return result except Exception as e: - error_message = f"Error in generate_image: {str(e)}" - logging.error(error_message) + logging.error(f"Error in upscale_image: {e}") return { - "success": False, + "success": False, "error": str(e), - "prompt": prompt, - "image_urls": [], # Include empty image_urls even in error case + "original_url": image_url, + "image_urls": [], "image_count": 0 } - async def edit_image(self, args, operation: str = "remove_background"): + async def remove_background( + self, + args: Union[str, Dict], + model: str = None + ) -> Dict[str, Any]: """ - Edit an image using various operations like background removal + Remove background from an image. Args: - args: Either a string image_url or dict containing image_url and options - operation: Type of edit operation (currently supports 'remove_background') + args: Image URL or dict with image_url/image_data and options + model: Background removal model key Returns: - Dict with edited image information + Dict with processed image information """ - # Handle both string and dict input for backward compatibility if isinstance(args, dict): image_url = args.get('image_url', '') - operation = args.get('operation', operation) + model = args.get('model', model) else: - image_url = str(args) # Ensure it's a string - + image_url = str(args) + + # Validate URL + is_valid, error_msg = self._validate_url(image_url) + if not is_valid: + return { + "success": False, + "error": f"Invalid image URL: {error_msg}", + "image_urls": [], + "image_count": 0 + } + + model = model or self.settings.get("default_background_removal_model", "bria") + bg_models = self.get_background_removal_models() + model_config = bg_models.get(model, {}) + + if not model_config: + model = self.settings.get("default_background_removal_model", "bria") + model_config = bg_models.get(model, {}) + + model_id = model_config.get("model_id", "runware:110@1") + try: - # Ensure connection is established - await self.ensure_connected() - - # Download the image first - image_data = None - async with aiohttp.ClientSession() as session: - async with session.get(image_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download image, status: {resp.status}", - "operation": operation - } - image_data = await resp.read() - - if operation == "remove_background": - # Import the necessary class from runware - from runware import IImageBackgroundRemoval - - # Create a temporary file to store the image - import tempfile - with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: - temp_file.write(image_data) - temp_path = temp_file.name - - try: - # Configure background removal request - background_removal_payload = IImageBackgroundRemoval( - image_initiator=temp_path - ) - - # Process the image - processed_images = await self.runware.imageBackgroundRemoval( - removeImageBackgroundPayload=background_removal_payload - ) - - # Clean up temporary file - try: - os.remove(temp_path) - except: - pass - - # Handle the response - result = { - "success": True, - "operation": operation, - "original_url": image_url, - "image_urls": [] - } - - # Extract image URLs from response - if processed_images: - for image in processed_images: - if hasattr(image, 'imageURL'): - result["image_urls"].append(image.imageURL) - - result["image_count"] = len(result["image_urls"]) - - if result["image_count"] > 0: - logging.info(f"Successfully removed background from image") - else: - logging.warning("Background removal succeeded but no images were returned") - - return result - - except Exception as e: - logging.error(f"Error in background removal: {str(e)}") - return { - "success": False, - "error": f"Error in background removal: {str(e)}", - "operation": operation - } - else: + if not await self.ensure_connected(): return { "success": False, - "error": f"Unsupported edit operation: {operation}", - "operation": operation + "error": "Failed to connect to image processing API", + "image_urls": [], + "image_count": 0 } + + # Pass URL directly to Runware API (it handles downloading) + logging.info(f"Sending image URL to Runware background removal API: {image_url}") + bg_removal_payload = IImageBackgroundRemoval( + inputImage=image_url, + model=model_id, + outputFormat="PNG" + ) + + processed_images = await self.runware.imageBackgroundRemoval( + removeImageBackgroundPayload=bg_removal_payload + ) + + result = { + "success": True, + "original_url": image_url, + "operation": "remove_background", + "model": model, + "model_name": model_config.get("name", model), + "image_urls": [], + "image_count": 0 + } + + if processed_images: + for image in processed_images: + if hasattr(image, 'imageURL') and image.imageURL: + result["image_urls"].append(image.imageURL) + + result["image_count"] = len(result["image_urls"]) + + if result["image_count"] > 0: + logging.info(f"Successfully removed background with {model}") + else: + logging.warning("Background removal succeeded but no images returned") + + return result except Exception as e: - error_message = f"Error in edit_image: {str(e)}" - logging.error(error_message) + logging.error(f"Error in remove_background: {e}") return { - "success": False, + "success": False, "error": str(e), + "original_url": image_url, + "operation": "remove_background", + "image_urls": [], + "image_count": 0 + } + + async def edit_image(self, args, operation: str = "remove_background") -> Dict[str, Any]: + """Edit an image - backward compatibility alias""" + if isinstance(args, dict): + operation = args.get('operation', operation) + + if operation == "remove_background": + return await self.remove_background(args) + else: + return { + "success": False, + "error": f"Unsupported operation: {operation}", "operation": operation, "image_urls": [], "image_count": 0 } - async def enhance_prompt(self, args, num_versions: int = 3, max_length: int = 64) -> Dict[str, Any]: - """ - Enhance a text prompt with AI to create more detailed/creative versions - - Args: - args: Either a string prompt or dict containing prompt and options - num_versions: Number of enhanced versions to generate - max_length: Maximum length of each enhanced prompt - - Returns: - Dict with enhanced prompt information - """ - # Handle both string and dict input for backward compatibility - if isinstance(args, dict): - prompt = args.get('prompt', '') - num_versions = args.get('num_versions', num_versions) - max_length = args.get('max_length', max_length) - else: - prompt = str(args) # Ensure it's a string - - try: - # Ensure connection is established - await self.ensure_connected() - - # Configure prompt enhancement request - prompt_enhancer = IPromptEnhance( - prompt=prompt, - promptVersions=num_versions, - promptMaxLength=max_length, - ) - - # Get enhanced prompts - enhanced_prompts = await self.runware.promptEnhance(promptEnhancer=prompt_enhancer) - - result = { - "success": True, - "original_prompt": prompt, - "enhanced_prompts": [], - "prompt_count": 0 - } - - # Extract enhanced prompts from the response - if enhanced_prompts: - for enhanced_prompt in enhanced_prompts: - if hasattr(enhanced_prompt, 'text') and enhanced_prompt.text: - result["enhanced_prompts"].append(enhanced_prompt.text) - - result["prompt_count"] = len(result["enhanced_prompts"]) - - # Log success or failure - if result["prompt_count"] > 0: - logging.info(f"Generated {result['prompt_count']} enhanced prompts for: {prompt[:50]}...") - else: - logging.warning(f"Prompt enhancement succeeded but no prompts were received") - - return result - - except Exception as e: - error_message = f"Error in enhance_prompt: {str(e)}" - logging.error(error_message) - return { - "success": False, - "error": str(e), - "original_prompt": prompt, - "enhanced_prompts": [], - "prompt_count": 0 - } - - async def image_to_text(self, args) -> Dict[str, Any]: - """ - Convert an image to a text description - - Args: - args: Either a string image_url or dict containing image_url - - Returns: - Dict with image caption information - """ - # Handle both string and dict input for backward compatibility + async def image_to_text(self, args: Union[str, Dict]) -> Dict[str, Any]: + """Generate a text caption/description from an image.""" if isinstance(args, dict): image_url = args.get('image_url', '') else: - image_url = str(args) # Ensure it's a string - + image_url = str(args) + try: - # Ensure connection is established - await self.ensure_connected() + if not await self.ensure_connected(): + return { + "success": False, + "error": "Failed to connect to image processing API", + "caption": "" + } - # Download the image first - image_data = None - async with aiohttp.ClientSession() as session: - async with session.get(image_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download image, status: {resp.status}" - } - image_data = await resp.read() + image_data = await self._download_image(image_url) + if image_data is None: + return { + "success": False, + "error": "Failed to download input image", + "caption": "" + } + + temp_path = await self._save_temp_image(image_data) - # Create a temporary file to store the image - import tempfile - with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: - temp_file.write(image_data) - temp_path = temp_file.name - try: - # Configure image caption request - request_image_to_text = IImageCaption( - image_initiator=temp_path - ) - - # Get image caption - image_caption = await self.runware.imageCaption( - requestImageToText=request_image_to_text - ) - - # Clean up temporary file - try: - os.remove(temp_path) - except: - pass + caption_request = IImageCaption(inputImage=temp_path) + caption_result = await self.runware.imageCaption(requestImageToText=caption_request) result = { "success": True, @@ -361,35 +611,23 @@ class ImageGenerator: "caption": "" } - # Extract caption from the response - if image_caption and hasattr(image_caption, 'text'): - result["caption"] = image_caption.text + if caption_result: + if hasattr(caption_result, 'text'): + result["caption"] = caption_result.text + elif isinstance(caption_result, list) and len(caption_result) > 0: + if hasattr(caption_result[0], 'text'): + result["caption"] = caption_result[0].text - # Log success or failure if result["caption"]: - logging.info(f"Generated caption for image: {result['caption'][:50]}...") - else: - logging.warning(f"Image caption generation succeeded but no text was received") + logging.info(f"Generated caption: {result['caption'][:50]}...") return result - except Exception as e: - # Clean up temporary file - try: - os.remove(temp_path) - except: - pass - - logging.error(f"Error in image captioning: {str(e)}") - return { - "success": False, - "error": f"Error in image captioning: {str(e)}", - "image_url": image_url - } + finally: + await self._cleanup_temp_file(temp_path) except Exception as e: - error_message = f"Error in image_to_text: {str(e)}" - logging.error(error_message) + logging.error(f"Error in image_to_text: {e}") return { "success": False, "error": str(e), @@ -397,293 +635,282 @@ class ImageGenerator: "caption": "" } - async def upscale_image(self, args, scale_factor: int = 4) -> Dict[str, Any]: - """ - Upscale an image to a higher resolution - - Args: - args: Either a string image_url or dict containing image_url and options - scale_factor: Factor by which to upscale the image (2-4) - - Returns: - Dict with upscaled image information - """ - # Handle both string and dict input for backward compatibility - if isinstance(args, dict): - image_url = args.get('image_url', '') - scale_factor = args.get('scale_factor', scale_factor) - else: - image_url = str(args) # Ensure it's a string - - # Ensure scale factor is within valid range - scale_factor = max(2, min(scale_factor, 4)) - - try: - # Ensure connection is established - await self.ensure_connected() - - # Download the image first - image_data = None - async with aiohttp.ClientSession() as session: - async with session.get(image_url) as resp: - if resp.status != 200: - return { - "success": False, - "error": f"Failed to download image, status: {resp.status}", - "image_urls": [], - "image_count": 0 - } - image_data = await resp.read() - - # Create a temporary file to store the image - import tempfile - with tempfile.NamedTemporaryFile(delete=False, suffix='.jpg') as temp_file: - temp_file.write(image_data) - temp_path = temp_file.name - - try: - # Configure upscale request - upscale_payload = IImageUpscale( - inputImage=temp_path, - upscaleFactor=scale_factor - ) - - # Get upscaled images - upscaled_images = await self.runware.imageUpscale( - upscaleGanPayload=upscale_payload - ) - - # Clean up temporary file - try: - os.remove(temp_path) - except: - pass - - result = { - "success": True, - "original_url": image_url, - "scale_factor": scale_factor, - "image_urls": [], - "image_count": 0 - } - - # Extract image URLs from response - if upscaled_images: - for image in upscaled_images: - if hasattr(image, 'imageSrc'): - result["image_urls"].append(image.imageSrc) - - result["image_count"] = len(result["image_urls"]) - - # Log success or failure - if result["image_count"] > 0: - logging.info(f"Successfully upscaled image by factor {scale_factor}") - else: - logging.warning(f"Image upscaling succeeded but no images were returned") - - return result - - except Exception as e: - # Clean up temporary file - try: - os.remove(temp_path) - except: - pass - - logging.error(f"Error in image upscaling: {str(e)}") - return { - "success": False, - "error": f"Error in image upscaling: {str(e)}", - "image_urls": [], - "image_count": 0 - } - - except Exception as e: - error_message = f"Error in upscale_image: {str(e)}" - logging.error(error_message) - return { - "success": False, - "error": str(e), - "original_url": image_url, - "image_urls": [], - "image_count": 0 - } - - async def photo_maker(self, args, style: str = "No style", - strength: int = 40, steps: int = 35, num_images: int = 1, - height: int = 512, width: int = 512) -> Dict[str, Any]: - """ - Generate images based on reference photos and a text prompt - - Args: - args: Either a dict containing prompt, input_images and options, or just prompt string - style: Style to apply to the generated image - strength: Strength of the input images' influence (0-100) - steps: Number of generation steps - num_images: Number of images to generate - height: Output image height - width: Output image width - - Returns: - Dict with generated image information - """ - # Handle both string and dict input for backward compatibility + async def enhance_prompt( + self, + args: Union[str, Dict], + num_versions: int = 3, + max_length: int = 200 + ) -> Dict[str, Any]: + """Enhance a text prompt with AI for better image generation results.""" if isinstance(args, dict): prompt = args.get('prompt', '') - input_images = args.get('input_images', []) + num_versions = args.get('num_versions', num_versions) + max_length = args.get('max_length', max_length) + else: + prompt = str(args) + + try: + if not await self.ensure_connected(): + return { + "success": False, + "error": "Failed to connect to API", + "enhanced_prompts": [], + "prompt_count": 0 + } + + enhance_request = IPromptEnhance( + prompt=prompt, + promptVersions=num_versions, + promptMaxLength=max_length + ) + + enhanced = await self.runware.promptEnhance(promptEnhancer=enhance_request) + + result = { + "success": True, + "original_prompt": prompt, + "enhanced_prompts": [], + "prompt_count": 0 + } + + if enhanced: + for item in enhanced: + if hasattr(item, 'text') and item.text: + result["enhanced_prompts"].append(item.text) + + result["prompt_count"] = len(result["enhanced_prompts"]) + + if result["prompt_count"] > 0: + logging.info(f"Generated {result['prompt_count']} enhanced prompts") + + return result + + except Exception as e: + logging.error(f"Error in enhance_prompt: {e}") + return { + "success": False, + "error": str(e), + "original_prompt": prompt, + "enhanced_prompts": [], + "prompt_count": 0 + } + + async def photo_maker( + self, + args: Union[str, Dict], + input_images: List[str] = None, + style: str = "No style", + strength: int = 40, + steps: int = 35, + num_images: int = 1, + width: int = 1024, + height: int = 1024 + ) -> Dict[str, Any]: + """Generate images based on reference photos and a text prompt.""" + if isinstance(args, dict): + prompt = args.get('prompt', '') + input_images = args.get('input_images', input_images or []) style = args.get('style', style) strength = args.get('strength', strength) steps = args.get('steps', steps) num_images = args.get('num_images', num_images) - height = args.get('height', height) width = args.get('width', width) + height = args.get('height', height) else: - prompt = str(args) # Ensure it's a string - input_images = [] # Default empty list - + prompt = str(args) + input_images = input_images or [] + try: - # Ensure connection is established - await self.ensure_connected() + if not await self.ensure_connected(): + return { + "success": False, + "error": "Failed to connect to API", + "image_urls": [], + "image_count": 0 + } - # Configure request for photo maker - request_photo = IPhotoMaker( + photo_request = IPhotoMaker( positivePrompt=prompt, - steps=steps, - numberResults=num_images, - height=height, - width=width, + inputImages=input_images, style=style, strength=strength, - outputFormat="WEBP", - includeCost=True, - taskUUID=str(uuid.uuid4()), - inputImages=input_images, + steps=steps, + numberResults=num_images, + width=width, + height=height, + outputFormat=self.settings.get("output_format", "WEBP"), + taskUUID=str(uuid.uuid4()) ) - # Generate photos - photos = await self.runware.photoMaker(requestPhotoMaker=request_photo) + photos = await self.runware.photoMaker(requestPhotoMaker=photo_request) result = { "success": True, "prompt": prompt, + "style": style, "image_urls": [], "image_count": 0 } - # Extract image URLs from response if photos: for photo in photos: - if hasattr(photo, 'imageURL'): + if hasattr(photo, 'imageURL') and photo.imageURL: result["image_urls"].append(photo.imageURL) result["image_count"] = len(result["image_urls"]) - # Log success or failure if result["image_count"] > 0: - logging.info(f"Generated {result['image_count']} photos with PhotoMaker for prompt: {prompt[:50]}...") - else: - logging.warning(f"PhotoMaker succeeded but no images were received for prompt: {prompt[:50]}...") + logging.info(f"Generated {result['image_count']} photos with PhotoMaker") return result - + except Exception as e: - error_message = f"Error in photo_maker: {str(e)}" - logging.error(error_message) + logging.error(f"Error in photo_maker: {e}") return { - "success": False, + "success": False, "error": str(e), "prompt": prompt, "image_urls": [], "image_count": 0 } - async def generate_image_with_refiner(self, args, num_images: int = 1, - negative_prompt: str = "blurry, distorted, low quality", - model: str = "civitai:101055@128078", - refiner_start_step: int = 20) -> Dict[str, Any]: - """ - Generate images with a refiner model for better quality - - Args: - args: Either a string prompt or dict containing prompt and options - num_images: Number of images to generate (max 4) - negative_prompt: Things to avoid in the generated image - model: Model to use for generation - refiner_start_step: Step at which to start refining - - Returns: - Dict with generated images or error information - """ - # Handle both string and dict input for backward compatibility + async def generate_image_with_refiner( + self, + args: Union[str, Dict], + model: str = "sdxl", + num_images: int = 1, + negative_prompt: str = None + ) -> Dict[str, Any]: + """Generate high-quality images with refiner model.""" if isinstance(args, dict): - prompt = args.get('prompt', '') - num_images = args.get('num_images', num_images) - negative_prompt = args.get('negative_prompt', negative_prompt) + args['model'] = args.get('model', model) else: - prompt = str(args) # Ensure it's a string - - num_images = min(num_images, 4) + args = { + 'prompt': str(args), + 'model': model, + 'num_images': num_images, + 'negative_prompt': negative_prompt + } + + return await self.generate_image(args) + + # ================== Helper Methods ================== + + def _validate_url(self, url: str) -> tuple[bool, str]: + """Validate if a string is a valid image URL""" + if not url or not isinstance(url, str): + return False, "No URL provided" + + url = url.strip() + + # Check for valid URL scheme + if not url.startswith(('http://', 'https://')): + return False, f"Invalid URL scheme. URL must start with http:// or https://. Got: {url[:50]}..." + + # Check for common image extensions or known image hosts + image_extensions = ('.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.tiff') + image_hosts = ('cdn.discordapp.com', 'media.discordapp.net', 'i.imgur.com', + 'imgur.com', 'cloudinary.com', 'unsplash.com', 'pexels.com', + 'runware.ai', 'replicate.delivery') + + url_lower = url.lower() + has_image_ext = any(ext in url_lower for ext in image_extensions) + is_image_host = any(host in url_lower for host in image_hosts) + + # URLs with query params might not have extension visible + if not has_image_ext and not is_image_host and '?' not in url: + logging.warning(f"URL may not be an image: {url[:100]}") + + return True, "OK" + + async def _download_image(self, url: str) -> Optional[bytes]: + """Download image from URL with validation and Discord CDN support""" + # Validate URL first + is_valid, error_msg = self._validate_url(url) + if not is_valid: + logging.error(f"Invalid image URL: {error_msg}") + return None + + url = url.strip() try: - # Ensure connection is established - await self.ensure_connected() - - # Configure request for Runware with refiner functionality - # Note: Refiner functionality may vary based on Runware SDK version - request_image = IImageInference( - positivePrompt=prompt, - numberResults=num_images, - model=model, - negativePrompt=negative_prompt, - height=512, - width=512, - # Add refiner parameters directly if supported by the SDK - ) - - # Generate images - images = await self.runware.imageInference(requestImage=request_image) - - result = { - "success": True, - "prompt": prompt, - "image_urls": [], - "image_count": 0 + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'image/*,*/*', + 'Accept-Language': 'en-US,en;q=0.9', } - # Process generated images - if images: - # Extract image URLs based on response structure - image_urls = [] - - # Case 1: Response is a direct list/iterable of image objects - if hasattr(images, '__iter__') and not hasattr(images, 'images'): - for image in images: - if hasattr(image, 'imageURL'): - image_urls.append(image.imageURL) - - # Case 2: Response has an 'images' attribute with URLs - elif hasattr(images, 'images') and images.images: - image_urls = images.images - - # Update result with image info - result["image_count"] = len(image_urls) - result["image_urls"] = image_urls + # For Discord CDN URLs, add bot authorization if available + if 'cdn.discordapp.com' in url or 'media.discordapp.net' in url: + try: + from src.config.config import DISCORD_TOKEN + if DISCORD_TOKEN: + headers['Authorization'] = f'Bot {DISCORD_TOKEN}' + logging.debug("Using Discord bot token for CDN access") + except ImportError: + pass - # Log success or failure - if result["image_count"] > 0: - logging.info(f"Generated {result['image_count']} refined images for prompt: {prompt[:50]}...") - else: - logging.warning(f"Refined image generation succeeded but no images were received for prompt: {prompt[:50]}...") - - return result - + async with aiohttp.ClientSession() as session: + async with session.get(url, timeout=aiohttp.ClientTimeout(total=30), headers=headers) as resp: + if resp.status == 200: + content_type = resp.headers.get('Content-Type', '') + if not content_type.startswith('image/') and 'octet-stream' not in content_type: + logging.warning(f"Response may not be an image. Content-Type: {content_type}") + return await resp.read() + elif resp.status == 404: + logging.error(f"Image not found (404). URL: {url[:100]}...") + return None + elif resp.status == 403: + logging.error(f"Access denied (403). The image URL may have expired or requires re-uploading. URL: {url[:100]}...") + return None + else: + logging.error(f"Failed to download image: HTTP {resp.status} for {url[:100]}...") + return None + except aiohttp.ClientError as e: + logging.error(f"Network error downloading image: {e}") + return None except Exception as e: - error_message = f"Error in generate_image_with_refiner: {str(e)}" - logging.error(error_message) - return { - "success": False, - "error": str(e), - "prompt": prompt, - "image_urls": [], - "image_count": 0 - } \ No newline at end of file + logging.error(f"Error downloading image: {e}") + return None + + async def _save_temp_image(self, image_data: bytes, suffix: str = '.jpg') -> str: + """Save image data to temporary file""" + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: + temp_file.write(image_data) + return temp_file.name + + async def _cleanup_temp_file(self, file_path: str): + """Clean up temporary file""" + try: + if os.path.exists(file_path): + os.remove(file_path) + except Exception as e: + logging.warning(f"Failed to clean up temp file {file_path}: {e}") + + def list_models(self) -> str: + """Get a formatted string listing all available models""" + models = self.get_available_models() + lines = ["**Available Image Models:**"] + for key, config in models.items(): + name = config.get("name", key) + desc = config.get("description", "") + lines.append(f"β€’ `{key}` - {name}: {desc}") + return "\n".join(lines) + + def list_upscale_models(self) -> str: + """Get a formatted string listing all upscale models""" + models = self.get_upscale_models() + lines = ["**Available Upscale Models:**"] + for key, config in models.items(): + name = config.get("name", key) + factors = config.get("supported_factors", [2]) + lines.append(f"β€’ `{key}` - {name} (factors: {factors})") + return "\n".join(lines) + + def reload_config(self): + """Reload configuration from file""" + global IMAGE_CONFIG + IMAGE_CONFIG = load_image_config() + self.config = IMAGE_CONFIG + self.settings = self.config.get("settings", {}) + logging.info("Image configuration reloaded") diff --git a/src/utils/openai_utils.py b/src/utils/openai_utils.py index c5e8a89..68cb04e 100644 --- a/src/utils/openai_utils.py +++ b/src/utils/openai_utils.py @@ -28,12 +28,11 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "edit_image", - "description": "Edit images (remove background). Returns URLs.", + "description": "Remove background from an image. Requires image_url from user's uploaded image or a web URL.", "parameters": { "type": "object", "properties": { - "image_url": {"type": "string"}, - "operation": {"type": "string", "enum": ["remove_background"]} + "image_url": {"type": "string", "description": "URL of the image to edit"} }, "required": ["image_url"] } @@ -43,12 +42,12 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "enhance_prompt", - "description": "Create enhanced prompt versions.", + "description": "Improve and expand a prompt for better image generation results", "parameters": { "type": "object", "properties": { - "prompt": {"type": "string"}, - "num_versions": {"type": "integer", "minimum": 1, "maximum": 5} + "prompt": {"type": "string", "description": "The prompt to enhance"}, + "num_versions": {"type": "integer", "maximum": 5, "description": "Number of enhanced versions"} }, "required": ["prompt"] } @@ -58,10 +57,10 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "image_to_text", - "description": "Convert image to text.", + "description": "Generate a text description/caption of an image or extract text via OCR. When user uploads an image, pass 'latest_image' as image_url - the system will use the most recent uploaded image.", "parameters": { "type": "object", - "properties": {"image_url": {"type": "string"}}, + "properties": {"image_url": {"type": "string", "description": "Pass 'latest_image' to use the user's most recently uploaded image"}}, "required": ["image_url"] } } @@ -70,12 +69,13 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "upscale_image", - "description": "Upscale image resolution. Returns URLs.", + "description": "Enlarge/upscale an image to higher resolution. When user uploads an image and wants to upscale it, pass 'latest_image' as the image_url - the system will use the most recent uploaded image.", "parameters": { "type": "object", "properties": { - "image_url": {"type": "string"}, - "scale_factor": {"type": "integer", "enum": [2, 3, 4]} + "image_url": {"type": "string", "description": "Pass 'latest_image' to use the user's most recently uploaded image"}, + "scale_factor": {"type": "integer", "enum": [2, 4], "description": "Scale factor (2 or 4)"}, + "model": {"type": "string", "enum": ["clarity", "ccsr", "sd-latent", "swinir"], "description": "Upscale model to use"} }, "required": ["image_url"] } @@ -85,14 +85,15 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "photo_maker", - "description": "Generate images from reference photos. Returns URLs.", + "description": "Generate new images based on reference photos. When user uploads an image and wants to use it as reference, pass ['latest_image'] as input_images - the system will use the most recent uploaded image.", "parameters": { "type": "object", "properties": { - "prompt": {"type": "string"}, - "input_images": {"type": "array", "items": {"type": "string"}}, - "strength": {"type": "integer", "minimum": 1, "maximum": 100}, - "num_images": {"type": "integer", "minimum": 1, "maximum": 4} + "prompt": {"type": "string", "description": "Description of the desired output image"}, + "input_images": {"type": "array", "items": {"type": "string"}, "description": "Pass ['latest_image'] to use the user's most recently uploaded image"}, + "style": {"type": "string", "description": "Style to apply (e.g., 'Photographic', 'Cinematic', 'Anime')"}, + "strength": {"type": "integer", "minimum": 0, "maximum": 100, "description": "Reference image influence (0-100)"}, + "num_images": {"type": "integer", "maximum": 4, "description": "Number of images to generate"} }, "required": ["prompt", "input_images"] } @@ -102,28 +103,44 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "generate_image_with_refiner", - "description": "Generate high-quality images. Returns URLs.", + "description": "Generate high-quality refined images with extra detail using SDXL refiner. Best for detailed artwork.", "parameters": { "type": "object", "properties": { - "prompt": {"type": "string"}, - "num_images": {"type": "integer", "minimum": 1, "maximum": 4}, - "negative_prompt": {"type": "string"} + "prompt": {"type": "string", "description": "Detailed description of the image to generate"}, + "model": {"type": "string", "enum": ["sdxl", "flux", "realistic"], "description": "Base model to use"}, + "num_images": {"type": "integer", "maximum": 4, "description": "Number of images to generate"}, + "negative_prompt": {"type": "string", "description": "Things to avoid in the image"} }, "required": ["prompt"] } } }, + { + "type": "function", + "function": { + "name": "remove_background", + "description": "Remove background from an image. When user uploads an image and wants to remove its background, pass 'latest_image' as the image_url - the system will use the most recent uploaded image.", + "parameters": { + "type": "object", + "properties": { + "image_url": {"type": "string", "description": "Pass 'latest_image' to use the user's most recently uploaded image"}, + "model": {"type": "string", "enum": ["bria", "rembg", "birefnet-base", "birefnet-general", "birefnet-portrait"], "description": "Background removal model"} + }, + "required": ["image_url"] + } + } + }, { "type": "function", "function": { "name": "google_search", - "description": "Search web for current information.", + "description": "Search the web for current information", "parameters": { "type": "object", "properties": { "query": {"type": "string"}, - "num_results": {"type": "integer", "minimum": 1, "maximum": 10} + "num_results": {"type": "integer", "maximum": 10} }, "required": ["query"] } @@ -133,10 +150,10 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "scrape_webpage", - "description": "Extract content from webpage.", + "description": "Extract and read content from a webpage URL", "parameters": { "type": "object", - "properties": {"url": {"type": "string"}}, + "properties": {"url": {"type": "string", "description": "The webpage URL to scrape"}}, "required": ["url"] } } @@ -145,12 +162,20 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "generate_image", - "description": "Generate images from text. Returns URLs.", + "description": "Create/generate images from text. Models: flux (best), flux-dev, sdxl, realistic (photos), anime, dreamshaper. Supports aspect ratios.", "parameters": { "type": "object", "properties": { - "prompt": {"type": "string"}, - "num_images": {"type": "integer", "minimum": 1, "maximum": 4} + "prompt": {"type": "string", "description": "Detailed description of the image to create"}, + "model": {"type": "string", "enum": ["flux", "flux-dev", "sdxl", "realistic", "anime", "dreamshaper"], "description": "Model to use for generation"}, + "num_images": {"type": "integer", "maximum": 4, "description": "Number of images (1-4)"}, + "aspect_ratio": {"type": "string", "enum": ["1:1", "16:9", "9:16", "4:3", "3:4", "3:2", "2:3", "21:9"], "description": "Aspect ratio preset"}, + "width": {"type": "integer", "description": "Custom width (512-2048, divisible by 64)"}, + "height": {"type": "integer", "description": "Custom height (512-2048, divisible by 64)"}, + "negative_prompt": {"type": "string", "description": "Things to avoid in the image"}, + "steps": {"type": "integer", "minimum": 10, "maximum": 50, "description": "Inference steps (more = higher quality)"}, + "cfg_scale": {"type": "number", "minimum": 1, "maximum": 20, "description": "Guidance scale (higher = more prompt adherence)"}, + "seed": {"type": "integer", "description": "Random seed for reproducibility"} }, "required": ["prompt"] } @@ -160,33 +185,12 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "execute_python_code", - "description": """Execute Python with AUTO-INSTALL. Packages (pandas, numpy, matplotlib, seaborn, sklearn, plotly, opencv, etc.) install automatically when imported. Just use 'import' normally. Generated files (CSV, images, JSON) auto-captured and sent to user (stored 48h). Load user files: load_file('file_id'). Example: import pandas as pd; df=load_file('id'); df.to_csv('out.csv')""", + "description": "Run Python code. Packages auto-install. Use load_file('file_id') for user files. Output files auto-sent to user.", "parameters": { "type": "object", "properties": { - "code": { - "type": "string", - "description": "Python code to execute. Import any approved package - they auto-install!" - }, - "input_data": { - "type": "string", - "description": "Optional input data (DEPRECATED - use load_file() in code instead)" - }, - "install_packages": { - "type": "array", - "items": {"type": "string"}, - "description": "OPTIONAL: Pre-install packages. Usually not needed as packages auto-install on import." - }, - "enable_visualization": { - "type": "boolean", - "description": "DEPRECATED: Just use plt.savefig() to create images" - }, - "timeout": { - "type": "integer", - "minimum": 1, - "maximum": 300, - "description": "Execution timeout in seconds (default: 60)" - } + "code": {"type": "string", "description": "Python code to execute"}, + "timeout": {"type": "integer", "maximum": 300, "description": "Timeout in seconds"} }, "required": ["code"] } @@ -196,7 +200,7 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "set_reminder", - "description": "Set user reminder with flexible time formats.", + "description": "Set reminder", "parameters": { "type": "object", "properties": { @@ -211,7 +215,7 @@ def get_tools_for_model() -> List[Dict[str, Any]]: "type": "function", "function": { "name": "get_reminders", - "description": "Get user reminders list.", + "description": "List reminders", "parameters": {"type": "object", "properties": {}} } }