[cleanup][un-used tool files]

pull/767/head
Kye Gomez 2 months ago
parent e0dd2d4578
commit 630a7826b8

@ -24,7 +24,7 @@ def bootup():
):
logger.disable("")
logging.disable(logging.CRITICAL)
else:
logger.enable("")

@ -1,5 +0,0 @@
from swarms.tools.prebuilt.math_eval import math_eval
__all__ = [
"math_eval",
]

@ -1,82 +0,0 @@
import os
import requests
from typing import List, Dict
def check_bing_api_key():
try:
return os.getenv("BING_API_KEY")
except Exception as error:
print(f"Error {error}")
raise None
def parse_and_merge_logs(logs: List[Dict[str, str]]) -> str:
"""
Parses logs and merges them into a single string for input to an LLM.
Parameters:
logs (List[Dict[str, str]]): A list of dictionaries where each dictionary represents a log entry.
Returns:
str: A single string containing all log entries concatenated.
"""
merged_logs = ""
for log in logs:
log_entries = [
f"{key}: {value}" for key, value in log.items()
]
log_string = "\n".join(log_entries)
merged_logs += log_string + "\n\n"
return merged_logs.strip()
def fetch_web_articles_bing_api(
query: str = None,
) -> List[Dict[str, str]]:
"""
Fetches four articles from Bing Web Search API based on the given query.
Parameters:
query (str): The search query to retrieve articles.
subscription_key (str): The Bing Search API subscription key.
Returns:
List[Dict[str, str]]: A list of dictionaries containing article details.
"""
subscription_key = check_bing_api_key()
url = "https://api.bing.microsoft.com/v7.0/search"
headers = {"Ocp-Apim-Subscription-Key": subscription_key}
params = {"q": query, "count": 4, "mkt": "en-US"}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
search_results = response.json()
articles = []
for i, result in enumerate(
search_results.get("webPages", {}).get("value", [])
):
article_info = {
"query": query,
"url": result.get("url"),
"title": result.get("name"),
"publishedDate": result.get("dateLastCrawled"),
"author": (
result.get("provider")[0]["name"]
if result.get("provider")
else "Unknown"
),
"id": str(i + 1), # Generating a simple unique ID
}
articles.append(article_info)
articles = parse_and_merge_logs(articles)
return articles
# out = fetch_web_articles_bing_api("swarms ai github")
# print(out)

