This commit is contained in:
2025-02-26 17:09:06 +07:00
parent 9c260fde71
commit 295bee8727

526
bot.py
View File

@@ -133,7 +133,7 @@ PDF_ALLOWED_MODELS = ["gpt-4o", "gpt-4o-mini"]
# Prompt for different plugins
WEB_SCRAPING_PROMPT = "You are using the Web Scraping Plugin, gathering information from given url. Respond accurately and combine data to provide a clear, insightful summary. "
NORMAL_CHAT_PROMPT = "You're ChatGPT for Discord! You can chat, generate images, and perform searches. Craft responses that are easy to copy directly into Discord chats, without using markdown, code blocks, or extra formatting. When you solving any problems you must remember that: Let's solve this step-by-step. What information do we need to find? What operation might help us solve this? Explain your reasoning and provide the answer."
NORMAL_CHAT_PROMPT = "You're ChatGPT for Discord! You can chat, generate images, and perform searches. Craft responses that are easy to copy directly into Discord chats, without using markdown, code blocks, or extra formatting. When you solving any problems you must remember that: Let's solve this step-by-step. What information do we need to find? What operation might help us solve this? Explain your reasoning and provide the answer. For code_interpreter you must send user the code you used to run in anycase."
SEARCH_PROMPT = "You are using the Google Search Plugin, accessing information from the top 3 Google results link which is the scraped content from these 3 website. Summarize these findings clearly, adding relevant insights to answer the users question."
PDF_ANALYSIS_PROMPT = """You are a PDF Analysis Assistant. Your task is to analyze PDF content thoroughly and effectively. Follow these guidelines:
@@ -512,6 +512,10 @@ def get_tools_for_model():
"type": "string",
"description": "The programming language to use (python or cpp)",
"enum": ["python", "cpp"]
},
"input": {
"type": "string",
"description": "Optional input data for the program (for cin>>, input() functions). All inputs should be on a single line, separated by spaces",
}
},
"required": ["code", "language"]
@@ -525,7 +529,7 @@ def get_tools_for_model():
tool_functions = {
"google_search": lambda args: google_custom_search(args["query"], args.get("num_results", 3)),
"scrape_webpage": lambda args: scrape_web_content(args["url"]),
"code_interpreter": lambda args: execute_code(args["code"], args["language"])
"code_interpreter": lambda args: asyncio.run(execute_code(args["code"], args["language"], input_data=args.get("input", "")))
}
# Process tool calls from the model
@@ -539,44 +543,134 @@ async def process_tool_calls(model_response, messages_history):
for tool_call in model_response.choices[0].message.tool_calls:
if tool_call.type == "function":
function_name = tool_call.function.name
try:
# Parse arguments and call the function
function_args = json.loads(tool_call.function.arguments)
function_to_call = tool_functions.get(function_name)
if function_to_call:
# Execute the function
if function_name == "code_interpreter":
# Properly await the async function
function_result = await execute_code(function_args["code"], function_args["language"])
else:
# For non-async functions
function_result = function_to_call(function_args)
# Safely parse function arguments with proper error handling
try:
function_args = json.loads(tool_call.function.arguments)
except json.JSONDecodeError as e:
error_message = f"Error parsing function arguments: {str(e)}. Raw arguments: {tool_call.function.arguments}"
logging.error(error_message)
# Format result to string if it's a complex object
if isinstance(function_result, (list, dict)):
result_content = json.dumps(function_result)
# Add a dummy object with reasonable defaults for code_interpreter
if function_name == "code_interpreter":
function_args = {
"code": tool_call.function.arguments,
"language": "python" # Default to python if we can't parse
}
else:
# Ensure we have a string and not None
result_content = str(function_result) if function_result is not None else "No content available"
# For other functions, report the error
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": error_message
})
continue
# Execute the function
if function_name in tool_functions:
try:
# Special handling for async code_interpreter function
if function_name == "code_interpreter":
try:
# Extract code information - always ensure we have this
code = function_args.get("code", "# No code provided")
language = function_args.get("language", "python")
input_data = function_args.get("input", "")
# Always show the exact code that will be executed
display_message = f"Executing {language} code:\n```{language}\n{code}\n```"
if input_data:
display_message += f"\nWith input:\n```\n{input_data}\n```"
# Execute the code
execution_result = await execute_code(
code,
language,
input_data=input_data
)
# Combine both code display and execution results in one message
combined_message = f"{display_message}\n\n{execution_result}"
# Add as a single tool message
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": combined_message
})
# Skip rest of this iteration since we've already added the message
continue
except Exception as e:
# For code_interpreter errors, ensure we still show the code
error_details = str(e)
logging.error(f"Code interpreter error: {error_details}")
# Extract whatever code we can
code = function_args.get("code", "# Code extraction failed")
language = function_args.get("language", "python")
# Show both the code and the error
error_message = f"Code that caused error:\n```{language}\n{code}\n```\n\nError: {error_details}"
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": error_message
})
continue
else:
function_response = tool_functions[function_name](function_args)
# Add the function result to messages
# Make sure function_response is never empty
if function_response is None or function_response == "":
function_response = f"Function {function_name} completed successfully with no output. Return code: 0"
# Add the function response to messages
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": str(function_response) # Ensure content is a string
})
except Exception as e:
# Log the error and add an error message to the history
error_message = f"Error executing {function_name}: {str(e)}"
logging.error(error_message)
# For code_interpreter, always show the code even on error
if function_name == "code_interpreter":
code = function_args.get("code", "# Code extraction failed")
language = function_args.get("language", "python")
error_message = f"Code that caused error:\n```{language}\n{code}\n```\n\nError: {error_message}"
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": error_message
})
else:
# Function not found, add error message
error_message = f"Function {function_name} not found"
logging.error(error_message)
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"role": "tool",
"name": function_name,
"content": result_content
"content": error_message
})
except Exception as e:
# Handle function call errors
error_message = f"Error calling function {function_name}: {str(e)}"
# Catch-all handler to ensure robustness
error_message = f"Unexpected error processing tool call: {str(e)}"
logging.error(error_message)
messages_history.append({
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"name": function_name if 'function_name' in locals() else "unknown_function",
"content": error_message
})
@@ -680,18 +774,24 @@ async def search(interaction: discord.Interaction, query: str):
formatted_results += f"Content Preview: {result['content'][:500]}...\n"
formatted_results += "\n"
# Prepare messages for the AI model
messages = [
{"role": "system", "content": SEARCH_PROMPT},
{"role": "user", "content": f"User query: {query}\n\n{formatted_results}"}
]
# Prepare messages for the AI model, handling system prompts appropriately
messages = []
if model in ["o1-mini", "o1-preview"]:
# These models don't support system prompts
messages = [
{"role": "user", "content": f"Instructions: {SEARCH_PROMPT}\n\nUser query: {query}\n\n{formatted_results}"}
]
else:
messages = [
{"role": "system", "content": SEARCH_PROMPT},
{"role": "user", "content": f"User query: {query}\n\n{formatted_results}"}
]
# Send to the AI model
response = client.chat.completions.create(
model=model if model in ["gpt-4o", "gpt-4o-mini"] else "gpt-4o",
messages=messages,
temperature=0.5,
max_tokens=4096,
temperature=0.5
)
reply = response.choices[0].message.content
@@ -942,33 +1042,53 @@ async def process_user_message(message: discord.Message):
current_message = {"role": "user", "content": content}
try:
# Prepare messages for API while ensuring token limit
# Process messages based on the model's capabilities
messages_for_api = []
if model in ["gpt-4o", "gpt-4o-mini", "o1", "o1-mini", "o3-mini"]:
# For these models, we keep some history but ensure token limits
# For models that don't support system prompts
if model in ["o1-mini", "o1-preview"]:
# Convert system messages to user instructions
system_content = None
history_without_system = []
# Extract system message content
for msg in history:
if msg["role"] == "system":
system_content = msg.get("content", "")
else:
history_without_system.append(msg)
# Add the system content as a special user message at the beginning
if system_content:
history_without_system.insert(0, {
"role": "user",
"content": f"Instructions for you to follow in this conversation: {system_content}"
})
# Add current message and prepare for API
history_without_system.append(current_message)
messages_for_api = prepare_messages_for_api(history_without_system)
else:
# For models that support system prompts
history.append(current_message)
messages_for_api = prepare_messages_for_api(history)
else:
# For other models, we just use the current message
messages_for_api = prepare_messages_for_api([current_message])
# Determine which models should have tools available
use_tools = model in ["gpt-4o", "gpt-4o-mini", "o1", "o1-mini", "o3-mini"]
# o1-mini and o1-preview do not support tools
use_tools = model in ["gpt-4o", "gpt-4o-mini", "o1", "o3-mini"]
# Prepare API call parameters
api_params = {
"model": model,
"messages": messages_for_api,
"temperature": 0.3 if model in ["gpt-4o", "gpt-4o-mini"] else 1,
"max_tokens": 8096 if model in ["gpt-4o", "gpt-4o-mini"] else 4096,
"top_p": 0.7 if model in ["gpt-4o", "gpt-4o-mini"] else 1
}
# Add tools if using a supported model
if use_tools:
api_params["tools"] = get_tools_for_model()
# Add a typing indicator to show that the bot is processing
async with message.channel.typing():
# Make the initial API call
@@ -998,20 +1118,26 @@ async def process_user_message(message: discord.Message):
reply = response.choices[0].message.content
# Only update history for successful calls for supported models
# Store the response in history for models that support it
if model in ["gpt-4o", "gpt-4o-mini", "o1", "o1-mini", "o3-mini"]:
# Add the final assistant response to history
history.append({"role": "assistant", "content": reply})
await save_history(user_id, history)
if model in ["o1-mini", "o1-preview"]:
# For models without system prompt support, we keep the modified history
if system_content:
# Don't add the system instruction again to history to avoid duplication
modified_history = [msg for msg in history_without_system if not (msg["role"] == "user" and msg["content"].startswith("Instructions for you to follow"))]
modified_history.append({"role": "assistant", "content": reply})
await save_history(user_id, modified_history)
else:
# For models with system prompt support, just append to regular history
history.append({"role": "assistant", "content": reply})
await save_history(user_id, history)
await send_response(message.channel, reply)
except RateLimitError:
await message.channel.send(
"Error: Rate limit exceeded for your model. "
"Please try again later or use /choose_model to change to any models else."
)
except Exception as e:
error_message = f"Error: {str(e)}"
logging.error(f"Error in message processing: {error_message}")
@@ -1163,18 +1289,25 @@ async def process_pdf_batch(model: str, user_prompt: str, batch_content: str, cu
for attempt in range(max_retries):
try:
# Create message without history but with system prompt
# Create message without history but with appropriate prompt handling
trimmed_content = trim_content_to_token_limit(batch_content, 7000) # Leave room for prompt
messages = [
{"role": "system", "content": PDF_ANALYSIS_PROMPT},
{"role": "user", "content": f"{user_prompt}\n\nAnalyze the following content:\n{trimmed_content}"}
]
messages = []
if model in ["o1-mini", "o1-preview"]:
# These models don't support system prompts
messages = [
{"role": "user", "content": f"Instructions: {PDF_ANALYSIS_PROMPT}\n\n{user_prompt}\n\nAnalyze the following content:\n{trimmed_content}"}
]
else:
messages = [
{"role": "system", "content": PDF_ANALYSIS_PROMPT},
{"role": "user", "content": f"{user_prompt}\n\nAnalyze the following content:\n{trimmed_content}"}
]
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=1,
max_tokens=8096
temperature=1
)
reply = response.choices[0].message.content
@@ -1304,8 +1437,12 @@ def trim_history(history):
# Function to send response to the discord channel
async def send_response(channel: discord.TextChannel, reply: str):
"""Sends the reply to the channel, handling long responses."""
# Safety check - ensure reply is not empty
if not reply or not reply.strip():
reply = "I'm sorry, I couldn't generate a proper response. Please try again."
if len(reply) > 2000:
with open("response.txt", "w") as file:
with open("response.txt", "w", encoding="utf-8") as file:
file.write(reply)
await channel.send(
"The response was too long, so it has been saved to a file.",
@@ -1668,7 +1805,7 @@ int main() {
return True, code
async def execute_code(code, language, timeout=10):
async def execute_code(code, language, timeout=10, input_data=""):
"""
Execute code in a sandboxed environment with strict timeout.
@@ -1676,6 +1813,7 @@ async def execute_code(code, language, timeout=10):
code (str): The code to execute.
language (str): 'python' or 'cpp'.
timeout (int): Maximum execution time in seconds.
input_data (str): Optional input data for the program (for input() or cin>>).
Returns:
str: The output of the code execution.
@@ -1687,128 +1825,208 @@ async def execute_code(code, language, timeout=10):
import subprocess
import tempfile
import sys
import logging
# Create temp directory for running code
with tempfile.TemporaryDirectory() as temp_dir:
if language == 'python':
# Execute Python code
file_path = os.path.join(temp_dir, 'code.py')
with open(file_path, 'w') as f:
f.write(code)
try:
# Set process environment to restrict access to the system
env = {
'PYTHONPATH': '', # Prevent access to installed Python modules
'PATH': '', # Restrict access to system commands
'TEMP': temp_dir, # Set temp directory to our controlled directory
'TMP': temp_dir,
}
# Run the code in a subprocess with timeout
proc = await asyncio.create_subprocess_exec(
sys.executable, file_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=temp_dir,
env=env,
# Use preexec_fn only on Unix systems
preexec_fn=os.setpgrp if os.name != 'nt' else None
)
# Validate that we have actual code to execute
if not code or not code.strip():
return "Error: No code provided to execute. Return code: 1"
# Basic validation of language
if language not in ["python", "cpp"]:
return f"Error: Unsupported language '{language}'. Please use 'python' or 'cpp'. Return code: 1"
# Validate and prepare input data
if input_data and not isinstance(input_data, str):
try:
input_data = str(input_data)
except Exception as e:
return f"Error: Invalid input data - {str(e)}. Return code: 1"
# Ensure input data ends with newline
if input_data and not input_data.endswith('\n'):
input_data += '\n'
try:
# Create temp directory for running code
with tempfile.TemporaryDirectory() as temp_dir:
if language == 'python':
# Execute Python code
file_path = os.path.join(temp_dir, 'code.py')
with open(file_path, 'w', encoding='utf-8') as f:
f.write(code)
try:
# Additional safety - use a shorter timeout than specified in the code
# to ensure our code terminates first
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
# Set process environment to restrict access to the system
env = {
'PYTHONPATH': '', # Prevent access to installed Python modules
'PATH': '', # Restrict access to system commands
'TEMP': temp_dir, # Set temp directory to our controlled directory
'TMP': temp_dir,
}
# Check for errors
if stderr:
return f"Error:\n```\n{stderr.decode('utf-8')}```"
# Return output
return f"Output:\n```\n{stdout.decode('utf-8')}```"
except asyncio.TimeoutError:
try:
# Kill process differently depending on the OS
if os.name != 'nt': # Unix-like systems
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except:
proc.kill()
else: # Windows
proc.kill()
except:
pass
return "Code execution timed out after 10 seconds. Please optimize your code or reduce complexity."
except Exception as e:
return f"An error occurred: {str(e)}"
elif language == 'cpp':
# Execute C++ code
src_path = os.path.join(temp_dir, 'code.cpp')
exe_path = os.path.join(temp_dir, 'code')
if os.name == 'nt': # Windows
exe_path += '.exe'
with open(src_path, 'w') as f:
f.write(code)
try:
# Compile C++ code with restricted options
compile_proc = await asyncio.create_subprocess_exec(
'g++', src_path, '-o', exe_path, '-std=c++17',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
compile_stdout, compile_stderr = await compile_proc.communicate()
if compile_proc.returncode != 0:
return f"Compilation error:\n```\n{compile_stderr.decode('utf-8')}```"
# Execute the compiled program
try:
# Execute in restricted environment
run_proc = await asyncio.create_subprocess_exec(
exe_path,
# Run the code in a subprocess with timeout
proc = await asyncio.create_subprocess_exec(
sys.executable, file_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE if input_data else None,
cwd=temp_dir,
env=env,
# Use preexec_fn only on Unix systems
preexec_fn=os.setpgrp if os.name != 'nt' else None
)
try:
# Enforce strict timeout
stdout, stderr = await asyncio.wait_for(run_proc.communicate(), timeout=timeout)
# Additional safety - use a shorter timeout than specified in the code
# to ensure our code terminates first
if input_data:
try:
# Send input data to the process
stdout, stderr = await asyncio.wait_for(
proc.communicate(input_data.encode('utf-8')),
timeout=timeout
)
except Exception as e:
return f"Error processing input data: {str(e)}. Return code: 1"
else:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
# Check for errors
if stderr:
return f"Runtime error:\n```\n{stderr.decode('utf-8')}```"
return f"Output:\n```\n{stdout.decode('utf-8')}```"
stderr_content = stderr.decode('utf-8', errors='replace').strip()
if stderr_content:
return f"Error:\n```\n{stderr_content}```"
# Return output or default message if output is empty
output = stdout.decode('utf-8', errors='replace').strip()
if output:
return f"Output:\n```\n{output}```"
else:
return "Output:\n```\nCode executed successfully with no output. Return code: 0\n```"
except asyncio.TimeoutError:
try:
# Kill process differently depending on the OS
if os.name != 'nt': # Unix-like systems
try:
os.killpg(os.getpgid(run_proc.pid), signal.SIGKILL)
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except:
run_proc.kill()
proc.kill()
else: # Windows
run_proc.kill()
proc.kill()
except:
pass
return "Code execution timed out after 10 seconds. Please optimize your code or reduce complexity."
except Exception as e:
return f"An error occurred during execution: {str(e)}"
return f"An error occurred during Python execution: {str(e)}"
except Exception as e:
return f"An error occurred: {str(e)}"
elif language == 'cpp':
# Execute C++ code
src_path = os.path.join(temp_dir, 'code.cpp')
exe_path = os.path.join(temp_dir, 'code')
if os.name == 'nt': # Windows
exe_path += '.exe'
return "Unsupported language. Please use 'python' or 'cpp'."
with open(src_path, 'w', encoding='utf-8') as f:
f.write(code)
try:
# Check if g++ is available
try:
check_proc = await asyncio.create_subprocess_exec(
'g++', '--version',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
await check_proc.communicate()
if check_proc.returncode != 0:
return "Error: C++ compiler (g++) not available. Return code: 1"
except Exception:
return "Error: C++ compiler (g++) not available. Return code: 1"
# Compile C++ code with restricted options
compile_proc = await asyncio.create_subprocess_exec(
'g++', src_path, '-o', exe_path, '-std=c++17',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
compile_stdout, compile_stderr = await compile_proc.communicate()
if compile_proc.returncode != 0:
compile_error = compile_stderr.decode('utf-8', errors='replace').strip()
if compile_error:
return f"Compilation error:\n```\n{compile_error}```"
else:
return "Compilation error: Unknown compilation failure. Return code: 1"
# Execute the compiled program
try:
# Execute in restricted environment
run_proc = await asyncio.create_subprocess_exec(
exe_path,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE if input_data else None,
cwd=temp_dir,
# Use preexec_fn only on Unix systems
preexec_fn=os.setpgrp if os.name != 'nt' else None
)
try:
# Enforce strict timeout
if input_data:
try:
# Send input data to the process
stdout, stderr = await asyncio.wait_for(
run_proc.communicate(input_data.encode('utf-8')),
timeout=timeout
)
except Exception as e:
return f"Error processing input data for C++ program: {str(e)}. Return code: 1"
else:
stdout, stderr = await asyncio.wait_for(run_proc.communicate(), timeout=timeout)
if stderr:
stderr_content = stderr.decode('utf-8', errors='replace').strip()
if stderr_content:
return f"Runtime error:\n```\n{stderr_content}```"
# Return output or default message if output is empty
output = stdout.decode('utf-8', errors='replace').strip()
if output:
return f"Output:\n```\n{output}```"
else:
return "Output:\n```\nCode executed successfully with no output. Return code: 0\n```"
except asyncio.TimeoutError:
try:
# Kill process differently depending on the OS
if os.name != 'nt': # Unix-like systems
try:
os.killpg(os.getpgid(run_proc.pid), signal.SIGKILL)
except:
run_proc.kill()
else: # Windows
run_proc.kill()
except:
pass
return "Code execution timed out after 10 seconds. Please optimize your code or reduce complexity."
except Exception as e:
return f"An error occurred during C++ execution: {str(e)}"
except Exception as e:
return f"An error occurred: {str(e)}"
# Default case for unsupported languages
return "Unsupported language. Please use 'python' or 'cpp'."
except Exception as e:
# Catch-all exception handler to ensure we always return something
error_msg = f"An unexpected error occurred: {str(e)}. Return code: 1"
logging.error(f"Error in execute_code: {error_msg}")
return error_msg
def extract_code_blocks(content):
"""