Former-commit-id: 267253283e
pull/47/head
parent
ae61d18fb3
commit
1b7be9787f
@ -0,0 +1,133 @@
|
||||
#--------------------------------------> AUTO GPT TOOLS
|
||||
|
||||
# General
|
||||
import os
|
||||
import pandas as pd
|
||||
|
||||
from langchain.agents.agent_toolkits.pandas.base import create_pandas_dataframe_agent
|
||||
from langchain.docstore.document import Document
|
||||
import asyncio
|
||||
|
||||
# Tools
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional
|
||||
from langchain.agents import tool
|
||||
|
||||
ROOT_DIR = "./data/"
|
||||
|
||||
from langchain.tools import BaseTool, DuckDuckGoSearchRun
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
|
||||
from pydantic import Field
|
||||
from langchain.chains.qa_with_sources.loading import BaseCombineDocumentsChain
|
||||
|
||||
|
||||
|
||||
@contextmanager
|
||||
def pushd(new_dir):
|
||||
"""Context manager for changing the current working directory."""
|
||||
prev_dir = os.getcwd()
|
||||
os.chdir(new_dir)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
os.chdir(prev_dir)
|
||||
|
||||
@tool
|
||||
def process_csv(
|
||||
llm, csv_file_path: str, instructions: str, output_path: Optional[str] = None
|
||||
) -> str:
|
||||
"""Process a CSV by with pandas in a limited REPL.\
|
||||
Only use this after writing data to disk as a csv file.\
|
||||
Any figures must be saved to disk to be viewed by the human.\
|
||||
Instructions should be written in natural language, not code. Assume the dataframe is already loaded."""
|
||||
with pushd(ROOT_DIR):
|
||||
try:
|
||||
df = pd.read_csv(csv_file_path)
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
agent = create_pandas_dataframe_agent(llm, df, max_iterations=30, verbose=False)
|
||||
if output_path is not None:
|
||||
instructions += f" Save output to disk at {output_path}"
|
||||
try:
|
||||
result = agent.run(instructions)
|
||||
return result
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
async def async_load_playwright(url: str) -> str:
|
||||
"""Load the specified URLs using Playwright and parse using BeautifulSoup."""
|
||||
from bs4 import BeautifulSoup
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
results = ""
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
try:
|
||||
page = await browser.new_page()
|
||||
await page.goto(url)
|
||||
|
||||
page_source = await page.content()
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
text = soup.get_text()
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
||||
results = "\n".join(chunk for chunk in chunks if chunk)
|
||||
except Exception as e:
|
||||
results = f"Error: {e}"
|
||||
await browser.close()
|
||||
return results
|
||||
|
||||
def run_async(coro):
|
||||
event_loop = asyncio.get_event_loop()
|
||||
return event_loop.run_until_complete(coro)
|
||||
|
||||
@tool
|
||||
def browse_web_page(url: str) -> str:
|
||||
"""Verbose way to scrape a whole webpage. Likely to cause issues parsing."""
|
||||
return run_async(async_load_playwright(url))
|
||||
|
||||
|
||||
def _get_text_splitter():
|
||||
return RecursiveCharacterTextSplitter(
|
||||
# Set a really small chunk size, just to show.
|
||||
chunk_size = 500,
|
||||
chunk_overlap = 20,
|
||||
length_function = len,
|
||||
)
|
||||
|
||||
|
||||
class WebpageQATool(BaseTool):
|
||||
name = "query_webpage"
|
||||
description = "Browse a webpage and retrieve the information relevant to the question."
|
||||
text_splitter: RecursiveCharacterTextSplitter = Field(default_factory=_get_text_splitter)
|
||||
qa_chain: BaseCombineDocumentsChain
|
||||
|
||||
def _run(self, url: str, question: str) -> str:
|
||||
"""Useful for browsing websites and scraping the text information."""
|
||||
result = browse_web_page.run(url)
|
||||
docs = [Document(page_content=result, metadata={"source": url})]
|
||||
web_docs = self.text_splitter.split_documents(docs)
|
||||
results = []
|
||||
# TODO: Handle this with a MapReduceChain
|
||||
for i in range(0, len(web_docs), 4):
|
||||
input_docs = web_docs[i:i+4]
|
||||
window_result = self.qa_chain({"input_documents": input_docs, "question": question}, return_only_outputs=True)
|
||||
results.append(f"Response from window {i} - {window_result}")
|
||||
results_docs = [Document(page_content="\n".join(results), metadata={"source": url})]
|
||||
return self.qa_chain({"input_documents": results_docs, "question": question}, return_only_outputs=True)
|
||||
|
||||
async def _arun(self, url: str, question: str) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
# query_website_tool = WebpageQATool(qa_chain=load_qa_with_sources_chain(llm))
|
||||
|
||||
# !pip install duckduckgo_search
|
||||
web_search = DuckDuckGoSearchRun()
|
||||
|
@ -0,0 +1,837 @@
|
||||
##########################################+> SYS
|
||||
import signal
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from ptrace.debugger import (
|
||||
NewProcessEvent,
|
||||
ProcessExecution,
|
||||
ProcessExit,
|
||||
ProcessSignal,
|
||||
PtraceDebugger,
|
||||
PtraceProcess,
|
||||
)
|
||||
from ptrace.func_call import FunctionCallOptions
|
||||
from ptrace.syscall import PtraceSyscall
|
||||
from ptrace.tools import signal_to_exitcode
|
||||
|
||||
from swarms.utils.logger import logger
|
||||
from swarms.agents.tools.base import SessionGetter, BaseTool, ToolScope, tool, BaseToolSet
|
||||
|
||||
|
||||
class SyscallTimeoutException(Exception):
|
||||
def __init__(self, pid: int, *args) -> None:
|
||||
super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args)
|
||||
|
||||
|
||||
class SyscallTracer:
|
||||
def __init__(self, pid: int):
|
||||
self.debugger: PtraceDebugger = PtraceDebugger()
|
||||
self.pid: int = pid
|
||||
self.process: PtraceProcess = None
|
||||
|
||||
def is_waiting(self, syscall: PtraceSyscall) -> bool:
|
||||
if syscall.name.startswith("wait"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def attach(self):
|
||||
self.process = self.debugger.addProcess(self.pid, False)
|
||||
|
||||
def detach(self):
|
||||
self.process.detach()
|
||||
self.debugger.quit()
|
||||
|
||||
def set_timer(self, timeout: int):
|
||||
def handler(signum, frame):
|
||||
raise SyscallTimeoutException(self.process.pid)
|
||||
|
||||
signal.signal(signal.SIGALRM, handler)
|
||||
signal.alarm(timeout)
|
||||
|
||||
def reset_timer(self):
|
||||
signal.alarm(0)
|
||||
|
||||
def wait_syscall_with_timeout(self, timeout: int):
|
||||
self.set_timer(timeout)
|
||||
self.process.waitSyscall()
|
||||
self.reset_timer()
|
||||
|
||||
def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
|
||||
self.process.syscall()
|
||||
exitcode = None
|
||||
reason = ""
|
||||
while True:
|
||||
if not self.debugger:
|
||||
break
|
||||
|
||||
try:
|
||||
self.wait_syscall_with_timeout(30)
|
||||
except ProcessExit as event:
|
||||
if event.exitcode is not None:
|
||||
exitcode = event.exitcode
|
||||
continue
|
||||
except ProcessSignal as event:
|
||||
event.process.syscall(event.signum)
|
||||
exitcode = signal_to_exitcode(event.signum)
|
||||
reason = event.reason
|
||||
continue
|
||||
except NewProcessEvent as event:
|
||||
continue
|
||||
except ProcessExecution as event:
|
||||
continue
|
||||
except Exception as e:
|
||||
reason = str(e)
|
||||
break
|
||||
|
||||
syscall = self.process.syscall_state.event(
|
||||
FunctionCallOptions(
|
||||
write_types=False,
|
||||
write_argname=False,
|
||||
string_max_length=300,
|
||||
replace_socketcall=True,
|
||||
write_address=False,
|
||||
max_array_count=20,
|
||||
)
|
||||
)
|
||||
|
||||
self.process.syscall()
|
||||
|
||||
if syscall is None:
|
||||
continue
|
||||
|
||||
if syscall.result:
|
||||
continue
|
||||
|
||||
self.reset_timer()
|
||||
|
||||
return exitcode, reason
|
||||
##########################################+> SYS CALL END
|
||||
|
||||
|
||||
|
||||
############### => st dout.py
|
||||
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from typing import Callable, Literal, Optional, Union, Tuple
|
||||
|
||||
PipeType = Union[Literal["stdout"], Literal["stderr"]]
|
||||
|
||||
|
||||
class StdoutTracer:
|
||||
def __init__(
|
||||
self,
|
||||
process: subprocess.Popen,
|
||||
timeout: int = 30,
|
||||
interval: int = 0.1,
|
||||
on_output: Callable[[PipeType, str], None] = lambda: None,
|
||||
):
|
||||
self.process: subprocess.Popen = process
|
||||
self.timeout: int = timeout
|
||||
self.interval: int = interval
|
||||
self.last_output: datetime = None
|
||||
self.on_output: Callable[[PipeType, str], None] = on_output
|
||||
|
||||
def nonblock(self):
|
||||
os.set_blocking(self.process.stdout.fileno(), False)
|
||||
os.set_blocking(self.process.stderr.fileno(), False)
|
||||
|
||||
def get_output(self, pipe: PipeType) -> str:
|
||||
output = None
|
||||
if pipe == "stdout":
|
||||
output = self.process.stdout.read()
|
||||
elif pipe == "stderr":
|
||||
output = self.process.stderr.read()
|
||||
|
||||
if output:
|
||||
decoded = output.decode()
|
||||
self.on_output(pipe, decoded)
|
||||
self.last_output = datetime.now()
|
||||
return decoded
|
||||
return ""
|
||||
|
||||
def last_output_passed(self, seconds: int) -> bool:
|
||||
return (datetime.now() - self.last_output).seconds > seconds
|
||||
|
||||
def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
|
||||
self.nonblock()
|
||||
self.last_output = datetime.now()
|
||||
output = ""
|
||||
exitcode = None
|
||||
while True:
|
||||
new_stdout = self.get_output("stdout")
|
||||
if new_stdout:
|
||||
output += new_stdout
|
||||
|
||||
new_stderr = self.get_output("stderr")
|
||||
if new_stderr:
|
||||
output += new_stderr
|
||||
|
||||
if self.process.poll() is not None:
|
||||
exitcode = self.process.poll()
|
||||
break
|
||||
|
||||
if self.last_output_passed(self.timeout):
|
||||
self.process.kill()
|
||||
break
|
||||
|
||||
time.sleep(self.interval)
|
||||
|
||||
return (exitcode, output)
|
||||
|
||||
################## => stdout end
|
||||
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Dict, List
|
||||
|
||||
from swarms.utils.main import ANSI, Color, Style # test
|
||||
|
||||
class Terminal(BaseToolSet):
|
||||
def __init__(self):
|
||||
self.sessions: Dict[str, List[SyscallTracer]] = {}
|
||||
|
||||
@tool(
|
||||
name="Terminal",
|
||||
description="Executes commands in a terminal."
|
||||
"If linux errno occurs, we have to solve the problem with the terminal. "
|
||||
"Input must be one valid command. "
|
||||
"Output will be any output from running that command.",
|
||||
scope=ToolScope.SESSION,
|
||||
)
|
||||
def execute(self, commands: str, get_session: SessionGetter) -> str:
|
||||
session, _ = get_session()
|
||||
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
commands,
|
||||
shell=True,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ")
|
||||
|
||||
output = ""
|
||||
tracer = StdoutTracer(
|
||||
process,
|
||||
on_output=lambda p, o: logger.info(
|
||||
ANSI(p).to(Style.dim()) + " " + o.strip("\n")
|
||||
),
|
||||
)
|
||||
exitcode, output = tracer.wait_until_stop_or_exit()
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed Terminal, Input Commands: {commands} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# import time
|
||||
|
||||
# o = Terminal().execute(
|
||||
# "sleep 1; echo 1; sleep 2; echo 2; sleep 3; echo 3; sleep 10;",
|
||||
# lambda: ("", None),
|
||||
# )
|
||||
# print(o)
|
||||
|
||||
# time.sleep(10) # see if timer has reset
|
||||
|
||||
|
||||
###################=> EDITOR/VERIFY
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def verify(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
filepath = args[0].filepath
|
||||
except AttributeError:
|
||||
raise Exception("This tool doesn't have filepath. Please check your code.")
|
||||
if not str(Path(filepath).resolve()).startswith(str(Path().resolve())):
|
||||
return "You can't access file outside of playground."
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
#=====================> EDITOR/END VERIFY
|
||||
|
||||
|
||||
###### EDITOR/WRITE.PY
|
||||
|
||||
"""
|
||||
write protocol:
|
||||
|
||||
<filepath>
|
||||
<content>
|
||||
"""
|
||||
|
||||
|
||||
|
||||
class WriteCommand:
|
||||
separator = "\n"
|
||||
|
||||
def __init__(self, filepath: str, content: int):
|
||||
self.filepath: str = filepath
|
||||
self.content: str = content
|
||||
self.mode: str = "w"
|
||||
|
||||
def with_mode(self, mode: str) -> "WriteCommand":
|
||||
self.mode = mode
|
||||
return self
|
||||
|
||||
@verify
|
||||
def execute(self) -> str:
|
||||
dir_path = os.path.dirname(self.filepath)
|
||||
if dir_path:
|
||||
os.makedirs(dir_path, exist_ok=True)
|
||||
with open(self.filepath, self.mode) as f:
|
||||
f.write(self.content)
|
||||
return self.content
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "WriteCommand":
|
||||
filepath = command.split(WriteCommand.separator)[0]
|
||||
return WriteCommand(filepath, command[len(filepath) + 1 :])
|
||||
|
||||
|
||||
class CodeWriter:
|
||||
@staticmethod
|
||||
def write(command: str) -> str:
|
||||
return WriteCommand.from_str(command).with_mode("w").execute()
|
||||
|
||||
@staticmethod
|
||||
def append(command: str) -> str:
|
||||
return WriteCommand.from_str(command).with_mode("a").execute()
|
||||
|
||||
#================> END
|
||||
|
||||
|
||||
|
||||
#============================> EDITOR/READ.PY
|
||||
"""
|
||||
read protocol:
|
||||
|
||||
<filepath>|<start line>-<end line>
|
||||
"""
|
||||
class Line:
|
||||
def __init__(self, content: str, line_number: int, depth: int):
|
||||
self.__content: str = content
|
||||
self.__line_number: int = line_number
|
||||
self.__depth: int = depth
|
||||
self.__children: List[Line] = []
|
||||
|
||||
def get_content(self) -> str:
|
||||
return self.__content
|
||||
|
||||
def get_depth(self) -> int:
|
||||
return self.__depth
|
||||
|
||||
def append_child(self, child: "Line") -> None:
|
||||
self.__children.append(child)
|
||||
|
||||
def find_by_lte_depth(self, depth: int) -> List["Line"]:
|
||||
if self.__depth > depth:
|
||||
return []
|
||||
|
||||
lines: List[Line] = [self]
|
||||
for child in self.__children:
|
||||
lines += child.find_by_lte_depth(depth)
|
||||
return lines
|
||||
|
||||
def find_by_content(self, content: str) -> List["Line"]:
|
||||
if content in self.__content:
|
||||
return [self]
|
||||
|
||||
lines: List[Line] = []
|
||||
for child in self.__children:
|
||||
lines += child.find_by_content(content)
|
||||
return lines
|
||||
|
||||
def find_last_lines(self) -> List["Line"]:
|
||||
if len(self.__children) == 0:
|
||||
return [self]
|
||||
else:
|
||||
return [self, *self.__children[-1].find_last_lines()]
|
||||
|
||||
def print(self, depth: int = 0) -> None:
|
||||
print(f"{' ' * depth}{self}", end="")
|
||||
for child in self.__children:
|
||||
child.print(depth + 1)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.__line_number}: {self.__content}"
|
||||
|
||||
|
||||
class CodeTree:
|
||||
def __init__(self):
|
||||
self.root: Line = Line("\n", -1, -1)
|
||||
|
||||
def append(self, content: str, line_number: int) -> None:
|
||||
last_lines: List[Line] = self.root.find_last_lines()
|
||||
new_leading_spaces: int = self.__get_leading_spaces(content)
|
||||
|
||||
previous_line: Line = self.root
|
||||
previous_leading_spaces: int = -1
|
||||
for line in last_lines:
|
||||
leading_spaces = self.__get_leading_spaces(line.get_content())
|
||||
if (
|
||||
previous_leading_spaces < new_leading_spaces
|
||||
and new_leading_spaces <= leading_spaces
|
||||
):
|
||||
break
|
||||
previous_line, previous_leading_spaces = line, leading_spaces
|
||||
|
||||
new_line_depth: int = previous_line.get_depth() + 1
|
||||
previous_line.append_child(Line(content, line_number, new_line_depth))
|
||||
|
||||
def find_from_root(self, depth: int) -> List[Line]:
|
||||
return self.root.find_by_lte_depth(depth)
|
||||
|
||||
def find_from_parent(self, depth: int, parent_content: str) -> List[Line]:
|
||||
lines: List[Line] = self.root.find_by_content(parent_content)
|
||||
if len(lines) == 0:
|
||||
return []
|
||||
parent = lines[0]
|
||||
return parent.find_by_lte_depth(depth + parent.get_depth())
|
||||
|
||||
def print(self):
|
||||
print("Code Tree:")
|
||||
print("=================================")
|
||||
self.root.print()
|
||||
print("=================================")
|
||||
|
||||
def __get_leading_spaces(self, content: str) -> int:
|
||||
return len(content) - len(content.lstrip())
|
||||
|
||||
|
||||
class ReadCommand:
|
||||
separator = "|"
|
||||
|
||||
def __init__(self, filepath: str, start: int, end: int):
|
||||
self.filepath: str = filepath
|
||||
self.start: int = start
|
||||
self.end: int = end
|
||||
|
||||
@verify
|
||||
def execute(self) -> str:
|
||||
with open(self.filepath, "r") as f:
|
||||
code = f.readlines()
|
||||
|
||||
if self.start == self.end:
|
||||
code = code[self.start - 1]
|
||||
else:
|
||||
code = "".join(code[self.start - 1 : self.end])
|
||||
return code
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "ReadCommand":
|
||||
filepath, line = command.split(ReadCommand.separator)
|
||||
start, end = line.split("-")
|
||||
return ReadCommand(filepath, int(start), int(end))
|
||||
|
||||
|
||||
class SummaryCommand:
|
||||
separator = "|"
|
||||
|
||||
def __init__(self, filepath: str, depth: int, parent_content: Optional[str] = None):
|
||||
self.filepath: str = filepath
|
||||
self.depth: int = depth
|
||||
self.parent_content: Optional[str] = parent_content
|
||||
|
||||
@verify
|
||||
def execute(self) -> str:
|
||||
with open(self.filepath, "r") as f:
|
||||
code = f.readlines()
|
||||
|
||||
code_tree = CodeTree()
|
||||
for i, line in enumerate(code):
|
||||
if line.strip() != "":
|
||||
code_tree.append(line, i + 1)
|
||||
|
||||
if self.parent_content is None:
|
||||
lines = code_tree.find_from_root(self.depth)
|
||||
else:
|
||||
lines = code_tree.find_from_parent(self.depth, self.parent_content)
|
||||
return "".join([str(line) for line in lines])
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "SummaryCommand":
|
||||
command_list: List[str] = command.split(SummaryCommand.separator)
|
||||
filepath: str = command_list[0]
|
||||
depth: int = int(command_list[1])
|
||||
parent_content: str | None = command_list[2] if len(command_list) == 3 else None
|
||||
return SummaryCommand(
|
||||
filepath=filepath, depth=depth, parent_content=parent_content
|
||||
)
|
||||
|
||||
|
||||
class CodeReader:
|
||||
@staticmethod
|
||||
def read(command: str) -> str:
|
||||
return ReadCommand.from_str(command).execute()
|
||||
|
||||
@staticmethod
|
||||
def summary(command: str) -> str:
|
||||
return SummaryCommand.from_str(command).execute()
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# summary = CodeReader.summary("read.py|1|class ReadCommand:")
|
||||
# print(summary)
|
||||
|
||||
#============================> EDITOR/READ.PY END
|
||||
|
||||
|
||||
|
||||
|
||||
#=================================> EDITOR/PATCH.PY
|
||||
"""
|
||||
patch protocol:
|
||||
|
||||
<filepath>|<line>,<col>|<line>,<col>|<content>
|
||||
---~~~+++===+++~~~---
|
||||
<filepath>|<line>,<col>|<line>,<col>|<content>
|
||||
---~~~+++===+++~~~---
|
||||
...
|
||||
---~~~+++===+++~~~---
|
||||
|
||||
let say original code is:
|
||||
```
|
||||
import requests
|
||||
|
||||
def crawl_news(keyword):
|
||||
url = f"https://www.google.com/search?q={keyword}+news"
|
||||
response = requests.get(url)
|
||||
|
||||
news = []
|
||||
for result in response:
|
||||
news.append(result.text)
|
||||
|
||||
return news
|
||||
```
|
||||
|
||||
and we want to change it to:
|
||||
```
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
def crawl_news(keyword):
|
||||
url = f"https://www.google.com/search?q={keyword}+news"
|
||||
html = requests.get(url).text
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd")
|
||||
|
||||
news_titles = []
|
||||
for result in news_results:
|
||||
news_titles.append(result.text)
|
||||
|
||||
return news_titles
|
||||
```
|
||||
|
||||
then the command will be:
|
||||
test.py|2,1|2,1|from bs4 import BeautifulSoup
|
||||
|
||||
---~~~+++===+++~~~---
|
||||
test.py|5,5|5,33|html = requests.get(url).text
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd")
|
||||
---~~~+++===+++~~~---
|
||||
test.py|7,5|9,13|news_titles = []
|
||||
for result in news_results:
|
||||
news_titles
|
||||
---~~~+++===+++~~~---
|
||||
test.py|11,16|11,16|_titles
|
||||
"""
|
||||
|
||||
import re
|
||||
|
||||
|
||||
|
||||
class Position:
|
||||
separator = ","
|
||||
|
||||
def __init__(self, line: int, col: int):
|
||||
self.line: int = line
|
||||
self.col: int = col
|
||||
|
||||
def __str__(self):
|
||||
return f"(Ln {self.line}, Col {self.col})"
|
||||
|
||||
@staticmethod
|
||||
def from_str(pos: str) -> "Position":
|
||||
line, col = pos.split(Position.separator)
|
||||
return Position(int(line) - 1, int(col) - 1)
|
||||
|
||||
|
||||
class PatchCommand:
|
||||
separator = "|"
|
||||
|
||||
def __init__(self, filepath: str, start: Position, end: Position, content: str):
|
||||
self.filepath: str = filepath
|
||||
self.start: Position = start
|
||||
self.end: Position = end
|
||||
self.content: str = content
|
||||
|
||||
def read_lines(self) -> list[str]:
|
||||
with open(self.filepath, "r") as f:
|
||||
lines = f.readlines()
|
||||
return lines
|
||||
|
||||
def write_lines(self, lines: list[str]) -> int:
|
||||
with open(self.filepath, "w") as f:
|
||||
f.writelines(lines)
|
||||
return sum([len(line) for line in lines])
|
||||
|
||||
@verify
|
||||
def execute(self) -> Tuple[int, int]:
|
||||
lines = self.read_lines()
|
||||
before = sum([len(line) for line in lines])
|
||||
|
||||
lines[self.start.line] = (
|
||||
lines[self.start.line][: self.start.col]
|
||||
+ self.content
|
||||
+ lines[self.end.line][self.end.col :]
|
||||
)
|
||||
lines = lines[: self.start.line + 1] + lines[self.end.line + 1 :]
|
||||
|
||||
after = self.write_lines(lines)
|
||||
|
||||
written = len(self.content)
|
||||
deleted = before - after + written
|
||||
|
||||
return written, deleted
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "PatchCommand":
|
||||
match = re.search(
|
||||
r"(.*)\|([0-9]*),([0-9]*)\|([0-9]*),([0-9]*)(\||\n)(.*)",
|
||||
command,
|
||||
re.DOTALL,
|
||||
)
|
||||
filepath = match.group(1)
|
||||
start_line = match.group(2)
|
||||
start_col = match.group(3)
|
||||
end_line = match.group(4)
|
||||
end_col = match.group(5)
|
||||
content = match.group(7)
|
||||
return PatchCommand(
|
||||
filepath,
|
||||
Position.from_str(f"{start_line},{start_col}"),
|
||||
Position.from_str(f"{end_line},{end_col}"),
|
||||
content,
|
||||
)
|
||||
|
||||
|
||||
class CodePatcher:
|
||||
separator = "\n---~~~+++===+++~~~---\n"
|
||||
|
||||
@staticmethod
|
||||
def sort_commands(commands: list[PatchCommand]) -> list[PatchCommand]:
|
||||
return sorted(commands, key=lambda c: c.start.line, reverse=True)
|
||||
|
||||
@staticmethod
|
||||
def patch(bulk_command: str) -> Tuple[int, int]:
|
||||
commands = [
|
||||
PatchCommand.from_str(command)
|
||||
for command in bulk_command.split(CodePatcher.separator)
|
||||
if command != ""
|
||||
]
|
||||
commands = CodePatcher.sort_commands(commands)
|
||||
|
||||
written, deleted = 0, 0
|
||||
for command in commands:
|
||||
if command:
|
||||
w, d = command.execute()
|
||||
written += w
|
||||
deleted += d
|
||||
return written, deleted
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
commands = """test.py|2,1|2,1|from bs4 import BeautifulSoup
|
||||
|
||||
---~~~+++===+++~~~---
|
||||
test.py|5,5|5,33|html = requests.get(url).text
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
news_results = soup.find_all("div", class_="BNeawe vvjwJb AP7Wnd")
|
||||
---~~~+++===+++~~~---
|
||||
test.py|7,5|9,13|news_titles = []
|
||||
for result in news_results:
|
||||
news_titles
|
||||
---~~~+++===+++~~~---
|
||||
test.py|11,16|11,16|_titles
|
||||
"""
|
||||
|
||||
example = """import requests
|
||||
|
||||
def crawl_news(keyword):
|
||||
url = f"https://www.google.com/search?q={keyword}+news"
|
||||
response = requests.get(url)
|
||||
|
||||
news = []
|
||||
for result in response:
|
||||
news.append(result.text)
|
||||
|
||||
return news
|
||||
"""
|
||||
testfile = "test.py"
|
||||
with open(testfile, "w") as f:
|
||||
f.write(example)
|
||||
|
||||
patcher = CodePatcher()
|
||||
written, deleted = patcher.patch(commands)
|
||||
print(f"written: {written}, deleted: {deleted}")
|
||||
|
||||
####################### => EDITOR/PATCH.PY
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
###################### EDITOR// INIT.PY
|
||||
|
||||
|
||||
class CodeEditor(BaseToolSet):
|
||||
@tool(
|
||||
name="CodeEditor.READ",
|
||||
description="Read and understand code. "
|
||||
"Input should be filename and line number group. ex. test.py|1-10 "
|
||||
"and the output will be code. ",
|
||||
)
|
||||
def read(self, inputs: str) -> str:
|
||||
try:
|
||||
output = CodeReader.read(inputs)
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.READ, Input Commands: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.SUMMARY",
|
||||
description="Summary code. "
|
||||
"Read the code structured into a tree. "
|
||||
"If you set specific line, it will show the code from the specific line. "
|
||||
"Input should be filename, depth, and specific line if you want. ex. test.py|2 or test.py|3|print('hello world') "
|
||||
"and the output will be list of (line number: code). ",
|
||||
)
|
||||
def summary(self, inputs: str) -> str:
|
||||
try:
|
||||
output = CodeReader.summary(inputs)
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.SUMMARY, Input Commands: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.APPEND",
|
||||
description="Append code to the existing file. "
|
||||
"If the code is completed, use the Terminal tool to execute it, if not, append the code through the this tool. "
|
||||
"Input should be filename and code to append. "
|
||||
"Input code must be the code that should be appended, NOT whole code. "
|
||||
"ex. test.py\nprint('hello world')\n "
|
||||
"and the output will be last 3 lines.",
|
||||
)
|
||||
def append(self, inputs: str) -> str:
|
||||
try:
|
||||
code = CodeWriter.append(inputs)
|
||||
output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:])
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.APPEND, Input: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.WRITE",
|
||||
description="Write code to create a new tool. "
|
||||
"If the code is completed, use the Terminal tool to execute it, if not, append the code through the CodeEditor.APPEND tool. "
|
||||
"Input should be formatted like: "
|
||||
"<filename>\n<code>\n\n"
|
||||
"Here is an example: "
|
||||
"test.py\nmessage = 'hello world'\nprint(message)\n"
|
||||
"\n"
|
||||
"The output will be last 3 lines you wrote.",
|
||||
)
|
||||
def write(self, inputs: str) -> str:
|
||||
try:
|
||||
code = CodeWriter.write(inputs.lstrip())
|
||||
output = "Last 3 line was:\n" + "\n".join(code.split("\n")[-3:])
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.WRITE, Input: {inputs} " f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
# @tool(
|
||||
# name="CodeEditor.PATCH",
|
||||
# description="Patch the code to correct the error if an error occurs or to improve it. "
|
||||
# "Input is a list of patches. The patch is separated by {seperator}. ".format(
|
||||
# seperator=CodePatcher.separator.replace("\n", "\\n")
|
||||
# )
|
||||
# + "Each patch has to be formatted like below.\n"
|
||||
# "<filepath>|<start_line>,<start_col>|<end_line>,<end_col>|<new_code>"
|
||||
# "Here is an example. If the original code is:\n"
|
||||
# "print('hello world')\n"
|
||||
# "and you want to change it to:\n"
|
||||
# "print('hi corca')\n"
|
||||
# "then the patch should be:\n"
|
||||
# "test.py|1,8|1,19|hi corca\n"
|
||||
# "Code between start and end will be replaced with new_code. "
|
||||
# "The output will be written/deleted bytes or error message. ",
|
||||
# )
|
||||
def patch(self, patches: str) -> str:
|
||||
try:
|
||||
w, d = CodePatcher.patch(patches)
|
||||
output = f"successfully wrote {w}, deleted {d}"
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.PATCH, Input Patch: {patches} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.DELETE",
|
||||
description="Delete code in file for a new start. "
|
||||
"Input should be filename."
|
||||
"ex. test.py "
|
||||
"Output will be success or error message.",
|
||||
)
|
||||
def delete(self, inputs: str, filepath: str) -> str:
|
||||
try:
|
||||
with open(filepath, "w") as f:
|
||||
f.write("")
|
||||
output = "success"
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.DELETE, Input filename: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
###################### EDITOR// INIT.PY END
|
||||
|
||||
|
@ -0,0 +1,22 @@
|
||||
from swarms.utils.logger import logger
|
||||
from swarms.agents.tools.base import BaseToolSet, tool, SessionGetter, ToolScope
|
||||
|
||||
class ExitConversation(BaseToolSet):
|
||||
@tool(
|
||||
name="Exit Conversation",
|
||||
description="A tool to exit the conversation. "
|
||||
"Use this when you want to exit the conversation. "
|
||||
"The input should be a message that the conversation is over.",
|
||||
scope=ToolScope.SESSION,
|
||||
)
|
||||
def exit(self, message: str, get_session: SessionGetter) -> str:
|
||||
"""Run the tool."""
|
||||
_, executor = get_session()
|
||||
del executor
|
||||
|
||||
logger.debug("\nProcessed ExitConversation.")
|
||||
|
||||
return message
|
||||
|
||||
|
||||
|
@ -0,0 +1,24 @@
|
||||
######################## ######################################################## file system
|
||||
|
||||
from langchain.agents.agent_toolkits import FileManagementToolkit
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
# We'll make a temporary directory to avoid clutter
|
||||
working_directory = TemporaryDirectory()
|
||||
|
||||
toolkit = FileManagementToolkit(
|
||||
root_dir=str(working_directory.name)
|
||||
) # If you don't provide a root_dir, operations will default to the current working directory
|
||||
toolkit.get_tools()
|
||||
|
||||
file_management_tools = FileManagementToolkit(
|
||||
root_dir=str(working_directory.name),
|
||||
selected_tools=["read_file", "write_file", "list_directory"],
|
||||
).get_tools()
|
||||
|
||||
read_tool, write_tool, list_tool = file_management_tools
|
||||
write_tool.run({"file_path": "example.txt", "text": "Hello World!"})
|
||||
|
||||
# List files in the working directory
|
||||
list_tool.run({})
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,262 @@
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from diffusers import (
|
||||
EulerAncestralDiscreteScheduler,
|
||||
StableDiffusionInpaintPipeline,
|
||||
StableDiffusionInstructPix2PixPipeline,
|
||||
StableDiffusionPipeline,
|
||||
)
|
||||
from PIL import Image
|
||||
from transformers import (
|
||||
BlipForConditionalGeneration,
|
||||
BlipForQuestionAnswering,
|
||||
BlipProcessor,
|
||||
CLIPSegForImageSegmentation,
|
||||
CLIPSegProcessor,
|
||||
)
|
||||
from swarms.agents.prompts.prompts import IMAGE_PROMPT
|
||||
from swarms.agents.tools.base import tool
|
||||
from swarms.agents.tools.main import BaseToolSet
|
||||
from swarms.utils.logger import logger
|
||||
from swarms.utils.main import BaseHandler, get_new_image_name
|
||||
|
||||
|
||||
class MaskFormer(BaseToolSet):
|
||||
def __init__(self, device):
|
||||
print("Initializing MaskFormer to %s" % device)
|
||||
self.device = device
|
||||
self.processor = CLIPSegProcessor.from_pretrained("CIDAS/clipseg-rd64-refined")
|
||||
self.model = CLIPSegForImageSegmentation.from_pretrained(
|
||||
"CIDAS/clipseg-rd64-refined"
|
||||
).to(device)
|
||||
|
||||
def inference(self, image_path, text):
|
||||
threshold = 0.5
|
||||
min_area = 0.02
|
||||
padding = 20
|
||||
original_image = Image.open(image_path)
|
||||
image = original_image.resize((512, 512))
|
||||
inputs = self.processor(
|
||||
text=text, images=image, padding="max_length", return_tensors="pt"
|
||||
).to(self.device)
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
mask = torch.sigmoid(outputs[0]).squeeze().cpu().numpy() > threshold
|
||||
area_ratio = len(np.argwhere(mask)) / (mask.shape[0] * mask.shape[1])
|
||||
if area_ratio < min_area:
|
||||
return None
|
||||
true_indices = np.argwhere(mask)
|
||||
mask_array = np.zeros_like(mask, dtype=bool)
|
||||
for idx in true_indices:
|
||||
padded_slice = tuple(
|
||||
slice(max(0, i - padding), i + padding + 1) for i in idx
|
||||
)
|
||||
mask_array[padded_slice] = True
|
||||
visual_mask = (mask_array * 255).astype(np.uint8)
|
||||
image_mask = Image.fromarray(visual_mask)
|
||||
return image_mask.resize(original_image.size)
|
||||
|
||||
|
||||
class ImageEditing(BaseToolSet):
|
||||
def __init__(self, device):
|
||||
print("Initializing ImageEditing to %s" % device)
|
||||
self.device = device
|
||||
self.mask_former = MaskFormer(device=self.device)
|
||||
self.revision = "fp16" if "cuda" in device else None
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.inpaint = StableDiffusionInpaintPipeline.from_pretrained(
|
||||
"runwayml/stable-diffusion-inpainting",
|
||||
revision=self.revision,
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(device)
|
||||
|
||||
@tool(
|
||||
name="Remove Something From The Photo",
|
||||
description="useful when you want to remove and object or something from the photo "
|
||||
"from its description or location. "
|
||||
"The input to this tool should be a comma separated string of two, "
|
||||
"representing the image_path and the object need to be removed. ",
|
||||
)
|
||||
def inference_remove(self, inputs):
|
||||
image_path, to_be_removed_txt = inputs.split(",")
|
||||
return self.inference_replace(f"{image_path},{to_be_removed_txt},background")
|
||||
|
||||
@tool(
|
||||
name="Replace Something From The Photo",
|
||||
description="useful when you want to replace an object from the object description or "
|
||||
"location with another object from its description. "
|
||||
"The input to this tool should be a comma separated string of three, "
|
||||
"representing the image_path, the object to be replaced, the object to be replaced with ",
|
||||
)
|
||||
def inference_replace(self, inputs):
|
||||
image_path, to_be_replaced_txt, replace_with_txt = inputs.split(",")
|
||||
original_image = Image.open(image_path)
|
||||
original_size = original_image.size
|
||||
mask_image = self.mask_former.inference(image_path, to_be_replaced_txt)
|
||||
updated_image = self.inpaint(
|
||||
prompt=replace_with_txt,
|
||||
image=original_image.resize((512, 512)),
|
||||
mask_image=mask_image.resize((512, 512)),
|
||||
).images[0]
|
||||
updated_image_path = get_new_image_name(
|
||||
image_path, func_name="replace-something"
|
||||
)
|
||||
updated_image = updated_image.resize(original_size)
|
||||
updated_image.save(updated_image_path)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed ImageEditing, Input Image: {image_path}, Replace {to_be_replaced_txt} to {replace_with_txt}, "
|
||||
f"Output Image: {updated_image_path}"
|
||||
)
|
||||
|
||||
return updated_image_path
|
||||
|
||||
|
||||
class InstructPix2Pix(BaseToolSet):
|
||||
def __init__(self, device):
|
||||
print("Initializing InstructPix2Pix to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(
|
||||
"timbrooks/instruct-pix2pix",
|
||||
safety_checker=None,
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(device)
|
||||
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
|
||||
self.pipe.scheduler.config
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Instruct Image Using Text",
|
||||
description="useful when you want to the style of the image to be like the text. "
|
||||
"like: make it look like a painting. or make it like a robot. "
|
||||
"The input to this tool should be a comma separated string of two, "
|
||||
"representing the image_path and the text. ",
|
||||
)
|
||||
def inference(self, inputs):
|
||||
"""Change style of image."""
|
||||
logger.debug("===> Starting InstructPix2Pix Inference")
|
||||
image_path, text = inputs.split(",")[0], ",".join(inputs.split(",")[1:])
|
||||
original_image = Image.open(image_path)
|
||||
image = self.pipe(
|
||||
text, image=original_image, num_inference_steps=40, image_guidance_scale=1.2
|
||||
).images[0]
|
||||
updated_image_path = get_new_image_name(image_path, func_name="pix2pix")
|
||||
image.save(updated_image_path)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed InstructPix2Pix, Input Image: {image_path}, Instruct Text: {text}, "
|
||||
f"Output Image: {updated_image_path}"
|
||||
)
|
||||
|
||||
return updated_image_path
|
||||
|
||||
|
||||
class Text2Image(BaseToolSet):
|
||||
def __init__(self, device):
|
||||
print("Initializing Text2Image to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.pipe = StableDiffusionPipeline.from_pretrained(
|
||||
"runwayml/stable-diffusion-v1-5", torch_dtype=self.torch_dtype
|
||||
)
|
||||
self.pipe.to(device)
|
||||
self.a_prompt = "best quality, extremely detailed"
|
||||
self.n_prompt = (
|
||||
"longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, "
|
||||
"fewer digits, cropped, worst quality, low quality"
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Generate Image From User Input Text",
|
||||
description="useful when you want to generate an image from a user input text and save it to a file. "
|
||||
"like: generate an image of an object or something, or generate an image that includes some objects. "
|
||||
"The input to this tool should be a string, representing the text used to generate image. ",
|
||||
)
|
||||
def inference(self, text):
|
||||
image_filename = os.path.join("image", str(uuid.uuid4())[0:8] + ".png")
|
||||
prompt = text + ", " + self.a_prompt
|
||||
image = self.pipe(prompt, negative_prompt=self.n_prompt).images[0]
|
||||
image.save(image_filename)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed Text2Image, Input Text: {text}, Output Image: {image_filename}"
|
||||
)
|
||||
|
||||
return image_filename
|
||||
|
||||
|
||||
class VisualQuestionAnswering(BaseToolSet):
|
||||
def __init__(self, device):
|
||||
print("Initializing VisualQuestionAnswering to %s" % device)
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.device = device
|
||||
self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
|
||||
self.model = BlipForQuestionAnswering.from_pretrained(
|
||||
"Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype
|
||||
).to(self.device)
|
||||
|
||||
@tool(
|
||||
name="Answer Question About The Image",
|
||||
description="useful when you need an answer for a question based on an image. "
|
||||
"like: what is the background color of the last image, how many cats in this figure, what is in this figure. "
|
||||
"The input to this tool should be a comma separated string of two, representing the image_path and the question",
|
||||
)
|
||||
def inference(self, inputs):
|
||||
image_path, question = inputs.split(",")
|
||||
raw_image = Image.open(image_path).convert("RGB")
|
||||
inputs = self.processor(raw_image, question, return_tensors="pt").to(
|
||||
self.device, self.torch_dtype
|
||||
)
|
||||
out = self.model.generate(**inputs)
|
||||
answer = self.processor.decode(out[0], skip_special_tokens=True)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input Question: {question}, "
|
||||
f"Output Answer: {answer}"
|
||||
)
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
|
||||
class ImageCaptioning(BaseHandler):
|
||||
def __init__(self, device):
|
||||
print("Initializing ImageCaptioning to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.processor = BlipProcessor.from_pretrained(
|
||||
"Salesforce/blip-image-captioning-base"
|
||||
)
|
||||
self.model = BlipForConditionalGeneration.from_pretrained(
|
||||
"Salesforce/blip-image-captioning-base", torch_dtype=self.torch_dtype
|
||||
).to(self.device)
|
||||
|
||||
def handle(self, filename: str):
|
||||
img = Image.open(filename)
|
||||
width, height = img.size
|
||||
ratio = min(512 / width, 512 / height)
|
||||
width_new, height_new = (round(width * ratio), round(height * ratio))
|
||||
img = img.resize((width_new, height_new))
|
||||
img = img.convert("RGB")
|
||||
img.save(filename, "PNG")
|
||||
print(f"Resize image form {width}x{height} to {width_new}x{height_new}")
|
||||
|
||||
inputs = self.processor(Image.open(filename), return_tensors="pt").to(
|
||||
self.device, self.torch_dtype
|
||||
)
|
||||
out = self.model.generate(**inputs)
|
||||
description = self.processor.decode(out[0], skip_special_tokens=True)
|
||||
print(
|
||||
f"\nProcessed ImageCaptioning, Input Image: {filename}, Output Text: {description}"
|
||||
)
|
||||
|
||||
return IMAGE_PROMPT.format(filename=filename, description=description)
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -0,0 +1,39 @@
|
||||
|
||||
import requests
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from swarms.agents.tools.base import BaseToolSet
|
||||
from swarms.utils.logger import logger
|
||||
from swarms.agents.tools.base import tool
|
||||
|
||||
|
||||
class RequestsGet(BaseToolSet):
|
||||
@tool(
|
||||
name="Requests Get",
|
||||
description="A portal to the internet. "
|
||||
"Use this when you need to get specific content from a website."
|
||||
"Input should be a url (i.e. https://www.google.com)."
|
||||
"The output will be the text response of the GET request.",
|
||||
)
|
||||
def get(self, url: str) -> str:
|
||||
"""Run the tool."""
|
||||
html = requests.get(url).text
|
||||
soup = BeautifulSoup(html)
|
||||
non_readable_tags = soup.find_all(
|
||||
["script", "style", "header", "footer", "form"]
|
||||
)
|
||||
|
||||
for non_readable_tag in non_readable_tags:
|
||||
non_readable_tag.extract()
|
||||
|
||||
content = soup.get_text("\n", strip=True)
|
||||
|
||||
if len(content) > 300:
|
||||
content = content[:300] + "..."
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed RequestsGet, Input Url: {url} " f"Output Contents: {content}"
|
||||
)
|
||||
|
||||
return content
|
||||
|
Loading…
Reference in new issue