parent
2643e98925
commit
cb77ac4e2b
@ -1,114 +0,0 @@
|
||||
Here are 20 tools the individual worker swarm nodes can use:
|
||||
|
||||
1. Write File Tool: Create a new file and write content to it.
|
||||
2. Read File Tool: Open and read the content of an existing file.
|
||||
3. Copy File Tool: Duplicate a file.
|
||||
4. Delete File Tool: Remove a file.
|
||||
5. Rename File Tool: Rename a file.
|
||||
6. Web Search Tool: Use a web search engine (like Google or DuckDuckGo) to find information.
|
||||
7. API Call Tool: Make requests to APIs.
|
||||
8. Process CSV Tool: Load a CSV file and perform operations on it using pandas.
|
||||
9. Create Directory Tool: Create a new directory.
|
||||
10. List Directory Tool: List all the files in a directory.
|
||||
11. Install Package Tool: Install Python packages using pip.
|
||||
12. Code Compilation Tool: Compile and run code in different languages.
|
||||
13. System Command Tool: Execute system commands.
|
||||
14. Image Processing Tool: Perform operations on images (resizing, cropping, etc.).
|
||||
15. PDF Processing Tool: Read, write, and manipulate PDF files.
|
||||
16. Text Processing Tool: Perform text processing operations like tokenization, stemming, etc.
|
||||
17. Email Sending Tool: Send emails.
|
||||
18. Database Query Tool: Execute SQL queries on a database.
|
||||
19. Data Scraping Tool: Scrape data from web pages.
|
||||
20. Version Control Tool: Perform Git operations.
|
||||
|
||||
The architecture for these tools involves creating a base `Tool` class that can be extended for each specific tool. The base `Tool` class would define common properties and methods that all tools would use.
|
||||
|
||||
The pseudocode for each tool would follow a similar structure:
|
||||
|
||||
```
|
||||
Class ToolNameTool extends Tool:
|
||||
Define properties specific to the tool
|
||||
|
||||
Method run:
|
||||
Perform the specific action of the tool
|
||||
Return the result
|
||||
```
|
||||
|
||||
Here's an example of how you might define the WriteFileTool:
|
||||
|
||||
```python
|
||||
import os
|
||||
from langchain.tools import BaseTool
|
||||
|
||||
class WriteFileTool(BaseTool):
|
||||
name = "write_file"
|
||||
description = "Create a new file and write content to it."
|
||||
|
||||
def __init__(self, root_dir: str):
|
||||
self.root_dir = root_dir
|
||||
|
||||
def _run(self, file_name: str, content: str) -> str:
|
||||
"""Creates a new file and writes the content."""
|
||||
try:
|
||||
with open(os.path.join(self.root_dir, file_name), 'w') as f:
|
||||
f.write(content)
|
||||
return f"Successfully wrote to {file_name}"
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
```
|
||||
|
||||
This tool takes the name of the file and the content to be written as parameters, writes the content to the file in the specified directory, and returns a success message. In case of any error, it returns the error message. You would follow a similar process to create the other tools.
|
||||
|
||||
|
||||
|
||||
|
||||
For completing browser-based tasks, you can use web automation tools. These tools allow you to interact with browsers as if a human user was interacting with it. Here are 20 tasks that individual worker swarm nodes can handle:
|
||||
|
||||
1. Open Browser Tool: Open a web browser.
|
||||
2. Close Browser Tool: Close the web browser.
|
||||
3. Navigate To URL Tool: Navigate to a specific URL.
|
||||
4. Fill Form Tool: Fill in a web form with provided data.
|
||||
5. Submit Form Tool: Submit a filled form.
|
||||
6. Click Button Tool: Click a button on a webpage.
|
||||
7. Hover Over Element Tool: Hover over a specific element on a webpage.
|
||||
8. Scroll Page Tool: Scroll up or down a webpage.
|
||||
9. Navigate Back Tool: Navigate back to the previous page.
|
||||
10. Navigate Forward Tool: Navigate forward to the next page.
|
||||
11. Refresh Page Tool: Refresh the current page.
|
||||
12. Switch Tab Tool: Switch between tabs in a browser.
|
||||
13. Capture Screenshot Tool: Capture a screenshot of the current page.
|
||||
14. Download File Tool: Download a file from a webpage.
|
||||
15. Send Email Tool: Send an email using a web-based email service.
|
||||
16. Login Tool: Log in to a website using provided credentials.
|
||||
17. Search Website Tool: Perform a search on a website.
|
||||
18. Extract Text Tool: Extract text from a webpage.
|
||||
19. Extract Image Tool: Extract image(s) from a webpage.
|
||||
20. Browser Session Management Tool: Handle creation, usage, and deletion of browser sessions.
|
||||
|
||||
You would typically use a library like Selenium, Puppeteer, or Playwright to automate these tasks. Here's an example of how you might define the FillFormTool using Selenium in Python:
|
||||
|
||||
```python
|
||||
from selenium import webdriver
|
||||
from langchain.tools import BaseTool
|
||||
|
||||
class FillFormTool(BaseTool):
|
||||
name = "fill_form"
|
||||
description = "Fill in a web form with provided data."
|
||||
|
||||
def _run(self, field_dict: dict) -> str:
|
||||
"""Fills a web form with the data in field_dict."""
|
||||
try:
|
||||
driver = webdriver.Firefox()
|
||||
|
||||
for field_name, field_value in field_dict.items():
|
||||
element = driver.find_element_by_name(field_name)
|
||||
element.send_keys(field_value)
|
||||
|
||||
return "Form filled successfully."
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
```
|
||||
|
||||
In this tool, `field_dict` is a dictionary where the keys are the names of the form fields and the values are the data to be filled in each field. The tool finds each field in the form and fills it with the provided data.
|
||||
|
||||
Please note that in a real scenario, you would need to handle the browser driver session more carefully (like closing the driver when it's not needed anymore), and also handle waiting for the page to load and exceptions more thoroughly. This is a simplified example for illustrative purposes.
|
@ -1,200 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional
|
||||
|
||||
import pandas as pd
|
||||
import torch
|
||||
from langchain.agents import tool
|
||||
from langchain.agents.agent_toolkits.pandas.base import (
|
||||
create_pandas_dataframe_agent,
|
||||
)
|
||||
from langchain.chains.qa_with_sources.loading import (
|
||||
BaseCombineDocumentsChain,
|
||||
)
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
from langchain.tools import BaseTool
|
||||
from PIL import Image
|
||||
from pydantic import Field
|
||||
from transformers import (
|
||||
BlipForQuestionAnswering,
|
||||
BlipProcessor,
|
||||
)
|
||||
|
||||
from swarms.utils.logger import logger
|
||||
|
||||
ROOT_DIR = "./data/"
|
||||
|
||||
|
||||
@contextmanager
|
||||
def pushd(new_dir):
|
||||
"""Context manager for changing the current working directory."""
|
||||
prev_dir = os.getcwd()
|
||||
os.chdir(new_dir)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
os.chdir(prev_dir)
|
||||
|
||||
|
||||
@tool
|
||||
def process_csv(
|
||||
llm,
|
||||
csv_file_path: str,
|
||||
instructions: str,
|
||||
output_path: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Process a CSV by with pandas in a limited REPL.\
|
||||
Only use this after writing data to disk as a csv file.\
|
||||
Any figures must be saved to disk to be viewed by the human.\
|
||||
Instructions should be written in natural language, not code. Assume the dataframe is already loaded."""
|
||||
with pushd(ROOT_DIR):
|
||||
try:
|
||||
df = pd.read_csv(csv_file_path)
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
agent = create_pandas_dataframe_agent(
|
||||
llm, df, max_iterations=30, verbose=False
|
||||
)
|
||||
if output_path is not None:
|
||||
instructions += f" Save output to disk at {output_path}"
|
||||
try:
|
||||
result = agent.run(instructions)
|
||||
return result
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
async def async_load_playwright(url: str) -> str:
|
||||
"""Load the specified URLs using Playwright and parse using BeautifulSoup."""
|
||||
from bs4 import BeautifulSoup
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
results = ""
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
try:
|
||||
page = await browser.new_page()
|
||||
await page.goto(url)
|
||||
|
||||
page_source = await page.content()
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
text = soup.get_text()
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (
|
||||
phrase.strip() for line in lines for phrase in line.split(" ")
|
||||
)
|
||||
results = "\n".join(chunk for chunk in chunks if chunk)
|
||||
except Exception as e:
|
||||
results = f"Error: {e}"
|
||||
await browser.close()
|
||||
return results
|
||||
|
||||
|
||||
def run_async(coro):
|
||||
event_loop = asyncio.get_event_loop()
|
||||
return event_loop.run_until_complete(coro)
|
||||
|
||||
|
||||
@tool
|
||||
def browse_web_page(url: str) -> str:
|
||||
"""Verbose way to scrape a whole webpage. Likely to cause issues parsing."""
|
||||
return run_async(async_load_playwright(url))
|
||||
|
||||
|
||||
def _get_text_splitter():
|
||||
return RecursiveCharacterTextSplitter(
|
||||
# Set a really small chunk size, just to show.
|
||||
chunk_size=500,
|
||||
chunk_overlap=20,
|
||||
length_function=len,
|
||||
)
|
||||
|
||||
|
||||
class WebpageQATool(BaseTool):
|
||||
name = "query_webpage"
|
||||
description = (
|
||||
"Browse a webpage and retrieve the information relevant to the"
|
||||
" question."
|
||||
)
|
||||
text_splitter: RecursiveCharacterTextSplitter = Field(
|
||||
default_factory=_get_text_splitter
|
||||
)
|
||||
qa_chain: BaseCombineDocumentsChain
|
||||
|
||||
def _run(self, url: str, question: str) -> str:
|
||||
"""Useful for browsing websites and scraping the text information."""
|
||||
result = browse_web_page.run(url)
|
||||
docs = [Document(page_content=result, metadata={"source": url})]
|
||||
web_docs = self.text_splitter.split_documents(docs)
|
||||
results = []
|
||||
# TODO: Handle this with a MapReduceChain
|
||||
for i in range(0, len(web_docs), 4):
|
||||
input_docs = web_docs[i : i + 4]
|
||||
window_result = self.qa_chain(
|
||||
{"input_documents": input_docs, "question": question},
|
||||
return_only_outputs=True,
|
||||
)
|
||||
results.append(f"Response from window {i} - {window_result}")
|
||||
results_docs = [
|
||||
Document(page_content="\n".join(results), metadata={"source": url})
|
||||
]
|
||||
return self.qa_chain(
|
||||
{"input_documents": results_docs, "question": question},
|
||||
return_only_outputs=True,
|
||||
)
|
||||
|
||||
async def _arun(self, url: str, question: str) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class EdgeGPTTool:
|
||||
# Initialize the custom tool
|
||||
def __init__(
|
||||
self,
|
||||
model,
|
||||
name="EdgeGPTTool",
|
||||
description="Tool that uses EdgeGPTModel to generate responses",
|
||||
):
|
||||
super().__init__(name=name, description=description)
|
||||
self.model = model
|
||||
|
||||
def _run(self, prompt):
|
||||
return self.model.__call__(prompt)
|
||||
|
||||
|
||||
@tool
|
||||
def VQAinference(self, inputs):
|
||||
"""
|
||||
Answer Question About The Image, VQA Multi-Modal Worker agent
|
||||
description="useful when you need an answer for a question based on an image. "
|
||||
"like: what is the background color of the last image, how many cats in this figure, what is in this figure. "
|
||||
"The input to this tool should be a comma separated string of two, representing the image_path and the question",
|
||||
|
||||
"""
|
||||
device = "cuda:0"
|
||||
torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
|
||||
model = BlipForQuestionAnswering.from_pretrained(
|
||||
"Salesforce/blip-vqa-base", torch_dtype=torch_dtype
|
||||
).to(device)
|
||||
|
||||
image_path, question = inputs.split(",")
|
||||
raw_image = Image.open(image_path).convert("RGB")
|
||||
inputs = processor(raw_image, question, return_tensors="pt").to(
|
||||
device, torch_dtype
|
||||
)
|
||||
out = model.generate(**inputs)
|
||||
answer = processor.decode(out[0], skip_special_tokens=True)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input"
|
||||
f" Question: {question}, Output Answer: {answer}"
|
||||
)
|
||||
|
||||
return answer
|
@ -1,284 +0,0 @@
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from diffusers import (
|
||||
EulerAncestralDiscreteScheduler,
|
||||
StableDiffusionInpaintPipeline,
|
||||
StableDiffusionInstructPix2PixPipeline,
|
||||
StableDiffusionPipeline,
|
||||
)
|
||||
from PIL import Image
|
||||
from transformers import (
|
||||
BlipForConditionalGeneration,
|
||||
BlipForQuestionAnswering,
|
||||
BlipProcessor,
|
||||
CLIPSegForImageSegmentation,
|
||||
CLIPSegProcessor,
|
||||
)
|
||||
|
||||
from swarms.prompts.prebuild.multi_modal_prompts import IMAGE_PROMPT
|
||||
from swarms.tools.tool import tool
|
||||
from swarms.utils.logger import logger
|
||||
from swarms.utils.main import BaseHandler, get_new_image_name
|
||||
|
||||
|
||||
class MaskFormer:
|
||||
def __init__(self, device):
|
||||
print("Initializing MaskFormer to %s" % device)
|
||||
self.device = device
|
||||
self.processor = CLIPSegProcessor.from_pretrained(
|
||||
"CIDAS/clipseg-rd64-refined"
|
||||
)
|
||||
self.model = CLIPSegForImageSegmentation.from_pretrained(
|
||||
"CIDAS/clipseg-rd64-refined"
|
||||
).to(device)
|
||||
|
||||
def inference(self, image_path, text):
|
||||
threshold = 0.5
|
||||
min_area = 0.02
|
||||
padding = 20
|
||||
original_image = Image.open(image_path)
|
||||
image = original_image.resize((512, 512))
|
||||
inputs = self.processor(
|
||||
text=text, images=image, padding="max_length", return_tensors="pt"
|
||||
).to(self.device)
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
mask = torch.sigmoid(outputs[0]).squeeze().cpu().numpy() > threshold
|
||||
area_ratio = len(np.argwhere(mask)) / (mask.shape[0] * mask.shape[1])
|
||||
if area_ratio < min_area:
|
||||
return None
|
||||
true_indices = np.argwhere(mask)
|
||||
mask_array = np.zeros_like(mask, dtype=bool)
|
||||
for idx in true_indices:
|
||||
padded_slice = tuple(
|
||||
slice(max(0, i - padding), i + padding + 1) for i in idx
|
||||
)
|
||||
mask_array[padded_slice] = True
|
||||
visual_mask = (mask_array * 255).astype(np.uint8)
|
||||
image_mask = Image.fromarray(visual_mask)
|
||||
return image_mask.resize(original_image.size)
|
||||
|
||||
|
||||
class ImageEditing:
|
||||
def __init__(self, device):
|
||||
print("Initializing ImageEditing to %s" % device)
|
||||
self.device = device
|
||||
self.mask_former = MaskFormer(device=self.device)
|
||||
self.revision = "fp16" if "cuda" in device else None
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.inpaint = StableDiffusionInpaintPipeline.from_pretrained(
|
||||
"runwayml/stable-diffusion-inpainting",
|
||||
revision=self.revision,
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(device)
|
||||
|
||||
@tool(
|
||||
name="Remove Something From The Photo",
|
||||
description=(
|
||||
"useful when you want to remove and object or something from the"
|
||||
" photo from its description or location. The input to this tool"
|
||||
" should be a comma separated string of two, representing the"
|
||||
" image_path and the object need to be removed. "
|
||||
),
|
||||
)
|
||||
def inference_remove(self, inputs):
|
||||
image_path, to_be_removed_txt = inputs.split(",")
|
||||
return self.inference_replace(
|
||||
f"{image_path},{to_be_removed_txt},background"
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Replace Something From The Photo",
|
||||
description=(
|
||||
"useful when you want to replace an object from the object"
|
||||
" description or location with another object from its description."
|
||||
" The input to this tool should be a comma separated string of"
|
||||
" three, representing the image_path, the object to be replaced,"
|
||||
" the object to be replaced with "
|
||||
),
|
||||
)
|
||||
def inference_replace(self, inputs):
|
||||
image_path, to_be_replaced_txt, replace_with_txt = inputs.split(",")
|
||||
original_image = Image.open(image_path)
|
||||
original_size = original_image.size
|
||||
mask_image = self.mask_former.inference(image_path, to_be_replaced_txt)
|
||||
updated_image = self.inpaint(
|
||||
prompt=replace_with_txt,
|
||||
image=original_image.resize((512, 512)),
|
||||
mask_image=mask_image.resize((512, 512)),
|
||||
).images[0]
|
||||
updated_image_path = get_new_image_name(
|
||||
image_path, func_name="replace-something"
|
||||
)
|
||||
updated_image = updated_image.resize(original_size)
|
||||
updated_image.save(updated_image_path)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed ImageEditing, Input Image: {image_path}, Replace"
|
||||
f" {to_be_replaced_txt} to {replace_with_txt}, Output Image:"
|
||||
f" {updated_image_path}"
|
||||
)
|
||||
|
||||
return updated_image_path
|
||||
|
||||
|
||||
class InstructPix2Pix:
|
||||
def __init__(self, device):
|
||||
print("Initializing InstructPix2Pix to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(
|
||||
"timbrooks/instruct-pix2pix",
|
||||
safety_checker=None,
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(device)
|
||||
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
|
||||
self.pipe.scheduler.config
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Instruct Image Using Text",
|
||||
description=(
|
||||
"useful when you want to the style of the image to be like the"
|
||||
" text. like: make it look like a painting. or make it like a"
|
||||
" robot. The input to this tool should be a comma separated string"
|
||||
" of two, representing the image_path and the text. "
|
||||
),
|
||||
)
|
||||
def inference(self, inputs):
|
||||
"""Change style of image."""
|
||||
logger.debug("===> Starting InstructPix2Pix Inference")
|
||||
image_path, text = inputs.split(",")[0], ",".join(inputs.split(",")[1:])
|
||||
original_image = Image.open(image_path)
|
||||
image = self.pipe(
|
||||
text,
|
||||
image=original_image,
|
||||
num_inference_steps=40,
|
||||
image_guidance_scale=1.2,
|
||||
).images[0]
|
||||
updated_image_path = get_new_image_name(image_path, func_name="pix2pix")
|
||||
image.save(updated_image_path)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed InstructPix2Pix, Input Image: {image_path}, Instruct"
|
||||
f" Text: {text}, Output Image: {updated_image_path}"
|
||||
)
|
||||
|
||||
return updated_image_path
|
||||
|
||||
|
||||
class Text2Image:
|
||||
def __init__(self, device):
|
||||
print("Initializing Text2Image to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.pipe = StableDiffusionPipeline.from_pretrained(
|
||||
"runwayml/stable-diffusion-v1-5", torch_dtype=self.torch_dtype
|
||||
)
|
||||
self.pipe.to(device)
|
||||
self.a_prompt = "best quality, extremely detailed"
|
||||
self.n_prompt = (
|
||||
"longbody, lowres, bad anatomy, bad hands, missing fingers, extra"
|
||||
" digit, fewer digits, cropped, worst quality, low quality"
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Generate Image From User Input Text",
|
||||
description=(
|
||||
"useful when you want to generate an image from a user input text"
|
||||
" and save it to a file. like: generate an image of an object or"
|
||||
" something, or generate an image that includes some objects. The"
|
||||
" input to this tool should be a string, representing the text used"
|
||||
" to generate image. "
|
||||
),
|
||||
)
|
||||
def inference(self, text):
|
||||
image_filename = os.path.join("image", str(uuid.uuid4())[0:8] + ".png")
|
||||
prompt = text + ", " + self.a_prompt
|
||||
image = self.pipe(prompt, negative_prompt=self.n_prompt).images[0]
|
||||
image.save(image_filename)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed Text2Image, Input Text: {text}, Output Image:"
|
||||
f" {image_filename}"
|
||||
)
|
||||
|
||||
return image_filename
|
||||
|
||||
|
||||
class VisualQuestionAnswering:
|
||||
def __init__(self, device):
|
||||
print("Initializing VisualQuestionAnswering to %s" % device)
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.device = device
|
||||
self.processor = BlipProcessor.from_pretrained(
|
||||
"Salesforce/blip-vqa-base"
|
||||
)
|
||||
self.model = BlipForQuestionAnswering.from_pretrained(
|
||||
"Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype
|
||||
).to(self.device)
|
||||
|
||||
@tool(
|
||||
name="Answer Question About The Image",
|
||||
description=(
|
||||
"useful when you need an answer for a question based on an image."
|
||||
" like: what is the background color of the last image, how many"
|
||||
" cats in this figure, what is in this figure. The input to this"
|
||||
" tool should be a comma separated string of two, representing the"
|
||||
" image_path and the question"
|
||||
),
|
||||
)
|
||||
def inference(self, inputs):
|
||||
image_path, question = inputs.split(",")
|
||||
raw_image = Image.open(image_path).convert("RGB")
|
||||
inputs = self.processor(raw_image, question, return_tensors="pt").to(
|
||||
self.device, self.torch_dtype
|
||||
)
|
||||
out = self.model.generate(**inputs)
|
||||
answer = self.processor.decode(out[0], skip_special_tokens=True)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path},"
|
||||
f" Input Question: {question}, Output Answer: {answer}"
|
||||
)
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
class ImageCaptioning(BaseHandler):
|
||||
def __init__(self, device):
|
||||
print("Initializing ImageCaptioning to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.processor = BlipProcessor.from_pretrained(
|
||||
"Salesforce/blip-image-captioning-base"
|
||||
)
|
||||
self.model = BlipForConditionalGeneration.from_pretrained(
|
||||
"Salesforce/blip-image-captioning-base",
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(self.device)
|
||||
|
||||
def handle(self, filename: str):
|
||||
img = Image.open(filename)
|
||||
width, height = img.size
|
||||
ratio = min(512 / width, 512 / height)
|
||||
width_new, height_new = (round(width * ratio), round(height * ratio))
|
||||
img = img.resize((width_new, height_new))
|
||||
img = img.convert("RGB")
|
||||
img.save(filename, "PNG")
|
||||
print(f"Resize image form {width}x{height} to {width_new}x{height_new}")
|
||||
|
||||
inputs = self.processor(Image.open(filename), return_tensors="pt").to(
|
||||
self.device, self.torch_dtype
|
||||
)
|
||||
out = self.model.generate(**inputs)
|
||||
description = self.processor.decode(out[0], skip_special_tokens=True)
|
||||
print(
|
||||
f"\nProcessed ImageCaptioning, Input Image: {filename}, Output"
|
||||
f" Text: {description}"
|
||||
)
|
||||
|
||||
return IMAGE_PROMPT.format(filename=filename, description=description)
|
@ -1,45 +0,0 @@
|
||||
from swarms.tools.tool import tool
|
||||
from typing import Dict, Callable, Any, List
|
||||
|
||||
ToolBuilder = Callable[[Any], tool]
|
||||
FuncToolBuilder = Callable[[], ToolBuilder]
|
||||
|
||||
|
||||
class ToolsRegistry:
|
||||
def __init__(self) -> None:
|
||||
self.tools: Dict[str, FuncToolBuilder] = {}
|
||||
|
||||
def register(self, tool_name: str, tool: FuncToolBuilder):
|
||||
print(f"will register {tool_name}")
|
||||
self.tools[tool_name] = tool
|
||||
|
||||
def build(self, tool_name, config):
|
||||
ret = self.tools[tool_name]()(config)
|
||||
if isinstance(ret, tool):
|
||||
return ret
|
||||
raise ValueError(
|
||||
"Tool builder {} did not return a Tool instance".format(tool_name)
|
||||
)
|
||||
|
||||
def list_tools(self) -> List[str]:
|
||||
return list(self.tools.keys())
|
||||
|
||||
|
||||
tools_registry = ToolsRegistry()
|
||||
|
||||
|
||||
def register(tool_name):
|
||||
def decorator(tool: FuncToolBuilder):
|
||||
tools_registry.register(tool_name, tool)
|
||||
return tool
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def build_tool(tool_name: str, config: Any) -> tool:
|
||||
print(f"will build {tool_name}")
|
||||
return tools_registry.build(tool_name, config)
|
||||
|
||||
|
||||
def list_tools() -> List[str]:
|
||||
return tools_registry.list_tools()
|
Loading…
Reference in new issue