|
|
|
@ -19,7 +19,6 @@ from dotenv import load_dotenv
|
|
|
|
|
from loguru import logger
|
|
|
|
|
|
|
|
|
|
from swarms import Agent
|
|
|
|
|
from swarms.tools.base_tool import BaseTool
|
|
|
|
|
from stagehand import Stagehand, StagehandConfig
|
|
|
|
|
|
|
|
|
|
load_dotenv()
|
|
|
|
@ -81,222 +80,283 @@ class BrowserState:
|
|
|
|
|
browser_state = BrowserState()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NavigateTool(BaseTool):
|
|
|
|
|
"""Tool for navigating to URLs in the browser."""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
super().__init__(
|
|
|
|
|
name="navigate_browser",
|
|
|
|
|
description="Navigate to a URL in the browser. Input should be a valid URL starting with http:// or https://",
|
|
|
|
|
verbose=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def run(self, url: str) -> str:
|
|
|
|
|
"""Navigate to the specified URL."""
|
|
|
|
|
return asyncio.run(self._async_run(url))
|
|
|
|
|
|
|
|
|
|
async def _async_run(self, url: str) -> str:
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
# Ensure URL has protocol
|
|
|
|
|
if not url.startswith(("http://", "https://")):
|
|
|
|
|
url = f"https://{url}"
|
|
|
|
|
|
|
|
|
|
await page.goto(url)
|
|
|
|
|
return f"Successfully navigated to {url}"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Navigation error: {str(e)}")
|
|
|
|
|
return f"Failed to navigate to {url}: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ActTool(BaseTool):
|
|
|
|
|
"""Tool for performing actions on web pages."""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
super().__init__(
|
|
|
|
|
name="browser_act",
|
|
|
|
|
description=(
|
|
|
|
|
"Perform an action on the current web page using natural language. "
|
|
|
|
|
"Examples: 'click the submit button', 'type hello@example.com in the email field', "
|
|
|
|
|
"'scroll down', 'press Enter'"
|
|
|
|
|
),
|
|
|
|
|
verbose=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def run(self, action: str) -> str:
|
|
|
|
|
"""Perform the specified action."""
|
|
|
|
|
return asyncio.run(self._async_run(action))
|
|
|
|
|
|
|
|
|
|
async def _async_run(self, action: str) -> str:
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
result = await page.act(action)
|
|
|
|
|
return f"Action performed: {action}. Result: {result}"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Action error: {str(e)}")
|
|
|
|
|
return f"Failed to perform action '{action}': {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ExtractTool(BaseTool):
|
|
|
|
|
"""Tool for extracting data from web pages."""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
super().__init__(
|
|
|
|
|
name="browser_extract",
|
|
|
|
|
description=(
|
|
|
|
|
"Extract information from the current web page using natural language. "
|
|
|
|
|
"Examples: 'extract all email addresses', 'get the main article text', "
|
|
|
|
|
"'find all product prices', 'extract the page title and meta description'"
|
|
|
|
|
),
|
|
|
|
|
verbose=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def run(self, query: str) -> str:
|
|
|
|
|
"""Extract information based on the query."""
|
|
|
|
|
return asyncio.run(self._async_run(query))
|
|
|
|
|
|
|
|
|
|
async def _async_run(self, query: str) -> str:
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
extracted = await page.extract(query)
|
|
|
|
|
|
|
|
|
|
# Convert to JSON string for agent consumption
|
|
|
|
|
if isinstance(extracted, (dict, list)):
|
|
|
|
|
return json.dumps(extracted, indent=2)
|
|
|
|
|
else:
|
|
|
|
|
return str(extracted)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Extraction error: {str(e)}")
|
|
|
|
|
return f"Failed to extract '{query}': {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ObserveTool(BaseTool):
|
|
|
|
|
"""Tool for observing elements on web pages."""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
super().__init__(
|
|
|
|
|
name="browser_observe",
|
|
|
|
|
description=(
|
|
|
|
|
"Observe and find elements on the current web page using natural language. "
|
|
|
|
|
"Returns information about elements including their selectors. "
|
|
|
|
|
"Examples: 'find the search box', 'locate the submit button', "
|
|
|
|
|
"'find all navigation links'"
|
|
|
|
|
),
|
|
|
|
|
verbose=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def run(self, query: str) -> str:
|
|
|
|
|
"""Observe elements based on the query."""
|
|
|
|
|
return asyncio.run(self._async_run(query))
|
|
|
|
|
|
|
|
|
|
async def _async_run(self, query: str) -> str:
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
observations = await page.observe(query)
|
|
|
|
|
|
|
|
|
|
# Format observations for readability
|
|
|
|
|
result = []
|
|
|
|
|
for obs in observations:
|
|
|
|
|
result.append(
|
|
|
|
|
{
|
|
|
|
|
"description": obs.description,
|
|
|
|
|
"selector": obs.selector,
|
|
|
|
|
"method": obs.method,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Observation error: {str(e)}")
|
|
|
|
|
return f"Failed to observe '{query}': {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ScreenshotTool(BaseTool):
|
|
|
|
|
"""Tool for taking screenshots of the current page."""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
super().__init__(
|
|
|
|
|
name="browser_screenshot",
|
|
|
|
|
description="Take a screenshot of the current web page. Optionally provide a filename.",
|
|
|
|
|
verbose=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def run(self, filename: str = "screenshot.png") -> str:
|
|
|
|
|
"""Take a screenshot."""
|
|
|
|
|
return asyncio.run(self._async_run(filename))
|
|
|
|
|
|
|
|
|
|
async def _async_run(self, filename: str) -> str:
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
# Ensure .png extension
|
|
|
|
|
if not filename.endswith(".png"):
|
|
|
|
|
filename += ".png"
|
|
|
|
|
|
|
|
|
|
# Get the underlying Playwright page
|
|
|
|
|
playwright_page = page.page
|
|
|
|
|
await playwright_page.screenshot(path=filename)
|
|
|
|
|
|
|
|
|
|
return f"Screenshot saved to {filename}"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Screenshot error: {str(e)}")
|
|
|
|
|
return f"Failed to take screenshot: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CloseBrowserTool(BaseTool):
|
|
|
|
|
"""Tool for closing the browser."""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
super().__init__(
|
|
|
|
|
name="close_browser",
|
|
|
|
|
description="Close the browser when done with automation tasks",
|
|
|
|
|
verbose=True,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def run(self, *args) -> str:
|
|
|
|
|
"""Close the browser."""
|
|
|
|
|
return asyncio.run(self._async_run())
|
|
|
|
|
def navigate_browser(url: str) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Navigate to a URL in the browser.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url (str): The URL to navigate to. Should be a valid URL starting with http:// or https://.
|
|
|
|
|
If no protocol is provided, https:// will be added automatically.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: Success message with the URL navigated to, or error message if navigation fails
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
RuntimeError: If browser initialization fails
|
|
|
|
|
Exception: If navigation to the URL fails
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result = navigate_browser("https://example.com")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Successfully navigated to https://example.com"
|
|
|
|
|
|
|
|
|
|
>>> result = navigate_browser("google.com")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Successfully navigated to https://google.com"
|
|
|
|
|
"""
|
|
|
|
|
return asyncio.run(_navigate_browser_async(url))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _navigate_browser_async(url: str) -> str:
|
|
|
|
|
"""Async implementation of navigate_browser."""
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
# Ensure URL has protocol
|
|
|
|
|
if not url.startswith(("http://", "https://")):
|
|
|
|
|
url = f"https://{url}"
|
|
|
|
|
|
|
|
|
|
await page.goto(url)
|
|
|
|
|
return f"Successfully navigated to {url}"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Navigation error: {str(e)}")
|
|
|
|
|
return f"Failed to navigate to {url}: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def browser_act(action: str) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Perform an action on the current web page using natural language.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
action (str): Natural language description of the action to perform.
|
|
|
|
|
Examples: 'click the submit button', 'type hello@example.com in the email field',
|
|
|
|
|
'scroll down', 'press Enter', 'select option from dropdown'
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: JSON formatted string with action result and status information
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
RuntimeError: If browser is not initialized or page is not available
|
|
|
|
|
Exception: If the action cannot be performed on the current page
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result = browser_act("click the submit button")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Action performed: click the submit button. Result: clicked successfully"
|
|
|
|
|
|
|
|
|
|
>>> result = browser_act("type hello@example.com in the email field")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Action performed: type hello@example.com in the email field. Result: text entered"
|
|
|
|
|
"""
|
|
|
|
|
return asyncio.run(_browser_act_async(action))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _browser_act_async(action: str) -> str:
|
|
|
|
|
"""Async implementation of browser_act."""
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
result = await page.act(action)
|
|
|
|
|
return f"Action performed: {action}. Result: {result}"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Action error: {str(e)}")
|
|
|
|
|
return f"Failed to perform action '{action}': {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def browser_extract(query: str) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Extract information from the current web page using natural language.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
query (str): Natural language description of what information to extract.
|
|
|
|
|
Examples: 'extract all email addresses', 'get the main article text',
|
|
|
|
|
'find all product prices', 'extract the page title and meta description'
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: JSON formatted string containing the extracted information, or error message if extraction fails
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
RuntimeError: If browser is not initialized or page is not available
|
|
|
|
|
Exception: If extraction fails due to page content or parsing issues
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result = browser_extract("extract all email addresses")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
'["contact@example.com", "support@example.com"]'
|
|
|
|
|
|
|
|
|
|
>>> result = browser_extract("get the main article text")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
'{"title": "Article Title", "content": "Article content..."}'
|
|
|
|
|
"""
|
|
|
|
|
return asyncio.run(_browser_extract_async(query))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _browser_extract_async(query: str) -> str:
|
|
|
|
|
"""Async implementation of browser_extract."""
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
extracted = await page.extract(query)
|
|
|
|
|
|
|
|
|
|
# Convert to JSON string for agent consumption
|
|
|
|
|
if isinstance(extracted, (dict, list)):
|
|
|
|
|
return json.dumps(extracted, indent=2)
|
|
|
|
|
else:
|
|
|
|
|
return str(extracted)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Extraction error: {str(e)}")
|
|
|
|
|
return f"Failed to extract '{query}': {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def browser_observe(query: str) -> str:
|
|
|
|
|
"""
|
|
|
|
|
Observe and find elements on the current web page using natural language.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
query (str): Natural language description of elements to find.
|
|
|
|
|
Examples: 'find the search box', 'locate the submit button',
|
|
|
|
|
'find all navigation links', 'observe form elements'
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: JSON formatted string containing information about found elements including
|
|
|
|
|
their descriptions, selectors, and interaction methods
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
RuntimeError: If browser is not initialized or page is not available
|
|
|
|
|
Exception: If observation fails due to page structure or element detection issues
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result = browser_observe("find the search box")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
'[{"description": "Search input field", "selector": "#search", "method": "input"}]'
|
|
|
|
|
|
|
|
|
|
>>> result = browser_observe("locate the submit button")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
'[{"description": "Submit button", "selector": "button[type=submit]", "method": "click"}]'
|
|
|
|
|
"""
|
|
|
|
|
return asyncio.run(_browser_observe_async(query))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _browser_observe_async(query: str) -> str:
|
|
|
|
|
"""Async implementation of browser_observe."""
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
observations = await page.observe(query)
|
|
|
|
|
|
|
|
|
|
# Format observations for readability
|
|
|
|
|
result = []
|
|
|
|
|
for obs in observations:
|
|
|
|
|
result.append(
|
|
|
|
|
{
|
|
|
|
|
"description": obs.description,
|
|
|
|
|
"selector": obs.selector,
|
|
|
|
|
"method": obs.method,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
async def _async_run(self) -> str:
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.close()
|
|
|
|
|
return "Browser closed successfully"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Close browser error: {str(e)}")
|
|
|
|
|
return f"Failed to close browser: {str(e)}"
|
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Observation error: {str(e)}")
|
|
|
|
|
return f"Failed to observe '{query}': {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def browser_screenshot(filename: str = "screenshot.png") -> str:
|
|
|
|
|
"""
|
|
|
|
|
Take a screenshot of the current web page.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
filename (str, optional): The filename to save the screenshot to.
|
|
|
|
|
Defaults to "screenshot.png".
|
|
|
|
|
.png extension will be added automatically if not provided.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: Success message with the filename where screenshot was saved,
|
|
|
|
|
or error message if screenshot fails
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
RuntimeError: If browser is not initialized or page is not available
|
|
|
|
|
Exception: If screenshot capture or file saving fails
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result = browser_screenshot()
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Screenshot saved to screenshot.png"
|
|
|
|
|
|
|
|
|
|
>>> result = browser_screenshot("page_capture.png")
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Screenshot saved to page_capture.png"
|
|
|
|
|
"""
|
|
|
|
|
return asyncio.run(_browser_screenshot_async(filename))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _browser_screenshot_async(filename: str) -> str:
|
|
|
|
|
"""Async implementation of browser_screenshot."""
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.init_browser()
|
|
|
|
|
page = await browser_state.get_page()
|
|
|
|
|
|
|
|
|
|
# Ensure .png extension
|
|
|
|
|
if not filename.endswith(".png"):
|
|
|
|
|
filename += ".png"
|
|
|
|
|
|
|
|
|
|
# Get the underlying Playwright page
|
|
|
|
|
playwright_page = page.page
|
|
|
|
|
await playwright_page.screenshot(path=filename)
|
|
|
|
|
|
|
|
|
|
return f"Screenshot saved to {filename}"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Screenshot error: {str(e)}")
|
|
|
|
|
return f"Failed to take screenshot: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def close_browser() -> str:
|
|
|
|
|
"""
|
|
|
|
|
Close the browser when done with automation tasks.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: Success message if browser is closed successfully,
|
|
|
|
|
or error message if closing fails
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
Exception: If browser closing process encounters errors
|
|
|
|
|
|
|
|
|
|
Example:
|
|
|
|
|
>>> result = close_browser()
|
|
|
|
|
>>> print(result)
|
|
|
|
|
"Browser closed successfully"
|
|
|
|
|
"""
|
|
|
|
|
return asyncio.run(_close_browser_async())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _close_browser_async() -> str:
|
|
|
|
|
"""Async implementation of close_browser."""
|
|
|
|
|
try:
|
|
|
|
|
await browser_state.close()
|
|
|
|
|
return "Browser closed successfully"
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Close browser error: {str(e)}")
|
|
|
|
|
return f"Failed to close browser: {str(e)}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Example usage
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
# Create browser automation tools
|
|
|
|
|
navigate_tool = NavigateTool()
|
|
|
|
|
act_tool = ActTool()
|
|
|
|
|
extract_tool = ExtractTool()
|
|
|
|
|
observe_tool = ObserveTool()
|
|
|
|
|
screenshot_tool = ScreenshotTool()
|
|
|
|
|
close_browser_tool = CloseBrowserTool()
|
|
|
|
|
|
|
|
|
|
# Create a Swarms agent with browser tools
|
|
|
|
|
browser_agent = Agent(
|
|
|
|
|
agent_name="BrowserAutomationAgent",
|
|
|
|
|
model_name="gpt-4o-mini",
|
|
|
|
|
max_loops=1,
|
|
|
|
|
tools=[
|
|
|
|
|
navigate_tool,
|
|
|
|
|
act_tool,
|
|
|
|
|
extract_tool,
|
|
|
|
|
observe_tool,
|
|
|
|
|
screenshot_tool,
|
|
|
|
|
close_browser_tool,
|
|
|
|
|
navigate_browser,
|
|
|
|
|
browser_act,
|
|
|
|
|
browser_extract,
|
|
|
|
|
browser_observe,
|
|
|
|
|
browser_screenshot,
|
|
|
|
|
close_browser,
|
|
|
|
|
],
|
|
|
|
|
system_prompt="""You are a web browser automation specialist. You can:
|
|
|
|
|
1. Navigate to websites using the navigate_browser tool
|
|
|
|
|