Remove auto deploy to server.
This commit is contained in:
239
bot.py
239
bot.py
@@ -102,9 +102,12 @@ MODEL_OPTIONS = [
|
||||
PDF_ALLOWED_MODELS = ["gpt-4o", "gpt-4o-mini"]
|
||||
|
||||
# Prompt for different plugins
|
||||
WEB_SCRAPING_PROMPT = "You are using the Web Scraping Plugin, gathering information from given url. Respond accurately and combine data to provide a clear, insightful summary. "
|
||||
NORMAL_CHAT_PROMPT = "You're ChatGPT for Discord! You can chat, generate images, and perform searches. Craft responses that are easy to copy directly into Discord chats, without using markdown, code blocks, or extra formatting. When you solving any problems you must remember that: Let's solve this step-by-step. What information do we need to find? What operation might help us solve this? Explain your reasoning and provide the answer. For code_interpreter you must send user the code you used to run in anycase."
|
||||
SEARCH_PROMPT = "You are using the Google Search Plugin, accessing information from the top 3 Google results link which is the scraped content from these 3 website. Summarize these findings clearly, adding relevant insights to answer the users question."
|
||||
WEB_SCRAPING_PROMPT = "You are a Web Scraping Assistant. You analyze content from webpages to extract key information. Integrate insights from the scraped content to give comprehensive, fact-based responses. When analyzing web content: 1) Focus on the most relevant information, 2) Cite specific sections when appropriate, 3) Maintain a neutral tone, and 4) Organize information logically. Present your response in a clear, conversational manner suitable for Discord."
|
||||
|
||||
NORMAL_CHAT_PROMPT = "You're ChatGPT for Discord! You have access to powerful tools that can enhance your responses. When appropriate, use: 1) Google Search to find current information, 2) Web Scraping to analyze webpages, 3) Code Interpreter to run and explain code, and 4) Image Generation to create images from text descriptions. When solving problems, follow a step-by-step approach: identify what information is needed, determine which tools might help, and explain your reasoning clearly. For code tasks, always share both the code you're running and its output. Craft responses that are easy to read in Discord without excessive formatting."
|
||||
|
||||
SEARCH_PROMPT = "You are a Research Assistant with access to Google Search results. Your task is to synthesize information from search results to provide accurate, comprehensive answers. When analyzing search results: 1) Prioritize information from credible sources, 2) Compare and contrast different perspectives when available, 3) Acknowledge when information is limited or unclear, and 4) Cite specific sources when presenting facts. Structure your response in a clear, logical manner, focusing on directly answering the user's question while providing relevant context."
|
||||
|
||||
PDF_ANALYSIS_PROMPT = """You are a PDF Analysis Assistant. Your task is to analyze PDF content thoroughly and effectively. Follow these guidelines:
|
||||
|
||||
1. Structure your response clearly and logically
|
||||
@@ -492,6 +495,28 @@ def get_tools_for_model():
|
||||
"required": ["code", "language"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "generate_image",
|
||||
"description": "Generate images from a text prompt using AI.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prompt": {
|
||||
"type": "string",
|
||||
"description": "Detailed description of the image you want to generate"
|
||||
},
|
||||
"num_images": {
|
||||
"type": "integer",
|
||||
"description": "Number of images to generate (default: 1, max: 4)",
|
||||
"default": 1
|
||||
}
|
||||
},
|
||||
"required": ["prompt"]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
return tools
|
||||
@@ -500,15 +525,32 @@ def get_tools_for_model():
|
||||
tool_functions = {
|
||||
"google_search": lambda args: google_custom_search(args["query"], args.get("num_results", 3)),
|
||||
"scrape_webpage": lambda args: scrape_web_content(args["url"]),
|
||||
"code_interpreter": lambda args: asyncio.run(execute_code(args["code"], args["language"], input_data=args.get("input", "")))
|
||||
"code_interpreter": lambda args: execute_code(args["code"], args["language"], input_data=args.get("input", "")),
|
||||
"generate_image": lambda args: generate_ai_image(args["prompt"], args.get("num_images", 1))
|
||||
}
|
||||
|
||||
# Process tool calls from the model
|
||||
async def process_tool_calls(model_response, messages_history):
|
||||
"""Process tool calls returned by the model and add results to message history."""
|
||||
if model_response.choices[0].finish_reason == "tool_calls" and hasattr(model_response.choices[0].message, 'tool_calls'):
|
||||
# Add the model's response to messages
|
||||
messages_history.append(model_response.choices[0].message)
|
||||
# Add the model's response to messages history
|
||||
# Convert ChatCompletionMessage to dictionary to avoid 'get' attribute error
|
||||
model_message = {
|
||||
"role": model_response.choices[0].message.role,
|
||||
"content": model_response.choices[0].message.content,
|
||||
"tool_calls": [
|
||||
{
|
||||
"id": tool_call.id,
|
||||
"type": tool_call.type,
|
||||
"function": {
|
||||
"name": tool_call.function.name,
|
||||
"arguments": tool_call.function.arguments
|
||||
}
|
||||
}
|
||||
for tool_call in model_response.choices[0].message.tool_calls
|
||||
]
|
||||
}
|
||||
messages_history.append(model_message)
|
||||
|
||||
# Process each tool call
|
||||
for tool_call in model_response.choices[0].message.tool_calls:
|
||||
@@ -593,7 +635,66 @@ async def process_tool_calls(model_response, messages_history):
|
||||
"content": error_message
|
||||
})
|
||||
continue
|
||||
# Special handling for generate_image function
|
||||
elif function_name == "generate_image":
|
||||
try:
|
||||
# Extract parameters
|
||||
prompt = function_args.get("prompt", "")
|
||||
num_images = function_args.get("num_images", 4) # Default to 4 images
|
||||
|
||||
# Create an image generation request
|
||||
request_image = IImageInference(
|
||||
positivePrompt=prompt,
|
||||
model="runware:100@1",
|
||||
numberResults=num_images,
|
||||
height=512,
|
||||
width=512
|
||||
)
|
||||
|
||||
# Call the API to get the results
|
||||
images = await runware.imageInference(requestImage=request_image)
|
||||
|
||||
# Check the API's return value
|
||||
if images is None:
|
||||
raise ValueError("API returned None for images")
|
||||
|
||||
# Add a message to history with image information but without URLs
|
||||
# This helps the model know images were generated
|
||||
result_message = f"I've generated {len(images)} image(s) based on your prompt. The images will be displayed separately."
|
||||
|
||||
messages_history.append({
|
||||
"tool_call_id": tool_call.id,
|
||||
"role": "tool",
|
||||
"name": function_name,
|
||||
"content": result_message
|
||||
})
|
||||
|
||||
# Store the image URLs in a special field that will be processed when sending the response
|
||||
image_urls = [image.imageURL for image in images]
|
||||
messages_history.append({
|
||||
"role": "system",
|
||||
"content": "Image Generation Results",
|
||||
"image_urls": image_urls
|
||||
})
|
||||
|
||||
# Skip rest of this iteration since we've already added the message
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
# For image generation errors, provide details
|
||||
error_details = str(e)
|
||||
logging.error(f"Image generation error: {error_details}")
|
||||
|
||||
error_message = f"Error generating images from prompt: \"{function_args.get('prompt', '')}\"\n\nError: {error_details}"
|
||||
messages_history.append({
|
||||
"tool_call_id": tool_call.id,
|
||||
"role": "tool",
|
||||
"name": function_name,
|
||||
"content": error_message
|
||||
})
|
||||
continue
|
||||
else:
|
||||
# For non-async functions
|
||||
function_response = tool_functions[function_name](function_args)
|
||||
|
||||
# Make sure function_response is never empty
|
||||
@@ -1060,6 +1161,9 @@ async def process_user_message(message: discord.Message):
|
||||
if use_tools:
|
||||
api_params["tools"] = get_tools_for_model()
|
||||
|
||||
# Flag to track if image generation was used
|
||||
image_generation_used = False
|
||||
|
||||
# Add a typing indicator to show that the bot is processing
|
||||
async with message.channel.typing():
|
||||
# Make the initial API call
|
||||
@@ -1069,9 +1173,34 @@ async def process_user_message(message: discord.Message):
|
||||
)
|
||||
|
||||
# Check if there are any tool calls to process
|
||||
if use_tools:
|
||||
# Send a message indicating that the bot is searching for information if tool calls are present
|
||||
if response.choices[0].finish_reason == "tool_calls":
|
||||
if use_tools and response.choices[0].finish_reason == "tool_calls":
|
||||
# Get tool call information
|
||||
tool_calls = response.choices[0].message.tool_calls
|
||||
tool_messages = {}
|
||||
|
||||
# Determine appropriate messages based on which tools are called
|
||||
for tool_call in tool_calls:
|
||||
if tool_call.function.name == "google_search":
|
||||
tool_messages["google_search"] = True
|
||||
elif tool_call.function.name == "scrape_webpage":
|
||||
tool_messages["scrape_webpage"] = True
|
||||
elif tool_call.function.name == "code_interpreter":
|
||||
tool_messages["code_interpreter"] = True
|
||||
elif tool_call.function.name == "generate_image":
|
||||
tool_messages["generate_image"] = True
|
||||
image_generation_used = True
|
||||
|
||||
# Display appropriate messages based on which tools are being called
|
||||
if tool_messages.get("google_search") or tool_messages.get("scrape_webpage"):
|
||||
await message.channel.send("🌐 Searching the web for relevant information...")
|
||||
|
||||
if tool_messages.get("code_interpreter"):
|
||||
await message.channel.send("💻 Running your code...")
|
||||
|
||||
if tool_messages.get("generate_image"):
|
||||
await message.channel.send("🎨 Generating images for you...")
|
||||
|
||||
if not tool_messages: # If no specific tools are recognized
|
||||
await message.channel.send("🔍 Processing your request...")
|
||||
|
||||
# Process any tool calls and get the updated messages
|
||||
@@ -1102,8 +1231,52 @@ async def process_user_message(message: discord.Message):
|
||||
# For models with system prompt support, just append to regular history
|
||||
history.append({"role": "assistant", "content": reply})
|
||||
await save_history(user_id, history)
|
||||
|
||||
await send_response(message.channel, reply)
|
||||
|
||||
# Check if there are any image URLs to send from the image generation tool
|
||||
image_urls = []
|
||||
for msg in updated_messages if 'updated_messages' in locals() else []:
|
||||
if msg.get('role') == 'system' and 'image_urls' in msg:
|
||||
image_urls = msg.get('image_urls', [])
|
||||
break
|
||||
|
||||
# If image generation was used and we have image URLs, handle specially
|
||||
if image_generation_used and image_urls:
|
||||
# Send the text response first (if it contains useful information besides just mentioning images)
|
||||
text_to_exclude = ["here are the images", "i've generated", "generated for you", "as requested", "based on your prompt"]
|
||||
|
||||
# Check if reply is just about the images or has other content
|
||||
has_other_content = True
|
||||
reply_lower = reply.lower()
|
||||
|
||||
# Check if reply is just about the images
|
||||
for phrase in text_to_exclude:
|
||||
if phrase in reply_lower:
|
||||
has_other_content = False
|
||||
|
||||
# Only send text response if it has additional content
|
||||
if has_other_content:
|
||||
await send_response(message.channel, reply)
|
||||
|
||||
# Download images from URLs and send as attachments
|
||||
image_files = []
|
||||
async with aiohttp.ClientSession() as session:
|
||||
for image_url in image_urls:
|
||||
async with session.get(image_url) as resp:
|
||||
if resp.status == 200:
|
||||
image_files.append(await resp.read())
|
||||
else:
|
||||
logging.error(f"Failed to download image: {image_url} with status {resp.status}")
|
||||
|
||||
# Send images as attachments
|
||||
if image_files:
|
||||
await message.channel.send(
|
||||
files=[discord.File(io.BytesIO(img), filename=f"image_{i}.png")
|
||||
for i, img in enumerate(image_files)]
|
||||
)
|
||||
else:
|
||||
# Normal response without image generation
|
||||
await send_response(message.channel, reply)
|
||||
|
||||
except RateLimitError:
|
||||
await message.channel.send(
|
||||
"Error: Rate limit exceeded for your model. "
|
||||
@@ -1164,7 +1337,7 @@ def count_tokens(text: str) -> int:
|
||||
def trim_content_to_token_limit(content: str, max_tokens: int = 8096) -> str:
|
||||
"""Trim content to stay within token limit while preserving the most recent content."""
|
||||
current_tokens = count_tokens(content)
|
||||
if current_tokens <= max_tokens:
|
||||
if (current_tokens <= max_tokens):
|
||||
return content
|
||||
|
||||
# Split into lines and start removing from the beginning until under limit
|
||||
@@ -2028,6 +2201,48 @@ def extract_code_blocks(content):
|
||||
|
||||
return matches
|
||||
|
||||
async def generate_ai_image(prompt: str, num_images: int = 1) -> str:
|
||||
"""
|
||||
Generate images using AI from a text prompt.
|
||||
|
||||
Args:
|
||||
prompt (str): The text prompt describing the image to generate
|
||||
num_images (int): Number of images to generate (default: 1, max: 4)
|
||||
|
||||
Returns:
|
||||
str: A string containing the URLs of the generated images
|
||||
"""
|
||||
try:
|
||||
# Limit the number of images to maximum 4
|
||||
num_images = min(max(1, num_images), 4)
|
||||
|
||||
# Create an image generation request
|
||||
request_image = IImageInference(
|
||||
positivePrompt=prompt,
|
||||
model="runware:100@1",
|
||||
numberResults=num_images,
|
||||
height=512,
|
||||
width=512
|
||||
)
|
||||
|
||||
# Call the API to get the results
|
||||
images = await runware.imageInference(requestImage=request_image)
|
||||
|
||||
# Check the API's return value
|
||||
if images is None:
|
||||
return "Error: Image generation failed - API returned no results"
|
||||
|
||||
# Format the results with URLs
|
||||
result = f"Generated {len(images)} image(s) from prompt: \"{prompt}\"\n\n"
|
||||
for i, image in enumerate(images):
|
||||
result += f"Image {i+1}: {image.imageURL}\n"
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
error_message = f"Error generating images: {str(e)}"
|
||||
logging.error(error_message)
|
||||
return error_message
|
||||
|
||||
# Main bot startup
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
||||
|
||||
Reference in New Issue
Block a user