parent
71133ebdaa
commit
374efe3411
Binary file not shown.
@ -1,188 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import os
|
|
||||||
from contextlib import contextmanager
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import pandas as pd
|
|
||||||
import torch
|
|
||||||
from langchain.agents import tool
|
|
||||||
from langchain.agents.agent_toolkits.pandas.base import create_pandas_dataframe_agent
|
|
||||||
from langchain.chains.qa_with_sources.loading import (
|
|
||||||
BaseCombineDocumentsChain,
|
|
||||||
)
|
|
||||||
from langchain.docstore.document import Document
|
|
||||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
|
||||||
from langchain.tools import BaseTool
|
|
||||||
from PIL import Image
|
|
||||||
from pydantic import Field
|
|
||||||
from transformers import (
|
|
||||||
BlipForQuestionAnswering,
|
|
||||||
BlipProcessor,
|
|
||||||
)
|
|
||||||
|
|
||||||
from swarms.utils.logger import logger
|
|
||||||
|
|
||||||
ROOT_DIR = "./data/"
|
|
||||||
|
|
||||||
|
|
||||||
@contextmanager
|
|
||||||
def pushd(new_dir):
|
|
||||||
"""Context manager for changing the current working directory."""
|
|
||||||
prev_dir = os.getcwd()
|
|
||||||
os.chdir(new_dir)
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
os.chdir(prev_dir)
|
|
||||||
|
|
||||||
|
|
||||||
@tool
|
|
||||||
def process_csv(
|
|
||||||
llm, csv_file_path: str, instructions: str, output_path: Optional[str] = None
|
|
||||||
) -> str:
|
|
||||||
"""Process a CSV by with pandas in a limited REPL.\
|
|
||||||
Only use this after writing data to disk as a csv file.\
|
|
||||||
Any figures must be saved to disk to be viewed by the human.\
|
|
||||||
Instructions should be written in natural language, not code. Assume the dataframe is already loaded."""
|
|
||||||
with pushd(ROOT_DIR):
|
|
||||||
try:
|
|
||||||
df = pd.read_csv(csv_file_path)
|
|
||||||
except Exception as e:
|
|
||||||
return f"Error: {e}"
|
|
||||||
agent = create_pandas_dataframe_agent(llm, df, max_iterations=30, verbose=False)
|
|
||||||
if output_path is not None:
|
|
||||||
instructions += f" Save output to disk at {output_path}"
|
|
||||||
try:
|
|
||||||
result = agent.run(instructions)
|
|
||||||
return result
|
|
||||||
except Exception as e:
|
|
||||||
return f"Error: {e}"
|
|
||||||
|
|
||||||
|
|
||||||
async def async_load_playwright(url: str) -> str:
|
|
||||||
"""Load the specified URLs using Playwright and parse using BeautifulSoup."""
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from playwright.async_api import async_playwright
|
|
||||||
|
|
||||||
results = ""
|
|
||||||
async with async_playwright() as p:
|
|
||||||
browser = await p.chromium.launch(headless=True)
|
|
||||||
try:
|
|
||||||
page = await browser.new_page()
|
|
||||||
await page.goto(url)
|
|
||||||
|
|
||||||
page_source = await page.content()
|
|
||||||
soup = BeautifulSoup(page_source, "html.parser")
|
|
||||||
|
|
||||||
for script in soup(["script", "style"]):
|
|
||||||
script.extract()
|
|
||||||
|
|
||||||
text = soup.get_text()
|
|
||||||
lines = (line.strip() for line in text.splitlines())
|
|
||||||
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
|
|
||||||
results = "\n".join(chunk for chunk in chunks if chunk)
|
|
||||||
except Exception as e:
|
|
||||||
results = f"Error: {e}"
|
|
||||||
await browser.close()
|
|
||||||
return results
|
|
||||||
|
|
||||||
|
|
||||||
def run_async(coro):
|
|
||||||
event_loop = asyncio.get_event_loop()
|
|
||||||
return event_loop.run_until_complete(coro)
|
|
||||||
|
|
||||||
|
|
||||||
@tool
|
|
||||||
def browse_web_page(url: str) -> str:
|
|
||||||
"""Verbose way to scrape a whole webpage. Likely to cause issues parsing."""
|
|
||||||
return run_async(async_load_playwright(url))
|
|
||||||
|
|
||||||
|
|
||||||
def _get_text_splitter():
|
|
||||||
return RecursiveCharacterTextSplitter(
|
|
||||||
# Set a really small chunk size, just to show.
|
|
||||||
chunk_size=500,
|
|
||||||
chunk_overlap=20,
|
|
||||||
length_function=len,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class WebpageQATool(BaseTool):
|
|
||||||
name = "query_webpage"
|
|
||||||
description = (
|
|
||||||
"Browse a webpage and retrieve the information relevant to the question."
|
|
||||||
)
|
|
||||||
text_splitter: RecursiveCharacterTextSplitter = Field(
|
|
||||||
default_factory=_get_text_splitter
|
|
||||||
)
|
|
||||||
qa_chain: BaseCombineDocumentsChain
|
|
||||||
|
|
||||||
def _run(self, url: str, question: str) -> str:
|
|
||||||
"""Useful for browsing websites and scraping the text information."""
|
|
||||||
result = browse_web_page.run(url)
|
|
||||||
docs = [Document(page_content=result, metadata={"source": url})]
|
|
||||||
web_docs = self.text_splitter.split_documents(docs)
|
|
||||||
results = []
|
|
||||||
# TODO: Handle this with a MapReduceChain
|
|
||||||
for i in range(0, len(web_docs), 4):
|
|
||||||
input_docs = web_docs[i : i + 4]
|
|
||||||
window_result = self.qa_chain(
|
|
||||||
{"input_documents": input_docs, "question": question},
|
|
||||||
return_only_outputs=True,
|
|
||||||
)
|
|
||||||
results.append(f"Response from window {i} - {window_result}")
|
|
||||||
results_docs = [
|
|
||||||
Document(page_content="\n".join(results), metadata={"source": url})
|
|
||||||
]
|
|
||||||
return self.qa_chain(
|
|
||||||
{"input_documents": results_docs, "question": question},
|
|
||||||
return_only_outputs=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _arun(self, url: str, question: str) -> str:
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
|
|
||||||
class EdgeGPTTool:
|
|
||||||
# Initialize the custom tool
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
model,
|
|
||||||
name="EdgeGPTTool",
|
|
||||||
description="Tool that uses EdgeGPTModel to generate responses",
|
|
||||||
):
|
|
||||||
super().__init__(name=name, description=description)
|
|
||||||
self.model = model
|
|
||||||
|
|
||||||
def _run(self, prompt):
|
|
||||||
return self.model.__call__(prompt)
|
|
||||||
|
|
||||||
|
|
||||||
@tool
|
|
||||||
def VQAinference(self, inputs):
|
|
||||||
"""
|
|
||||||
Answer Question About The Image, VQA Multi-Modal Worker agent
|
|
||||||
description="useful when you need an answer for a question based on an image. "
|
|
||||||
"like: what is the background color of the last image, how many cats in this figure, what is in this figure. "
|
|
||||||
"The input to this tool should be a comma separated string of two, representing the image_path and the question",
|
|
||||||
|
|
||||||
"""
|
|
||||||
device = "cuda:0"
|
|
||||||
torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
|
||||||
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
|
|
||||||
model = BlipForQuestionAnswering.from_pretrained(
|
|
||||||
"Salesforce/blip-vqa-base", torch_dtype=torch_dtype
|
|
||||||
).to(device)
|
|
||||||
|
|
||||||
image_path, question = inputs.split(",")
|
|
||||||
raw_image = Image.open(image_path).convert("RGB")
|
|
||||||
inputs = processor(raw_image, question, return_tensors="pt").to(device, torch_dtype)
|
|
||||||
out = model.generate(**inputs)
|
|
||||||
answer = processor.decode(out[0], skip_special_tokens=True)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input"
|
|
||||||
f" Question: {question}, Output Answer: {answer}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return answer
|
|
@ -1,271 +0,0 @@
|
|||||||
import os
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
import torch
|
|
||||||
from diffusers import (
|
|
||||||
EulerAncestralDiscreteScheduler,
|
|
||||||
StableDiffusionInpaintPipeline,
|
|
||||||
StableDiffusionInstructPix2PixPipeline,
|
|
||||||
StableDiffusionPipeline,
|
|
||||||
)
|
|
||||||
from PIL import Image
|
|
||||||
from transformers import (
|
|
||||||
BlipForConditionalGeneration,
|
|
||||||
BlipForQuestionAnswering,
|
|
||||||
BlipProcessor,
|
|
||||||
CLIPSegForImageSegmentation,
|
|
||||||
CLIPSegProcessor,
|
|
||||||
)
|
|
||||||
|
|
||||||
from swarms.prompts.prebuild.multi_modal_prompts import IMAGE_PROMPT
|
|
||||||
from swarms.tools.tool import tool
|
|
||||||
from swarms.utils.logger import logger
|
|
||||||
from swarms.utils.main import BaseHandler, get_new_image_name
|
|
||||||
|
|
||||||
|
|
||||||
class MaskFormer:
|
|
||||||
def __init__(self, device):
|
|
||||||
print("Initializing MaskFormer to %s" % device)
|
|
||||||
self.device = device
|
|
||||||
self.processor = CLIPSegProcessor.from_pretrained("CIDAS/clipseg-rd64-refined")
|
|
||||||
self.model = CLIPSegForImageSegmentation.from_pretrained(
|
|
||||||
"CIDAS/clipseg-rd64-refined"
|
|
||||||
).to(device)
|
|
||||||
|
|
||||||
def inference(self, image_path, text):
|
|
||||||
threshold = 0.5
|
|
||||||
min_area = 0.02
|
|
||||||
padding = 20
|
|
||||||
original_image = Image.open(image_path)
|
|
||||||
image = original_image.resize((512, 512))
|
|
||||||
inputs = self.processor(
|
|
||||||
text=text, images=image, padding="max_length", return_tensors="pt"
|
|
||||||
).to(self.device)
|
|
||||||
with torch.no_grad():
|
|
||||||
outputs = self.model(**inputs)
|
|
||||||
mask = torch.sigmoid(outputs[0]).squeeze().cpu().numpy() > threshold
|
|
||||||
area_ratio = len(np.argwhere(mask)) / (mask.shape[0] * mask.shape[1])
|
|
||||||
if area_ratio < min_area:
|
|
||||||
return None
|
|
||||||
true_indices = np.argwhere(mask)
|
|
||||||
mask_array = np.zeros_like(mask, dtype=bool)
|
|
||||||
for idx in true_indices:
|
|
||||||
padded_slice = tuple(
|
|
||||||
slice(max(0, i - padding), i + padding + 1) for i in idx
|
|
||||||
)
|
|
||||||
mask_array[padded_slice] = True
|
|
||||||
visual_mask = (mask_array * 255).astype(np.uint8)
|
|
||||||
image_mask = Image.fromarray(visual_mask)
|
|
||||||
return image_mask.resize(original_image.size)
|
|
||||||
|
|
||||||
|
|
||||||
class ImageEditing:
|
|
||||||
def __init__(self, device):
|
|
||||||
print("Initializing ImageEditing to %s" % device)
|
|
||||||
self.device = device
|
|
||||||
self.mask_former = MaskFormer(device=self.device)
|
|
||||||
self.revision = "fp16" if "cuda" in device else None
|
|
||||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
|
||||||
self.inpaint = StableDiffusionInpaintPipeline.from_pretrained(
|
|
||||||
"runwayml/stable-diffusion-inpainting",
|
|
||||||
revision=self.revision,
|
|
||||||
torch_dtype=self.torch_dtype,
|
|
||||||
).to(device)
|
|
||||||
|
|
||||||
@tool(
|
|
||||||
name="Remove Something From The Photo",
|
|
||||||
description=(
|
|
||||||
"useful when you want to remove and object or something from the photo "
|
|
||||||
"from its description or location. "
|
|
||||||
"The input to this tool should be a comma separated string of two, "
|
|
||||||
"representing the image_path and the object need to be removed. "
|
|
||||||
),
|
|
||||||
)
|
|
||||||
def inference_remove(self, inputs):
|
|
||||||
image_path, to_be_removed_txt = inputs.split(",")
|
|
||||||
return self.inference_replace(f"{image_path},{to_be_removed_txt},background")
|
|
||||||
|
|
||||||
@tool(
|
|
||||||
name="Replace Something From The Photo",
|
|
||||||
description=(
|
|
||||||
"useful when you want to replace an object from the object description or"
|
|
||||||
" location with another object from its description. The input to this tool"
|
|
||||||
" should be a comma separated string of three, representing the image_path,"
|
|
||||||
" the object to be replaced, the object to be replaced with "
|
|
||||||
),
|
|
||||||
)
|
|
||||||
def inference_replace(self, inputs):
|
|
||||||
image_path, to_be_replaced_txt, replace_with_txt = inputs.split(",")
|
|
||||||
original_image = Image.open(image_path)
|
|
||||||
original_size = original_image.size
|
|
||||||
mask_image = self.mask_former.inference(image_path, to_be_replaced_txt)
|
|
||||||
updated_image = self.inpaint(
|
|
||||||
prompt=replace_with_txt,
|
|
||||||
image=original_image.resize((512, 512)),
|
|
||||||
mask_image=mask_image.resize((512, 512)),
|
|
||||||
).images[0]
|
|
||||||
updated_image_path = get_new_image_name(
|
|
||||||
image_path, func_name="replace-something"
|
|
||||||
)
|
|
||||||
updated_image = updated_image.resize(original_size)
|
|
||||||
updated_image.save(updated_image_path)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
f"\nProcessed ImageEditing, Input Image: {image_path}, Replace"
|
|
||||||
f" {to_be_replaced_txt} to {replace_with_txt}, Output Image:"
|
|
||||||
f" {updated_image_path}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return updated_image_path
|
|
||||||
|
|
||||||
|
|
||||||
class InstructPix2Pix:
|
|
||||||
def __init__(self, device):
|
|
||||||
print("Initializing InstructPix2Pix to %s" % device)
|
|
||||||
self.device = device
|
|
||||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
|
||||||
self.pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(
|
|
||||||
"timbrooks/instruct-pix2pix",
|
|
||||||
safety_checker=None,
|
|
||||||
torch_dtype=self.torch_dtype,
|
|
||||||
).to(device)
|
|
||||||
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
|
|
||||||
self.pipe.scheduler.config
|
|
||||||
)
|
|
||||||
|
|
||||||
@tool(
|
|
||||||
name="Instruct Image Using Text",
|
|
||||||
description=(
|
|
||||||
"useful when you want to the style of the image to be like the text. "
|
|
||||||
"like: make it look like a painting. or make it like a robot. "
|
|
||||||
"The input to this tool should be a comma separated string of two, "
|
|
||||||
"representing the image_path and the text. "
|
|
||||||
),
|
|
||||||
)
|
|
||||||
def inference(self, inputs):
|
|
||||||
"""Change style of image."""
|
|
||||||
logger.debug("===> Starting InstructPix2Pix Inference")
|
|
||||||
image_path, text = inputs.split(",")[0], ",".join(inputs.split(",")[1:])
|
|
||||||
original_image = Image.open(image_path)
|
|
||||||
image = self.pipe(
|
|
||||||
text, image=original_image, num_inference_steps=40, image_guidance_scale=1.2
|
|
||||||
).images[0]
|
|
||||||
updated_image_path = get_new_image_name(image_path, func_name="pix2pix")
|
|
||||||
image.save(updated_image_path)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
f"\nProcessed InstructPix2Pix, Input Image: {image_path}, Instruct Text:"
|
|
||||||
f" {text}, Output Image: {updated_image_path}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return updated_image_path
|
|
||||||
|
|
||||||
|
|
||||||
class Text2Image:
|
|
||||||
def __init__(self, device):
|
|
||||||
print("Initializing Text2Image to %s" % device)
|
|
||||||
self.device = device
|
|
||||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
|
||||||
self.pipe = StableDiffusionPipeline.from_pretrained(
|
|
||||||
"runwayml/stable-diffusion-v1-5", torch_dtype=self.torch_dtype
|
|
||||||
)
|
|
||||||
self.pipe.to(device)
|
|
||||||
self.a_prompt = "best quality, extremely detailed"
|
|
||||||
self.n_prompt = (
|
|
||||||
"longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, "
|
|
||||||
"fewer digits, cropped, worst quality, low quality"
|
|
||||||
)
|
|
||||||
|
|
||||||
@tool(
|
|
||||||
name="Generate Image From User Input Text",
|
|
||||||
description=(
|
|
||||||
"useful when you want to generate an image from a user input text and save"
|
|
||||||
" it to a file. like: generate an image of an object or something, or"
|
|
||||||
" generate an image that includes some objects. The input to this tool"
|
|
||||||
" should be a string, representing the text used to generate image. "
|
|
||||||
),
|
|
||||||
)
|
|
||||||
def inference(self, text):
|
|
||||||
image_filename = os.path.join("image", str(uuid.uuid4())[0:8] + ".png")
|
|
||||||
prompt = text + ", " + self.a_prompt
|
|
||||||
image = self.pipe(prompt, negative_prompt=self.n_prompt).images[0]
|
|
||||||
image.save(image_filename)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
f"\nProcessed Text2Image, Input Text: {text}, Output Image:"
|
|
||||||
f" {image_filename}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return image_filename
|
|
||||||
|
|
||||||
|
|
||||||
class VisualQuestionAnswering:
|
|
||||||
def __init__(self, device):
|
|
||||||
print("Initializing VisualQuestionAnswering to %s" % device)
|
|
||||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
|
||||||
self.device = device
|
|
||||||
self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
|
|
||||||
self.model = BlipForQuestionAnswering.from_pretrained(
|
|
||||||
"Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype
|
|
||||||
).to(self.device)
|
|
||||||
|
|
||||||
@tool(
|
|
||||||
name="Answer Question About The Image",
|
|
||||||
description=(
|
|
||||||
"useful when you need an answer for a question based on an image. like:"
|
|
||||||
" what is the background color of the last image, how many cats in this"
|
|
||||||
" figure, what is in this figure. The input to this tool should be a comma"
|
|
||||||
" separated string of two, representing the image_path and the question"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
def inference(self, inputs):
|
|
||||||
image_path, question = inputs.split(",")
|
|
||||||
raw_image = Image.open(image_path).convert("RGB")
|
|
||||||
inputs = self.processor(raw_image, question, return_tensors="pt").to(
|
|
||||||
self.device, self.torch_dtype
|
|
||||||
)
|
|
||||||
out = self.model.generate(**inputs)
|
|
||||||
answer = self.processor.decode(out[0], skip_special_tokens=True)
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input"
|
|
||||||
f" Question: {question}, Output Answer: {answer}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return answer
|
|
||||||
|
|
||||||
|
|
||||||
class ImageCaptioning(BaseHandler):
|
|
||||||
def __init__(self, device):
|
|
||||||
print("Initializing ImageCaptioning to %s" % device)
|
|
||||||
self.device = device
|
|
||||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
|
||||||
self.processor = BlipProcessor.from_pretrained(
|
|
||||||
"Salesforce/blip-image-captioning-base"
|
|
||||||
)
|
|
||||||
self.model = BlipForConditionalGeneration.from_pretrained(
|
|
||||||
"Salesforce/blip-image-captioning-base", torch_dtype=self.torch_dtype
|
|
||||||
).to(self.device)
|
|
||||||
|
|
||||||
def handle(self, filename: str):
|
|
||||||
img = Image.open(filename)
|
|
||||||
width, height = img.size
|
|
||||||
ratio = min(512 / width, 512 / height)
|
|
||||||
width_new, height_new = (round(width * ratio), round(height * ratio))
|
|
||||||
img = img.resize((width_new, height_new))
|
|
||||||
img = img.convert("RGB")
|
|
||||||
img.save(filename, "PNG")
|
|
||||||
print(f"Resize image form {width}x{height} to {width_new}x{height_new}")
|
|
||||||
|
|
||||||
inputs = self.processor(Image.open(filename), return_tensors="pt").to(
|
|
||||||
self.device, self.torch_dtype
|
|
||||||
)
|
|
||||||
out = self.model.generate(**inputs)
|
|
||||||
description = self.processor.decode(out[0], skip_special_tokens=True)
|
|
||||||
print(
|
|
||||||
f"\nProcessed ImageCaptioning, Input Image: {filename}, Output Text:"
|
|
||||||
f" {description}"
|
|
||||||
)
|
|
||||||
|
|
||||||
return IMAGE_PROMPT.format(filename=filename, description=description)
|
|
Loading…
Reference in new issue