Add Selenium support for web scraping and enhance token counting

This commit is contained in:
2025-03-17 17:19:06 +07:00
parent 7b7abc96a3
commit 07b51dcfd7
3 changed files with 171 additions and 52 deletions

View File

@@ -9,4 +9,5 @@ requests>=2.31.0
aiohttp>=3.9.0
runware>=0.2.0
python-dotenv>=1.0.0
webdriver-manager
selenium
undetected-chromedriver

View File

@@ -1,11 +1,59 @@
import requests
import json
import re
import time
import logging
from bs4 import BeautifulSoup
from typing import Dict, List, Any, Optional, Tuple
from src.config.config import GOOGLE_API_KEY, GOOGLE_CX
import tiktoken # Add tiktoken for token counting
# Add new imports for Selenium
import undetected_chromedriver as uc
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, WebDriverException
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global driver variable to reuse browser instance
_driver = None
def get_chrome_driver(headless=True):
"""
Initialize or return existing undetected Chrome driver
Args:
headless (bool): Whether to run Chrome in headless mode
Returns:
WebDriver: Undetected Chrome WebDriver instance
"""
global _driver
try:
if _driver is None:
options = uc.ChromeOptions()
if headless:
options.add_argument('--headless')
options.add_argument('--disable-gpu')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-extensions')
options.add_argument('--disable-popup-blocking')
options.add_argument('--blink-settings=imagesEnabled=false') # Disable images for faster loading
_driver = uc.Chrome(options=options)
_driver.set_page_load_timeout(30) # 30 second timeout
logger.info("Chrome driver initialized")
return _driver
except Exception as e:
logger.error(f"Failed to initialize Chrome driver: {str(e)}")
return None
def google_custom_search(query: str, num_results: int = 5, max_tokens: int = 4000) -> dict:
"""
Perform a Google search using the Google Custom Search API and scrape content
@@ -93,8 +141,8 @@ def scrape_multiple_links(urls: List[str], max_tokens: int = 4000) -> Tuple[str,
if not url:
continue
# Get content from this URL
content, token_count = scrape_web_content_with_count(url, return_token_count=True)
# Get content from this URL using enhanced scraping
content, token_count = scrape_web_content_with_count(url, max_tokens=max_tokens, return_token_count=True)
# Skip failed scrapes
if content.startswith("Failed"):
@@ -137,9 +185,61 @@ def scrape_multiple_links(urls: List[str], max_tokens: int = 4000) -> Tuple[str,
return combined_content, used_urls
def scrape_with_selenium(url: str, timeout=30) -> str:
"""
Scrape content from a webpage using Selenium with Undetected Chrome Driver
Args:
url (str): URL of the webpage to scrape
timeout (int): Timeout in seconds for page load
Returns:
str: The scraped text content or error message
"""
driver = get_chrome_driver()
if not driver:
return f"Failed to scrape {url}: Chrome driver not available"
try:
logger.info(f"Navigating to {url} with Selenium")
driver.get(url)
# Wait for page content to load
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
# Wait a bit for JavaScript to render content
time.sleep(2)
# Get page content
page_source = driver.page_source
soup = BeautifulSoup(page_source, 'html.parser')
# Remove unwanted elements
for element in soup(["script", "style", "header", "footer", "nav"]):
element.extract()
# Get text and clean it
text = soup.get_text(separator='\n')
# Clean up text: remove extra whitespace and empty lines
lines = (line.strip() for line in text.splitlines())
text = '\n'.join(line for line in lines if line)
return text
except TimeoutException:
return f"Failed to scrape {url}: Page load timeout"
except WebDriverException as e:
return f"Failed to scrape {url}: Browser error - {str(e)}"
except Exception as e:
return f"Failed to process content from {url}: {str(e)}"
def scrape_web_content_with_count(url: str, max_tokens: int = 4000, return_token_count: bool = False) -> Any:
"""
Scrape content from a webpage and return with token count if needed.
Uses Selenium first, falls back to requests if Selenium fails.
Args:
url (str): URL of the webpage to scrape
@@ -157,58 +257,65 @@ def scrape_web_content_with_count(url: str, max_tokens: int = 4000, return_token
message = f"Failed to scrape: The URL {url} cannot be scraped (unsupported format)."
return (message, 0) if return_token_count else message
try:
# Add user agent to mimic a browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Parse the content with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "header", "footer", "nav"]):
script.extract()
# Get the text content
text = soup.get_text(separator='\n')
# Clean up text: remove extra whitespace and empty lines
lines = (line.strip() for line in text.splitlines())
text = '\n'.join(line for line in lines if line)
# Count tokens
token_count = 0
# Try with Selenium first
content = scrape_with_selenium(url)
# If Selenium failed, fall back to requests
if content.startswith("Failed"):
logger.info(f"Selenium scraping failed for {url}, falling back to requests")
try:
# Use cl100k_base encoder which is used by most recent models
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
token_count = len(tokens)
# Add user agent to mimic a browser
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Truncate if token count exceeds max_tokens and we're not returning token count
if len(tokens) > max_tokens and not return_token_count:
truncated_tokens = tokens[:max_tokens]
text = encoding.decode(truncated_tokens)
text += "...\n[Content truncated due to token limit]"
except ImportError:
# Fallback to character-based estimation
token_count = len(text) // 4 # Rough estimate: 1 token ≈ 4 characters
if len(text) > max_tokens * 4 and not return_token_count:
text = text[:max_tokens * 4] + "...\n[Content truncated due to length]"
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
# Parse the content with BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style", "header", "footer", "nav"]):
script.extract()
# Get the text content
text = soup.get_text(separator='\n')
# Clean up text: remove extra whitespace and empty lines
lines = (line.strip() for line in text.splitlines())
content = '\n'.join(line for line in lines if line)
except requests.exceptions.RequestException as e:
message = f"Failed to scrape {url}: {str(e)}"
return (message, 0) if return_token_count else message
except Exception as e:
message = f"Failed to process content from {url}: {str(e)}"
return (message, 0) if return_token_count else message
# Count tokens
token_count = 0
try:
# Use cl100k_base encoder which is used by most recent models
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(content)
token_count = len(tokens)
if return_token_count:
return text, token_count
return text
except requests.exceptions.RequestException as e:
message = f"Failed to scrape {url}: {str(e)}"
return (message, 0) if return_token_count else message
except Exception as e:
message = f"Failed to process content from {url}: {str(e)}"
return (message, 0) if return_token_count else message
# Truncate if token count exceeds max_tokens
if len(tokens) > max_tokens:
truncated_tokens = tokens[:max_tokens]
content = encoding.decode(truncated_tokens)
content += "...\n[Content truncated due to token limit]"
token_count = max_tokens
except ImportError:
# Fallback to character-based estimation
token_count = len(content) // 4 # Rough estimate: 1 token ≈ 4 characters
if len(content) > max_tokens * 4:
content = content[:max_tokens * 4] + "...\n[Content truncated due to length]"
token_count = max_tokens
if return_token_count:
return content, token_count
return content
# Keep the original scrape_web_content function for backward compatibility
def scrape_web_content(url: str, max_tokens: int = 4000) -> str:
@@ -223,3 +330,14 @@ def scrape_web_content(url: str, max_tokens: int = 4000) -> str:
str: The scraped text content or error message
"""
return scrape_web_content_with_count(url, max_tokens)
def cleanup_resources():
"""Close the browser when the application is shutting down"""
global _driver
if _driver:
try:
_driver.quit()
_driver = None
logger.info("Chrome driver closed")
except Exception as e:
logger.error(f"Error closing Chrome driver: {str(e)}")