parent
55ff25b8e2
commit
5a0afc537c
@ -1,114 +0,0 @@
|
||||
Here are 20 tools the individual worker swarm nodes can use:
|
||||
|
||||
1. Write File Tool: Create a new file and write content to it.
|
||||
2. Read File Tool: Open and read the content of an existing file.
|
||||
3. Copy File Tool: Duplicate a file.
|
||||
4. Delete File Tool: Remove a file.
|
||||
5. Rename File Tool: Rename a file.
|
||||
6. Web Search Tool: Use a web search engine (like Google or DuckDuckGo) to find information.
|
||||
7. API Call Tool: Make requests to APIs.
|
||||
8. Process CSV Tool: Load a CSV file and perform operations on it using pandas.
|
||||
9. Create Directory Tool: Create a new directory.
|
||||
10. List Directory Tool: List all the files in a directory.
|
||||
11. Install Package Tool: Install Python packages using pip.
|
||||
12. Code Compilation Tool: Compile and run code in different languages.
|
||||
13. System Command Tool: Execute system commands.
|
||||
14. Image Processing Tool: Perform operations on images (resizing, cropping, etc.).
|
||||
15. PDF Processing Tool: Read, write, and manipulate PDF files.
|
||||
16. Text Processing Tool: Perform text processing operations like tokenization, stemming, etc.
|
||||
17. Email Sending Tool: Send emails.
|
||||
18. Database Query Tool: Execute SQL queries on a database.
|
||||
19. Data Scraping Tool: Scrape data from web pages.
|
||||
20. Version Control Tool: Perform Git operations.
|
||||
|
||||
The architecture for these tools involves creating a base `Tool` class that can be extended for each specific tool. The base `Tool` class would define common properties and methods that all tools would use.
|
||||
|
||||
The pseudocode for each tool would follow a similar structure:
|
||||
|
||||
```
|
||||
Class ToolNameTool extends Tool:
|
||||
Define properties specific to the tool
|
||||
|
||||
Method run:
|
||||
Perform the specific action of the tool
|
||||
Return the result
|
||||
```
|
||||
|
||||
Here's an example of how you might define the WriteFileTool:
|
||||
|
||||
```python
|
||||
import os
|
||||
from langchain.tools import BaseTool
|
||||
|
||||
class WriteFileTool(BaseTool):
|
||||
name = "write_file"
|
||||
description = "Create a new file and write content to it."
|
||||
|
||||
def __init__(self, root_dir: str):
|
||||
self.root_dir = root_dir
|
||||
|
||||
def _run(self, file_name: str, content: str) -> str:
|
||||
"""Creates a new file and writes the content."""
|
||||
try:
|
||||
with open(os.path.join(self.root_dir, file_name), 'w') as f:
|
||||
f.write(content)
|
||||
return f"Successfully wrote to {file_name}"
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
```
|
||||
|
||||
This tool takes the name of the file and the content to be written as parameters, writes the content to the file in the specified directory, and returns a success message. In case of any error, it returns the error message. You would follow a similar process to create the other tools.
|
||||
|
||||
|
||||
|
||||
|
||||
For completing browser-based tasks, you can use web automation tools. These tools allow you to interact with browsers as if a human user was interacting with it. Here are 20 tasks that individual worker swarm nodes can handle:
|
||||
|
||||
1. Open Browser Tool: Open a web browser.
|
||||
2. Close Browser Tool: Close the web browser.
|
||||
3. Navigate To URL Tool: Navigate to a specific URL.
|
||||
4. Fill Form Tool: Fill in a web form with provided data.
|
||||
5. Submit Form Tool: Submit a filled form.
|
||||
6. Click Button Tool: Click a button on a webpage.
|
||||
7. Hover Over Element Tool: Hover over a specific element on a webpage.
|
||||
8. Scroll Page Tool: Scroll up or down a webpage.
|
||||
9. Navigate Back Tool: Navigate back to the previous page.
|
||||
10. Navigate Forward Tool: Navigate forward to the next page.
|
||||
11. Refresh Page Tool: Refresh the current page.
|
||||
12. Switch Tab Tool: Switch between tabs in a browser.
|
||||
13. Capture Screenshot Tool: Capture a screenshot of the current page.
|
||||
14. Download File Tool: Download a file from a webpage.
|
||||
15. Send Email Tool: Send an email using a web-based email service.
|
||||
16. Login Tool: Log in to a website using provided credentials.
|
||||
17. Search Website Tool: Perform a search on a website.
|
||||
18. Extract Text Tool: Extract text from a webpage.
|
||||
19. Extract Image Tool: Extract image(s) from a webpage.
|
||||
20. Browser Session Management Tool: Handle creation, usage, and deletion of browser sessions.
|
||||
|
||||
You would typically use a library like Selenium, Puppeteer, or Playwright to automate these tasks. Here's an example of how you might define the FillFormTool using Selenium in Python:
|
||||
|
||||
```python
|
||||
from selenium import webdriver
|
||||
from langchain.tools import BaseTool
|
||||
|
||||
class FillFormTool(BaseTool):
|
||||
name = "fill_form"
|
||||
description = "Fill in a web form with provided data."
|
||||
|
||||
def _run(self, field_dict: dict) -> str:
|
||||
"""Fills a web form with the data in field_dict."""
|
||||
try:
|
||||
driver = webdriver.Firefox()
|
||||
|
||||
for field_name, field_value in field_dict.items():
|
||||
element = driver.find_element_by_name(field_name)
|
||||
element.send_keys(field_value)
|
||||
|
||||
return "Form filled successfully."
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
```
|
||||
|
||||
In this tool, `field_dict` is a dictionary where the keys are the names of the form fields and the values are the data to be filled in each field. The tool finds each field in the form and fills it with the provided data.
|
||||
|
||||
Please note that in a real scenario, you would need to handle the browser driver session more carefully (like closing the driver when it's not needed anymore), and also handle waiting for the page to load and exceptions more thoroughly. This is a simplified example for illustrative purposes.
|
@ -1,200 +0,0 @@
|
||||
import asyncio
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from typing import Optional
|
||||
|
||||
import pandas as pd
|
||||
import torch
|
||||
from langchain.agents import tool
|
||||
from langchain.agents.agent_toolkits.pandas.base import (
|
||||
create_pandas_dataframe_agent,
|
||||
)
|
||||
from langchain.chains.qa_with_sources.loading import (
|
||||
BaseCombineDocumentsChain,
|
||||
)
|
||||
from langchain.docstore.document import Document
|
||||
from langchain.text_splitter import RecursiveCharacterTextSplitter
|
||||
from langchain.tools import BaseTool
|
||||
from PIL import Image
|
||||
from pydantic import Field
|
||||
from transformers import (
|
||||
BlipForQuestionAnswering,
|
||||
BlipProcessor,
|
||||
)
|
||||
|
||||
from swarms.utils.logger import logger
|
||||
|
||||
ROOT_DIR = "./data/"
|
||||
|
||||
|
||||
@contextmanager
|
||||
def pushd(new_dir):
|
||||
"""Context manager for changing the current working directory."""
|
||||
prev_dir = os.getcwd()
|
||||
os.chdir(new_dir)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
os.chdir(prev_dir)
|
||||
|
||||
|
||||
@tool
|
||||
def process_csv(
|
||||
llm,
|
||||
csv_file_path: str,
|
||||
instructions: str,
|
||||
output_path: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Process a CSV by with pandas in a limited REPL.\
|
||||
Only use this after writing data to disk as a csv file.\
|
||||
Any figures must be saved to disk to be viewed by the human.\
|
||||
Instructions should be written in natural language, not code. Assume the dataframe is already loaded."""
|
||||
with pushd(ROOT_DIR):
|
||||
try:
|
||||
df = pd.read_csv(csv_file_path)
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
agent = create_pandas_dataframe_agent(
|
||||
llm, df, max_iterations=30, verbose=False
|
||||
)
|
||||
if output_path is not None:
|
||||
instructions += f" Save output to disk at {output_path}"
|
||||
try:
|
||||
result = agent.run(instructions)
|
||||
return result
|
||||
except Exception as e:
|
||||
return f"Error: {e}"
|
||||
|
||||
|
||||
async def async_load_playwright(url: str) -> str:
|
||||
"""Load the specified URLs using Playwright and parse using BeautifulSoup."""
|
||||
from bs4 import BeautifulSoup
|
||||
from playwright.async_api import async_playwright
|
||||
|
||||
results = ""
|
||||
async with async_playwright() as p:
|
||||
browser = await p.chromium.launch(headless=True)
|
||||
try:
|
||||
page = await browser.new_page()
|
||||
await page.goto(url)
|
||||
|
||||
page_source = await page.content()
|
||||
soup = BeautifulSoup(page_source, "html.parser")
|
||||
|
||||
for script in soup(["script", "style"]):
|
||||
script.extract()
|
||||
|
||||
text = soup.get_text()
|
||||
lines = (line.strip() for line in text.splitlines())
|
||||
chunks = (
|
||||
phrase.strip() for line in lines for phrase in line.split(" ")
|
||||
)
|
||||
results = "\n".join(chunk for chunk in chunks if chunk)
|
||||
except Exception as e:
|
||||
results = f"Error: {e}"
|
||||
await browser.close()
|
||||
return results
|
||||
|
||||
|
||||
def run_async(coro):
|
||||
event_loop = asyncio.get_event_loop()
|
||||
return event_loop.run_until_complete(coro)
|
||||
|
||||
|
||||
@tool
|
||||
def browse_web_page(url: str) -> str:
|
||||
"""Verbose way to scrape a whole webpage. Likely to cause issues parsing."""
|
||||
return run_async(async_load_playwright(url))
|
||||
|
||||
|
||||
def _get_text_splitter():
|
||||
return RecursiveCharacterTextSplitter(
|
||||
# Set a really small chunk size, just to show.
|
||||
chunk_size=500,
|
||||
chunk_overlap=20,
|
||||
length_function=len,
|
||||
)
|
||||
|
||||
|
||||
class WebpageQATool(BaseTool):
|
||||
name = "query_webpage"
|
||||
description = (
|
||||
"Browse a webpage and retrieve the information relevant to the"
|
||||
" question."
|
||||
)
|
||||
text_splitter: RecursiveCharacterTextSplitter = Field(
|
||||
default_factory=_get_text_splitter
|
||||
)
|
||||
qa_chain: BaseCombineDocumentsChain
|
||||
|
||||
def _run(self, url: str, question: str) -> str:
|
||||
"""Useful for browsing websites and scraping the text information."""
|
||||
result = browse_web_page.run(url)
|
||||
docs = [Document(page_content=result, metadata={"source": url})]
|
||||
web_docs = self.text_splitter.split_documents(docs)
|
||||
results = []
|
||||
# TODO: Handle this with a MapReduceChain
|
||||
for i in range(0, len(web_docs), 4):
|
||||
input_docs = web_docs[i : i + 4]
|
||||
window_result = self.qa_chain(
|
||||
{"input_documents": input_docs, "question": question},
|
||||
return_only_outputs=True,
|
||||
)
|
||||
results.append(f"Response from window {i} - {window_result}")
|
||||
results_docs = [
|
||||
Document(page_content="\n".join(results), metadata={"source": url})
|
||||
]
|
||||
return self.qa_chain(
|
||||
{"input_documents": results_docs, "question": question},
|
||||
return_only_outputs=True,
|
||||
)
|
||||
|
||||
async def _arun(self, url: str, question: str) -> str:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class EdgeGPTTool:
|
||||
# Initialize the custom tool
|
||||
def __init__(
|
||||
self,
|
||||
model,
|
||||
name="EdgeGPTTool",
|
||||
description="Tool that uses EdgeGPTModel to generate responses",
|
||||
):
|
||||
super().__init__(name=name, description=description)
|
||||
self.model = model
|
||||
|
||||
def _run(self, prompt):
|
||||
return self.model.__call__(prompt)
|
||||
|
||||
|
||||
@tool
|
||||
def VQAinference(self, inputs):
|
||||
"""
|
||||
Answer Question About The Image, VQA Multi-Modal Worker agent
|
||||
description="useful when you need an answer for a question based on an image. "
|
||||
"like: what is the background color of the last image, how many cats in this figure, what is in this figure. "
|
||||
"The input to this tool should be a comma separated string of two, representing the image_path and the question",
|
||||
|
||||
"""
|
||||
device = "cuda:0"
|
||||
torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
|
||||
model = BlipForQuestionAnswering.from_pretrained(
|
||||
"Salesforce/blip-vqa-base", torch_dtype=torch_dtype
|
||||
).to(device)
|
||||
|
||||
image_path, question = inputs.split(",")
|
||||
raw_image = Image.open(image_path).convert("RGB")
|
||||
inputs = processor(raw_image, question, return_tensors="pt").to(
|
||||
device, torch_dtype
|
||||
)
|
||||
out = model.generate(**inputs)
|
||||
answer = processor.decode(out[0], skip_special_tokens=True)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path}, Input"
|
||||
f" Question: {question}, Output Answer: {answer}"
|
||||
)
|
||||
|
||||
return answer
|
@ -1,284 +0,0 @@
|
||||
import os
|
||||
import uuid
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from diffusers import (
|
||||
EulerAncestralDiscreteScheduler,
|
||||
StableDiffusionInpaintPipeline,
|
||||
StableDiffusionInstructPix2PixPipeline,
|
||||
StableDiffusionPipeline,
|
||||
)
|
||||
from PIL import Image
|
||||
from transformers import (
|
||||
BlipForConditionalGeneration,
|
||||
BlipForQuestionAnswering,
|
||||
BlipProcessor,
|
||||
CLIPSegForImageSegmentation,
|
||||
CLIPSegProcessor,
|
||||
)
|
||||
|
||||
from swarms.prompts.prebuild.multi_modal_prompts import IMAGE_PROMPT
|
||||
from swarms.tools.tool import tool
|
||||
from swarms.utils.logger import logger
|
||||
from swarms.utils.main import BaseHandler, get_new_image_name
|
||||
|
||||
|
||||
class MaskFormer:
|
||||
def __init__(self, device):
|
||||
print("Initializing MaskFormer to %s" % device)
|
||||
self.device = device
|
||||
self.processor = CLIPSegProcessor.from_pretrained(
|
||||
"CIDAS/clipseg-rd64-refined"
|
||||
)
|
||||
self.model = CLIPSegForImageSegmentation.from_pretrained(
|
||||
"CIDAS/clipseg-rd64-refined"
|
||||
).to(device)
|
||||
|
||||
def inference(self, image_path, text):
|
||||
threshold = 0.5
|
||||
min_area = 0.02
|
||||
padding = 20
|
||||
original_image = Image.open(image_path)
|
||||
image = original_image.resize((512, 512))
|
||||
inputs = self.processor(
|
||||
text=text, images=image, padding="max_length", return_tensors="pt"
|
||||
).to(self.device)
|
||||
with torch.no_grad():
|
||||
outputs = self.model(**inputs)
|
||||
mask = torch.sigmoid(outputs[0]).squeeze().cpu().numpy() > threshold
|
||||
area_ratio = len(np.argwhere(mask)) / (mask.shape[0] * mask.shape[1])
|
||||
if area_ratio < min_area:
|
||||
return None
|
||||
true_indices = np.argwhere(mask)
|
||||
mask_array = np.zeros_like(mask, dtype=bool)
|
||||
for idx in true_indices:
|
||||
padded_slice = tuple(
|
||||
slice(max(0, i - padding), i + padding + 1) for i in idx
|
||||
)
|
||||
mask_array[padded_slice] = True
|
||||
visual_mask = (mask_array * 255).astype(np.uint8)
|
||||
image_mask = Image.fromarray(visual_mask)
|
||||
return image_mask.resize(original_image.size)
|
||||
|
||||
|
||||
class ImageEditing:
|
||||
def __init__(self, device):
|
||||
print("Initializing ImageEditing to %s" % device)
|
||||
self.device = device
|
||||
self.mask_former = MaskFormer(device=self.device)
|
||||
self.revision = "fp16" if "cuda" in device else None
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.inpaint = StableDiffusionInpaintPipeline.from_pretrained(
|
||||
"runwayml/stable-diffusion-inpainting",
|
||||
revision=self.revision,
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(device)
|
||||
|
||||
@tool(
|
||||
name="Remove Something From The Photo",
|
||||
description=(
|
||||
"useful when you want to remove and object or something from the"
|
||||
" photo from its description or location. The input to this tool"
|
||||
" should be a comma separated string of two, representing the"
|
||||
" image_path and the object need to be removed. "
|
||||
),
|
||||
)
|
||||
def inference_remove(self, inputs):
|
||||
image_path, to_be_removed_txt = inputs.split(",")
|
||||
return self.inference_replace(
|
||||
f"{image_path},{to_be_removed_txt},background"
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Replace Something From The Photo",
|
||||
description=(
|
||||
"useful when you want to replace an object from the object"
|
||||
" description or location with another object from its description."
|
||||
" The input to this tool should be a comma separated string of"
|
||||
" three, representing the image_path, the object to be replaced,"
|
||||
" the object to be replaced with "
|
||||
),
|
||||
)
|
||||
def inference_replace(self, inputs):
|
||||
image_path, to_be_replaced_txt, replace_with_txt = inputs.split(",")
|
||||
original_image = Image.open(image_path)
|
||||
original_size = original_image.size
|
||||
mask_image = self.mask_former.inference(image_path, to_be_replaced_txt)
|
||||
updated_image = self.inpaint(
|
||||
prompt=replace_with_txt,
|
||||
image=original_image.resize((512, 512)),
|
||||
mask_image=mask_image.resize((512, 512)),
|
||||
).images[0]
|
||||
updated_image_path = get_new_image_name(
|
||||
image_path, func_name="replace-something"
|
||||
)
|
||||
updated_image = updated_image.resize(original_size)
|
||||
updated_image.save(updated_image_path)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed ImageEditing, Input Image: {image_path}, Replace"
|
||||
f" {to_be_replaced_txt} to {replace_with_txt}, Output Image:"
|
||||
f" {updated_image_path}"
|
||||
)
|
||||
|
||||
return updated_image_path
|
||||
|
||||
|
||||
class InstructPix2Pix:
|
||||
def __init__(self, device):
|
||||
print("Initializing InstructPix2Pix to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.pipe = StableDiffusionInstructPix2PixPipeline.from_pretrained(
|
||||
"timbrooks/instruct-pix2pix",
|
||||
safety_checker=None,
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(device)
|
||||
self.pipe.scheduler = EulerAncestralDiscreteScheduler.from_config(
|
||||
self.pipe.scheduler.config
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Instruct Image Using Text",
|
||||
description=(
|
||||
"useful when you want to the style of the image to be like the"
|
||||
" text. like: make it look like a painting. or make it like a"
|
||||
" robot. The input to this tool should be a comma separated string"
|
||||
" of two, representing the image_path and the text. "
|
||||
),
|
||||
)
|
||||
def inference(self, inputs):
|
||||
"""Change style of image."""
|
||||
logger.debug("===> Starting InstructPix2Pix Inference")
|
||||
image_path, text = inputs.split(",")[0], ",".join(inputs.split(",")[1:])
|
||||
original_image = Image.open(image_path)
|
||||
image = self.pipe(
|
||||
text,
|
||||
image=original_image,
|
||||
num_inference_steps=40,
|
||||
image_guidance_scale=1.2,
|
||||
).images[0]
|
||||
updated_image_path = get_new_image_name(image_path, func_name="pix2pix")
|
||||
image.save(updated_image_path)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed InstructPix2Pix, Input Image: {image_path}, Instruct"
|
||||
f" Text: {text}, Output Image: {updated_image_path}"
|
||||
)
|
||||
|
||||
return updated_image_path
|
||||
|
||||
|
||||
class Text2Image:
|
||||
def __init__(self, device):
|
||||
print("Initializing Text2Image to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.pipe = StableDiffusionPipeline.from_pretrained(
|
||||
"runwayml/stable-diffusion-v1-5", torch_dtype=self.torch_dtype
|
||||
)
|
||||
self.pipe.to(device)
|
||||
self.a_prompt = "best quality, extremely detailed"
|
||||
self.n_prompt = (
|
||||
"longbody, lowres, bad anatomy, bad hands, missing fingers, extra"
|
||||
" digit, fewer digits, cropped, worst quality, low quality"
|
||||
)
|
||||
|
||||
@tool(
|
||||
name="Generate Image From User Input Text",
|
||||
description=(
|
||||
"useful when you want to generate an image from a user input text"
|
||||
" and save it to a file. like: generate an image of an object or"
|
||||
" something, or generate an image that includes some objects. The"
|
||||
" input to this tool should be a string, representing the text used"
|
||||
" to generate image. "
|
||||
),
|
||||
)
|
||||
def inference(self, text):
|
||||
image_filename = os.path.join("image", str(uuid.uuid4())[0:8] + ".png")
|
||||
prompt = text + ", " + self.a_prompt
|
||||
image = self.pipe(prompt, negative_prompt=self.n_prompt).images[0]
|
||||
image.save(image_filename)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed Text2Image, Input Text: {text}, Output Image:"
|
||||
f" {image_filename}"
|
||||
)
|
||||
|
||||
return image_filename
|
||||
|
||||
|
||||
class VisualQuestionAnswering:
|
||||
def __init__(self, device):
|
||||
print("Initializing VisualQuestionAnswering to %s" % device)
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.device = device
|
||||
self.processor = BlipProcessor.from_pretrained(
|
||||
"Salesforce/blip-vqa-base"
|
||||
)
|
||||
self.model = BlipForQuestionAnswering.from_pretrained(
|
||||
"Salesforce/blip-vqa-base", torch_dtype=self.torch_dtype
|
||||
).to(self.device)
|
||||
|
||||
@tool(
|
||||
name="Answer Question About The Image",
|
||||
description=(
|
||||
"useful when you need an answer for a question based on an image."
|
||||
" like: what is the background color of the last image, how many"
|
||||
" cats in this figure, what is in this figure. The input to this"
|
||||
" tool should be a comma separated string of two, representing the"
|
||||
" image_path and the question"
|
||||
),
|
||||
)
|
||||
def inference(self, inputs):
|
||||
image_path, question = inputs.split(",")
|
||||
raw_image = Image.open(image_path).convert("RGB")
|
||||
inputs = self.processor(raw_image, question, return_tensors="pt").to(
|
||||
self.device, self.torch_dtype
|
||||
)
|
||||
out = self.model.generate(**inputs)
|
||||
answer = self.processor.decode(out[0], skip_special_tokens=True)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed VisualQuestionAnswering, Input Image: {image_path},"
|
||||
f" Input Question: {question}, Output Answer: {answer}"
|
||||
)
|
||||
|
||||
return answer
|
||||
|
||||
|
||||
class ImageCaptioning(BaseHandler):
|
||||
def __init__(self, device):
|
||||
print("Initializing ImageCaptioning to %s" % device)
|
||||
self.device = device
|
||||
self.torch_dtype = torch.float16 if "cuda" in device else torch.float32
|
||||
self.processor = BlipProcessor.from_pretrained(
|
||||
"Salesforce/blip-image-captioning-base"
|
||||
)
|
||||
self.model = BlipForConditionalGeneration.from_pretrained(
|
||||
"Salesforce/blip-image-captioning-base",
|
||||
torch_dtype=self.torch_dtype,
|
||||
).to(self.device)
|
||||
|
||||
def handle(self, filename: str):
|
||||
img = Image.open(filename)
|
||||
width, height = img.size
|
||||
ratio = min(512 / width, 512 / height)
|
||||
width_new, height_new = (round(width * ratio), round(height * ratio))
|
||||
img = img.resize((width_new, height_new))
|
||||
img = img.convert("RGB")
|
||||
img.save(filename, "PNG")
|
||||
print(f"Resize image form {width}x{height} to {width_new}x{height_new}")
|
||||
|
||||
inputs = self.processor(Image.open(filename), return_tensors="pt").to(
|
||||
self.device, self.torch_dtype
|
||||
)
|
||||
out = self.model.generate(**inputs)
|
||||
description = self.processor.decode(out[0], skip_special_tokens=True)
|
||||
print(
|
||||
f"\nProcessed ImageCaptioning, Input Image: {filename}, Output"
|
||||
f" Text: {description}"
|
||||
)
|
||||
|
||||
return IMAGE_PROMPT.format(filename=filename, description=description)
|
@ -1,890 +0,0 @@
|
||||
"""Base implementation for tools or skills."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import warnings
|
||||
from abc import abstractmethod
|
||||
from functools import partial
|
||||
from inspect import signature
|
||||
from typing import (
|
||||
Any,
|
||||
Awaitable,
|
||||
Callable,
|
||||
Dict,
|
||||
List,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
)
|
||||
|
||||
from langchain.callbacks.base import BaseCallbackManager
|
||||
from langchain.callbacks.manager import (
|
||||
AsyncCallbackManager,
|
||||
AsyncCallbackManagerForToolRun,
|
||||
CallbackManager,
|
||||
CallbackManagerForToolRun,
|
||||
Callbacks,
|
||||
)
|
||||
|
||||
from langchain.load.serializable import Serializable
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
Extra,
|
||||
Field,
|
||||
create_model,
|
||||
root_validator,
|
||||
validate_arguments,
|
||||
)
|
||||
from langchain.schema.runnable import (
|
||||
Runnable,
|
||||
RunnableConfig,
|
||||
RunnableSerializable,
|
||||
)
|
||||
|
||||
|
||||
class SchemaAnnotationError(TypeError):
|
||||
"""Raised when 'args_schema' is missing or has an incorrect type annotation."""
|
||||
|
||||
|
||||
def _create_subset_model(
|
||||
name: str, model: BaseModel, field_names: list
|
||||
) -> Type[BaseModel]:
|
||||
"""Create a pydantic model with only a subset of model's fields."""
|
||||
fields = {}
|
||||
for field_name in field_names:
|
||||
field = model.__fields__[field_name]
|
||||
fields[field_name] = (field.outer_type_, field.field_info)
|
||||
return create_model(name, **fields) # type: ignore
|
||||
|
||||
|
||||
def _get_filtered_args(
|
||||
inferred_model: Type[BaseModel],
|
||||
func: Callable,
|
||||
) -> dict:
|
||||
"""Get the arguments from a function's signature."""
|
||||
schema = inferred_model.schema()["properties"]
|
||||
valid_keys = signature(func).parameters
|
||||
return {
|
||||
k: schema[k]
|
||||
for k in valid_keys
|
||||
if k not in ("run_manager", "callbacks")
|
||||
}
|
||||
|
||||
|
||||
class _SchemaConfig:
|
||||
"""Configuration for the pydantic model."""
|
||||
|
||||
extra: Any = Extra.forbid
|
||||
arbitrary_types_allowed: bool = True
|
||||
|
||||
|
||||
def create_schema_from_function(
|
||||
model_name: str,
|
||||
func: Callable,
|
||||
) -> Type[BaseModel]:
|
||||
"""Create a pydantic schema from a function's signature.
|
||||
Args:
|
||||
model_name: Name to assign to the generated pydandic schema
|
||||
func: Function to generate the schema from
|
||||
Returns:
|
||||
A pydantic model with the same arguments as the function
|
||||
"""
|
||||
# https://docs.pydantic.dev/latest/usage/validation_decorator/
|
||||
validated = validate_arguments(func, config=_SchemaConfig) # type: ignore
|
||||
inferred_model = validated.model # type: ignore
|
||||
if "run_manager" in inferred_model.__fields__:
|
||||
del inferred_model.__fields__["run_manager"]
|
||||
if "callbacks" in inferred_model.__fields__:
|
||||
del inferred_model.__fields__["callbacks"]
|
||||
# Pydantic adds placeholder virtual fields we need to strip
|
||||
valid_properties = _get_filtered_args(inferred_model, func)
|
||||
return _create_subset_model(
|
||||
f"{model_name}Schema", inferred_model, list(valid_properties)
|
||||
)
|
||||
|
||||
|
||||
class ToolException(Exception):
|
||||
"""An optional exception that tool throws when execution error occurs.
|
||||
|
||||
When this exception is thrown, the agent will not stop working,
|
||||
but will handle the exception according to the handle_tool_error
|
||||
variable of the tool, and the processing result will be returned
|
||||
to the agent as observation, and printed in red on the console.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class BaseTool(RunnableSerializable[Union[str, Dict], Any]):
|
||||
"""Interface swarms tools must implement."""
|
||||
|
||||
def __init_subclass__(cls, **kwargs: Any) -> None:
|
||||
"""Create the definition of the new tool class."""
|
||||
super().__init_subclass__(**kwargs)
|
||||
|
||||
args_schema_type = cls.__annotations__.get("args_schema", None)
|
||||
|
||||
if args_schema_type is not None:
|
||||
if args_schema_type is None or args_schema_type == BaseModel:
|
||||
# Throw errors for common mis-annotations.
|
||||
# TODO: Use get_args / get_origin and fully
|
||||
# specify valid annotations.
|
||||
typehint_mandate = """
|
||||
class ChildTool(BaseTool):
|
||||
...
|
||||
args_schema: Type[BaseModel] = SchemaClass
|
||||
..."""
|
||||
name = cls.__name__
|
||||
raise SchemaAnnotationError(
|
||||
f"Tool definition for {name} must include valid type"
|
||||
" annotations for argument 'args_schema' to behave as"
|
||||
" expected.\nExpected annotation of 'Type[BaseModel]' but"
|
||||
f" got '{args_schema_type}'.\nExpected class looks"
|
||||
f" like:\n{typehint_mandate}"
|
||||
)
|
||||
|
||||
name: str
|
||||
"""The unique name of the tool that clearly communicates its purpose."""
|
||||
description: str
|
||||
"""Used to tell the model how/when/why to use the tool.
|
||||
|
||||
You can provide few-shot examples as a part of the description.
|
||||
"""
|
||||
args_schema: Optional[Type[BaseModel]] = None
|
||||
"""Pydantic model class to validate and parse the tool's input arguments."""
|
||||
return_direct: bool = False
|
||||
"""Whether to return the tool's output directly. Setting this to True means
|
||||
|
||||
that after the tool is called, the AgentExecutor will stop looping.
|
||||
"""
|
||||
verbose: bool = False
|
||||
"""Whether to log the tool's progress."""
|
||||
|
||||
callbacks: Callbacks = Field(default=None, exclude=True)
|
||||
"""Callbacks to be called during tool execution."""
|
||||
callback_manager: Optional[BaseCallbackManager] = Field(
|
||||
default=None, exclude=True
|
||||
)
|
||||
"""Deprecated. Please use callbacks instead."""
|
||||
tags: Optional[List[str]] = None
|
||||
"""Optional list of tags associated with the tool. Defaults to None
|
||||
These tags will be associated with each call to this tool,
|
||||
and passed as arguments to the handlers defined in `callbacks`.
|
||||
You can use these to eg identify a specific instance of a tool with its use case.
|
||||
"""
|
||||
metadata: Optional[Dict[str, Any]] = None
|
||||
"""Optional metadata associated with the tool. Defaults to None
|
||||
This metadata will be associated with each call to this tool,
|
||||
and passed as arguments to the handlers defined in `callbacks`.
|
||||
You can use these to eg identify a specific instance of a tool with its use case.
|
||||
"""
|
||||
|
||||
handle_tool_error: Optional[
|
||||
Union[bool, str, Callable[[ToolException], str]]
|
||||
] = False
|
||||
"""Handle the content of the ToolException thrown."""
|
||||
|
||||
class Config(Serializable.Config):
|
||||
"""Configuration for this pydantic object."""
|
||||
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
@property
|
||||
def is_single_input(self) -> bool:
|
||||
"""Whether the tool only accepts a single input."""
|
||||
keys = {k for k in self.args if k != "kwargs"}
|
||||
return len(keys) == 1
|
||||
|
||||
@property
|
||||
def args(self) -> dict:
|
||||
if self.args_schema is not None:
|
||||
return self.args_schema.schema()["properties"]
|
||||
else:
|
||||
schema = create_schema_from_function(self.name, self._run)
|
||||
return schema.schema()["properties"]
|
||||
|
||||
# --- Runnable ---
|
||||
|
||||
@property
|
||||
def input_schema(self) -> Type[BaseModel]:
|
||||
"""The tool's input schema."""
|
||||
if self.args_schema is not None:
|
||||
return self.args_schema
|
||||
else:
|
||||
return create_schema_from_function(self.name, self._run)
|
||||
|
||||
def invoke(
|
||||
self,
|
||||
input: Union[str, Dict],
|
||||
config: Optional[RunnableConfig] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
config = config or {}
|
||||
return self.run(
|
||||
input,
|
||||
callbacks=config.get("callbacks"),
|
||||
tags=config.get("tags"),
|
||||
metadata=config.get("metadata"),
|
||||
run_name=config.get("run_name"),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
async def ainvoke(
|
||||
self,
|
||||
input: Union[str, Dict],
|
||||
config: Optional[RunnableConfig] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
config = config or {}
|
||||
return await self.arun(
|
||||
input,
|
||||
callbacks=config.get("callbacks"),
|
||||
tags=config.get("tags"),
|
||||
metadata=config.get("metadata"),
|
||||
run_name=config.get("run_name"),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
# --- Tool ---
|
||||
|
||||
def _parse_input(
|
||||
self,
|
||||
tool_input: Union[str, Dict],
|
||||
) -> Union[str, Dict[str, Any]]:
|
||||
"""Convert tool input to pydantic model."""
|
||||
input_args = self.args_schema
|
||||
if isinstance(tool_input, str):
|
||||
if input_args is not None:
|
||||
key_ = next(iter(input_args.__fields__.keys()))
|
||||
input_args.validate({key_: tool_input})
|
||||
return tool_input
|
||||
else:
|
||||
if input_args is not None:
|
||||
result = input_args.parse_obj(tool_input)
|
||||
return {
|
||||
k: v for k, v in result.dict().items() if k in tool_input
|
||||
}
|
||||
return tool_input
|
||||
|
||||
@root_validator()
|
||||
def raise_deprecation(cls, values: Dict) -> Dict:
|
||||
"""Raise deprecation warning if callback_manager is used."""
|
||||
if values.get("callback_manager") is not None:
|
||||
warnings.warn(
|
||||
"callback_manager is deprecated. Please use callbacks instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
values["callbacks"] = values.pop("callback_manager", None)
|
||||
return values
|
||||
|
||||
@abstractmethod
|
||||
def _run(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Use the tool.
|
||||
|
||||
Add run_manager: Optional[CallbackManagerForToolRun] = None
|
||||
to child implementations to enable tracing,
|
||||
"""
|
||||
|
||||
async def _arun(
|
||||
self,
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Use the tool asynchronously.
|
||||
|
||||
Add run_manager: Optional[AsyncCallbackManagerForToolRun] = None
|
||||
to child implementations to enable tracing,
|
||||
"""
|
||||
return await asyncio.get_running_loop().run_in_executor(
|
||||
None,
|
||||
partial(self._run, **kwargs),
|
||||
*args,
|
||||
)
|
||||
|
||||
def _to_args_and_kwargs(
|
||||
self, tool_input: Union[str, Dict]
|
||||
) -> Tuple[Tuple, Dict]:
|
||||
# For backwards compatibility, if run_input is a string,
|
||||
# pass as a positional argument.
|
||||
if isinstance(tool_input, str):
|
||||
return (tool_input,), {}
|
||||
else:
|
||||
return (), tool_input
|
||||
|
||||
def run(
|
||||
self,
|
||||
tool_input: Union[str, Dict],
|
||||
verbose: Optional[bool] = None,
|
||||
start_color: Optional[str] = "green",
|
||||
color: Optional[str] = "green",
|
||||
callbacks: Callbacks = None,
|
||||
*,
|
||||
tags: Optional[List[str]] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
run_name: Optional[str] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Run the tool."""
|
||||
parsed_input = self._parse_input(tool_input)
|
||||
if not self.verbose and verbose is not None:
|
||||
verbose_ = verbose
|
||||
else:
|
||||
verbose_ = self.verbose
|
||||
callback_manager = CallbackManager.configure(
|
||||
callbacks,
|
||||
self.callbacks,
|
||||
verbose_,
|
||||
tags,
|
||||
self.tags,
|
||||
metadata,
|
||||
self.metadata,
|
||||
)
|
||||
# TODO: maybe also pass through run_manager is _run supports kwargs
|
||||
new_arg_supported = signature(self._run).parameters.get("run_manager")
|
||||
run_manager = callback_manager.on_tool_start(
|
||||
{"name": self.name, "description": self.description},
|
||||
tool_input if isinstance(tool_input, str) else str(tool_input),
|
||||
color=start_color,
|
||||
name=run_name,
|
||||
**kwargs,
|
||||
)
|
||||
try:
|
||||
tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input)
|
||||
observation = (
|
||||
self._run(*tool_args, run_manager=run_manager, **tool_kwargs)
|
||||
if new_arg_supported
|
||||
else self._run(*tool_args, **tool_kwargs)
|
||||
)
|
||||
except ToolException as e:
|
||||
if not self.handle_tool_error:
|
||||
run_manager.on_tool_error(e)
|
||||
raise e
|
||||
elif isinstance(self.handle_tool_error, bool):
|
||||
if e.args:
|
||||
observation = e.args[0]
|
||||
else:
|
||||
observation = "Tool execution error"
|
||||
elif isinstance(self.handle_tool_error, str):
|
||||
observation = self.handle_tool_error
|
||||
elif callable(self.handle_tool_error):
|
||||
observation = self.handle_tool_error(e)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Got unexpected type of `handle_tool_error`. Expected"
|
||||
" bool, str or callable. Received:"
|
||||
f" {self.handle_tool_error}"
|
||||
)
|
||||
run_manager.on_tool_end(
|
||||
str(observation), color="red", name=self.name, **kwargs
|
||||
)
|
||||
return observation
|
||||
except (Exception, KeyboardInterrupt) as e:
|
||||
run_manager.on_tool_error(e)
|
||||
raise e
|
||||
else:
|
||||
run_manager.on_tool_end(
|
||||
str(observation), color=color, name=self.name, **kwargs
|
||||
)
|
||||
return observation
|
||||
|
||||
async def arun(
|
||||
self,
|
||||
tool_input: Union[str, Dict],
|
||||
verbose: Optional[bool] = None,
|
||||
start_color: Optional[str] = "green",
|
||||
color: Optional[str] = "green",
|
||||
callbacks: Callbacks = None,
|
||||
*,
|
||||
tags: Optional[List[str]] = None,
|
||||
metadata: Optional[Dict[str, Any]] = None,
|
||||
run_name: Optional[str] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Run the tool asynchronously."""
|
||||
parsed_input = self._parse_input(tool_input)
|
||||
if not self.verbose and verbose is not None:
|
||||
verbose_ = verbose
|
||||
else:
|
||||
verbose_ = self.verbose
|
||||
callback_manager = AsyncCallbackManager.configure(
|
||||
callbacks,
|
||||
self.callbacks,
|
||||
verbose_,
|
||||
tags,
|
||||
self.tags,
|
||||
metadata,
|
||||
self.metadata,
|
||||
)
|
||||
new_arg_supported = signature(self._arun).parameters.get("run_manager")
|
||||
run_manager = await callback_manager.on_tool_start(
|
||||
{"name": self.name, "description": self.description},
|
||||
tool_input if isinstance(tool_input, str) else str(tool_input),
|
||||
color=start_color,
|
||||
name=run_name,
|
||||
**kwargs,
|
||||
)
|
||||
try:
|
||||
# We then call the tool on the tool input to get an observation
|
||||
tool_args, tool_kwargs = self._to_args_and_kwargs(parsed_input)
|
||||
observation = (
|
||||
await self._arun(
|
||||
*tool_args, run_manager=run_manager, **tool_kwargs
|
||||
)
|
||||
if new_arg_supported
|
||||
else await self._arun(*tool_args, **tool_kwargs)
|
||||
)
|
||||
except ToolException as e:
|
||||
if not self.handle_tool_error:
|
||||
await run_manager.on_tool_error(e)
|
||||
raise e
|
||||
elif isinstance(self.handle_tool_error, bool):
|
||||
if e.args:
|
||||
observation = e.args[0]
|
||||
else:
|
||||
observation = "Tool execution error"
|
||||
elif isinstance(self.handle_tool_error, str):
|
||||
observation = self.handle_tool_error
|
||||
elif callable(self.handle_tool_error):
|
||||
observation = self.handle_tool_error(e)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Got unexpected type of `handle_tool_error`. Expected"
|
||||
" bool, str or callable. Received:"
|
||||
f" {self.handle_tool_error}"
|
||||
)
|
||||
await run_manager.on_tool_end(
|
||||
str(observation), color="red", name=self.name, **kwargs
|
||||
)
|
||||
return observation
|
||||
except (Exception, KeyboardInterrupt) as e:
|
||||
await run_manager.on_tool_error(e)
|
||||
raise e
|
||||
else:
|
||||
await run_manager.on_tool_end(
|
||||
str(observation), color=color, name=self.name, **kwargs
|
||||
)
|
||||
return observation
|
||||
|
||||
def __call__(self, tool_input: str, callbacks: Callbacks = None) -> str:
|
||||
"""Make tool callable."""
|
||||
return self.run(tool_input, callbacks=callbacks)
|
||||
|
||||
|
||||
class Tool(BaseTool):
|
||||
"""Tool that takes in function or coroutine directly."""
|
||||
|
||||
description: str = ""
|
||||
func: Optional[Callable[..., str]]
|
||||
"""The function to run when the tool is called."""
|
||||
coroutine: Optional[Callable[..., Awaitable[str]]] = None
|
||||
"""The asynchronous version of the function."""
|
||||
|
||||
# --- Runnable ---
|
||||
async def ainvoke(
|
||||
self,
|
||||
input: Union[str, Dict],
|
||||
config: Optional[RunnableConfig] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
if not self.coroutine:
|
||||
# If the tool does not implement async, fall back to default implementation
|
||||
return await asyncio.get_running_loop().run_in_executor(
|
||||
None, partial(self.invoke, input, config, **kwargs)
|
||||
)
|
||||
|
||||
return await super().ainvoke(input, config, **kwargs)
|
||||
|
||||
# --- Tool ---
|
||||
|
||||
@property
|
||||
def args(self) -> dict:
|
||||
"""The tool's input arguments."""
|
||||
if self.args_schema is not None:
|
||||
return self.args_schema.schema()["properties"]
|
||||
# For backwards compatibility, if the function signature is ambiguous,
|
||||
# assume it takes a single string input.
|
||||
return {"tool_input": {"type": "string"}}
|
||||
|
||||
def _to_args_and_kwargs(
|
||||
self, tool_input: Union[str, Dict]
|
||||
) -> Tuple[Tuple, Dict]:
|
||||
"""Convert tool input to pydantic model."""
|
||||
args, kwargs = super()._to_args_and_kwargs(tool_input)
|
||||
# For backwards compatibility. The tool must be run with a single input
|
||||
all_args = list(args) + list(kwargs.values())
|
||||
if len(all_args) != 1:
|
||||
raise ToolException(
|
||||
f"Too many arguments to single-input tool {self.name}. Args:"
|
||||
f" {all_args}"
|
||||
)
|
||||
return tuple(all_args), {}
|
||||
|
||||
def _run(
|
||||
self,
|
||||
*args: Any,
|
||||
run_manager: Optional[CallbackManagerForToolRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Use the tool."""
|
||||
if self.func:
|
||||
new_argument_supported = signature(self.func).parameters.get(
|
||||
"callbacks"
|
||||
)
|
||||
return (
|
||||
self.func(
|
||||
*args,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**kwargs,
|
||||
)
|
||||
if new_argument_supported
|
||||
else self.func(*args, **kwargs)
|
||||
)
|
||||
raise NotImplementedError("Tool does not support sync")
|
||||
|
||||
async def _arun(
|
||||
self,
|
||||
*args: Any,
|
||||
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Use the tool asynchronously."""
|
||||
if self.coroutine:
|
||||
new_argument_supported = signature(self.coroutine).parameters.get(
|
||||
"callbacks"
|
||||
)
|
||||
return (
|
||||
await self.coroutine(
|
||||
*args,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**kwargs,
|
||||
)
|
||||
if new_argument_supported
|
||||
else await self.coroutine(*args, **kwargs)
|
||||
)
|
||||
else:
|
||||
return await asyncio.get_running_loop().run_in_executor(
|
||||
None,
|
||||
partial(self._run, run_manager=run_manager, **kwargs),
|
||||
*args,
|
||||
)
|
||||
|
||||
# TODO: this is for backwards compatibility, remove in future
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
func: Optional[Callable],
|
||||
description: str,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Initialize tool."""
|
||||
super(Tool, self).__init__(
|
||||
name=name, func=func, description=description, **kwargs
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_function(
|
||||
cls,
|
||||
func: Optional[Callable],
|
||||
name: str, # We keep these required to support backwards compatibility
|
||||
description: str,
|
||||
return_direct: bool = False,
|
||||
args_schema: Optional[Type[BaseModel]] = None,
|
||||
coroutine: Optional[
|
||||
Callable[..., Awaitable[Any]]
|
||||
] = None, # This is last for compatibility, but should be after func
|
||||
**kwargs: Any,
|
||||
) -> Tool:
|
||||
"""Initialize tool from a function."""
|
||||
if func is None and coroutine is None:
|
||||
raise ValueError("Function and/or coroutine must be provided")
|
||||
return cls(
|
||||
name=name,
|
||||
func=func,
|
||||
coroutine=coroutine,
|
||||
description=description,
|
||||
return_direct=return_direct,
|
||||
args_schema=args_schema,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
class StructuredTool(BaseTool):
|
||||
"""Tool that can operate on any number of inputs."""
|
||||
|
||||
description: str = ""
|
||||
args_schema: Type[BaseModel] = Field(..., description="The tool schema.")
|
||||
"""The input arguments' schema."""
|
||||
func: Optional[Callable[..., Any]]
|
||||
"""The function to run when the tool is called."""
|
||||
coroutine: Optional[Callable[..., Awaitable[Any]]] = None
|
||||
"""The asynchronous version of the function."""
|
||||
|
||||
# --- Runnable ---
|
||||
async def ainvoke(
|
||||
self,
|
||||
input: Union[str, Dict],
|
||||
config: Optional[RunnableConfig] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
if not self.coroutine:
|
||||
# If the tool does not implement async, fall back to default implementation
|
||||
return await asyncio.get_running_loop().run_in_executor(
|
||||
None, partial(self.invoke, input, config, **kwargs)
|
||||
)
|
||||
|
||||
return await super().ainvoke(input, config, **kwargs)
|
||||
|
||||
# --- Tool ---
|
||||
|
||||
@property
|
||||
def args(self) -> dict:
|
||||
"""The tool's input arguments."""
|
||||
return self.args_schema.schema()["properties"]
|
||||
|
||||
def _run(
|
||||
self,
|
||||
*args: Any,
|
||||
run_manager: Optional[CallbackManagerForToolRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> Any:
|
||||
"""Use the tool."""
|
||||
if self.func:
|
||||
new_argument_supported = signature(self.func).parameters.get(
|
||||
"callbacks"
|
||||
)
|
||||
return (
|
||||
self.func(
|
||||
*args,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**kwargs,
|
||||
)
|
||||
if new_argument_supported
|
||||
else self.func(*args, **kwargs)
|
||||
)
|
||||
raise NotImplementedError("Tool does not support sync")
|
||||
|
||||
async def _arun(
|
||||
self,
|
||||
*args: Any,
|
||||
run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
|
||||
**kwargs: Any,
|
||||
) -> str:
|
||||
"""Use the tool asynchronously."""
|
||||
if self.coroutine:
|
||||
new_argument_supported = signature(self.coroutine).parameters.get(
|
||||
"callbacks"
|
||||
)
|
||||
return (
|
||||
await self.coroutine(
|
||||
*args,
|
||||
callbacks=run_manager.get_child() if run_manager else None,
|
||||
**kwargs,
|
||||
)
|
||||
if new_argument_supported
|
||||
else await self.coroutine(*args, **kwargs)
|
||||
)
|
||||
return await asyncio.get_running_loop().run_in_executor(
|
||||
None,
|
||||
partial(self._run, run_manager=run_manager, **kwargs),
|
||||
*args,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_function(
|
||||
cls,
|
||||
func: Optional[Callable] = None,
|
||||
coroutine: Optional[Callable[..., Awaitable[Any]]] = None,
|
||||
name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
return_direct: bool = False,
|
||||
args_schema: Optional[Type[BaseModel]] = None,
|
||||
infer_schema: bool = True,
|
||||
**kwargs: Any,
|
||||
) -> StructuredTool:
|
||||
"""Create tool from a given function.
|
||||
|
||||
A classmethod that helps to create a tool from a function.
|
||||
|
||||
Args:
|
||||
func: The function from which to create a tool
|
||||
coroutine: The async function from which to create a tool
|
||||
name: The name of the tool. Defaults to the function name
|
||||
description: The description of the tool. Defaults to the function docstring
|
||||
return_direct: Whether to return the result directly or as a callback
|
||||
args_schema: The schema of the tool's input arguments
|
||||
infer_schema: Whether to infer the schema from the function's signature
|
||||
**kwargs: Additional arguments to pass to the tool
|
||||
|
||||
Returns:
|
||||
The tool
|
||||
|
||||
Examples:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
def add(a: int, b: int) -> int:
|
||||
\"\"\"Add two numbers\"\"\"
|
||||
return a + b
|
||||
tool = StructuredTool.from_function(add)
|
||||
tool.run(1, 2) # 3
|
||||
"""
|
||||
|
||||
if func is not None:
|
||||
source_function = func
|
||||
elif coroutine is not None:
|
||||
source_function = coroutine
|
||||
else:
|
||||
raise ValueError("Function and/or coroutine must be provided")
|
||||
name = name or source_function.__name__
|
||||
description = description or source_function.__doc__
|
||||
if description is None:
|
||||
raise ValueError(
|
||||
"Function must have a docstring if description not provided."
|
||||
)
|
||||
|
||||
# Description example:
|
||||
# search_api(query: str) - Searches the API for the query.
|
||||
sig = signature(source_function)
|
||||
description = f"{name}{sig} - {description.strip()}"
|
||||
_args_schema = args_schema
|
||||
if _args_schema is None and infer_schema:
|
||||
_args_schema = create_schema_from_function(
|
||||
f"{name}Schema", source_function
|
||||
)
|
||||
return cls(
|
||||
name=name,
|
||||
func=func,
|
||||
coroutine=coroutine,
|
||||
args_schema=_args_schema,
|
||||
description=description,
|
||||
return_direct=return_direct,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
def tool(
|
||||
*args: Union[str, Callable, Runnable],
|
||||
return_direct: bool = False,
|
||||
args_schema: Optional[Type[BaseModel]] = None,
|
||||
infer_schema: bool = True,
|
||||
) -> Callable:
|
||||
"""Make tools out of functions, can be used with or without arguments.
|
||||
|
||||
Args:
|
||||
*args: The arguments to the tool.
|
||||
return_direct: Whether to return directly from the tool rather
|
||||
than continuing the agent loop.
|
||||
args_schema: optional argument schema for user to specify
|
||||
infer_schema: Whether to infer the schema of the arguments from
|
||||
the function's signature. This also makes the resultant tool
|
||||
accept a dictionary input to its `run()` function.
|
||||
|
||||
Requires:
|
||||
- Function must be of type (str) -> str
|
||||
- Function must have a docstring
|
||||
|
||||
Examples:
|
||||
.. code-block:: python
|
||||
|
||||
@tool
|
||||
def search_api(query: str) -> str:
|
||||
# Searches the API for the query.
|
||||
return
|
||||
|
||||
@tool("search", return_direct=True)
|
||||
def search_api(query: str) -> str:
|
||||
# Searches the API for the query.
|
||||
return
|
||||
"""
|
||||
|
||||
def _make_with_name(tool_name: str) -> Callable:
|
||||
def _make_tool(dec_func: Union[Callable, Runnable]) -> BaseTool:
|
||||
if isinstance(dec_func, Runnable):
|
||||
runnable = dec_func
|
||||
|
||||
if runnable.input_schema.schema().get("type") != "object":
|
||||
raise ValueError("Runnable must have an object schema.")
|
||||
|
||||
async def ainvoke_wrapper(
|
||||
callbacks: Optional[Callbacks] = None, **kwargs: Any
|
||||
) -> Any:
|
||||
return await runnable.ainvoke(
|
||||
kwargs, {"callbacks": callbacks}
|
||||
)
|
||||
|
||||
def invoke_wrapper(
|
||||
callbacks: Optional[Callbacks] = None, **kwargs: Any
|
||||
) -> Any:
|
||||
return runnable.invoke(kwargs, {"callbacks": callbacks})
|
||||
|
||||
coroutine = ainvoke_wrapper
|
||||
func = invoke_wrapper
|
||||
schema: Optional[Type[BaseModel]] = runnable.input_schema
|
||||
description = repr(runnable)
|
||||
elif inspect.iscoroutinefunction(dec_func):
|
||||
coroutine = dec_func
|
||||
func = None
|
||||
schema = args_schema
|
||||
description = None
|
||||
else:
|
||||
coroutine = None
|
||||
func = dec_func
|
||||
schema = args_schema
|
||||
description = None
|
||||
|
||||
if infer_schema or args_schema is not None:
|
||||
return StructuredTool.from_function(
|
||||
func,
|
||||
coroutine,
|
||||
name=tool_name,
|
||||
description=description,
|
||||
return_direct=return_direct,
|
||||
args_schema=schema,
|
||||
infer_schema=infer_schema,
|
||||
)
|
||||
# If someone doesn't want a schema applied, we must treat it as
|
||||
# a simple string->string function
|
||||
if func.__doc__ is None:
|
||||
raise ValueError(
|
||||
"Function must have a docstring if "
|
||||
"description not provided and infer_schema is False."
|
||||
)
|
||||
return Tool(
|
||||
name=tool_name,
|
||||
func=func,
|
||||
description=f"{tool_name} tool",
|
||||
return_direct=return_direct,
|
||||
coroutine=coroutine,
|
||||
)
|
||||
|
||||
return _make_tool
|
||||
|
||||
if (
|
||||
len(args) == 2
|
||||
and isinstance(args[0], str)
|
||||
and isinstance(args[1], Runnable)
|
||||
):
|
||||
return _make_with_name(args[0])(args[1])
|
||||
elif len(args) == 1 and isinstance(args[0], str):
|
||||
# if the argument is a string, then we use the string as the tool name
|
||||
# Example usage: @tool("search", return_direct=True)
|
||||
return _make_with_name(args[0])
|
||||
elif len(args) == 1 and callable(args[0]):
|
||||
# if the argument is a function, then we use the function name as the tool name
|
||||
# Example usage: @tool
|
||||
return _make_with_name(args[0].__name__)(args[0])
|
||||
elif len(args) == 0:
|
||||
# if there are no arguments, then we use the function name as the tool name
|
||||
# Example usage: @tool(return_direct=True)
|
||||
def _partial(func: Callable[[str], str]) -> BaseTool:
|
||||
return _make_with_name(func.__name__)(func)
|
||||
|
||||
return _partial
|
||||
else:
|
||||
raise ValueError("Too many arguments for tool decorator")
|
@ -1,45 +0,0 @@
|
||||
from swarms.tools.tool import tool
|
||||
from typing import Dict, Callable, Any, List
|
||||
|
||||
ToolBuilder = Callable[[Any], tool]
|
||||
FuncToolBuilder = Callable[[], ToolBuilder]
|
||||
|
||||
|
||||
class ToolsRegistry:
|
||||
def __init__(self) -> None:
|
||||
self.tools: Dict[str, FuncToolBuilder] = {}
|
||||
|
||||
def register(self, tool_name: str, tool: FuncToolBuilder):
|
||||
print(f"will register {tool_name}")
|
||||
self.tools[tool_name] = tool
|
||||
|
||||
def build(self, tool_name, config):
|
||||
ret = self.tools[tool_name]()(config)
|
||||
if isinstance(ret, tool):
|
||||
return ret
|
||||
raise ValueError(
|
||||
"Tool builder {} did not return a Tool instance".format(tool_name)
|
||||
)
|
||||
|
||||
def list_tools(self) -> List[str]:
|
||||
return list(self.tools.keys())
|
||||
|
||||
|
||||
tools_registry = ToolsRegistry()
|
||||
|
||||
|
||||
def register(tool_name):
|
||||
def decorator(tool: FuncToolBuilder):
|
||||
tools_registry.register(tool_name, tool)
|
||||
return tool
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def build_tool(tool_name: str, config: Any) -> tool:
|
||||
print(f"will build {tool_name}")
|
||||
return tools_registry.build(tool_name, config)
|
||||
|
||||
|
||||
def list_tools() -> List[str]:
|
||||
return tools_registry.list_tools()
|
@ -1,13 +1,12 @@
|
||||
from swarms.utils.markdown_message import display_markdown_message
|
||||
from swarms.utils.display_markdown import display_markdown_message
|
||||
from swarms.utils.futures import execute_futures_dict
|
||||
from swarms.utils.code_interpreter import SubprocessCodeInterpreter
|
||||
from swarms.utils.parse_code import extract_code_in_backticks_in_string
|
||||
from swarms.utils.pdf_to_text import pdf_to_text
|
||||
from swarms.utils.tool_logging import get_logger
|
||||
|
||||
__all__ = [
|
||||
"display_markdown_message",
|
||||
"execute_futures_dict",
|
||||
"SubprocessCodeInterpreter",
|
||||
"extract_code_in_backticks_in_string",
|
||||
"pdf_to_text",
|
||||
]
|
||||
|
@ -0,0 +1,23 @@
|
||||
from rich import print as rich_print
|
||||
from rich.markdown import Markdown
|
||||
from rich.rule import Rule
|
||||
|
||||
|
||||
def display_markdown_message(message):
|
||||
"""
|
||||
Display markdown message. Works with multiline strings with lots of indentation.
|
||||
Will automatically make single line > tags beautiful.
|
||||
"""
|
||||
|
||||
for line in message.split("\n"):
|
||||
line = line.strip()
|
||||
if line == "":
|
||||
print("")
|
||||
elif line == "---":
|
||||
rich_print(Rule(style="white"))
|
||||
else:
|
||||
rich_print(Markdown(line))
|
||||
|
||||
if "\n" not in message and message.startswith(">"):
|
||||
# Aesthetic choice. For these tags, they need a space below them
|
||||
print("")
|
@ -0,0 +1,78 @@
|
||||
import json
|
||||
import os
|
||||
import requests
|
||||
import numpy as np
|
||||
|
||||
import nltk
|
||||
from nltk.stem import WordNetLemmatizer
|
||||
from nltk.corpus import wordnet, stopwords
|
||||
from nltk.tokenize import word_tokenize
|
||||
|
||||
from swarms.tools.database.utils.db_parser import get_conf
|
||||
from swarms.tools.database.utils.database import DBArgs, Database
|
||||
from swarms.tools.db_diag.anomaly_detection import detect_anomalies
|
||||
from swarms.tools.db_diag.anomaly_detection import prometheus
|
||||
|
||||
from swarms.tools.db_diag.example_generate import bm25
|
||||
|
||||
# match with external knowledge for in-context learning
|
||||
|
||||
class KnowledgeExtraction():
|
||||
|
||||
def __init__(self, file_path, topk=3, keyword_matching_func=bm25):
|
||||
|
||||
# select an attribute in the jsons to embed
|
||||
self.names = {"matched_attr": "cause_name"}
|
||||
self.cause_name = self.names["matched_attr"]
|
||||
|
||||
nltk.download('stopwords')
|
||||
nltk.download('punkt')
|
||||
nltk.download('averaged_perceptron_tagger')
|
||||
nltk.download('wordnet')
|
||||
self.wnl = WordNetLemmatizer()
|
||||
self.keyword_matching_func = keyword_matching_func
|
||||
|
||||
self.topk = topk
|
||||
|
||||
self.corpus, self.preprocessed_corpus, self.matched_attr, self.stop_words = self.knowledge_load(file_path)
|
||||
|
||||
def knowledge_load(self, file_path):
|
||||
|
||||
# file_path = "/swarms/tools/db_diag/root_causes_dbmind.jsonl"
|
||||
with open(str(os.getcwd()) + file_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
self.corpus = [example["desc"] for example in data]
|
||||
self.matched_attr = [example[self.names["matched_attr"]] for example in data]
|
||||
self.stop_words = set(stopwords.words('english'))
|
||||
|
||||
self.preprocessed_corpus = []
|
||||
for c in self.corpus:
|
||||
word_tokens = word_tokenize(c)
|
||||
self.preprocessed_corpus.append([self.wnl.lemmatize(w,pos='n') for w in word_tokens if not w in self.stop_words]) # remove useless words and standardize words
|
||||
|
||||
return self.corpus, self.preprocessed_corpus, self.matched_attr, self.stop_words
|
||||
|
||||
def match(self, detailed_metrics):
|
||||
|
||||
metrics_str = []
|
||||
for metrics in detailed_metrics.keys():
|
||||
metrics = metrics.replace("_"," ")
|
||||
word_tokens = word_tokenize(metrics)
|
||||
metrics_str.extend([self.wnl.lemmatize(w,pos='n') for w in word_tokens if not w in self.stop_words])
|
||||
metrics_str = list(set(metrics_str))
|
||||
|
||||
best_index = self.keyword_matching_func(self.topk, metrics_str, self.preprocessed_corpus)
|
||||
best_docs = [self.corpus[b] for b in best_index]
|
||||
best_names = [self.matched_attr[b] for b in best_index]
|
||||
docs_str = ""
|
||||
print("Best docs: ", best_docs)
|
||||
for i, docs in enumerate(best_docs):
|
||||
docs_str = docs_str + "{}: ".format(best_names[i]) + docs + "\n\n"
|
||||
print("docs_str: ", docs_str)
|
||||
|
||||
return docs_str
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
matcher = KnowledgeExtraction("/root_causes_dbmind.jsonl")
|
||||
print(matcher.match({"memory_resource_contention":123, "node_scrape_collector_duration_seconds": 1293}))
|
@ -0,0 +1,29 @@
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
# from env import DotEnv
|
||||
|
||||
from swarms.utils.main import AbstractUploader
|
||||
|
||||
|
||||
class StaticUploader(AbstractUploader):
|
||||
def __init__(self, server: str, path: Path, endpoint: str):
|
||||
self.server = server
|
||||
self.path = path
|
||||
self.endpoint = endpoint
|
||||
|
||||
@staticmethod
|
||||
def from_settings(path: Path, endpoint: str) -> "StaticUploader":
|
||||
return StaticUploader(os.environ["SERVER"], path, endpoint)
|
||||
|
||||
def get_url(self, uploaded_path: str) -> str:
|
||||
return f"{self.server}/{uploaded_path}"
|
||||
|
||||
def upload(self, filepath: str):
|
||||
relative_path = Path("generated") / filepath.split("/")[-1]
|
||||
file_path = self.path / relative_path
|
||||
os.makedirs(os.path.dirname(file_path), exist_ok=True)
|
||||
shutil.copy(filepath, file_path)
|
||||
endpoint_path = self.endpoint / relative_path
|
||||
return f"{self.server}/{endpoint_path}"
|
@ -0,0 +1,285 @@
|
||||
# coding=utf-8
|
||||
# Copyright 2020 Optuna, Hugging Face
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# swarms.tools copied from Huggingface Transformers
|
||||
""" Logging utilities."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
from logging import CRITICAL # NOQA
|
||||
from logging import DEBUG # NOQA
|
||||
from logging import ERROR # NOQA
|
||||
from logging import FATAL # NOQA
|
||||
from logging import INFO # NOQA
|
||||
from logging import NOTSET # NOQA
|
||||
from logging import WARN # NOQA
|
||||
from logging import WARNING # NOQA
|
||||
from typing import Optional
|
||||
|
||||
|
||||
_lock = threading.Lock()
|
||||
_default_handler: Optional[logging.Handler] = None
|
||||
|
||||
log_levels = {
|
||||
"debug": logging.DEBUG,
|
||||
"info": logging.INFO,
|
||||
"warning": logging.WARNING,
|
||||
"error": logging.ERROR,
|
||||
"critical": logging.CRITICAL,
|
||||
}
|
||||
|
||||
_default_log_level = logging.INFO
|
||||
|
||||
|
||||
def _get_default_logging_level():
|
||||
"""
|
||||
If SWARMSTOOLS_VERBOSITY env var is set to one of the valid choices return that as the new default level. If it is
|
||||
not - fall back to ``_default_log_level``
|
||||
"""
|
||||
env_level_str = os.getenv("SWARMSTOOLS_VERBOSITY", None)
|
||||
if env_level_str:
|
||||
if env_level_str in log_levels:
|
||||
return log_levels[env_level_str]
|
||||
else:
|
||||
logging.getLogger().warning(
|
||||
f"Unknown option SWARMSTOOLS_VERBOSITY={env_level_str}, "
|
||||
f"has to be one of: { ', '.join(log_levels.keys()) }"
|
||||
)
|
||||
return _default_log_level
|
||||
|
||||
|
||||
def _get_library_name() -> str:
|
||||
|
||||
return __name__.split(".")[0]
|
||||
|
||||
|
||||
def _get_library_root_logger() -> logging.Logger:
|
||||
|
||||
return logging.getLogger(_get_library_name())
|
||||
|
||||
|
||||
def _configure_library_root_logger() -> None:
|
||||
|
||||
global _default_handler
|
||||
|
||||
with _lock:
|
||||
if _default_handler:
|
||||
# This library has already configured the library root logger.
|
||||
return
|
||||
_default_handler = logging.StreamHandler() # Set sys.stderr as stream.
|
||||
_default_handler.flush = sys.stderr.flush
|
||||
formatter = logging.Formatter(
|
||||
"\033[1;31m[%(levelname)s|(SWARMSTools)%(module)s:%(lineno)d]%(asctime)s >> \033[0m %(message)s")
|
||||
_default_handler.setFormatter(formatter)
|
||||
|
||||
# Apply our default configuration to the library root logger.
|
||||
library_root_logger = _get_library_root_logger()
|
||||
library_root_logger.addHandler(_default_handler)
|
||||
library_root_logger.setLevel(_get_default_logging_level())
|
||||
|
||||
|
||||
library_root_logger.propagate = False
|
||||
|
||||
|
||||
def _reset_library_root_logger() -> None:
|
||||
|
||||
global _default_handler
|
||||
|
||||
with _lock:
|
||||
if not _default_handler:
|
||||
return
|
||||
|
||||
library_root_logger = _get_library_root_logger()
|
||||
library_root_logger.removeHandler(_default_handler)
|
||||
library_root_logger.setLevel(logging.NOTSET)
|
||||
_default_handler = None
|
||||
|
||||
|
||||
def get_log_levels_dict():
|
||||
return log_levels
|
||||
|
||||
|
||||
|
||||
def get_verbosity() -> int:
|
||||
"""
|
||||
Return the current level for the 🤗 Transformers's root logger as an int.
|
||||
Returns:
|
||||
:obj:`int`: The logging level.
|
||||
<Tip>
|
||||
🤗 Transformers has following logging levels:
|
||||
- 50: ``transformers.logging.CRITICAL`` or ``transformers.logging.FATAL``
|
||||
- 40: ``transformers.logging.ERROR``
|
||||
- 30: ``transformers.logging.WARNING`` or ``transformers.logging.WARN``
|
||||
- 20: ``transformers.logging.INFO``
|
||||
- 10: ``transformers.logging.DEBUG``
|
||||
</Tip>"""
|
||||
|
||||
_configure_library_root_logger()
|
||||
return _get_library_root_logger().getEffectiveLevel()
|
||||
|
||||
|
||||
def set_verbosity(verbosity: int) -> None:
|
||||
"""
|
||||
Set the verbosity level for the 🤗 Transformers's root logger.
|
||||
Args:
|
||||
verbosity (:obj:`int`):
|
||||
Logging level, e.g., one of:
|
||||
- ``transformers.logging.CRITICAL`` or ``transformers.logging.FATAL``
|
||||
- ``transformers.logging.ERROR``
|
||||
- ``transformers.logging.WARNING`` or ``transformers.logging.WARN``
|
||||
- ``transformers.logging.INFO``
|
||||
- ``transformers.logging.DEBUG``
|
||||
"""
|
||||
|
||||
_configure_library_root_logger()
|
||||
_get_library_root_logger().setLevel(verbosity)
|
||||
|
||||
|
||||
def set_verbosity_info():
|
||||
"""Set the verbosity to the ``INFO`` level."""
|
||||
return set_verbosity(INFO)
|
||||
|
||||
|
||||
def set_verbosity_warning():
|
||||
"""Set the verbosity to the ``WARNING`` level."""
|
||||
return set_verbosity(WARNING)
|
||||
|
||||
|
||||
def set_verbosity_debug():
|
||||
"""Set the verbosity to the ``DEBUG`` level."""
|
||||
return set_verbosity(DEBUG)
|
||||
|
||||
|
||||
def set_verbosity_error():
|
||||
"""Set the verbosity to the ``ERROR`` level."""
|
||||
return set_verbosity(ERROR)
|
||||
|
||||
|
||||
def disable_default_handler() -> None:
|
||||
"""Disable the default handler of the HuggingFace Transformers's root logger."""
|
||||
|
||||
_configure_library_root_logger()
|
||||
|
||||
assert _default_handler is not None
|
||||
_get_library_root_logger().removeHandler(_default_handler)
|
||||
|
||||
|
||||
def enable_default_handler() -> None:
|
||||
"""Enable the default handler of the HuggingFace Transformers's root logger."""
|
||||
|
||||
_configure_library_root_logger()
|
||||
|
||||
assert _default_handler is not None
|
||||
_get_library_root_logger().addHandler(_default_handler)
|
||||
|
||||
|
||||
def add_handler(handler: logging.Handler) -> None:
|
||||
"""adds a handler to the HuggingFace Transformers's root logger."""
|
||||
|
||||
_configure_library_root_logger()
|
||||
|
||||
assert handler is not None
|
||||
_get_library_root_logger().addHandler(handler)
|
||||
|
||||
|
||||
def remove_handler(handler: logging.Handler) -> None:
|
||||
"""removes given handler from the HuggingFace Transformers's root logger."""
|
||||
|
||||
_configure_library_root_logger()
|
||||
|
||||
assert handler is not None and handler not in _get_library_root_logger().handlers
|
||||
_get_library_root_logger().removeHandler(handler)
|
||||
|
||||
|
||||
def disable_propagation() -> None:
|
||||
"""
|
||||
Disable propagation of the library log outputs. Note that log propagation is disabled by default.
|
||||
"""
|
||||
|
||||
_configure_library_root_logger()
|
||||
_get_library_root_logger().propagate = False
|
||||
|
||||
|
||||
def enable_propagation() -> None:
|
||||
"""
|
||||
Enable propagation of the library log outputs. Please disable the HuggingFace Transformers's default handler to
|
||||
prevent double logging if the root logger has been configured.
|
||||
"""
|
||||
|
||||
_configure_library_root_logger()
|
||||
_get_library_root_logger().propagate = True
|
||||
|
||||
|
||||
def enable_explicit_format() -> None:
|
||||
"""
|
||||
Enable explicit formatting for every HuggingFace Transformers's logger. The explicit formatter is as follows:
|
||||
```
|
||||
[LEVELNAME|FILENAME|LINE NUMBER] TIME >> MESSAGE
|
||||
```
|
||||
All handlers currently bound to the root logger are affected by this method.
|
||||
"""
|
||||
handlers = _get_library_root_logger().handlers
|
||||
|
||||
for handler in handlers:
|
||||
formatter = logging.Formatter("[%(levelname)s|%(filename)s:%(lineno)s] %(asctime)s >> %(message)s")
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
|
||||
def reset_format() -> None:
|
||||
"""
|
||||
Resets the formatting for HuggingFace Transformers's loggers.
|
||||
All handlers currently bound to the root logger are affected by this method.
|
||||
"""
|
||||
handlers = _get_library_root_logger().handlers
|
||||
|
||||
for handler in handlers:
|
||||
handler.setFormatter(None)
|
||||
|
||||
|
||||
def warning_advice(self, *args, **kwargs):
|
||||
"""
|
||||
This method is identical to ``logger.warning()``, but if env var TRANSFORMERS_NO_ADVISORY_WARNINGS=1 is set, this
|
||||
warning will not be printed
|
||||
"""
|
||||
no_advisory_warnings = os.getenv("TRANSFORMERS_NO_ADVISORY_WARNINGS", False)
|
||||
if no_advisory_warnings:
|
||||
return
|
||||
self.warning(*args, **kwargs)
|
||||
|
||||
|
||||
logging.Logger.warning_advice = warning_advice
|
||||
|
||||
|
||||
def get_logger(name: Optional[str] = None, verbosity='info') -> logging.Logger:
|
||||
"""
|
||||
Return a logger with the specified name.
|
||||
This function is not supposed to be directly accessed unless you are writing a custom transformers module.
|
||||
"""
|
||||
|
||||
if name is None:
|
||||
name = _get_library_name()
|
||||
|
||||
_configure_library_root_logger()
|
||||
logger = logging.getLogger(name)
|
||||
logger.setLevel(log_levels[verbosity])
|
||||
|
||||
# Set up a file handler to write log messages to a file
|
||||
# file_handler = logging.FileHandler('/Users/xuanhe/Documents/our-paper/instructdb/code/BMTools/swarms.tools/tools/database/my_log_file.log')
|
||||
# file_handler.setLevel(log_levels[verbosity])
|
||||
# logger.addHandler(file_handler)
|
||||
|
||||
return logger
|
Loading…
Reference in new issue