@ -1,231 +0,0 @@
import os
import subprocess
from loguru import logger
try:
import tiktoken
except ImportError:
print("tiktoken not found, installing...")
subprocess.run(["pip", "install", "tiktoken"])
import tiktoken
import concurrent.futures
from typing import List
class TikTokenizer:
def __init__(
self,
model_name: str = "o200k_base",
):
"""
Initializes a TikTokenizer object.
Args:
model_name (str, optional): The name of the model to use for tokenization. Defaults to "gpt-4o".
"""
try:
self.model_name = model_name
self.encoding = tiktoken.get_encoding(model_name)
except Exception as e:
raise ValueError(
f"Failed to initialize tokenizer with model '{model_name}': {str(e)}"
)
def encode(self, string: str) -> str:
"""
Tokenizes a text string.
Args:
string (str): The input text string.
Returns:
str: The tokenized text string.
"""
return self.encoding.encode(string)
def decode(self, tokens: List[int]) -> str:
"""
Detokenizes a text string.
Args:
string (str): The input tokenized text string.
Returns:
str: The detokenized text string.
"""
return self.encoding.decode(tokens)
def count_tokens(self, string: str) -> int:
"""
Returns the number of tokens in a text string.
Args:
string (str): The input text string.
Returns:
int: The number of tokens in the text string.
"""
num_tokens = 0
def count_tokens_in_chunk(chunk):
nonlocal num_tokens
num_tokens += len(self.encoding.encode(chunk))
# Split the string into chunks for parallel processing
chunks = [
string[i : i + 1000] for i in range(0, len(string), 1000)
]
# Create a ThreadPoolExecutor with maximum threads
with concurrent.futures.ThreadPoolExecutor(
max_workers=10
) as executor:
# Submit each chunk for processing
futures = [
executor.submit(count_tokens_in_chunk, chunk)
for chunk in chunks
]
# Wait for all futures to complete
concurrent.futures.wait(futures)
return num_tokens
class CodeExecutor:
"""
A class to execute Python code and return the output as a string.
The class also logs the input and output using loguru and stores the outputs
in a folder called 'artifacts'.
Methods:
execute(code: str) -> str:
Executes the given Python code and returns the output.
"""
def __init__(
self,
max_output_length: int = 1000,
artifacts_directory: str = "artifacts",
language: str = "python3",
) -> None:
"""
Initializes the CodeExecutor class and sets up the logging.
"""
self.max_output_length = max_output_length
self.artifacts_dir = artifacts_directory
self.language = language
os.makedirs(self.artifacts_dir, exist_ok=True)
self.setup_logging()
self.tokenizer = TikTokenizer()
def setup_logging(self) -> None:
"""
Sets up the loguru logger with colorful output.
"""
logger.add(
os.path.join(self.artifacts_dir, "code_execution.log"),
format="{time} {level} {message}",
level="DEBUG",
)
logger.info(
"Logger initialized and artifacts directory set up."
)
def format_code(self, code: str) -> str:
"""
Formats the given Python code using black.
Args:
code (str): The Python code to format.
Returns:
str: The formatted Python code.
Raises:
ValueError: If the code cannot be formatted.
"""
try:
import black
formatted_code = black.format_str(
code, mode=black.FileMode()
)
return formatted_code
except Exception as e:
logger.error(f"Error formatting code: {e}")
raise ValueError(f"Error formatting code: {e}") from e
def execute(self, code: str) -> str:
"""
Executes the given Python code and returns the output.
Args:
code (str): The Python code to execute.
Returns:
str: The output of the executed code.
Raises:
RuntimeError: If there is an error during the execution of the code.
"""
try:
formatted_code = self.format_code(code)
logger.info(f"Executing code:\n{formatted_code}")
completed_process = subprocess.run(
[self.language, "-c", formatted_code],
capture_output=True,
text=True,
check=True,
)
output = completed_process.stdout
logger.info(f"Code output:\n{output}")
token_count = self.tokenizer.count_tokens(output)
print(token_count)
if (
self.max_output_length
and token_count > self.max_output_length
):
logger.warning(
f"Output length exceeds {self.max_output_length} characters. Truncating output."
)
output = output[: self.max_output_length] + "..."
return output
except subprocess.CalledProcessError as e:
logger.error(f"Error executing code: {e.stderr}")
raise RuntimeError(
f"Error executing code: {e.stderr}"
) from e
# # Example usage:
# if __name__ == "__main__":
# executor = CodeExecutor(max_output_length=300)
# code = """
# import requests
# from typing import Any
# def fetch_financial_news(api_key: str, query: str, num_articles: int) -> Any:
# try:
# url = f"https://newsapi.org/v2/everything?q={query}&apiKey={api_key}"
# response = requests.get(url)
# response.raise_for_status()
# return response.json()
# except requests.RequestException as e:
# print(f"Request Error: {e}")
# raise
# except ValueError as e:
# print(f"Value Error: {e}")
# raise
# api_key = ""
# result = fetch_financial_news(api_key, query="Nvidia news", num_articles=5)
# print(result)
# """
# result = executor.execute(code)
# print(result)

