4 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
9b52890840 Add comprehensive quick start guide and final documentation for enhanced features
Co-authored-by: cauvang32 <113093128+cauvang32@users.noreply.github.com>
2025-07-29 14:08:49 +00:00
copilot-swe-agent[bot]
f323e7b6b2 Implement comprehensive bot enhancements: smart model selection, user preferences, conversation management, and enhanced file processing
Co-authored-by: cauvang32 <113093128+cauvang32@users.noreply.github.com>
2025-07-29 14:07:19 +00:00
copilot-swe-agent[bot]
0bc074be14 Initial analysis and improvement planning for Discord bot
Co-authored-by: cauvang32 <113093128+cauvang32@users.noreply.github.com>
2025-07-29 13:51:30 +00:00
copilot-swe-agent[bot]
3a2a0d5c3e Initial plan 2025-07-29 13:44:57 +00:00
19 changed files with 2704 additions and 109 deletions

32
.gitignore vendored
View File

@@ -1,14 +1,18 @@
test.py
.env
chat_history.db
bot_copy.py
__pycache__/bot.cpython-312.pyc
tests/__pycache__/test_bot.cpython-312.pyc
.vscode/settings.json
chatgpt.zip
response.txt
.venv
venv
temp_charts
.idea
temp_data_files
test.py
.env
chat_history.db
bot_copy.py
__pycache__/
*/__pycache__/
**/__pycache__/
*.pyc
tests/__pycache__/test_bot.cpython-312.pyc
.vscode/settings.json
chatgpt.zip
response.txt
.venv
venv
temp_charts
.idea
temp_data_files
logs/

202
ENHANCEMENTS.md Normal file
View File

@@ -0,0 +1,202 @@
# Enhanced Features Documentation
## Overview
This update introduces several significant enhancements to the ChatGPT Discord Bot to improve user experience, functionality, and personalization.
## New Features
### 1. 🧠 Smart Model Selection
Automatically suggests the best AI model based on the type of task being requested.
**Commands:**
- `/smart_model <task>` - Get model recommendations for specific tasks
**Features:**
- Analyzes user input to detect task types (coding, reasoning, creative, analysis, etc.)
- Suggests optimal models for each task type
- Respects user preferences while providing alternatives
- Provides explanations for model recommendations
**Task Types Detected:**
- **Reasoning**: Math problems, logic, step-by-step analysis → `openai/o1-preview`, `openai/o1`
- **Coding**: Programming, debugging, code review → `openai/gpt-4o`, `openai/o1-preview`
- **Creative**: Writing, stories, marketing content → `openai/gpt-4o`, `openai/gpt-4o-mini`
- **Analysis**: Data analysis, research, insights → `openai/gpt-4o`, `openai/o1-preview`
- **General**: Quick questions, casual chat → `openai/gpt-4o-mini`, `openai/gpt-4o`
### 2. ⚙️ User Preferences System
Comprehensive personalization system allowing users to customize bot behavior.
**Commands:**
- `/preferences view` - See all current settings
- `/preferences set <setting> <value>` - Update a specific setting
- `/preferences reset` - Reset to default settings
**Available Preferences:**
- `preferred_model` - Default AI model for responses
- `auto_model_selection` - Enable/disable smart model selection
- `response_style` - balanced, concise, detailed
- `show_model_suggestions` - Show model selection explanations
- `enable_conversation_summary` - Auto-summarize long conversations
- `max_response_length` - short, medium, long
- `language` - Response language (auto-detect or specific)
- `timezone` - For reminders and timestamps
- `code_execution_allowed` - Allow/block code execution
- `image_generation_style` - Style preferences for image generation
- `notification_reminders` - Enable/disable reminder notifications
- `analytics_opt_in` - Allow usage analytics collection
### 3. 📊 Conversation Management
Intelligent conversation context management with automatic summarization.
**Commands:**
- `/conversation_stats` - View conversation statistics and health
**Features:**
- Automatic conversation summarization when context gets too long
- Token usage tracking and optimization
- Context preservation while maintaining performance
- Configurable summarization preferences
**How it Works:**
- Monitors conversation length and token usage
- Automatically creates summaries of older messages when needed
- Preserves recent context while condensing historical information
- Maintains conversation continuity across long sessions
### 4. 📁 Enhanced File Processing
Expanded file type support with intelligent processing for various document formats.
**Commands:**
- `/process_file <file>` - Process and analyze various file types
**Supported File Types:**
- **Documents**: .txt, .md, .docx (if python-docx installed)
- **Presentations**: .pptx (if python-pptx installed)
- **Data**: .csv, .xlsx, .xls, .json, .yaml, .yml
- **Code**: .py, .js, .html, .css, .xml
- **Logs**: .log files with error/warning analysis
**Features:**
- Intelligent content extraction and analysis
- Metadata generation (file stats, structure analysis)
- Content summarization and insights
- Error handling for corrupted or invalid files
- File size and format validation
### 5. 🔍 Enhanced Help System
Improved help and feature discovery system.
**Commands:**
- `/help_enhanced [category]` - Detailed help with categories
**Categories:**
- **New Features** - Overview of latest enhancements
- **AI Models** - Guide to model selection and capabilities
- **Preferences** - How to customize your experience
- **File Processing** - Supported formats and usage
- **All Commands** - Complete command reference
- **Tips & Tricks** - Power user features and best practices
## Integration with Existing Features
### Enhanced Message Processing
- Smart model selection is integrated into normal chat flow
- User preferences are automatically applied to all interactions
- Conversation summarization works transparently in the background
- File processing handles both new formats and existing PDF/image support
### Backward Compatibility
- All existing commands and features remain unchanged
- New features are opt-in and don't interfere with current workflows
- Default settings maintain current behavior for existing users
- Progressive enhancement approach ensures smooth transition
## Performance Improvements
### Caching and Optimization
- User preferences are cached for faster access
- Conversation summaries reduce token usage
- Smart model selection prevents unnecessary API calls
- File processing is optimized for large documents
### Database Enhancements
- New `user_preferences` collection for settings storage
- Improved caching system with instance-level cache management
- Better error handling and fallback mechanisms
## Technical Implementation
### New Utilities
- `model_selector.py` - Smart model recommendation engine
- `user_preferences.py` - Comprehensive preferences management
- `conversation_manager.py` - Conversation summarization and optimization
- `enhanced_file_processor.py` - Multi-format file processing
### Enhanced Components
- Updated `commands.py` with new slash commands
- Enhanced `message_handler.py` with integrated smart features
- Improved `db_handler.py` with preferences support
- Extended test suite with `test_enhancements.py`
### Security and Privacy
- All new features respect existing blacklist/whitelist systems
- User preferences are stored securely and can be reset
- File processing includes size limits and validation
- Analytics opt-in ensures user privacy control
## Usage Examples
### Smart Model Selection
```
User: "Help me solve this complex math equation step by step"
Bot: 🧠 Smart Model Selection: Switched to `openai/o1-preview` for this task.
💡 Reason: Optimized for reasoning tasks
```
### Preferences Configuration
```
/preferences set response_style detailed
/preferences set auto_model_selection true
/preferences set preferred_model openai/gpt-4o
```
### File Processing
```
Upload: resume.docx
Bot: 📄 File Analysis: resume.docx
📊 File Info:
• Type: DOCX
• Size: 45.2 KB
• Paragraphs: 23
• Tables: 2
```
## Migration and Deployment
### For Existing Users
- No action required - all existing functionality preserved
- Gradual feature adoption through natural discovery
- Optional preference configuration for enhanced experience
### For Administrators
- New preferences collection will be created automatically
- No database migration required
- Enhanced logging and monitoring capabilities
- New admin commands for preference management
## Future Enhancements
This foundation enables future improvements:
- Advanced analytics and usage insights
- Plugin system for custom user extensions
- Integration with external productivity tools
- Enhanced collaboration features
- More sophisticated AI agent behaviors
## Support and Documentation
- Use `/help_enhanced` for interactive guidance
- Check `/conversation_stats` for usage monitoring
- Use `/preferences view` to review current settings
- All features include comprehensive error handling and user feedback

162
QUICK_START.md Normal file
View File

@@ -0,0 +1,162 @@
# Quick Start Guide for Enhanced Features
## 🚀 New Commands Overview
### Smart Model Selection
```
/smart_model task: "Help me debug this Python code"
# Output: Suggests openai/gpt-4o for coding tasks with explanation
```
### User Preferences
```
/preferences view
# Shows all your current settings
/preferences set preferred_model openai/gpt-4o
# Sets your default model
/preferences set response_style detailed
# Makes responses more comprehensive
/preferences set auto_model_selection true
# Enables automatic model switching based on task type
```
### Conversation Management
```
/conversation_stats
# Shows: 45 messages, 12,500 tokens, needs summarization: No
```
### Enhanced File Processing
```
# Upload any supported file and use:
/process_file
# Supports: .docx, .pptx, .json, .yaml, .py, .js, .md, .log, etc.
```
### Enhanced Help
```
/help_enhanced category: New Features
# Interactive help with categories:
# - New Features, AI Models, Preferences, File Processing, Tips & Tricks
```
## 🔧 Quick Setup for New Features
### For Users
1. Start using the bot normally - all existing features work as before
2. Try `/help_enhanced` to discover new capabilities
3. Use `/preferences view` to see customization options
4. Set your preferences: `/preferences set response_style detailed`
5. Upload different file types to see enhanced processing
### For Administrators
1. No additional setup required - new features are automatically available
2. New MongoDB collection `user_preferences` will be created automatically
3. All existing admin commands work unchanged
4. Monitor usage with enhanced logging
## 💡 Pro Tips
### Getting the Most from Smart Model Selection
- Let the bot auto-select models by enabling: `/preferences set auto_model_selection true`
- For coding tasks, the bot will automatically use `openai/gpt-4o` or `openai/o1-preview`
- For complex reasoning, it switches to `openai/o1-preview` or `openai/o1`
- For quick questions, it uses the efficient `openai/gpt-4o-mini`
### Optimizing Your Experience
- Set response style based on your needs:
- `balanced`: Good for most situations
- `concise`: Quick, to-the-point responses
- `detailed`: Comprehensive, in-depth answers
- Enable conversation summaries to maintain context in long chats
- Use `/conversation_stats` to monitor token usage and conversation health
### File Processing Power
- Upload Word documents for content analysis and summarization
- Process code files to get syntax analysis and documentation
- Analyze JSON/YAML files for structure insights
- Upload log files for error detection and analysis
## 🛠 Dependencies for Optional Features
Some enhanced file processing features require additional packages:
```bash
# For Word document processing
pip install python-docx
# For PowerPoint processing
pip install python-pptx
# For Markdown processing with enhanced features
pip install markdown beautifulsoup4
# For YAML processing
pip install pyyaml
```
These are optional - the bot works without them, but installs them for enhanced capabilities.
## 🔍 Troubleshooting
### If Smart Model Selection Isn't Working
- Check `/preferences view` to ensure `auto_model_selection` is `true`
- Make sure you're using a supported model in your preferences
- Try `/smart_model` command directly to test the feature
### If Preferences Aren't Saving
- Ensure MongoDB connection is working
- Check bot logs for database errors
- Try `/preferences reset` and set preferences again
### If File Processing Fails
- Check file size (max 10MB for `/process_file`)
- Verify file type is supported with the error message
- For PDF files, use the regular file upload (existing feature)
## 📊 Monitoring and Analytics
### User Analytics (if opted in)
- Token usage tracking per user
- Model selection patterns
- Feature adoption metrics
- Conversation length statistics
### Admin Monitoring
- Enhanced logging for all new features
- Database performance metrics
- User preference distributions
- Error rates and handling
## 🔐 Privacy and Security
### What's Stored
- User preferences (customizable settings)
- Conversation summaries (when enabled)
- File processing metadata (temporary)
- Usage statistics (if opted in)
### What's Protected
- All existing blacklist/whitelist protections apply
- User preferences are private to each user
- File content is processed temporarily and not permanently stored
- Analytics can be disabled via preferences
### Data Control
- Users can reset preferences anytime: `/preferences reset`
- Conversation summaries can be disabled: `/preferences set enable_conversation_summary false`
- Analytics can be opted out: `/preferences set analytics_opt_in false`
## 🚀 What's Next
The enhanced architecture enables future improvements:
- Advanced AI agents for specialized tasks
- Integration with external productivity tools
- Plugin system for custom user extensions
- Enhanced collaboration features
- More sophisticated analytics and insights
Try the new features and provide feedback to help guide future development!

View File

@@ -6,6 +6,10 @@ Welcome to **ChatGPT Discord Bot**! This bot provides a powerful AI assistant fo
## Features
- **Advanced AI Conversations**: Uses OpenAI's latest models (including openai/gpt-4o) for natural language interactions
- **🆕 Smart Model Selection**: Automatically suggests the best AI model based on task type (coding, reasoning, creative, etc.)
- **🆕 User Preferences**: Comprehensive personalization system for customizing bot behavior and settings
- **🆕 Conversation Management**: Intelligent context management with automatic summarization for long conversations
- **🆕 Enhanced File Processing**: Support for Word docs, PowerPoint, code files, and many more formats beyond PDF/CSV
- **Image Generation**: Creates custom images from text prompts using Runware's API
- **Data Analysis**: Analyzes CSV and Excel files with visualizations (distributions, correlations, box plots, etc.)
- **Code Interpretation**: Executes Python code for calculations and data processing
@@ -15,6 +19,7 @@ Welcome to **ChatGPT Discord Bot**! This bot provides a powerful AI assistant fo
- **Web Scraping**: Extracts and summarizes content from websites
- **PDF Analysis**: Processes and analyzes PDF documents
- **User Statistics**: Tracks token usage and model selection per user
- **🆕 Enhanced Help System**: Interactive help with feature discovery and detailed guides
- **Dockerized Deployment**: Ready for easy deployment with Docker
- **Automated CI/CD**: Integrated with GitHub Actions
@@ -121,14 +126,55 @@ To get started, ensure you have:
## Usage
Once the bot is running, it connects to Discord using credentials from `.env`. Available features include:
## 🆕 What's New - Enhanced Features
This bot now includes several powerful enhancements to improve your experience:
### 🧠 Smart Model Selection
The bot automatically analyzes your request and suggests the best AI model:
- **Coding tasks** → `openai/gpt-4o` or `openai/o1-preview`
- **Complex reasoning** → `openai/o1-preview` or `openai/o1`
- **Creative writing** → `openai/gpt-4o` or `openai/gpt-4o-mini`
- **Quick questions** → `openai/gpt-4o-mini`
### ⚙️ Personalization
Customize your bot experience with `/preferences`:
- Set your preferred AI model
- Choose response style (balanced, concise, detailed)
- Enable/disable automatic features
- Configure language and timezone preferences
### 📊 Conversation Management
- Automatic conversation summarization for long chats
- Token usage monitoring with `/conversation_stats`
- Smart context management to maintain conversation quality
### 📁 Enhanced File Processing
Process many more file types with `/process_file`:
- **Documents**: Word (.docx), PowerPoint (.pptx), Markdown
- **Data**: JSON, YAML, advanced CSV/Excel analysis
- **Code**: Python, JavaScript, HTML, CSS, and more
- **Logs**: Error analysis and insights
For detailed information, see [ENHANCEMENTS.md](ENHANCEMENTS.md) or use `/help_enhanced` in Discord.
### Text Commands
- **Normal chat**: Ping the bot with a question or send a DM to start a conversation
- **🆕 Smart Model Selection**: The bot automatically suggests the best AI model for your task
- **Image Generation**: `/generate prompt: "A futuristic cityscape"`
- **Web Content**: `/web url: "https://example.com"`
- **Google Search**: `/search prompt: "latest news in Vietnam"`
- **🆕 Personalization**: `/preferences set response_style detailed` to customize your experience
- **🆕 File Processing**: `/process_file` to analyze Word docs, PowerPoint, code files, and more
- **🆕 Smart Help**: `/help_enhanced` for detailed feature discovery and guides
- **User Statistics**: `/user_stat` - Get your token usage and model information
- **🆕 Conversation Stats**: `/conversation_stats` - Monitor your conversation health and token usage
### Advanced Features
- **🆕 Smart Model Selection**: Automatically chooses the optimal AI model based on your task type
- **🆕 User Preferences**: Customize response style, enable/disable features, set default models
- **🆕 Conversation Management**: Automatic summarization of long conversations to maintain context
- **🆕 Enhanced File Support**: Process Word documents, PowerPoint presentations, code files, JSON, YAML, and more
- **Data Analysis**: Upload CSV or Excel files for automatic analysis and visualization
- **Code Execution**: The bot can execute Python code to solve problems or create visualizations
- **Reminders**: Ask the bot to set reminders like "Remind me to check email in 30 minutes"

View File

@@ -0,0 +1,6 @@
2025-07-29 13:49:40,549 - code_utils - INFO - Initialized data directory at /home/runner/work/ChatGPT-Discord-Bot/ChatGPT-Discord-Bot/src/temp_data_files
2025-07-29 13:49:40,549 - code_utils - INFO - Initialized data directory at /home/runner/work/ChatGPT-Discord-Bot/ChatGPT-Discord-Bot/src/temp_data_files
2025-07-29 13:49:40,674 - root - INFO - Database handler initialized
2025-07-29 13:49:55,768 - root - INFO - Database handler initialized
2025-07-29 13:50:10,797 - root - INFO - Database handler initialized
2025-07-29 13:50:25,825 - root - INFO - Database handler initialized

View File