@ -1,232 +0,0 @@
import queue
import subprocess
import threading
import time
import traceback
from swarms.utils.loguru_logger import logger
class SubprocessCodeInterpreter:
"""
SubprocessCodeinterpreter is a base class for code interpreters that run code in a subprocess.
Attributes:
start_cmd (str): The command to start the subprocess. Should be a string that can be split by spaces.
process (subprocess.Popen): The subprocess that is running the code.
debug_mode (bool): Whether to print debug statements.
output_queue (queue.Queue): A queue that is filled with output from the subprocess.
done (threading.Event): An event that is set when the subprocess is done running code.
Example:
"""
def __init__(
self,
start_cmd: str = "python3",
debug_mode: bool = False,
max_retries: int = 3,
verbose: bool = False,
retry_count: int = 0,
*args,
**kwargs,
):
self.process = None
self.start_cmd = start_cmd
self.debug_mode = debug_mode
self.max_retries = max_retries
self.verbose = verbose
self.retry_count = retry_count
self.output_queue = queue.Queue()
self.done = threading.Event()
def detect_active_line(self, line):
"""Detect if the line is an active line
Args:
line (_type_): _description_
Returns:
_type_: _description_
"""
return None
def detect_end_of_execution(self, line):
"""detect if the line is an end of execution line
Args:
line (_type_): _description_
Returns:
_type_: _description_
"""
return None
def line_postprocessor(self, line):
"""Line postprocessor
Args:
line (_type_): _description_
Returns:
_type_: _description_
"""
return line
def preprocess_code(self, code):
"""
This needs to insert an end_of_execution marker of some kind,
which can be detected by detect_end_of_execution.
Optionally, add active line markers for detect_active_line.
"""
return code
def terminate(self):
"""terminate the subprocess"""
self.process.terminate()
def start_process(self):
"""start the subprocess"""
if self.process:
self.terminate()
logger.info(
f"Starting subprocess with command: {self.start_cmd}"
)
self.process = subprocess.Popen(
self.start_cmd.split(),
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=0,
universal_newlines=True,
)
threading.Thread(
target=self.handle_stream_output,
args=(self.process.stdout, False),
daemon=True,
).start()
threading.Thread(
target=self.handle_stream_output,
args=(self.process.stderr, True),
daemon=True,
).start()
return self.process
def run(self, code: str):
"""Run the code in the subprocess
Args:
code (str): _description_
Yields:
_type_: _description_
"""
# Setup
logger.info("Running code in subprocess")
try:
code = self.preprocess_code(code)
if not self.process:
self.start_process()
except BaseException:
yield {"output": traceback.format_exc()}
return
while self.retry_count <= self.max_retries:
if self.debug_mode:
print(f"Running code:\n{code}\n---")
self.done.clear()
try:
self.process.stdin.write(code + "\n")
self.process.stdin.flush()
break
except BaseException:
if self.retry_count != 0:
# For UX, I like to hide this if it happens once. Obviously feels better to not see errors
# Most of the time it doesn't matter, but we should figure out why it happens frequently with:
# applescript
yield {"output": traceback.format_exc()}
yield {
"output": (
"Retrying..."
f" ({self.retry_count}/{self.max_retries})"
)
}
yield {"output": "Restarting process."}
self.start_process()
self.retry_count += 1
if self.retry_count > self.max_retries:
yield {
"output": (
"Maximum retries reached. Could not"
" execute code."
)
}
return
while True:
if not self.output_queue.empty():
yield self.output_queue.get()
else:
time.sleep(0.1)
try:
output = self.output_queue.get(
timeout=0.3
) # Waits for 0.3 seconds
yield output
except queue.Empty:
if self.done.is_set():
# Try to yank 3 more times from it... maybe there's something in there...
# (I don't know if this actually helps. Maybe we just need to yank 1 more time)
for _ in range(3):
if not self.output_queue.empty():
yield self.output_queue.get()
time.sleep(0.2)
break
def handle_stream_output(self, stream, is_error_stream):
"""Handle the output from the subprocess
Args:
stream (_type_): _description_
is_error_stream (bool): _description_
"""
for line in iter(stream.readline, ""):
if self.debug_mode:
print(f"Received output line:\n{line}\n---")
line = self.line_postprocessor(line)
if line is None:
continue # `line = None` is the postprocessor's signal to discard completely
if self.detect_active_line(line):
active_line = self.detect_active_line(line)
self.output_queue.put({"active_line": active_line})
elif self.detect_end_of_execution(line):
self.output_queue.put({"active_line": None})
time.sleep(0.1)
self.done.set()
elif is_error_stream and "KeyboardInterrupt" in line:
self.output_queue.put({"output": "KeyboardInterrupt"})
time.sleep(0.1)
self.done.set()
else:
self.output_queue.put({"output": line})
# interpreter = SubprocessCodeInterpreter()
# interpreter.start_cmd = "python3"
# out = interpreter.run("""
# print("hello")
# print("world")
# """)
# print(out)

@ -1,61 +0,0 @@
import functools
import logging
def math_eval(func1, func2):
"""Math evaluation decorator.
Args:
func1 (_type_): _description_
func2 (_type_): _description_
Example:
>>> @math_eval(ground_truth, generated_func)
>>> def test_func(x):
>>> return x
>>> result1, result2 = test_func(5)
>>> print(f"Result from ground_truth: {result1}")
>>> print(f"Result from generated_func: {result2}")
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
result1 = func1(*args, **kwargs)
except Exception as e:
logging.error(f"Error in func1: {e}")
result1 = None
try:
result2 = func2(*args, **kwargs)
except Exception as e:
logging.error(f"Error in func2: {e}")
result2 = None
if result1 != result2:
logging.warning(
f"Outputs do not match: {result1} != {result2}"
)
return result1, result2
return wrapper
return decorator
# def ground_truth(x):
# return x * 2
# def generated_func(x):
# return x - 10
# @math_eval(ground_truth, generated_func)
# def test_func(x):
# return x
# result1, result2 = test_func(5)
# print(f"Result from ground_truth: {result1}")
# print(f"Result from generated_func: {result2}")

@ -13,7 +13,6 @@ from swarms.utils.file_processing import (
zip_folders,
)
from swarms.utils.markdown_message import display_markdown_message
from swarms.tools.prebuilt.math_eval import math_eval
from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.try_except_wrapper import try_except_wrapper
@ -32,7 +31,6 @@ __all__ = [
"create_file_in_folder",
"zip_folders",
"display_markdown_message",
"math_eval",
"extract_code_from_markdown",
"pdf_to_text",
"try_except_wrapper",

Loading…
Cancel
Save