@@ -6,11 +6,15 @@ import io
import asyncio
from typing import Optional, Dict, List, Any, Callable
from src.config.config import MODEL_OPTIONS, PDF_ALLOWED_MODELS
from src.utils.image_utils import ImageGenerator
from src.utils.web_utils import google_custom_search, scrape_web_content
from src.utils.pdf_utils import process_pdf, send_response
from src.utils.openai_utils import prepare_file_from_path
from src.config.config import MODEL_OPTIONS, PDF_ALLOWED_MODELS
from src.utils.image_utils import ImageGenerator
from src.utils.web_utils import google_custom_search, scrape_web_content
from src.utils.pdf_utils import process_pdf, send_response
from src.utils.openai_utils import prepare_file_from_path
from src.utils.model_selector import model_selector
from src.utils.user_preferences import UserPreferences
from src.utils.conversation_manager import ConversationSummarizer
from src.utils.enhanced_file_processor import enhanced_file_processor
# Dictionary to keep track of user requests and their cooldowns
user_requests = {}
@@ -27,7 +31,11 @@ def setup_commands(bot: commands.Bot, db_handler, openai_client, image_generator
openai_client: OpenAI client instance
image_generator: Image generator instance
"""
tree = bot.tree
tree = bot.tree
# Initialize enhancement utilities
user_prefs_manager = UserPreferences(db_handler)
conversation_summarizer = ConversationSummarizer(openai_client, db_handler)
def check_blacklist():
"""Decorator to check if a user is blacklisted before executing a command."""
@@ -356,20 +364,29 @@ def setup_commands(bot: commands.Bot, db_handler, openai_client, image_generator
await process_request(interaction, process_user_stat)
@tree.command(name="help", description="Display a list of available commands.")
@check_blacklist()
async def help_command(interaction: discord.Interaction):
"""Sends a list of available commands to the user."""
help_message = (
"**Available commands:**\n"
"/choose_model - Select which AI model to use for responses (openai/gpt-4o, openai/gpt-4o-mini, openai/o1-preview, openai/o1-mini).\n"
"/search `<query>` - Search Google and send results to the AI model.\n"
"/web `<url>` - Scrape a webpage and send the data to the AI model.\n"
"/generate `<prompt>` - Generate an image from a text prompt.\n"
"/reset - Reset your chat history.\n"
"/user_stat - Get information about your input tokens, output tokens, and current model.\n"
"/help - Display this help message.\n"
)
@tree.command(name="help", description="Display a list of available commands.")
@check_blacklist()
async def help_command(interaction: discord.Interaction):
"""Sends a list of available commands to the user."""
help_message = (
"**Available commands:**\n"
"/choose_model - Select which AI model to use for responses (openai/gpt-4o, openai/gpt-4o-mini, openai/o1-preview, openai/o1-mini).\n"
"/search `<query>` - Search Google and send results to the AI model.\n"
"/web `<url>` - Scrape a webpage and send the data to the AI model.\n"
"/generate `<prompt>` - Generate an image from a text prompt.\n"
"/reset - Reset your chat history.\n"
"/user_stat - Get information about your input tokens, output tokens, and current model.\n"
"/help - Display this help message.\n"
"\n"
"**🆕 New Enhanced Features:**\n"
"/smart_model `<task>` - Get AI model recommendations for your task.\n"
"/preferences `<action>` - Manage your personal settings and preferences.\n"
"/conversation_stats - View your conversation statistics and health.\n"
"/process_file `<file>` - Process various file types (Word, Excel, code, etc.).\n"
"/help_enhanced - Detailed help with feature discovery.\n"
"\n"
"💡 **Try `/help_enhanced` for detailed guides and tips!**"
)
await interaction.response.send_message(help_message, ephemeral=True)
@tree.command(name="stop", description="Stop any process or queue of the user. Admins can stop other users' tasks by providing their ID.")
@@ -459,39 +476,497 @@ def setup_commands(bot: commands.Bot, db_handler, openai_client, image_generator
except ValueError:
await interaction.response.send_message("Invalid user ID. Please provide a valid Discord user ID.", ephemeral=True)
# Helper function to stop user tasks
async def stop_user_tasks(user_id: int):
"""Stop all tasks for a specific user."""
logging.info(f"Stopping all tasks for user {user_id}")
# Cancel all active tasks in user_tasks
if user_id in user_tasks:
for task in user_tasks[user_id]:
try:
task.cancel()
logging.info(f"Cancelled task for user {user_id}")
except Exception as e:
logging.error(f"Error cancelling task: {str(e)}")
user_tasks[user_id] = []
# Clear any queued requests
if user_id in user_requests:
queue_size = user_requests[user_id]['queue'].qsize()
while not user_requests[user_id]['queue'].empty():
try:
user_requests[user_id]['queue'].get_nowait()
user_requests[user_id]['queue'].task_done()
except Exception as e:
logging.error(f"Error clearing queue: {str(e)}")
logging.info(f"Cleared {queue_size} queued requests for user {user_id}")
# Also notify the message handler to stop any running PDF processes
# This is important for PDF batch processing which might be running in separate tasks
try:
# Import here to avoid circular imports
from src.module.message_handler import MessageHandler
if hasattr(MessageHandler, 'stop_user_tasks'):
await MessageHandler.stop_user_tasks(user_id)
logging.info(f"Called MessageHandler.stop_user_tasks for user {user_id}")
except Exception as e:
logging.error(f"Error stopping message handler tasks: {str(e)}")
# Helper function to stop user tasks
async def stop_user_tasks(user_id: int):
"""Stop all tasks for a specific user."""
logging.info(f"Stopping all tasks for user {user_id}")
# Cancel all active tasks in user_tasks
if user_id in user_tasks:
for task in user_tasks[user_id]:
try:
task.cancel()
logging.info(f"Cancelled task for user {user_id}")
except Exception as e:
logging.error(f"Error cancelling task: {str(e)}")
user_tasks[user_id] = []
# Clear any queued requests
if user_id in user_requests:
queue_size = user_requests[user_id]['queue'].qsize()
while not user_requests[user_id]['queue'].empty():
try:
user_requests[user_id]['queue'].get_nowait()
user_requests[user_id]['queue'].task_done()
except Exception as e:
logging.error(f"Error clearing queue: {str(e)}")
logging.info(f"Cleared {queue_size} queued requests for user {user_id}")
# Also notify the message handler to stop any running PDF processes
# This is important for PDF batch processing which might be running in separate tasks
try:
# Import here to avoid circular imports
from src.module.message_handler import MessageHandler
if hasattr(MessageHandler, 'stop_user_tasks'):
await MessageHandler.stop_user_tasks(user_id)
logging.info(f"Called MessageHandler.stop_user_tasks for user {user_id}")
except Exception as e:
logging.error(f"Error stopping message handler tasks: {str(e)}")
# ==================== NEW ENHANCED COMMANDS ====================
@tree.command(name="smart_model", description="Get AI model suggestions based on your task type.")
@app_commands.describe(task="Describe what you want to do")
@check_blacklist()
async def smart_model(interaction: discord.Interaction, task: str):
"""Suggest the best AI model for a specific task."""
await interaction.response.defer(thinking=True, ephemeral=True)
try:
user_id = interaction.user.id
user_prefs = await user_prefs_manager.get_user_preferences(user_id)
# Get model suggestion
suggestion = model_selector.suggest_model_with_alternatives(
task,
user_prefs.get('preferred_model')
)
# Format response
response_lines = [
f"**🎯 Smart Model Suggestion for:** `{task[:100]}{'...' if len(task) > 100 else ''}`",
"",
f"**📋 Task Type Detected:** `{suggestion['task_type'].title()}`",
"",
f"**🤖 Recommended Model:** `{suggestion['suggested_model']}`",
f"**💡 Why:** {suggestion['reason']}",
f"**📝 Details:** {suggestion['explanation']}",
""
]
if suggestion['alternatives']:
response_lines.append("**🔄 Alternative Models:**")
for alt in suggestion['alternatives']:
response_lines.append(f" • `{alt['model']}` - {alt['explanation']}")
response_lines.append("")
response_lines.extend([
"*💡 Tip: Use `/preferences set preferred_model` to set a default model*",
"*🔧 Use `/choose_model` to select a model for your conversations*"
])
await interaction.followup.send("\n".join(response_lines), ephemeral=True)
except Exception as e:
await interaction.followup.send(f"Error analyzing task: {str(e)}", ephemeral=True)
@tree.command(name="preferences", description="Manage your personal bot preferences and settings.")
@app_commands.describe(
action="Action to perform",
setting="Setting to modify",
value="New value for the setting"
)
@app_commands.choices(action=[
app_commands.Choice(name="view", value="view"),
app_commands.Choice(name="set", value="set"),
app_commands.Choice(name="reset", value="reset")
])
@check_blacklist()
async def preferences(
interaction: discord.Interaction,
action: str,
setting: str = None,
value: str = None
):
"""Manage user preferences."""
await interaction.response.defer(thinking=True, ephemeral=True)
try:
user_id = interaction.user.id
if action == "view":
prefs = await user_prefs_manager.get_user_preferences(user_id)
formatted_prefs = user_prefs_manager.format_preferences_display(prefs)
await interaction.followup.send(formatted_prefs, ephemeral=True)
elif action == "set":
if not setting or not value:
await interaction.followup.send(
"❌ Please provide both setting and value.\n"
"Example: `/preferences set response_style detailed`\n\n"
"Available settings:\n"
"• `preferred_model` - Your default AI model\n"
"• `response_style` - balanced, concise, detailed\n"
"• `auto_model_selection` - true, false\n"
"• `show_model_suggestions` - true, false\n"
"• `enable_conversation_summary` - true, false\n"
"• `max_response_length` - short, medium, long\n"
"• `language` - auto, en, es, fr, de, etc.\n"
"• `timezone` - Your timezone (e.g., America/New_York)",
ephemeral=True
)
return
# Convert string values to appropriate types
if value.lower() in ['true', 'false']:
value = value.lower() == 'true'
success = await user_prefs_manager.set_preference(user_id, setting, value)
if success:
await interaction.followup.send(
f"✅ Successfully updated `{setting}` to `{value}`",
ephemeral=True
)
else:
await interaction.followup.send(
f"❌ Failed to update setting. Please check the setting name and value.",
ephemeral=True
)
elif action == "reset":
success = await user_prefs_manager.reset_preferences(user_id)
if success:
await interaction.followup.send(
"✅ Your preferences have been reset to defaults.",
ephemeral=True
)
else:
await interaction.followup.send(
"❌ Failed to reset preferences.",
ephemeral=True
)
except Exception as e:
logging.error(f"Error in preferences command: {str(e)}")
await interaction.followup.send(f"❌ Error managing preferences: {str(e)}", ephemeral=True)
@tree.command(name="conversation_stats", description="Get statistics about your current conversation.")
@check_blacklist()
async def conversation_stats(interaction: discord.Interaction):
"""Show conversation statistics."""
await interaction.response.defer(thinking=True, ephemeral=True)
try:
user_id = interaction.user.id
history = await db_handler.get_history(user_id)
if not history:
await interaction.followup.send(
"📊 **Conversation Statistics**\n\n"
"No conversation history found. Start chatting to see statistics!",
ephemeral=True
)
return
stats = await conversation_summarizer.get_conversation_stats(history)
response_lines = [
"📊 **Your Conversation Statistics**",
"",
f"💬 **Total Messages:** {stats['total_messages']}",
f"👤 **Your Messages:** {stats['user_messages']}",
f"🤖 **Bot Responses:** {stats['assistant_messages']}",
f"📝 **Summaries:** {stats['summary_messages']}",
"",
f"🔤 **Token Usage:** {stats['total_tokens']:,} tokens",
f"📏 **Context Limit:** {stats['token_limit']:,} tokens",
"",
f"📊 **Status:** {'⚠️ Needs summarization' if stats['needs_summary'] else '✅ Within limits'}",
"",
"*💡 Long conversations are automatically summarized to maintain context quality*"
]
await interaction.followup.send("\n".join(response_lines), ephemeral=True)
except Exception as e:
logging.error(f"Error getting conversation stats: {str(e)}")
await interaction.followup.send(f"❌ Error getting statistics: {str(e)}", ephemeral=True)
@tree.command(name="process_file", description="Process and analyze various file types (documents, data, code).")
@app_commands.describe(file="Upload a file to process and analyze")
@check_blacklist()
async def process_file(interaction: discord.Interaction, file: discord.Attachment):
"""Process various file types with enhanced capabilities."""
await interaction.response.defer(thinking=True)
async def process_uploaded_file(interaction: discord.Interaction, file: discord.Attachment):
try:
# Check if file type is supported
if not enhanced_file_processor.is_supported(file.filename):
supported_types = ", ".join(enhanced_file_processor.get_supported_extensions())
await interaction.followup.send(
f"❌ **Unsupported file type:** `{file.filename}`\n\n"
f"**Supported types:** {supported_types}\n\n"
"*💡 Tip: For PDF files, use the regular file upload feature*"
)
return
# Check file size (limit to 10MB)
if file.size > 10 * 1024 * 1024: # 10MB
await interaction.followup.send(
f"❌ **File too large:** {file.size / (1024*1024):.1f}MB\n"
"Maximum supported size: 10MB"
)
return
# Download and process the file
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(file.filename).suffix) as tmp_file:
await file.save(tmp_file.name)
# Process the file
result = await enhanced_file_processor.process_file(tmp_file.name, file.filename)
# Clean up
os.unlink(tmp_file.name)
if not result['success']:
await interaction.followup.send(f"❌ **Processing failed:** {result['error']}")
return
# Format the response
metadata = result['metadata']
content = result['content']
response_lines = [
f"📄 **File Analysis: {metadata['filename']}**",
"",
f"📊 **File Info:**",
f" • Type: `{metadata['type'].upper()}`",
f" • Size: `{metadata['size_human']}`",
f" • Processor: `{metadata['processor']}`",
""
]
# Add type-specific metadata
if metadata['type'] == 'csv':
response_lines.extend([
f"📈 **Data Info:**",
f" • Rows: `{metadata['rows']}`",
f" • Columns: `{metadata['columns']}`",
f" • Has null values: `{'Yes' if metadata['has_null_values'] else 'No'}`",
""
])
elif metadata['type'] == 'excel':
response_lines.extend([
f"📊 **Excel Info:**",
f" • Sheets: `{metadata['sheet_count']}`",
f" • Sheet names: `{', '.join(metadata['sheets'])}`",
""
])
elif metadata['type'] == 'code':
response_lines.extend([
f"💻 **Code Info:**",
f" • Language: `{metadata['language']}`",
f" • Total lines: `{metadata['total_lines']}`",
f" • Code lines: `{metadata['code_lines']}`",
f" • Comment lines: `{metadata['comment_lines']}`",
""
])
response_text = "\n".join(response_lines)
# Send response with file content
if len(content) <= 1500: # Show content directly if short
response_text += f"**📝 Content Preview:**\n```\n{content[:1500]}\n```"
await interaction.followup.send(response_text)
else:
# Send as file attachment if too long
content_file = io.BytesIO(content.encode('utf-8'))
discord_file = discord.File(content_file, filename=f"processed_{file.filename}.txt")
response_text += "*📎 Full content attached as file*"
await interaction.followup.send(response_text, file=discord_file)
except Exception as e:
logging.error(f"Error processing file {file.filename}: {str(e)}")
await interaction.followup.send(f"❌ **Error processing file:** {str(e)}")
await process_request(interaction, process_uploaded_file, file)
@tree.command(name="help_enhanced", description="Discover advanced features and get detailed help.")
@app_commands.describe(category="Help category to explore")
@app_commands.choices(category=[
app_commands.Choice(name="🆕 New Features", value="new"),
app_commands.Choice(name="🤖 AI Models", value="models"),
app_commands.Choice(name="⚙️ Preferences", value="preferences"),
app_commands.Choice(name="📁 File Processing", value="files"),
app_commands.Choice(name="🔧 All Commands", value="all"),
app_commands.Choice(name="💡 Tips & Tricks", value="tips")
])
@check_blacklist()
async def help_enhanced(interaction: discord.Interaction, category: str = "all"):
"""Enhanced help system with feature discovery."""
await interaction.response.defer(thinking=True, ephemeral=True)
try:
if category == "new":
help_text = """
🆕 **New Enhanced Features**
**🧠 Smart Model Selection**
• `/smart_model` - Get AI model recommendations based on your task
• Automatically suggests the best model for coding, analysis, creative tasks, etc.
**⚙️ Personalization**
• `/preferences` - Customize your bot experience
• Set preferred models, response styles, and behavior
• Auto-conversation summarization to maintain context
**📊 Analytics**
• `/conversation_stats` - See your conversation statistics
• Track token usage and conversation health
**📁 Enhanced File Processing**
• `/process_file` - Support for Word docs, Excel, PowerPoint, code files, etc.
• Better analysis and content extraction
**💡 Smarter Conversations**
• Automatic conversation summarization for long chats
• Better context management and memory
"""
elif category == "models":
help_text = """
🤖 **AI Model Guide**
**🧠 Reasoning Models (Best for complex problems):**
• `openai/o1-preview` - Advanced reasoning and step-by-step problem solving
• `openai/o1` - Enhanced reasoning for analytical tasks
• `openai/o1-mini` - Fast reasoning for structured problems
**🎯 Balanced Models (Great for everything):**
• `openai/gpt-4o` - Excellent for coding, analysis, creativity
• `openai/gpt-4o-mini` - Fast and efficient for general tasks
**⚡ Speed Models:**
• `openai/gpt-4.1-mini` - Compact with great performance
• `openai/gpt-4.1-nano` - Ultra-fast for simple tasks
**🔧 How to Choose:**
• Use `/smart_model` to get suggestions based on your task
• Set a default with `/preferences set preferred_model <model>`
• Use `/choose_model` to select for current conversation
"""
elif category == "preferences":
help_text = """
⚙️ **Preferences System**
**📋 Available Settings:**
• `preferred_model` - Your default AI model
• `auto_model_selection` - Enable smart model suggestions
• `response_style` - balanced, concise, detailed
• `show_model_suggestions` - Show why a model was chosen
• `enable_conversation_summary` - Auto-summarize long chats
• `max_response_length` - short, medium, long
• `language` - Response language (auto-detect or specific)
• `timezone` - For reminders and timestamps
**🔧 Commands:**
• `/preferences view` - See all your settings
• `/preferences set <setting> <value>` - Change a setting
• `/preferences reset` - Reset to defaults
**💡 Examples:**
• `/preferences set response_style detailed`
• `/preferences set preferred_model openai/gpt-4o`
• `/preferences set auto_model_selection true`
"""
elif category == "files":
help_text = f"""
📁 **Enhanced File Processing**
**📄 Supported File Types:**
{', '.join([f'`{ext}`' for ext in enhanced_file_processor.get_supported_extensions()])}
**🔧 Features:**
• **Documents:** Word (.docx), PowerPoint (.pptx), PDF
• **Data:** CSV, Excel (.xlsx), JSON, YAML
• **Code:** Python, JavaScript, HTML, CSS, and more
• **Text:** Markdown, plain text, logs
**💡 How to Use:**
• `/process_file` - Upload and analyze any supported file
• Regular file upload - For PDF analysis (existing feature)
• Drag & drop files in chat for automatic processing
**📊 What You Get:**
• Content extraction and analysis
• Metadata and statistics
• Structure analysis for data files
• Code metrics for programming files
"""
elif category == "tips":
help_text = """
💡 **Tips & Tricks**
**🎯 Getting Better Results:**
• Be specific about your task type for better model suggestions
• Use `/smart_model` before complex tasks to get the right model
• Set preferences once to customize your experience
**⚡ Efficiency Tips:**
• Enable auto-model selection for optimal performance
• Use conversation summaries for long discussions
• Check `/conversation_stats` to monitor context usage
**🔧 Power User Features:**
• Combine multiple file types in analysis
• Use preferences to match your workflow
• Try different response styles for different tasks
**📊 Monitoring:**
• Use `/user_stat` for token usage tracking
• Check `/conversation_stats` for conversation health
• Monitor your preferences with `/preferences view`
**🎨 Creative Workflows:**
• Use detailed response style for creative writing
• Try different models for different creative tasks
• Experiment with image generation prompts
"""
else: # "all"
help_text = """
🔧 **Complete Command Reference**
**💬 Core Chat Commands:**
• `/choose_model` - Select AI model for responses
• `/search <query>` - Google search with AI analysis
• `/web <url>` - Scrape and analyze web content
• `/generate <prompt>` - Generate images from text
• `/reset` - Clear conversation history
**🆕 Enhanced Features:**
• `/smart_model <task>` - Get model recommendations
• `/preferences <action>` - Manage personal settings
• `/conversation_stats` - View conversation analytics
• `/process_file <file>` - Analyze various file types
• `/help_enhanced` - This detailed help system
**📊 Statistics & Info:**
• `/user_stat` - Your token usage and current model
• `/help` - Basic command list
**👑 Admin Commands:**
• `/whitelist_add/remove` - Manage PDF whitelist
• `/blacklist_add/remove` - Manage user access
• `/stop` - Stop user processes
**💡 Pro Tips:**
• Upload files directly in chat for automatic processing
• Use @ mentions to get the bot's attention
• Try different models for different types of tasks
• Set your preferences for a personalized experience
"""
await interaction.followup.send(help_text, ephemeral=True)
except Exception as e:
logging.error(f"Error in help_enhanced: {str(e)}")
await interaction.followup.send("❌ Error loading help content.", ephemeral=True)

Binary file not shown.

View File

@@ -6,34 +6,35 @@ from datetime import datetime, timedelta
import logging
import re
class DatabaseHandler:
# Class-level cache for database results
_cache = {}
_cache_expiry = {}
_cache_lock = asyncio.Lock()
def __init__(self, mongodb_uri: str):
"""Initialize database connection with optimized settings"""
# Set up a connection pool with sensible timeouts
self.client = AsyncIOMotorClient(
mongodb_uri,
maxIdleTimeMS=45000,
connectTimeoutMS=10000,
serverSelectionTimeoutMS=15000,
waitQueueTimeoutMS=5000,
socketTimeoutMS=30000,
retryWrites=True
class DatabaseHandler:
def __init__(self, mongodb_uri: str):
"""Initialize database connection with optimized settings"""
# Instance-level cache for database results
self.cache = {}
self.cache_expiry = {}
self.cache_lock = asyncio.Lock()
# Set up a connection pool with sensible timeouts
self.client = AsyncIOMotorClient(
mongodb_uri,
maxIdleTimeMS=45000,
connectTimeoutMS=10000,
serverSelectionTimeoutMS=15000,
waitQueueTimeoutMS=5000,
socketTimeoutMS=30000,
retryWrites=True
)
self.db = self.client['chatgpt_discord_bot'] # Database name
# Collections
self.users_collection = self.db.users
self.history_collection = self.db.history
self.admin_collection = self.db.admin
self.blacklist_collection = self.db.blacklist
self.whitelist_collection = self.db.whitelist
self.logs_collection = self.db.logs
self.reminders_collection = self.db.reminders
# Collections
self.users_collection = self.db.users
self.history_collection = self.db.history
self.admin_collection = self.db.admin
self.blacklist_collection = self.db.blacklist
self.whitelist_collection = self.db.whitelist
self.logs_collection = self.db.logs
self.reminders_collection = self.db.reminders
self.user_preferences_collection = self.db.user_preferences # New collection for preferences
logging.info("Database handler initialized")
@@ -42,20 +43,20 @@ class DatabaseHandler:
"""Get result from cache or execute fetch_func if not cached/expired"""
current_time = datetime.now()
# Check if we have a cached result that's still valid
async with self._cache_lock:
if (cache_key in self._cache and
cache_key in self._cache_expiry and
current_time < self._cache_expiry[cache_key]):
return self._cache[cache_key]
# Not in cache or expired, fetch new result
result = await fetch_func()
# Cache the new result
async with self._cache_lock:
self._cache[cache_key] = result
self._cache_expiry[cache_key] = current_time + timedelta(seconds=expiry_seconds)
# Check if we have a cached result that's still valid
async with self.cache_lock:
if (cache_key in self.cache and
cache_key in self.cache_expiry and
current_time < self.cache_expiry[cache_key]):
return self.cache[cache_key]
# Not in cache or expired, fetch new result
result = await fetch_func()
# Cache the new result
async with self.cache_lock:
self.cache[cache_key] = result
self.cache_expiry[cache_key] = current_time + timedelta(seconds=expiry_seconds)
return result

View File

@@ -15,10 +15,14 @@ import base64
import traceback
import tiktoken
from datetime import datetime, timedelta
from pathlib import Path
from src.utils.openai_utils import process_tool_calls, prepare_messages_for_api, get_tools_for_model
from src.utils.pdf_utils import process_pdf, send_response
from src.utils.code_utils import extract_code_blocks
from src.utils.reminder_utils import ReminderManager
from src.utils.model_selector import model_selector
from src.utils.user_preferences import UserPreferences
from src.utils.conversation_manager import ConversationSummarizer
from src.config.config import PDF_ALLOWED_MODELS, MODEL_TOKEN_LIMITS, DEFAULT_TOKEN_LIMIT
# Global task and rate limiting tracking
@@ -81,6 +85,10 @@ class MessageHandler:
# Initialize reminder manager
self.reminder_manager = ReminderManager(bot, db_handler)
# Initialize enhancement utilities
self.user_prefs_manager = UserPreferences(db_handler)
self.conversation_summarizer = ConversationSummarizer(openai_client, db_handler)
# Tool mapping for API integration
self.tool_mapping = {
"google_search": self._google_search,
@@ -823,6 +831,63 @@ class MessageHandler:
if isinstance(current_message, str):
current_message = {"role": "user", "content": current_message}
# ===========================================
# ENHANCED FEATURES INTEGRATION
# ===========================================
# Get user preferences
user_prefs = await self.user_prefs_manager.get_user_preferences(user_id)
# Smart model selection if enabled
if user_prefs.get('auto_model_selection', True):
# Extract text content for analysis
user_text = ""
if isinstance(current_message.get('content'), list):
for part in current_message['content']:
if isinstance(part, dict) and part.get('type') == 'text':
user_text += part.get('text', '') + " "
elif isinstance(current_message.get('content'), str):
user_text = current_message['content']
# Get smart model suggestion
if user_text.strip():
suggested_model, reason = model_selector.suggest_model(
user_text,
user_prefs.get('preferred_model')
)
# Use suggested model if different from current and user hasn't explicitly set one
if (suggested_model != model and
not user_prefs.get('preferred_model') and
suggested_model in ['openai/gpt-4o', 'openai/gpt-4o-mini', 'openai/o1-preview', 'openai/o1-mini']):
old_model = model
model = suggested_model
# Optionally notify user about model switch
if user_prefs.get('show_model_suggestions', True):
try:
await message.channel.send(
f"🧠 **Smart Model Selection:** Switched to `{model}` for this task.\n"
f"💡 **Reason:** {reason}\n"
f"*Use `/preferences set auto_model_selection false` to disable this feature*",
ephemeral=True
)
except:
# If ephemeral fails, just log it
logging.info(f"Auto-selected model {model} for user {user_id}: {reason}")
# Conversation management with summarization
if user_prefs.get('enable_conversation_summary', True):
# Manage conversation length and summarize if needed
history = await self.conversation_summarizer.manage_conversation_length(user_id, history)
# Update history in database with managed version
await self.db.save_history(user_id, history)
# ===========================================
# ORIGINAL PROCESSING LOGIC (Enhanced)
# ===========================================
# Process messages based on the model's capabilities
messages_for_api = []

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,258 @@
"""
Conversation Summarization Utility
Manages conversation context by creating smart summaries when conversations get too long.
"""
import logging
import tiktoken
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timezone
class ConversationSummarizer:
"""Handles conversation summarization for better context management."""
def __init__(self, openai_client, db_handler):
self.client = openai_client
self.db = db_handler
self.logger = logging.getLogger(__name__)
self.encoding = tiktoken.get_encoding("o200k_base")
# Configuration
self.max_context_tokens = 6000 # When to start summarizing
self.summary_target_tokens = 2000 # Target size for summary
self.min_messages_to_summarize = 4 # Minimum messages before summarizing
def count_tokens(self, text: str) -> int:
"""Count tokens in text."""
try:
return len(self.encoding.encode(str(text)))
except Exception:
# Fallback estimation
return len(str(text)) // 4
def count_conversation_tokens(self, messages: List[Dict[str, Any]]) -> int:
"""Count total tokens in conversation history."""
total_tokens = 0
for message in messages:
content = message.get('content', '')
if isinstance(content, list):
# Handle multimodal content
for part in content:
if isinstance(part, dict) and 'text' in part:
total_tokens += self.count_tokens(part['text'])
else:
total_tokens += self.count_tokens(str(content))
return total_tokens
async def should_summarize(self, messages: List[Dict[str, Any]]) -> bool:
"""
Determine if conversation should be summarized.
Args:
messages: Conversation history
Returns:
bool: Whether to summarize
"""
if len(messages) < self.min_messages_to_summarize:
return False
token_count = self.count_conversation_tokens(messages)
return token_count > self.max_context_tokens
async def create_summary(self, messages: List[Dict[str, Any]], user_id: int) -> Optional[str]:
"""
Create a summary of the conversation.
Args:
messages: Conversation history to summarize
user_id: User ID for context
Returns:
Optional[str]: Summary of the conversation
"""
try:
if len(messages) < 2:
return None
# Prepare messages for summarization
conversation_text = self._format_messages_for_summary(messages)
# Create summary prompt
summary_prompt = """Please create a concise summary of this conversation that preserves:
1. Key topics discussed
2. Important decisions or conclusions reached
3. Ongoing context that might be relevant for future messages
4. User preferences or specific requests mentioned
Keep the summary under 500 words and focus on information that would help continue the conversation naturally.
Conversation to summarize:
""" + conversation_text
# Get user's preferred model for summarization (prefer efficient models)
user_prefs = await self.db.get_user_model(user_id)
summary_model = "openai/gpt-4o-mini" # Use efficient model for summaries
response = await self.client.chat.completions.create(
model=summary_model,
messages=[
{"role": "system", "content": "You are a helpful assistant that creates concise, informative conversation summaries."},
{"role": "user", "content": summary_prompt}
],
temperature=0.3,
max_tokens=800
)
summary = response.choices[0].message.content
self.logger.info(f"Created conversation summary for user {user_id} ({len(messages)} messages)")
return summary
except Exception as e:
self.logger.error(f"Error creating conversation summary: {str(e)}")
return None
def _format_messages_for_summary(self, messages: List[Dict[str, Any]]) -> str:
"""Format messages for summarization."""
formatted_lines = []
for i, message in enumerate(messages):
role = message.get('role', 'unknown')
content = message.get('content', '')
# Handle multimodal content
if isinstance(content, list):
content_parts = []
for part in content:
if isinstance(part, dict):
if 'text' in part:
content_parts.append(part['text'])
elif 'type' in part:
content_parts.append(f"[{part['type']} content]")
content = " ".join(content_parts)
# Truncate very long messages
if len(str(content)) > 1000:
content = str(content)[:1000] + "... [truncated]"
formatted_lines.append(f"{role.upper()}: {content}")
return "\n\n".join(formatted_lines)
async def manage_conversation_length(self, user_id: int, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Manage conversation length by summarizing when needed.
Args:
user_id: User ID
messages: Current conversation history
Returns:
List[Dict[str, Any]]: Managed conversation history
"""
try:
# Check if summarization is needed
if not await self.should_summarize(messages):
return messages
# Check user preferences
try:
from src.utils.user_preferences import UserPreferences
user_prefs_manager = UserPreferences(self.db)
prefs = await user_prefs_manager.get_user_preferences(user_id)
if not prefs.get('enable_conversation_summary', True):
# User disabled summarization, just trim older messages
return self._trim_messages(messages)
except Exception:
# If preferences system fails, continue with summarization
pass
# Find split point (keep recent messages, summarize older ones)
recent_tokens = 0
split_index = len(messages)
# Work backwards to find where to split
for i in range(len(messages) - 1, -1, -1):
message_tokens = self.count_tokens(str(messages[i].get('content', '')))
if recent_tokens + message_tokens > self.summary_target_tokens:
split_index = i + 1
break
recent_tokens += message_tokens
# Don't summarize if we'd only be summarizing a few messages
if split_index >= len(messages) - 2:
return self._trim_messages(messages)
# Split conversation
messages_to_summarize = messages[:split_index]
recent_messages = messages[split_index:]
# Create summary
summary = await self.create_summary(messages_to_summarize, user_id)
if summary:
# Create new conversation starting with summary
summary_message = {
"role": "system",
"content": f"[Conversation Summary] {summary}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"type": "summary"
}
managed_messages = [summary_message] + recent_messages
self.logger.info(f"Summarized {len(messages_to_summarize)} messages into summary for user {user_id}")
return managed_messages
else:
# Fallback to simple trimming if summarization fails
return self._trim_messages(messages)
except Exception as e:
self.logger.error(f"Error managing conversation length: {str(e)}")
return self._trim_messages(messages)
def _trim_messages(self, messages: List[Dict[str, Any]], max_messages: int = 20) -> List[Dict[str, Any]]:
"""
Simple fallback: trim to recent messages.
Args:
messages: Messages to trim
max_messages: Maximum number of messages to keep
Returns:
List[Dict[str, Any]]: Trimmed messages
"""
if len(messages) <= max_messages:
return messages
# Keep the most recent messages
return messages[-max_messages:]
async def get_conversation_stats(self, messages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Get statistics about the conversation.
Args:
messages: Conversation messages
Returns:
Dict[str, Any]: Conversation statistics
"""
total_messages = len(messages)
total_tokens = self.count_conversation_tokens(messages)
user_messages = [m for m in messages if m.get('role') == 'user']
assistant_messages = [m for m in messages if m.get('role') == 'assistant']
summary_messages = [m for m in messages if m.get('type') == 'summary']
return {
"total_messages": total_messages,
"user_messages": len(user_messages),
"assistant_messages": len(assistant_messages),
"summary_messages": len(summary_messages),
"total_tokens": total_tokens,
"needs_summary": total_tokens > self.max_context_tokens,
"token_limit": self.max_context_tokens
}

View File

@@ -0,0 +1,551 @@
"""
Enhanced File Processing Utilities
Supports multiple file types for better document processing capabilities.
"""
import logging
import os
import tempfile
import io
import json
from typing import Dict, List, Any, Optional, Tuple
from pathlib import Path
# Additional imports for file processing
try:
import docx
from docx import Document
DOCX_AVAILABLE = True
except ImportError:
DOCX_AVAILABLE = False
try:
from pptx import Presentation
PPTX_AVAILABLE = True
except ImportError:
PPTX_AVAILABLE = False
try:
import markdown
from bs4 import BeautifulSoup
MARKDOWN_AVAILABLE = True
except ImportError:
MARKDOWN_AVAILABLE = False
class EnhancedFileProcessor:
"""Enhanced file processing with support for multiple formats."""
def __init__(self):
self.logger = logging.getLogger(__name__)
# Supported file extensions and their processors
self.processors = {
# Document formats
'.txt': self._process_text_file,
'.md': self._process_markdown_file,
'.json': self._process_json_file,
'.csv': self._process_csv_file,
'.xlsx': self._process_excel_file,
'.xls': self._process_excel_file,
# Code files
'.py': self._process_code_file,
'.js': self._process_code_file,
'.html': self._process_html_file,
'.css': self._process_code_file,
'.xml': self._process_xml_file,
'.yaml': self._process_yaml_file,
'.yml': self._process_yaml_file,
# Log files
'.log': self._process_log_file,
}
# Add conditional processors
if DOCX_AVAILABLE:
self.processors['.docx'] = self._process_docx_file
if PPTX_AVAILABLE:
self.processors['.pptx'] = self._process_pptx_file
def get_supported_extensions(self) -> List[str]:
"""Get list of supported file extensions."""
return list(self.processors.keys())
def is_supported(self, filename: str) -> bool:
"""Check if file type is supported."""
ext = Path(filename).suffix.lower()
return ext in self.processors
async def process_file(self, file_path: str, original_name: str = None) -> Dict[str, Any]:
"""
Process a file and extract its content and metadata.
Args:
file_path (str): Path to the file
original_name (str): Original filename
Returns:
Dict[str, Any]: Processing result
"""
try:
if not os.path.exists(file_path):
return {
"success": False,
"error": "File not found",
"content": None,
"metadata": {}
}
# Determine file extension
filename = original_name or os.path.basename(file_path)
ext = Path(filename).suffix.lower()
if ext not in self.processors:
return {
"success": False,
"error": f"Unsupported file type: {ext}",
"content": None,
"metadata": {"filename": filename, "extension": ext}
}
# Get file stats
file_stats = os.stat(file_path)
metadata = {
"filename": filename,
"extension": ext,
"size_bytes": file_stats.st_size,
"size_human": self._format_file_size(file_stats.st_size),
"processor": self.processors[ext].__name__
}
# Process the file
processor = self.processors[ext]
content, extra_metadata = await processor(file_path, filename)
metadata.update(extra_metadata)
return {
"success": True,
"error": None,
"content": content,
"metadata": metadata
}
except Exception as e:
self.logger.error(f"Error processing file {file_path}: {str(e)}")
return {
"success": False,
"error": str(e),
"content": None,
"metadata": {"filename": filename if 'filename' in locals() else "unknown"}
}
async def _process_text_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process plain text files."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
metadata = {
"lines": len(content.splitlines()),
"characters": len(content),
"words": len(content.split()),
"type": "text"
}
return content, metadata
except Exception as e:
raise Exception(f"Error reading text file: {str(e)}")
async def _process_markdown_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process Markdown files."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
md_content = f.read()
# Convert to HTML if markdown library is available
html_content = None
if MARKDOWN_AVAILABLE:
try:
html_content = markdown.markdown(md_content, extensions=['tables', 'fenced_code'])
if BeautifulSoup:
soup = BeautifulSoup(html_content, 'html.parser')
headers = [h.get_text() for h in soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])]
links = [a.get('href') for a in soup.find_all('a', href=True)]
else:
headers = []
links = []
except Exception:
headers = []
links = []
else:
headers = []
links = []
metadata = {
"lines": len(md_content.splitlines()),
"characters": len(md_content),
"words": len(md_content.split()),
"type": "markdown",
"headers": headers[:10], # Limit to first 10 headers
"links": links[:20], # Limit to first 20 links
"has_html": html_content is not None
}
# Return both markdown and HTML if available
if html_content:
content = f"# Markdown Content:\n{md_content}\n\n# HTML Preview:\n{html_content}"
else:
content = md_content
return content, metadata
except Exception as e:
raise Exception(f"Error reading markdown file: {str(e)}")
async def _process_json_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process JSON files."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
raw_content = f.read()
json_data = json.loads(raw_content)
# Format JSON for better readability
formatted_json = json.dumps(json_data, indent=2, ensure_ascii=False)
metadata = {
"type": "json",
"structure": self._analyze_json_structure(json_data),
"size_formatted": len(formatted_json),
"is_valid": True
}
return formatted_json, metadata
except json.JSONDecodeError as e:
# Return raw content if JSON is invalid
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
raw_content = f.read()
metadata = {
"type": "json",
"is_valid": False,
"error": str(e)
}
return raw_content, metadata
async def _process_csv_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process CSV files."""
try:
import pandas as pd
# Read CSV with pandas for better analysis
df = pd.read_csv(file_path, encoding='utf-8', errors='ignore', nrows=100) # Limit to first 100 rows
# Generate summary
content_lines = [
f"CSV File Analysis: {filename}",
f"Shape: {df.shape[0]} rows, {df.shape[1]} columns",
"",
"Columns:",
*[f" - {col} ({str(df[col].dtype)})" for col in df.columns],
"",
"First 10 rows:",
df.head(10).to_string(),
"",
"Data Info:",
str(df.describe(include='all').to_string()) if not df.empty else "No data to describe"
]
metadata = {
"type": "csv",
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"data_types": {col: str(dtype) for col, dtype in df.dtypes.items()},
"memory_usage": df.memory_usage(deep=True).sum(),
"has_null_values": df.isnull().any().any()
}
return "\n".join(content_lines), metadata
except Exception as e:
# Fallback to simple text processing
return await self._process_text_file(file_path, filename)
async def _process_excel_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process Excel files."""
try:
import pandas as pd
# Read Excel file
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
content_lines = [f"Excel File Analysis: {filename}", ""]
metadata = {
"type": "excel",
"sheets": sheet_names,
"sheet_count": len(sheet_names)
}
# Process each sheet (limit to first 3 sheets)
for i, sheet_name in enumerate(sheet_names[:3]):
df = pd.read_excel(file_path, sheet_name=sheet_name, nrows=50)
content_lines.extend([
f"Sheet: {sheet_name}",
f"Shape: {df.shape[0]} rows, {df.shape[1]} columns",
f"Columns: {', '.join(df.columns)}",
"",
"First 5 rows:",
df.head(5).to_string(),
"",
"---",
""
])
metadata[f"sheet_{i}"] = {
"name": sheet_name,
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns)
}
return "\n".join(content_lines), metadata
except Exception as e:
raise Exception(f"Error processing Excel file: {str(e)}")
async def _process_docx_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process Word documents."""
if not DOCX_AVAILABLE:
raise Exception("python-docx library not available")
try:
doc = Document(file_path)
# Extract text content
content_lines = []
paragraph_count = 0
for paragraph in doc.paragraphs:
if paragraph.text.strip():
content_lines.append(paragraph.text)
paragraph_count += 1
# Extract tables
table_count = len(doc.tables)
if table_count > 0:
content_lines.append("\n--- TABLES ---")
for i, table in enumerate(doc.tables[:3]): # Limit to first 3 tables
content_lines.append(f"\nTable {i+1}:")
for row in table.rows[:5]: # First 5 rows
row_text = " | ".join([cell.text for cell in row.cells])
content_lines.append(row_text)
content = "\n".join(content_lines)
metadata = {
"type": "docx",
"paragraphs": paragraph_count,
"tables": table_count,
"words": len(content.split()),
"characters": len(content)
}
return content, metadata
except Exception as e:
raise Exception(f"Error processing Word document: {str(e)}")
async def _process_pptx_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process PowerPoint presentations."""
if not PPTX_AVAILABLE:
raise Exception("python-pptx library not available")
try:
prs = Presentation(file_path)
content_lines = [f"PowerPoint Presentation: {filename}", ""]
slide_count = 0
for i, slide in enumerate(prs.slides):
slide_count += 1
content_lines.append(f"--- Slide {i+1} ---")
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
content_lines.append(shape.text)
content_lines.append("")
content = "\n".join(content_lines)
metadata = {
"type": "pptx",
"slides": slide_count,
"words": len(content.split()),
"characters": len(content)
}
return content, metadata
except Exception as e:
raise Exception(f"Error processing PowerPoint file: {str(e)}")
async def _process_code_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process code files."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Basic code analysis
lines = content.splitlines()
non_empty_lines = [line for line in lines if line.strip()]
comment_lines = [line for line in lines if line.strip().startswith(('#', '//', '/*', '*', '<!--'))]
metadata = {
"type": "code",
"language": Path(filename).suffix[1:], # Remove the dot
"total_lines": len(lines),
"code_lines": len(non_empty_lines),
"comment_lines": len(comment_lines),
"characters": len(content)
}
return content, metadata
except Exception as e:
raise Exception(f"Error reading code file: {str(e)}")
async def _process_html_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process HTML files."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
html_content = f.read()
# Extract text content if BeautifulSoup is available
text_content = html_content
if BeautifulSoup:
try:
soup = BeautifulSoup(html_content, 'html.parser')
text_content = soup.get_text()
# Extract metadata
title = soup.find('title')
meta_tags = soup.find_all('meta')
links = soup.find_all('a', href=True)
metadata = {
"type": "html",
"title": title.text if title else "No title",
"meta_count": len(meta_tags),
"link_count": len(links),
"characters": len(text_content),
"has_parsed_content": True
}
except Exception:
metadata = {"type": "html", "has_parsed_content": False}
else:
metadata = {"type": "html", "has_parsed_content": False}
# Return both HTML and text content
content = f"HTML Content:\n{html_content}\n\nExtracted Text:\n{text_content}"
return content, metadata
except Exception as e:
raise Exception(f"Error reading HTML file: {str(e)}")
async def _process_xml_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process XML files."""
return await self._process_text_file(file_path, filename)
async def _process_yaml_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process YAML files."""
try:
import yaml
with open(file_path, 'r', encoding='utf-8') as f:
yaml_content = f.read()
yaml_data = yaml.safe_load(yaml_content)
# Format YAML for better readability
formatted_yaml = yaml.dump(yaml_data, default_flow_style=False, allow_unicode=True)
metadata = {
"type": "yaml",
"structure": self._analyze_json_structure(yaml_data), # Same analysis as JSON
"is_valid": True
}
return formatted_yaml, metadata
except Exception:
# Fallback to text processing
return await self._process_text_file(file_path, filename)
async def _process_log_file(self, file_path: str, filename: str) -> Tuple[str, Dict]:
"""Process log files."""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
lines = f.readlines()
# Analyze log patterns (basic)
error_lines = [line for line in lines if any(keyword in line.lower() for keyword in ['error', 'exception', 'fail'])]
warning_lines = [line for line in lines if any(keyword in line.lower() for keyword in ['warn', 'warning'])]
# Show last 50 lines for recent activity
recent_lines = lines[-50:] if len(lines) > 50 else lines
content = "".join(recent_lines)
metadata = {
"type": "log",
"total_lines": len(lines),
"error_lines": len(error_lines),
"warning_lines": len(warning_lines),
"showing_recent": len(recent_lines)
}
return content, metadata
except Exception as e:
raise Exception(f"Error reading log file: {str(e)}")
def _analyze_json_structure(self, data: Any, max_depth: int = 3) -> Dict[str, Any]:
"""Analyze JSON structure."""
if max_depth <= 0:
return {"type": type(data).__name__, "truncated": True}
if isinstance(data, dict):
return {
"type": "object",
"keys": list(data.keys())[:10], # First 10 keys
"key_count": len(data.keys()),
"sample_values": {k: self._analyze_json_structure(v, max_depth-1)
for k, v in list(data.items())[:3]} # First 3 key-value pairs
}
elif isinstance(data, list):
return {
"type": "array",
"length": len(data),
"sample_items": [self._analyze_json_structure(item, max_depth-1)
for item in data[:3]] if data else []
}
else:
return {"type": type(data).__name__, "value": str(data)[:100]}
def _format_file_size(self, size_bytes: int) -> str:
"""Format file size in human-readable format."""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
# Global instance
enhanced_file_processor = EnhancedFileProcessor()

219
src/utils/model_selector.py Normal file
View File

@@ -0,0 +1,219 @@
"""
Model Selection Utility
Automatically suggests the best AI model based on task type and content analysis.
"""
import re
import logging
from typing import Dict, List, Optional, Tuple
from src.config.config import MODEL_OPTIONS
# Task type patterns and their optimal models
TASK_PATTERNS = {
# Reasoning and complex problem solving
"reasoning": {
"patterns": [
r"solve|calculate|compute|analyze|reason|logic|problem|proof|mathematics?|math|equation",
r"step.by.step|think|explain why|how does|what is the relationship",
r"algorithm|optimization|strategy|plan|approach"
],
"models": ["openai/o1-preview", "openai/o1", "openai/o1-mini", "openai/gpt-4o"]
},
# Code and programming tasks
"coding": {
"patterns": [
r"code|program|script|function|class|debug|refactor|implement",
r"python|javascript|java|c\+\+|html|css|sql|api|framework",
r"bug|error|exception|syntax|compile|deploy|test",
r"```.*```", # Code blocks
r"github|repository|pull request|commit"
],
"models": ["openai/gpt-4o", "openai/o1-preview", "openai/gpt-4o-mini"]
},
# Creative and content generation
"creative": {
"patterns": [
r"write|create|generate|compose|story|poem|article|blog",
r"creative|imagination|fiction|narrative|character|plot",
r"lyrics|song|script|dialogue|monologue",
r"marketing|advertisement|slogan|copy|content"
],
"models": ["openai/gpt-4o", "openai/gpt-4o-mini"]
},
# Data analysis and research
"analysis": {
"patterns": [
r"analyze|analysis|data|statistics|chart|graph|visualization",
r"research|study|findings|conclusions|insights|trends",
r"compare|contrast|evaluate|assess|review|examine",
r"csv|excel|spreadsheet|dataset|metrics|kpi"
],
"models": ["openai/gpt-4o", "openai/o1-preview", "openai/gpt-4o-mini"]
},
# Quick questions and general chat
"general": {
"patterns": [
r"^(hi|hello|hey|what|who|when|where|how|why|can you|please|thanks?)",
r"quick question|simple|brief|short answer|tldr|summary"
],
"models": ["openai/gpt-4o-mini", "openai/gpt-4o"]
},
# Translation and language tasks
"language": {
"patterns": [
r"translate|translation|language|français|español|deutsch|italiano|中文|日本語|한국어",
r"grammar|spelling|proofreading|correct|fix|improve writing"
],
"models": ["openai/gpt-4o", "openai/gpt-4o-mini"]
},
# Image and visual tasks
"visual": {
"patterns": [
r"image|picture|photo|visual|draw|sketch|art|design",
r"generate image|create image|make picture|visualize"
],
"models": ["openai/gpt-4o", "openai/gpt-4o-mini"] # For image generation prompts
}
}
class ModelSelector:
"""Intelligent model selection based on task analysis."""
def __init__(self):
self.logger = logging.getLogger(__name__)
def analyze_task_type(self, content: str) -> str:
"""
Analyze the content to determine the primary task type.
Args:
content (str): The user's input content
Returns:
str: The detected task type
"""
if not content or not isinstance(content, str):
return "general"
content_lower = content.lower()
task_scores = {}
# Score each task type based on pattern matches
for task_type, config in TASK_PATTERNS.items():
score = 0
for pattern in config["patterns"]:
matches = len(re.findall(pattern, content_lower, re.IGNORECASE))
score += matches
# Bonus for longer matches
if score > 0:
score += len(content_lower) / 1000 # Small bonus for longer content
task_scores[task_type] = score
# Return the task type with the highest score
if not task_scores or max(task_scores.values()) == 0:
return "general"
best_task = max(task_scores, key=task_scores.get)
self.logger.debug(f"Task analysis: {task_scores}, selected: {best_task}")
return best_task
def suggest_model(self, content: str, user_preference: Optional[str] = None) -> Tuple[str, str]:
"""
Suggest the best model for the given content.
Args:
content (str): The user's input content
user_preference (Optional[str]): User's preferred model if any
Returns:
Tuple[str, str]: (suggested_model, reason)
"""
# If user has a strong preference and it's available, respect it
if user_preference and user_preference in MODEL_OPTIONS:
return user_preference, f"Using your preferred model: {user_preference}"
# Analyze the task type
task_type = self.analyze_task_type(content)
# Get the recommended models for this task type
if task_type in TASK_PATTERNS:
recommended_models = TASK_PATTERNS[task_type]["models"]
# Find the first available model from recommendations
for model in recommended_models:
if model in MODEL_OPTIONS:
reason = f"Optimized for {task_type} tasks"
return model, reason
# Fallback to default model
default_model = "openai/gpt-4o-mini" # Fast and cost-effective default
return default_model, "Default model for general tasks"
def get_model_explanation(self, model: str) -> str:
"""
Get a user-friendly explanation of what the model is best for.
Args:
model (str): The model name
Returns:
str: Human-readable explanation
"""
explanations = {
"openai/o1-preview": "🧠 Best for complex reasoning, mathematics, and step-by-step problem solving",
"openai/o1": "🧠 Advanced reasoning model for complex analytical tasks",
"openai/o1-mini": "⚡ Fast reasoning model for structured problem solving",
"openai/gpt-4o": "🎯 Balanced model excellent for all tasks including coding, analysis, and creativity",
"openai/gpt-4o-mini": "⚡ Fast and efficient model for general conversations and quick tasks",
"openai/gpt-4.1": "💪 Enhanced model with improved capabilities",
"openai/gpt-4.1-mini": "🚀 Compact version with great performance",
"openai/gpt-4.1-nano": "⚡ Ultra-fast model for simple tasks",
"openai/o3-mini": "🔧 Specialized model for focused tasks",
"openai/o3": "🔬 Advanced model for specialized analysis",
"openai/o4-mini": "🚀 Next-generation compact model"
}
return explanations.get(model, f"AI model: {model}")
def suggest_model_with_alternatives(self, content: str, user_preference: Optional[str] = None) -> Dict[str, any]:
"""
Suggest a model with alternatives and explanations.
Args:
content (str): The user's input content
user_preference (Optional[str]): User's preferred model
Returns:
Dict containing suggestion details
"""
primary_model, reason = self.suggest_model(content, user_preference)
task_type = self.analyze_task_type(content)
# Get alternative models for this task
alternatives = []
if task_type in TASK_PATTERNS:
for model in TASK_PATTERNS[task_type]["models"]:
if model != primary_model and model in MODEL_OPTIONS:
alternatives.append({
"model": model,
"explanation": self.get_model_explanation(model)
})
return {
"suggested_model": primary_model,
"reason": reason,
"task_type": task_type,
"explanation": self.get_model_explanation(primary_model),
"alternatives": alternatives[:2] # Limit to 2 alternatives
}
# Global instance
model_selector = ModelSelector()

View File

@@ -0,0 +1,280 @@
"""
User Preferences System
Manages user-specific preferences and settings for enhanced personalization.
"""
import logging
from typing import Dict, Any, Optional, List
from datetime import datetime, timezone
class UserPreferences:
"""Manages user preferences and settings."""
def __init__(self, db_handler):
self.db = db_handler
self.logger = logging.getLogger(__name__)
# Default preferences
self.default_preferences = {
"preferred_model": None, # Let auto-selection work by default
"auto_model_selection": True, # Enable smart model selection
"response_style": "balanced", # balanced, concise, detailed
"language": "auto", # auto-detect or specific language
"timezone": "UTC",
"show_model_suggestions": True, # Show why a model was chosen
"enable_conversation_summary": True,
"max_response_length": "medium", # short, medium, long
"code_execution_allowed": True,
"image_generation_style": "default",
"notification_reminders": True,
"analytics_opt_in": True, # Allow usage analytics
"theme": "default", # For future UI customization
"created_at": None,
"updated_at": None
}
async def get_user_preferences(self, user_id: int) -> Dict[str, Any]:
"""
Get user preferences, creating defaults if none exist.
Args:
user_id (int): Discord user ID
Returns:
Dict[str, Any]: User preferences
"""
try:
cache_key = f"user_prefs_{user_id}"
async def fetch_preferences():
user_prefs = await self.db.db.user_preferences.find_one({'user_id': user_id})
if user_prefs:
# Merge with defaults to ensure all keys exist
prefs = self.default_preferences.copy()
prefs.update(user_prefs.get('preferences', {}))
return prefs
else:
# Create default preferences
new_prefs = self.default_preferences.copy()
new_prefs['created_at'] = datetime.now(timezone.utc)
new_prefs['updated_at'] = datetime.now(timezone.utc)
await self.db.db.user_preferences.update_one(
{'user_id': user_id},
{'$set': {'preferences': new_prefs}},
upsert=True
)
return new_prefs
return await self.db._get_cached_result(cache_key, fetch_preferences, 300) # 5 min cache
except Exception as e:
self.logger.error(f"Error getting user preferences for {user_id}: {str(e)}")
return self.default_preferences.copy()
async def update_user_preferences(self, user_id: int, preferences: Dict[str, Any]) -> bool:
"""
Update user preferences.
Args:
user_id (int): Discord user ID
preferences (Dict[str, Any]): Preferences to update
Returns:
bool: Success status
"""
try:
# Get current preferences
current_prefs = await self.get_user_preferences(user_id)
# Update with new preferences
current_prefs.update(preferences)
current_prefs['updated_at'] = datetime.now(timezone.utc)
# Validate preferences
validated_prefs = self._validate_preferences(current_prefs)
# Save to database
await self.db.db.user_preferences.update_one(
{'user_id': user_id},
{'$set': {'preferences': validated_prefs}},
upsert=True
)
# Clear cache
cache_key = f"user_prefs_{user_id}"
if cache_key in self.db.cache:
del self.db.cache[cache_key]
self.logger.info(f"Updated preferences for user {user_id}")
return True
except Exception as e:
self.logger.error(f"Error updating preferences for user {user_id}: {str(e)}")
return False
def _validate_preferences(self, preferences: Dict[str, Any]) -> Dict[str, Any]:
"""
Validate and sanitize user preferences.
Args:
preferences (Dict[str, Any]): Raw preferences
Returns:
Dict[str, Any]: Validated preferences
"""
validated = {}
# Validate each preference
for key, value in preferences.items():
if key == "preferred_model":
# Validate model exists in available models
from src.config.config import MODEL_OPTIONS
if value is None or value in MODEL_OPTIONS:
validated[key] = value
else:
validated[key] = None
elif key == "response_style":
if value in ["balanced", "concise", "detailed"]:
validated[key] = value
else:
validated[key] = "balanced"
elif key == "max_response_length":
if value in ["short", "medium", "long"]:
validated[key] = value
else:
validated[key] = "medium"
elif key == "image_generation_style":
if value in ["default", "artistic", "realistic", "cartoon"]:
validated[key] = value
else:
validated[key] = "default"
elif key in ["auto_model_selection", "show_model_suggestions", "enable_conversation_summary",
"code_execution_allowed", "notification_reminders", "analytics_opt_in"]:
# Handle string representations of booleans
if isinstance(value, str):
validated[key] = value.lower() in ['true', '1', 'yes', 'on']
else:
validated[key] = bool(value)
elif key in ["language", "timezone", "theme"]:
validated[key] = str(value) if value else self.default_preferences[key]
elif key in ["created_at", "updated_at"]:
validated[key] = value # Keep as-is for datetime objects
else:
# Unknown preference, keep default
if key in self.default_preferences:
validated[key] = self.default_preferences[key]
# Ensure all default keys exist
for key, default_value in self.default_preferences.items():
if key not in validated:
validated[key] = default_value
return validated
async def get_preference(self, user_id: int, preference_key: str) -> Any:
"""
Get a specific preference value.
Args:
user_id (int): Discord user ID
preference_key (str): Preference key to get
Returns:
Any: Preference value
"""
preferences = await self.get_user_preferences(user_id)
return preferences.get(preference_key, self.default_preferences.get(preference_key))
async def set_preference(self, user_id: int, preference_key: str, value: Any) -> bool:
"""
Set a specific preference value.
Args:
user_id (int): Discord user ID
preference_key (str): Preference key to set
value (Any): New preference value
Returns:
bool: Success status
"""
return await self.update_user_preferences(user_id, {preference_key: value})
async def reset_preferences(self, user_id: int) -> bool:
"""
Reset user preferences to defaults.
Args:
user_id (int): Discord user ID
Returns:
bool: Success status
"""
try:
default_prefs = self.default_preferences.copy()
default_prefs['created_at'] = datetime.now(timezone.utc)
default_prefs['updated_at'] = datetime.now(timezone.utc)
await self.db.db.user_preferences.update_one(
{'user_id': user_id},
{'$set': {'preferences': default_prefs}},
upsert=True
)
# Clear cache
cache_key = f"user_prefs_{user_id}"
if cache_key in self.db.cache:
del self.db.cache[cache_key]
self.logger.info(f"Reset preferences for user {user_id}")
return True
except Exception as e:
self.logger.error(f"Error resetting preferences for user {user_id}: {str(e)}")
return False
def format_preferences_display(self, preferences: Dict[str, Any]) -> str:
"""
Format preferences for display to user.
Args:
preferences (Dict[str, Any]): User preferences
Returns:
str: Formatted preference display
"""
display_lines = [
"**Your Current Preferences:**",
"",
f"🤖 **Model Settings:**",
f" • Preferred Model: `{preferences.get('preferred_model', 'Auto-select')}`",
f" • Auto Model Selection: `{'' if preferences.get('auto_model_selection') else ''}`",
f" • Show Model Suggestions: `{'' if preferences.get('show_model_suggestions') else ''}`",
"",
f"💬 **Response Settings:**",
f" • Response Style: `{preferences.get('response_style', 'balanced').title()}`",
f" • Max Response Length: `{preferences.get('max_response_length', 'medium').title()}`",
f" • Language: `{preferences.get('language', 'auto')}`",
"",
f"🔧 **Feature Settings:**",
f" • Code Execution: `{'' if preferences.get('code_execution_allowed') else ''}`",
f" • Conversation Summary: `{'' if preferences.get('enable_conversation_summary') else ''}`",
f" • Reminder Notifications: `{'' if preferences.get('notification_reminders') else ''}`",
"",
f"🎨 **Creative Settings:**",
f" • Image Generation Style: `{preferences.get('image_generation_style', 'default').title()}`",
"",
f"📊 **Privacy Settings:**",
f" • Usage Analytics: `{'' if preferences.get('analytics_opt_in') else ''}`",
"",
f"*Use `/preferences set` to modify these settings*"
]
return "\n".join(display_lines)

326
tests/test_enhancements.py Normal file
View File

@@ -0,0 +1,326 @@
"""
Tests for enhanced Discord bot features
"""
import unittest
import asyncio
from unittest.mock import MagicMock, AsyncMock, patch
import sys
import os
# Add parent directory to path for imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.utils.model_selector import ModelSelector, model_selector
from src.utils.user_preferences import UserPreferences
from src.utils.conversation_manager import ConversationSummarizer
from src.utils.enhanced_file_processor import EnhancedFileProcessor, enhanced_file_processor
class TestModelSelector(unittest.TestCase):
"""Test smart model selection functionality."""
def setUp(self):
self.selector = ModelSelector()
def test_analyze_task_type_coding(self):
"""Test detection of coding tasks."""
coding_prompts = [
"Write a Python function to calculate fibonacci",
"Help me debug this JavaScript code",
"Create a REST API in Node.js"
]
for prompt in coding_prompts:
task_type = self.selector.analyze_task_type(prompt)
self.assertEqual(task_type, "coding", f"Failed for prompt: {prompt}")
def test_analyze_task_type_reasoning(self):
"""Test detection of reasoning tasks."""
reasoning_prompts = [
"Solve this math problem step by step",
"Analyze the logic behind this algorithm",
"What is the relationship between these variables?"
]
for prompt in reasoning_prompts:
task_type = self.selector.analyze_task_type(prompt)
self.assertEqual(task_type, "reasoning", f"Failed for prompt: {prompt}")
def test_analyze_task_type_creative(self):
"""Test detection of creative tasks."""
creative_prompts = [
"Write a story about a dragon",
"Create a marketing slogan for our product",
"Generate a poem about nature"
]
for prompt in creative_prompts:
task_type = self.selector.analyze_task_type(prompt)
self.assertEqual(task_type, "creative", f"Failed for prompt: {prompt}")
def test_suggest_model_for_coding(self):
"""Test model suggestion for coding tasks."""
model, reason = self.selector.suggest_model("Write a Python function to sort a list")
self.assertIn("openai/gpt-4o", model)
self.assertIn("coding", reason.lower())
def test_suggest_model_for_reasoning(self):
"""Test model suggestion for reasoning tasks."""
model, reason = self.selector.suggest_model("Solve this complex mathematical proof step by step")
self.assertIn("o1", model) # Should suggest o1 family for reasoning
self.assertIn("reasoning", reason.lower())
def test_suggest_model_with_preference(self):
"""Test that user preference is respected."""
preferred_model = "openai/gpt-4o-mini"
model, reason = self.selector.suggest_model("Any task", preferred_model)
self.assertEqual(model, preferred_model)
self.assertIn("preferred", reason.lower())
def test_get_model_explanation(self):
"""Test model explanations."""
explanation = self.selector.get_model_explanation("openai/gpt-4o")
self.assertIsInstance(explanation, str)
self.assertTrue(len(explanation) > 10)
class TestUserPreferences(unittest.IsolatedAsyncioTestCase):
"""Test user preferences system."""
def setUp(self):
# Mock database handler
self.mock_db = MagicMock()
self.mock_db.db = MagicMock()
self.mock_db.db.user_preferences = MagicMock()
self.mock_db._get_cached_result = AsyncMock()
self.mock_db.cache = {}
self.prefs_manager = UserPreferences(self.mock_db)
async def test_get_default_preferences(self):
"""Test getting default preferences for new user."""
# Mock no existing preferences
self.mock_db._get_cached_result.return_value = self.prefs_manager.default_preferences.copy()
prefs = await self.prefs_manager.get_user_preferences(12345)
# Should return default preferences
self.assertEqual(prefs['auto_model_selection'], True)
self.assertEqual(prefs['response_style'], 'balanced')
self.assertIsNone(prefs['preferred_model'])
async def test_update_preferences(self):
"""Test updating user preferences."""
# Mock existing preferences
self.mock_db._get_cached_result.return_value = self.prefs_manager.default_preferences.copy()
self.mock_db.db.user_preferences.update_one = AsyncMock()
# Update a preference
success = await self.prefs_manager.update_user_preferences(12345, {
'response_style': 'detailed',
'preferred_model': 'openai/gpt-4o'
})
self.assertTrue(success)
self.mock_db.db.user_preferences.update_one.assert_called_once()
async def test_validate_preferences(self):
"""Test preference validation."""
invalid_prefs = {
'response_style': 'invalid_style',
'preferred_model': 'invalid_model',
'auto_model_selection': 'false' # String instead of boolean
}
validated = self.prefs_manager._validate_preferences(invalid_prefs)
# Should fall back to defaults for invalid values
self.assertEqual(validated['response_style'], 'balanced')
self.assertIsNone(validated['preferred_model'])
self.assertFalse(validated['auto_model_selection']) # String 'false' should become boolean False
def test_format_preferences_display(self):
"""Test preference display formatting."""
prefs = self.prefs_manager.default_preferences.copy()
prefs['preferred_model'] = 'openai/gpt-4o'
display = self.prefs_manager.format_preferences_display(prefs)
self.assertIsInstance(display, str)
self.assertIn('openai/gpt-4o', display)
self.assertIn('Preferences', display)
class TestConversationSummarizer(unittest.IsolatedAsyncioTestCase):
"""Test conversation summarization functionality."""
def setUp(self):
# Mock OpenAI client
self.mock_client = MagicMock()
self.mock_client.chat = MagicMock()
self.mock_client.chat.completions = MagicMock()
self.mock_client.chat.completions.create = AsyncMock()
# Mock database handler
self.mock_db = MagicMock()
self.mock_db.get_user_model = AsyncMock(return_value="openai/gpt-4o-mini")
# Mock tiktoken to avoid network calls
with patch('tiktoken.get_encoding') as mock_encoding:
mock_encoder = MagicMock()
mock_encoder.encode = MagicMock(return_value=[1, 2, 3, 4]) # Mock 4 tokens
mock_encoding.return_value = mock_encoder
self.summarizer = ConversationSummarizer(self.mock_client, self.mock_db)
def test_count_tokens(self):
"""Test token counting."""
text = "Hello, world! This is a test message."
tokens = self.summarizer.count_tokens(text)
self.assertGreater(tokens, 0)
self.assertIsInstance(tokens, int)
def test_should_summarize_short_conversation(self):
"""Test that short conversations are not summarized."""
short_messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"}
]
result = asyncio.run(self.summarizer.should_summarize(short_messages))
self.assertFalse(result)
def test_should_summarize_long_conversation(self):
"""Test that long conversations trigger summarization."""
# Create a long conversation
long_messages = []
long_text = "This is a very long message. " * 100 # Make it long
for i in range(10):
long_messages.append({"role": "user", "content": long_text})
long_messages.append({"role": "assistant", "content": long_text})
result = asyncio.run(self.summarizer.should_summarize(long_messages))
self.assertTrue(result)
async def test_create_summary(self):
"""Test summary creation."""
# Mock OpenAI response
mock_response = MagicMock()
mock_response.choices = [MagicMock()]
mock_response.choices[0].message.content = "This is a test summary."
self.mock_client.chat.completions.create.return_value = mock_response
messages = [
{"role": "user", "content": "Tell me about AI"},
{"role": "assistant", "content": "AI is artificial intelligence..."},
{"role": "user", "content": "What about machine learning?"},
{"role": "assistant", "content": "Machine learning is a subset of AI..."}
]
summary = await self.summarizer.create_summary(messages, 12345)
self.assertIsInstance(summary, str)
self.assertEqual(summary, "This is a test summary.")
self.mock_client.chat.completions.create.assert_called_once()
def test_trim_messages(self):
"""Test message trimming fallback."""
messages = [{"role": "user", "content": f"Message {i}"} for i in range(30)]
trimmed = self.summarizer._trim_messages(messages, max_messages=10)
self.assertEqual(len(trimmed), 10)
self.assertEqual(trimmed[-1]["content"], "Message 29") # Should keep most recent
class TestEnhancedFileProcessor(unittest.IsolatedAsyncioTestCase):
"""Test enhanced file processing functionality."""
def setUp(self):
self.processor = EnhancedFileProcessor()
def test_get_supported_extensions(self):
"""Test getting supported file extensions."""
extensions = self.processor.get_supported_extensions()
self.assertIsInstance(extensions, list)
self.assertIn('.txt', extensions)
self.assertIn('.py', extensions)
self.assertIn('.json', extensions)
def test_is_supported(self):
"""Test file support detection."""
self.assertTrue(self.processor.is_supported('test.txt'))
self.assertTrue(self.processor.is_supported('script.py'))
self.assertTrue(self.processor.is_supported('data.json'))
self.assertFalse(self.processor.is_supported('image.png')) # Not in text processors
async def test_process_text_file(self):
"""Test text file processing."""
# Create a temporary text file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("Hello, world!\nThis is a test file.")
temp_path = f.name
try:
result = await self.processor.process_file(temp_path, "test.txt")
self.assertTrue(result['success'])
self.assertIn("Hello, world!", result['content'])
self.assertEqual(result['metadata']['type'], 'text')
self.assertEqual(result['metadata']['lines'], 2)
finally:
os.unlink(temp_path)
async def test_process_json_file(self):
"""Test JSON file processing."""
import tempfile
import json
test_data = {"name": "test", "value": 123, "items": [1, 2, 3]}
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
json.dump(test_data, f)
temp_path = f.name
try:
result = await self.processor.process_file(temp_path, "test.json")
self.assertTrue(result['success'])
self.assertIn('"name": "test"', result['content'])
self.assertEqual(result['metadata']['type'], 'json')
self.assertTrue(result['metadata']['is_valid'])
finally:
os.unlink(temp_path)
async def test_process_unsupported_file(self):
"""Test handling of unsupported file types."""
import tempfile
with tempfile.NamedTemporaryFile(suffix='.unknown', delete=False) as f:
f.write(b"some binary data")
temp_path = f.name
try:
result = await self.processor.process_file(temp_path, "test.unknown")
self.assertFalse(result['success'])
self.assertIn("Unsupported file type", result['error'])
finally:
os.unlink(temp_path)
def test_format_file_size(self):
"""Test file size formatting."""
self.assertEqual(self.processor._format_file_size(1024), "1.0 KB")
self.assertEqual(self.processor._format_file_size(1048576), "1.0 MB")
self.assertEqual(self.processor._format_file_size(500), "500.0 B")
if __name__ == "__main__":
# Run tests
unittest.main()