[FEAT][Improved Logging]

pull/392/head
Kye 11 months ago
parent afc8df570a
commit c82f4f9b5a

@ -3,7 +3,7 @@ from swarms import Agent, OpenAIChat
## Initialize the workflow ## Initialize the workflow
agent = Agent( agent = Agent(
llm=OpenAIChat(), llm=OpenAIChat(),
max_loops="auto", max_loops=1,
autosave=True, autosave=True,
dashboard=False, dashboard=False,
streaming_on=True, streaming_on=True,
@ -14,3 +14,4 @@ agent = Agent(
agent( agent(
"Find a chick fil a equivalent in hayes valley" "Find a chick fil a equivalent in hayes valley"
) )

@ -4,9 +4,9 @@ from swarms import Agent, MajorityVoting, OpenAIChat
llm = OpenAIChat() llm = OpenAIChat()
# Initialize the agents # Initialize the agents
agent1 = Agent(llm=llm, max_loops=1) agent1 = Agent(agent_name="worker-1", llm=llm, max_loops=1)
agent2 = Agent(llm=llm, max_loops=1) agent2 = Agent(agent_name="worker-2", llm=llm, max_loops=1)
agent3 = Agent(llm=llm, max_loops=1) agent3 = Agent(agent_name="worker3", llm=llm, max_loops=1)
# Initialize the majority voting # Initialize the majority voting

@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry] [tool.poetry]
name = "swarms" name = "swarms"
version = "4.1.6" version = "4.1.7"
description = "Swarms - Pytorch" description = "Swarms - Pytorch"
license = "MIT" license = "MIT"
authors = ["Kye Gomez <kye@apac.ai>"] authors = ["Kye Gomez <kye@apac.ai>"]
@ -51,6 +51,7 @@ httpx = "0.24.1"
tiktoken = "0.4.0" tiktoken = "0.4.0"
attrs = "22.2.0" attrs = "22.2.0"
ratelimit = "2.2.1" ratelimit = "2.2.1"
loguru = "*"
cohere = "4.24" cohere = "4.24"
huggingface-hub = "*" huggingface-hub = "*"
pydantic = "1.10.12" pydantic = "1.10.12"

@ -16,6 +16,7 @@ sentencepiece==0.1.98
requests_mock requests_mock
pypdf==4.0.1 pypdf==4.0.1
accelerate==0.22.0 accelerate==0.22.0
loguru
chromadb chromadb
tensorflow tensorflow
optimum optimum

@ -1,16 +1,23 @@
############################################ LLMs
from swarms.models.anthropic import Anthropic # noqa: E402 from swarms.models.anthropic import Anthropic # noqa: E402
# 3############ Embedding models
from swarms.models.base_embedding_model import BaseEmbeddingModel from swarms.models.base_embedding_model import BaseEmbeddingModel
from swarms.models.base_llm import AbstractLLM # noqa: E402 from swarms.models.base_llm import AbstractLLM # noqa: E402
################# MultiModal Models
from swarms.models.base_multimodal_model import ( from swarms.models.base_multimodal_model import (
BaseMultiModalModel, BaseMultiModalModel,
) # noqa: E402 )
# noqa: E402
from swarms.models.biogpt import BioGPT # noqa: E402 from swarms.models.biogpt import BioGPT # noqa: E402
from swarms.models.clipq import CLIPQ # noqa: E402 from swarms.models.clipq import CLIPQ # noqa: E402
# from swarms.models.dalle3 import Dalle3
# from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402
# from swarms.models.whisperx_model import WhisperX # noqa: E402
# from swarms.models.kosmos_two import Kosmos # noqa: E402
# from swarms.models.cog_agent import CogAgent # noqa: E402
## Function calling models
from swarms.models.fire_function import (
FireFunctionCaller,
)
from swarms.models.fuyu import Fuyu # noqa: E402 from swarms.models.fuyu import Fuyu # noqa: E402
from swarms.models.gemini import Gemini # noqa: E402 from swarms.models.gemini import Gemini # noqa: E402
from swarms.models.gigabind import Gigabind # noqa: E402 from swarms.models.gigabind import Gigabind # noqa: E402
@ -20,9 +27,9 @@ from swarms.models.idefics import Idefics # noqa: E402
from swarms.models.kosmos_two import Kosmos # noqa: E402 from swarms.models.kosmos_two import Kosmos # noqa: E402
from swarms.models.layoutlm_document_qa import ( from swarms.models.layoutlm_document_qa import (
LayoutLMDocumentQA, LayoutLMDocumentQA,
) # noqa: E402 )
# from swarms.models.vip_llava import VipLlavaMultiModal # noqa: E402 # noqa: E402
from swarms.models.llava import LavaMultiModal # noqa: E402 from swarms.models.llava import LavaMultiModal # noqa: E402
from swarms.models.mistral import Mistral # noqa: E402 from swarms.models.mistral import Mistral # noqa: E402
from swarms.models.mixtral import Mixtral # noqa: E402 from swarms.models.mixtral import Mixtral # noqa: E402
@ -32,18 +39,18 @@ from swarms.models.openai_models import (
AzureOpenAI, AzureOpenAI,
OpenAI, OpenAI,
OpenAIChat, OpenAIChat,
) # noqa: E402 )
# noqa: E402
from swarms.models.openai_tts import OpenAITTS # noqa: E402 from swarms.models.openai_tts import OpenAITTS # noqa: E402
from swarms.models.petals import Petals # noqa: E402 from swarms.models.petals import Petals # noqa: E402
from swarms.models.qwen import QwenVLMultiModal # noqa: E402 from swarms.models.qwen import QwenVLMultiModal # noqa: E402
from swarms.models.roboflow_model import RoboflowMultiModal from swarms.models.roboflow_model import RoboflowMultiModal
from swarms.models.sam_supervision import SegmentAnythingMarkGenerator from swarms.models.sam_supervision import SegmentAnythingMarkGenerator
##### Utils
from swarms.models.sampling_params import ( from swarms.models.sampling_params import (
SamplingParams, SamplingParams,
SamplingType, SamplingType,
) # noqa: E402 )
from swarms.models.timm import TimmModel # noqa: E402 from swarms.models.timm import TimmModel # noqa: E402
# from swarms.models.modelscope_pipeline import ModelScopePipeline # from swarms.models.modelscope_pipeline import ModelScopePipeline
@ -62,26 +69,19 @@ from swarms.models.types import ( # noqa: E402
) )
from swarms.models.ultralytics_model import ( from swarms.models.ultralytics_model import (
UltralyticsModel, UltralyticsModel,
) # noqa: E402 )
# noqa: E402
from swarms.models.vilt import Vilt # noqa: E402 from swarms.models.vilt import Vilt # noqa: E402
from swarms.models.wizard_storytelling import ( from swarms.models.wizard_storytelling import (
WizardLLMStoryTeller, WizardLLMStoryTeller,
) # noqa: E402 )
# noqa: E402
# from swarms.models.vllm import vLLM # noqa: E402 # from swarms.models.vllm import vLLM # noqa: E402
from swarms.models.zephyr import Zephyr # noqa: E402 from swarms.models.zephyr import Zephyr # noqa: E402
from swarms.models.zeroscope import ZeroscopeTTV # noqa: E402 from swarms.models.zeroscope import ZeroscopeTTV # noqa: E402
# from swarms.models.dalle3 import Dalle3
# from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402
# from swarms.models.whisperx_model import WhisperX # noqa: E402
# from swarms.models.kosmos_two import Kosmos # noqa: E402
# from swarms.models.cog_agent import CogAgent # noqa: E402
################# Tokenizers
__all__ = [ __all__ = [
"AbstractLLM", "AbstractLLM",
"Anthropic", "Anthropic",
@ -100,7 +100,6 @@ __all__ = [
"HuggingfaceLLM", "HuggingfaceLLM",
"MPT7B", "MPT7B",
"WizardLLMStoryTeller", "WizardLLMStoryTeller",
# "GPT4Vision",
# "Dalle3", # "Dalle3",
# "DistilWhisperModel", # "DistilWhisperModel",
"GPT4VisionAPI", "GPT4VisionAPI",
@ -118,7 +117,6 @@ __all__ = [
"TogetherLLM", "TogetherLLM",
"TimmModel", "TimmModel",
"UltralyticsModel", "UltralyticsModel",
# "VipLlavaMultiModal",
"LavaMultiModal", "LavaMultiModal",
"QwenVLMultiModal", "QwenVLMultiModal",
"CLIPQ", "CLIPQ",
@ -129,4 +127,5 @@ __all__ = [
"SegmentAnythingMarkGenerator", "SegmentAnythingMarkGenerator",
"SamplingType", "SamplingType",
"SamplingParams", "SamplingParams",
"FireFunctionCaller",
] ]

@ -0,0 +1,530 @@
import base64
import os
import time
from io import BytesIO
from typing import List, Literal, Optional, Tuple, Union
import torch
from PIL import Image
from pydantic import BaseModel, Field
from transformers import (
AutoModelForCausalLM,
LlamaTokenizer,
TextIteratorStreamer,
)
from swarms.models.base_multimodal_model import BaseMultiModalModel
from swarms.utils.logger import logger
MODEL_PATH = "THUDM/cogvlm-chat-hf"
TOKENIZER_PATH = "lmsys/vicuna-7b-v1.5"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
QUANT_ENABLED = False
class ImageUrl(BaseModel):
url: str
class TextContent(BaseModel):
type: Literal["text"]
text: str
class ImageUrlContent(BaseModel):
type: Literal["image_url"]
image_url: ImageUrl
ContentItem = Union[TextContent, ImageUrlContent]
class ChatMessageInput(BaseModel):
role: Literal["user", "assistant", "system"]
content: Union[str, List[ContentItem]]
name: Optional[str] = None
class ChatMessageResponse(BaseModel):
role: Literal["assistant"]
content: str = None
name: Optional[str] = None
class DeltaMessage(BaseModel):
role: Optional[Literal["user", "assistant", "system"]] = None
content: Optional[str] = None
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessageInput]
temperature: Optional[float] = 0.8
top_p: Optional[float] = 0.8
max_tokens: Optional[int] = None
stream: Optional[bool] = False
# Additional parameters
repetition_penalty: Optional[float] = 1.0
class ChatCompletionResponseChoice(BaseModel):
index: int
message: ChatMessageResponse
class ChatCompletionResponseStreamChoice(BaseModel):
index: int
delta: DeltaMessage
class UsageInfo(BaseModel):
prompt_tokens: int = 0
total_tokens: int = 0
completion_tokens: Optional[int] = 0
class ChatCompletionResponse(BaseModel):
model: str
object: Literal["chat.completion", "chat.completion.chunk"]
choices: List[
Union[
ChatCompletionResponseChoice,
ChatCompletionResponseStreamChoice,
]
]
created: Optional[int] = Field(
default_factory=lambda: int(time.time())
)
usage: Optional[UsageInfo] = None
# async def create_chat_completion(request: ChatCompletionRequest):
# global model, tokenizer
# gen_params = dict(
# messages=request.messages,
# temperature=request.temperature,
# top_p=request.top_p,
# max_tokens=request.max_tokens or 1024,
# echo=False,
# stream=request.stream,
# )
# # if request.stream:
# # predict(request.model, gen_params)
# # response = generate_cogvlm(model, tokenizer, gen_params)
# usage = UsageInfo()
# message = ChatMessageResponse(
# role="assistant",
# content=response["text"],
# )
# logger.debug(f"==== message ====\n{message}")
# choice_data = ChatCompletionResponseChoice(
# index=0,
# message=message,
# )
# task_usage = UsageInfo.model_validate(response["usage"])
# for usage_key, usage_value in task_usage.model_dump().items():
# setattr(
# usage, usage_key, getattr(usage, usage_key) + usage_value
# )
# return ChatCompletionResponse(
# model=request.model,
# choices=[choice_data],
# object="chat.completion",
# usage=usage,
# )
class CogVLMMultiModal(BaseMultiModalModel):
"""
Initializes the CogVLM model.
Args:
model_name (str): The path or name of the pre-trained model.
tokenizer (str): The path or name of the tokenizer.
device (str): The device to run the model on.
quantize (bool): Whether to enable quantization.
torch_type (str): The torch data type to use.
temperature (float): The temperature for sampling.
top_p (float): The top-p value for sampling.
max_tokens (int): The maximum number of tokens to generate.
echo (bool): Whether to echo the input text.
stream (bool): Whether to stream the output.
repetition_penalty (float): The repetition penalty for sampling.
do_sample (bool): Whether to use sampling during generation.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Methods:
run: Generates a response using the CogVLM model.
generate_stream_cogvlm: Generates a stream of responses using the CogVLM model in inference mode.
process_history_and_images: Processes history messages to extract text, identify the last user query, and convert base64 encoded image URLs to PIL images.
Example:
>>> model = CogVLMMultiModal()
>>> response = model("Describe this image with meticlous details.", "https://example.com/image.jpg")
>>> print(response)
"""
def __init__(
self,
model_name: str = MODEL_PATH,
tokenizer: str = TOKENIZER_PATH,
device: str = DEVICE,
quantize: bool = QUANT_ENABLED,
torch_type: str = "float16",
temperature: float = 0.5,
top_p: float = 0.9,
max_tokens: int = 3500,
echo: bool = False,
stream: bool = False,
repetition_penalty: float = 1.0,
do_sample: bool = True,
*args,
**kwargs,
):
super().__init__()
self.model_name = model_name
self.device = device
self.tokenizer = tokenizer
self.device = device
self.quantize = quantize
self.torch_type = torch_type
self.temperature = temperature
self.top_p = top_p
self.max_tokens = max_tokens
self.echo = echo
self.stream = stream
self.repetition_penalty = repetition_penalty
self.do_sample = do_sample
if os.environ.get("QUANT_ENABLED"):
pass
else:
with torch.cuda.device(device):
__, total_bytes = torch.cuda.mem_get_info()
total_gb = total_bytes / (1 << 30)
if total_gb < 40:
pass
else:
pass
torch.cuda.empty_cache()
self.tokenizer = LlamaTokenizer.from_pretrained(
tokenizer, trust_remote_code=True
)
if (
torch.cuda.is_available()
and torch.cuda.get_device_capability()[0] >= 8
):
torch_type = torch.bfloat16
else:
torch_type = torch.float16
print(
"========Use torch type as:{} with device:{}========\n\n"
.format(torch_type, device)
)
if "cuda" in device:
if QUANT_ENABLED:
self.model = AutoModelForCausalLM.from_pretrained(
model_name,
load_in_4bit=True,
trust_remote_code=True,
torch_dtype=torch_type,
low_cpu_mem_usage=True,
*args,
**kwargs,
).eval()
else:
self.model = (
AutoModelForCausalLM.from_pretrained(
model_name,
load_in_4bit=False,
trust_remote_code=True,
torch_dtype=torch_type,
low_cpu_mem_usage=True,
*args,
**kwargs,
)
.to(device)
.eval()
)
else:
self.model = (
AutoModelForCausalLM.from_pretrained(
model_name,
trust_remote_code=True,
*args,
**kwargs,
)
.float()
.to(device)
.eval()
)
def run(self, task: str, img: str, *args, **kwargs):
"""
Generates a response using the CogVLM model. It processes the chat history and image data, if any,
and then invokes the model to generate a response.
"""
messages = [task]
params = dict(
messages=messages,
temperature=self.temperature,
repitition_penalty=self.repetition_penalty,
top_p=self.top_p,
max_new_tokens=self.max_tokens,
)
for response in self.generate_stream_cogvlm(params):
pass
return response
@torch.inference_mode()
def generate_stream_cogvlm(
self,
params: dict,
):
"""
Generates a stream of responses using the CogVLM model in inference mode.
It's optimized to handle continuous input-output interactions with the model in a streaming manner.
"""
messages = params["messages"]
temperature = float(params.get("temperature", 1.0))
repetition_penalty = float(
params.get("repetition_penalty", 1.0)
)
top_p = float(params.get("top_p", 1.0))
max_new_tokens = int(params.get("max_tokens", 256))
query, history, image_list = self.process_history_and_images(
messages
)
logger.debug(f"==== request ====\n{query}")
input_by_model = self.model.build_conversation_input_ids(
self.tokenizer,
query=query,
history=history,
images=[image_list[-1]],
)
inputs = {
"input_ids": (
input_by_model["input_ids"]
.unsqueeze(0)
.to(self.device)
),
"token_type_ids": (
input_by_model["token_type_ids"]
.unsqueeze(0)
.to(self.device)
),
"attention_mask": (
input_by_model["attention_mask"]
.unsqueeze(0)
.to(self.device)
),
"images": [
[
input_by_model["images"][0]
.to(self.device)
.to(self.torch_type)
]
],
}
if (
"cross_images" in input_by_model
and input_by_model["cross_images"]
):
inputs["cross_images"] = [
[
input_by_model["cross_images"][0]
.to(self.device)
.to(self.torch_type)
]
]
input_echo_len = len(inputs["input_ids"][0])
streamer = TextIteratorStreamer(
tokenizer=self.tokenizer,
timeout=60.0,
skip_promptb=True,
skip_special_tokens=True,
)
gen_kwargs = {
"repetition_penalty": repetition_penalty,
"max_new_tokens": max_new_tokens,
"do_sample": True if temperature > 1e-5 else False,
"top_p": top_p if temperature > 1e-5 else 0,
"streamer": streamer,
}
if temperature > 1e-5:
gen_kwargs["temperature"] = temperature
total_len = 0
generated_text = ""
with torch.no_grad():
self.model.generate(**inputs, **gen_kwargs)
for next_text in streamer:
generated_text += next_text
yield {
"text": generated_text,
"usage": {
"prompt_tokens": input_echo_len,
"completion_tokens": (
total_len - input_echo_len
),
"total_tokens": total_len,
},
}
ret = {
"text": generated_text,
"usage": {
"prompt_tokens": input_echo_len,
"completion_tokens": total_len - input_echo_len,
"total_tokens": total_len,
},
}
yield ret
def process_history_and_images(
self,
messages: List[ChatMessageInput],
) -> Tuple[
Optional[str],
Optional[List[Tuple[str, str]]],
Optional[List[Image.Image]],
]:
"""
Process history messages to extract text, identify the last user query,
and convert base64 encoded image URLs to PIL images.
Args:
messages(List[ChatMessageInput]): List of ChatMessageInput objects.
return: A tuple of three elements:
- The last user query as a string.
- Text history formatted as a list of tuples for the model.
- List of PIL Image objects extracted from the messages.
"""
formatted_history = []
image_list = []
last_user_query = ""
for i, message in enumerate(messages):
role = message.role
content = message.content
# Extract text content
if isinstance(content, list): # text
text_content = " ".join(
item.text
for item in content
if isinstance(item, TextContent)
)
else:
text_content = content
# Extract image data
if isinstance(content, list): # image
for item in content:
if isinstance(item, ImageUrlContent):
image_url = item.image_url.url
if image_url.startswith(
"data:image/jpeg;base64,"
):
base64_encoded_image = image_url.split(
"data:image/jpeg;base64,"
)[1]
image_data = base64.b64decode(
base64_encoded_image
)
image = Image.open(
BytesIO(image_data)
).convert("RGB")
image_list.append(image)
# Format history
if role == "user":
if i == len(messages) - 1:
last_user_query = text_content
else:
formatted_history.append((text_content, ""))
elif role == "assistant":
if formatted_history:
if formatted_history[-1][1] != "":
assert False, (
"the last query is answered. answer"
f" again. {formatted_history[-1][0]},"
f" {formatted_history[-1][1]},"
f" {text_content}"
)
formatted_history[-1] = (
formatted_history[-1][0],
text_content,
)
else:
assert False, "assistant reply before user"
else:
assert False, f"unrecognized role: {role}"
return last_user_query, formatted_history, image_list
async def predict(self, params: dict):
"""
Handle streaming predictions. It continuously generates responses for a given input stream.
This is particularly useful for real-time, continuous interactions with the model.
"""
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(role="assistant"),
finish_reason=None,
)
chunk = ChatCompletionResponse(
model=self.model_name,
choices=[choice_data],
object="chat.completion.chunk",
)
yield "{}".format(chunk.model_dump_json(exclude_unset=True))
previous_text = ""
for new_response in self.generate_stream_cogvlm(params):
decoded_unicode = new_response["text"]
delta_text = decoded_unicode[len(previous_text) :]
previous_text = decoded_unicode
delta = DeltaMessage(
content=delta_text,
role="assistant",
)
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=delta,
)
chunk = ChatCompletionResponse(
model=self.model_name,
choices=[choice_data],
object="chat.completion.chunk",
)
yield "{}".format(
chunk.model_dump_json(exclude_unset=True)
)
choice_data = ChatCompletionResponseStreamChoice(
index=0,
delta=DeltaMessage(),
)
chunk = ChatCompletionResponse(
model=self.model_name,
choices=[choice_data],
object="chat.completion.chunk",
)
yield "{}".format(chunk.model_dump_json(exclude_unset=True))

@ -0,0 +1,87 @@
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
from swarms.models.base_llm import AbstractLLM
from typing import Any
class FireFunctionCaller(AbstractLLM):
"""
A class that represents a caller for the FireFunction model.
Args:
model_name (str): The name of the model to be used.
device (str): The device to be used.
function_spec (Any): The specification of the function.
max_tokens (int): The maximum number of tokens.
system_prompt (str): The system prompt.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Methods:
run(self, task: str, *args, **kwargs) -> None: Run the function with the given task and arguments.
Examples:
>>> fire_function_caller = FireFunctionCaller()
>>> fire_function_caller.run("Add 2 and 3")
"""
def __init__(
self,
model_name: str = "fireworks-ai/firefunction-v1",
device: str = "cuda",
function_spec: Any = None,
max_tokens: int = 3000,
system_prompt: str = "You are a helpful assistant with access to functions. Use them if required.",
*args,
**kwargs,
):
super().__init__(model_name, device)
self.model_name = model_name
self.device = device
self.fucntion_spec = function_spec
self.max_tokens = max_tokens
self.system_prompt = system_prompt
self.model = AutoModelForCausalLM.from_pretrained(
model_name, device_map="auto", *args, **kwargs
)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.functions = json.dumps(function_spec, indent=4)
def run(self, task: str, *args, **kwargs):
"""
Run the function with the given task and arguments.
Args:
task (str): The task to be performed.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
Returns:
None
"""
messages = [
{"role": "functions", "content": self.functions},
{
"role": "system",
"content": self.system_prompt,
},
{
"role": "user",
"content": task,
},
]
model_inputs = self.tokenizer.apply_chat_template(
messages, return_tensors="pt"
).to(self.model.device)
generated_ids = self.model.generate(
model_inputs,
max_new_tokens=self.max_tokens,
*args,
**kwargs,
)
decoded = self.tokenizer.batch_decode(generated_ids)
print(decoded[0])

@ -0,0 +1,41 @@
import pytest
from unittest.mock import MagicMock
from swarms.models.fire_function import FireFunctionCaller
def test_fire_function_caller_run(mocker):
# Create mock model and tokenizer
model = MagicMock()
tokenizer = MagicMock()
mocker.patch.object(FireFunctionCaller, 'model', model)
mocker.patch.object(FireFunctionCaller, 'tokenizer', tokenizer)
# Create mock task and arguments
task = "Add 2 and 3"
args = (2, 3)
kwargs = {}
# Create mock generated_ids and decoded output
generated_ids = [1, 2, 3]
decoded_output = "5"
model.generate.return_value = generated_ids
tokenizer.batch_decode.return_value = [decoded_output]
# Create FireFunctionCaller instance
fire_function_caller = FireFunctionCaller()
# Run the function
fire_function_caller.run(task, *args, **kwargs)
# Assert model.generate was called with the correct inputs
model.generate.assert_called_once_with(
tokenizer.apply_chat_template.return_value,
max_new_tokens=fire_function_caller.max_tokens,
*args,
**kwargs,
)
# Assert tokenizer.batch_decode was called with the correct inputs
tokenizer.batch_decode.assert_called_once_with(generated_ids)
# Assert the decoded output is printed
assert decoded_output in mocker.patch.object(print, 'call_args_list')

@ -3,10 +3,12 @@ import json
import logging import logging
import os import os
import random import random
import sys
import time import time
import uuid import uuid
from typing import Any, Callable, Dict, List, Optional, Tuple from typing import Any, Callable, Dict, List, Optional, Tuple
from loguru import logger
from termcolor import colored from termcolor import colored
from swarms.memory.base_vectordb import AbstractVectorDatabase from swarms.memory.base_vectordb import AbstractVectorDatabase
@ -22,7 +24,8 @@ from swarms.tools.exec_tool import execute_tool_by_name
from swarms.tools.tool import BaseTool from swarms.tools.tool import BaseTool
from swarms.utils.code_interpreter import SubprocessCodeInterpreter from swarms.utils.code_interpreter import SubprocessCodeInterpreter
from swarms.utils.data_to_text import data_to_text from swarms.utils.data_to_text import data_to_text
from swarms.utils.logger import logger
# from swarms.utils.logger import logger
from swarms.utils.parse_code import extract_code_from_markdown from swarms.utils.parse_code import extract_code_from_markdown
from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.pdf_to_text import pdf_to_text
from swarms.utils.token_count_tiktoken import limit_tokens_from_string from swarms.utils.token_count_tiktoken import limit_tokens_from_string
@ -51,10 +54,18 @@ def agent_id():
return str(uuid.uuid4()) return str(uuid.uuid4())
# Task ID generator
def task_id(): def task_id():
"""
Generate a unique task ID.
Returns:
str: A string representation of a UUID.
"""
return str(uuid.uuid4()) return str(uuid.uuid4())
# Step ID generator
def step_id(): def step_id():
return str(uuid.uuid1()) return str(uuid.uuid1())
@ -194,6 +205,10 @@ class Agent:
callback: Optional[Callable] = None, callback: Optional[Callable] = None,
metadata: Optional[Dict[str, Any]] = None, metadata: Optional[Dict[str, Any]] = None,
callbacks: Optional[List[Callable]] = None, callbacks: Optional[List[Callable]] = None,
logger_handler: Any = sys.stderr,
search_algorithm: Optional[Callable] = None,
logs_to_filename: Optional[str] = None,
evaluator: Optional[Callable] = None,
*args, *args,
**kwargs, **kwargs,
): ):
@ -243,9 +258,14 @@ class Agent:
self.callback = callback self.callback = callback
self.metadata = metadata self.metadata = metadata
self.callbacks = callbacks self.callbacks = callbacks
self.logger_handler = logger_handler
self.search_algorithm = search_algorithm
self.logs_to_filename = logs_to_filename
self.evaluator = evaluator
# The max_loops will be set dynamically if the dynamic_loop # The max_loops will be set dynamically if the dynamic_loop
if self.dynamic_loops: if self.dynamic_loops:
logger.info("Dynamic loops enabled")
self.max_loops = "auto" self.max_loops = "auto"
# If multimodal = yes then set the sop to the multimodal sop # If multimodal = yes then set the sop to the multimodal sop
@ -260,7 +280,9 @@ class Agent:
self.feedback = [] self.feedback = []
# Initialize the code executor # Initialize the code executor
self.code_executor = SubprocessCodeInterpreter() self.code_executor = SubprocessCodeInterpreter(
debug_mode=True,
)
# If the preset stopping token is enabled then set the stopping token to the preset stopping token # If the preset stopping token is enabled then set the stopping token to the preset stopping token
if preset_stopping_token: if preset_stopping_token:
@ -279,11 +301,12 @@ class Agent:
self.get_docs_from_doc_folders() self.get_docs_from_doc_folders()
# If tokenizer and context length exists then: # If tokenizer and context length exists then:
if self.tokenizer and self.context_length: # if self.tokenizer and self.context_length:
self.truncate_history() # self.truncate_history()
if verbose: # If verbose is enabled then set the logger level to info
logger.setLevel(logging.DEBUG) # if verbose:
# logger.setLevel(logging.INFO)
# If tools are provided then set the tool prompt by adding to sop # If tools are provided then set the tool prompt by adding to sop
if self.tools: if self.tools:
@ -308,6 +331,21 @@ class Agent:
# Step cache # Step cache
self.step_cache = [] self.step_cache = []
# Set the logger handler
if logger_handler:
logger.add(
f"{self.agent_name}.log",
level="INFO",
colorize=True,
format=(
"<green>{time}</green> <level>{message}</level>"
),
backtrace=True,
diagnose=True,
)
# logger.info("Creating Agent {}".format(self.agent_name))
def set_system_prompt(self, system_prompt: str): def set_system_prompt(self, system_prompt: str):
"""Set the system prompt""" """Set the system prompt"""
self.system_prompt = system_prompt self.system_prompt = system_prompt
@ -342,6 +380,7 @@ class Agent:
if hasattr(self.llm, "temperature"): if hasattr(self.llm, "temperature"):
# Randomly change the temperature attribute of self.llm object # Randomly change the temperature attribute of self.llm object
self.llm.temperature = random.uniform(0.0, 1.0) self.llm.temperature = random.uniform(0.0, 1.0)
logger.info(f"Temperature: {self.llm.temperature}")
else: else:
# Use a default temperature # Use a default temperature
self.llm.temperature = 0.7 self.llm.temperature = 0.7
@ -359,6 +398,7 @@ class Agent:
def add_task_to_memory(self, task: str): def add_task_to_memory(self, task: str):
"""Add the task to the memory""" """Add the task to the memory"""
try: try:
logger.info(f"Adding task to memory: {task}")
self.short_memory.add(f"{self.user_name}: {task}") self.short_memory.add(f"{self.user_name}: {task}")
except Exception as error: except Exception as error:
print( print(
@ -370,6 +410,7 @@ class Agent:
def add_message_to_memory(self, message: str): def add_message_to_memory(self, message: str):
"""Add the message to the memory""" """Add the message to the memory"""
try: try:
logger.info(f"Adding message to memory: {message}")
self.short_memory.add( self.short_memory.add(
role=self.agent_name, content=message role=self.agent_name, content=message
) )
@ -590,12 +631,26 @@ class Agent:
# Log each step # Log each step
step = Step( step = Step(
input=task, input=str(task),
task_id=task_id, task_id=str(task_id),
step_id=step_id, step_id=str(step_id),
output=response, output=str(response),
status="running",
) )
if self.evaluator:
evaluated_response = self.evaluator(
response
)
out = (
f"Response: {response}\nEvaluated"
f" Response: {evaluated_response}"
)
out = self.short_memory.add(
"Evaluator", out
)
# Check to see if stopping token is in the output to stop the loop # Check to see if stopping token is in the output to stop the loop
if self.stopping_token: if self.stopping_token:
if self._check_stopping_condition( if self._check_stopping_condition(
@ -679,20 +734,6 @@ class Agent:
""" """
self.run(task, img, *args, **kwargs) self.run(task, img, *args, **kwargs)
def _run(self, **kwargs: Any) -> str:
"""Run the agent on a task
Returns:
str: _description_
"""
try:
task = self.format_prompt(**kwargs)
response, history = self._generate(task, task)
logging.info(f"Message history: {history}")
return response
except Exception as error:
print(colored(f"Error running agent: {error}", "red"))
def agent_history_prompt( def agent_history_prompt(
self, self,
history: str = None, history: str = None,
@ -710,13 +751,12 @@ class Agent:
if self.sop: if self.sop:
system_prompt = self.system_prompt system_prompt = self.system_prompt
agent_history_prompt = f""" agent_history_prompt = f"""
SYSTEM_PROMPT: {system_prompt} role: system
{system_prompt}
Follow this standard operating procedure (SOP) to complete tasks: Follow this standard operating procedure (SOP) to complete tasks:
{self.sop} {self.sop}
-----------------
################ CHAT HISTORY ####################
{history} {history}
""" """
return agent_history_prompt return agent_history_prompt
@ -758,6 +798,7 @@ class Agent:
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
logger.info(f"Adding memory: {message}")
return self.short_memory.add( return self.short_memory.add(
role=self.agent_name, content=message role=self.agent_name, content=message
) )
@ -770,10 +811,12 @@ class Agent:
tasks (List[str]): A list of tasks to run. tasks (List[str]): A list of tasks to run.
""" """
try: try:
logger.info(f"Running concurrent tasks: {tasks}")
task_coroutines = [ task_coroutines = [
self.run_async(task, **kwargs) for task in tasks self.run_async(task, **kwargs) for task in tasks
] ]
completed_tasks = await asyncio.gather(*task_coroutines) completed_tasks = await asyncio.gather(*task_coroutines)
logger.info(f"Completed tasks: {completed_tasks}")
return completed_tasks return completed_tasks
except Exception as error: except Exception as error:
print( print(
@ -789,6 +832,7 @@ class Agent:
def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]:
try: try:
"""Generate responses for multiple input sets.""" """Generate responses for multiple input sets."""
logger.info(f"Running bulk tasks: {inputs}")
return [self.run(**input_data) for input_data in inputs] return [self.run(**input_data) for input_data in inputs]
except Exception as error: except Exception as error:
print(colored(f"Error running bulk run: {error}", "red")) print(colored(f"Error running bulk run: {error}", "red"))
@ -881,6 +925,7 @@ class Agent:
""" """
try: try:
logger.info(f"Running a single step: {task}")
# Generate the response using lm # Generate the response using lm
response = self.llm(task, **kwargs) response = self.llm(task, **kwargs)
@ -960,6 +1005,7 @@ class Agent:
""" """
logger.info(f"Adding response filter: {filter_word}")
self.reponse_filters.append(filter_word) self.reponse_filters.append(filter_word)
def apply_reponse_filters(self, response: str) -> str: def apply_reponse_filters(self, response: str) -> str:
@ -967,6 +1013,9 @@ class Agent:
Apply the response filters to the response Apply the response filters to the response
""" """
logger.info(
f"Applying response filters to response: {response}"
)
for word in self.response_filters: for word in self.response_filters:
response = response.replace(word, "[FILTERED]") response = response.replace(word, "[FILTERED]")
return response return response
@ -978,11 +1027,13 @@ class Agent:
response = agent.filtered_run("Generate a report on finance") response = agent.filtered_run("Generate a report on finance")
print(response) print(response)
""" """
logger.info(f"Running filtered task: {task}")
raw_response = self.run(task) raw_response = self.run(task)
return self.apply_response_filters(raw_response) return self.apply_response_filters(raw_response)
def interactive_run(self, max_loops: int = 5) -> None: def interactive_run(self, max_loops: int = 5) -> None:
"""Interactive run mode""" """Interactive run mode"""
logger.info("Running in interactive mode")
response = input("Start the cnversation") response = input("Start the cnversation")
for i in range(max_loops): for i in range(max_loops):
@ -992,28 +1043,6 @@ class Agent:
# Get user input # Get user input
response = input("You: ") response = input("You: ")
def streamed_generation(self, prompt: str) -> str:
"""
Stream the generation of the response
Args:
prompt (str): The prompt to use
Example:
# Feature 4: Streamed generation
response = agent.streamed_generation("Generate a report on finance")
print(response)
"""
tokens = list(prompt)
response = ""
for token in tokens:
time.sleep(0.1)
response += token
print(token, end="", flush=True)
print()
return response
def save_state(self, file_path: str) -> None: def save_state(self, file_path: str) -> None:
""" """
Saves the current state of the agent to a JSON file, including the llm parameters. Saves the current state of the agent to a JSON file, including the llm parameters.
@ -1025,6 +1054,7 @@ class Agent:
>>> agent.save_state('saved_flow.json') >>> agent.save_state('saved_flow.json')
""" """
try: try:
logger.info(f"Saving agent state to: {file_path}")
state = { state = {
"agent_id": str(self.id), "agent_id": str(self.id),
"agent_name": self.agent_name, "agent_name": self.agent_name,
@ -1045,6 +1075,7 @@ class Agent:
"autosave": self.autosave, "autosave": self.autosave,
"saved_state_path": self.saved_state_path, "saved_state_path": self.saved_state_path,
"max_loops": self.max_loops, "max_loops": self.max_loops,
# "StepCache": self.step_cache,
} }
with open(file_path, "w") as f: with open(file_path, "w") as f:
@ -1104,54 +1135,55 @@ class Agent:
>>> agent.run("Continue with the task") >>> agent.run("Continue with the task")
""" """
with open(file_path) as f: try:
state = json.load(f) with open(file_path) as f:
state = json.load(f)
# Restore other saved attributes
self.id = state.get("agent_id", self.id) # Restore other saved attributes
self.agent_name = state.get("agent_name", self.agent_name) self.id = state.get("agent_id", self.id)
self.agent_description = state.get( self.agent_name = state.get("agent_name", self.agent_name)
"agent_description", self.agent_description self.agent_description = state.get(
) "agent_description", self.agent_description
self.system_prompt = state.get( )
"system_prompt", self.system_prompt self.system_prompt = state.get(
) "system_prompt", self.system_prompt
self.sop = state.get("sop", self.sop) )
self.short_memory = state.get("short_memory", []) self.sop = state.get("sop", self.sop)
self.max_loops = state.get("max_loops", 5) self.short_memory = state.get("short_memory", [])
self.loop_interval = state.get("loop_interval", 1) self.max_loops = state.get("max_loops", 5)
self.retry_attempts = state.get("retry_attempts", 3) self.loop_interval = state.get("loop_interval", 1)
self.retry_interval = state.get("retry_interval", 1) self.retry_attempts = state.get("retry_attempts", 3)
self.interactive = state.get("interactive", False) self.retry_interval = state.get("retry_interval", 1)
self.interactive = state.get("interactive", False)
print(f"Agent state loaded from {file_path}")
print(f"Agent state loaded from {file_path}")
except Exception as error:
print(
colored(f"Error loading agent state: {error}", "red")
)
def retry_on_failure( def retry_on_failure(
self, function, retries: int = 3, retry_delay: int = 1 self,
function: callable,
retries: int = 3,
retry_delay: int = 1,
): ):
"""Retry wrapper for LLM calls.""" """Retry wrapper for LLM calls."""
attempt = 0 try:
while attempt < retries: logger.info(f"Retrying function: {function}")
try: attempt = 0
return function() while attempt < retries:
except Exception as error: try:
logging.error(f"Error generating response: {error}") return function()
attempt += 1 except Exception as error:
time.sleep(retry_delay) logging.error(
raise Exception("All retry attempts failed") f"Error generating response: {error}"
)
def generate_reply(self, history: str, **kwargs) -> str: attempt += 1
""" time.sleep(retry_delay)
Generate a response based on initial or task raise Exception("All retry attempts failed")
""" except Exception as error:
prompt = f""" print(colored(f"Error retrying function: {error}", "red"))
SYSTEM_PROMPT: {self.system_prompt}
History: {history}
"""
response = self.llm(prompt, **kwargs)
return {"role": self.agent_name, "content": response}
def update_system_prompt(self, system_prompt: str): def update_system_prompt(self, system_prompt: str):
"""Upddate the system message""" """Upddate the system message"""
@ -1181,9 +1213,13 @@ class Agent:
""" """
text -> parse_code by looking for code inside 6 backticks `````-> run_code text -> parse_code by looking for code inside 6 backticks `````-> run_code
""" """
parsed_code = extract_code_from_markdown(code) try:
run_code = self.code_executor.run(parsed_code) logger.info(f"Running code: {code}")
return run_code parsed_code = extract_code_from_markdown(code)
run_code = self.code_executor.run(parsed_code)
return run_code
except Exception as error:
logger.debug(f"Error running code: {error}")
def pdf_connector(self, pdf: str = None): def pdf_connector(self, pdf: str = None):
"""Transforms the pdf into text """Transforms the pdf into text
@ -1194,9 +1230,13 @@ class Agent:
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
pdf = pdf or self.pdf_path try:
text = pdf_to_text(pdf) pdf = pdf or self.pdf_path
return text text = pdf_to_text(pdf)
return text
except Exception as error:
print(f"Error connecting to the pdf: {error}")
raise error
def pdf_chunker(self, text: str = None, num_limits: int = 1000): def pdf_chunker(self, text: str = None, num_limits: int = 1000):
"""Chunk the pdf into sentences """Chunk the pdf into sentences
@ -1220,12 +1260,15 @@ class Agent:
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
for doc in docs: try:
data = data_to_text(doc) for doc in docs:
data = data_to_text(doc)
return self.short_memory.add( return self.short_memory.add(
role=self.user_name, content=data role=self.user_name, content=data
) )
except Exception as error:
print(colored(f"Error ingesting docs: {error}", "red"))
def ingest_pdf(self, pdf: str): def ingest_pdf(self, pdf: str):
"""Ingest the pdf into the memory """Ingest the pdf into the memory
@ -1236,22 +1279,37 @@ class Agent:
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
text = pdf_to_text(pdf) try:
return self.short_memory.add( logger.info(f"Ingesting pdf: {pdf}")
role=self.user_name, content=text text = pdf_to_text(pdf)
) return self.short_memory.add(
role=self.user_name, content=text
)
except Exception as error:
print(colored(f"Error ingesting pdf: {error}", "red"))
def receieve_mesage(self, name: str, message: str): def receieve_mesage(self, name: str, message: str):
"""Receieve a message""" """Receieve a message"""
message = f"{name}: {message}" try:
return self.short_memory.add(role=name, content=message) message = f"{name}: {message}"
return self.short_memory.add(role=name, content=message)
except Exception as error:
print(colored(f"Error receiving message: {error}", "red"))
def send_agent_message( def send_agent_message(
self, agent_name: str, message: str, *args, **kwargs self, agent_name: str, message: str, *args, **kwargs
): ):
"""Send a message to the agent""" """Send a message to the agent"""
message = f"{agent_name}: {message}" try:
return self.run(message, *args, **kwargs) logger.info(f"Sending agent message: {message}")
message = f"{agent_name}: {message}"
return self.run(message, *args, **kwargs)
except Exception as error:
print(
colored(
f"Error sending agent message: {error}", "red"
)
)
def truncate_history(self): def truncate_history(self):
""" """
@ -1278,13 +1336,22 @@ class Agent:
def get_docs_from_doc_folders(self): def get_docs_from_doc_folders(self):
"""Get the docs from the files""" """Get the docs from the files"""
# Get the list of files then extract them and add them to the memory try:
files = os.listdir(self.docs_folder) logger.info("Getting docs from doc folders")
# Get the list of files then extract them and add them to the memory
files = os.listdir(self.docs_folder)
# Extract the text from the files # Extract the text from the files
for file in files: for file in files:
text = data_to_text(file) text = data_to_text(file)
return self.short_memory.add( return self.short_memory.add(
role=self.user_name, content=text role=self.user_name, content=text
) )
except Exception as error:
print(
colored(
f"Error getting docs from doc folders: {error}",
"red",
)
)

@ -13,7 +13,7 @@ class LongContextSwarmLeader:
- agents (List[Agent]): The agents in the swarm. - agents (List[Agent]): The agents in the swarm.
- prompt_template_json (str): The SOP template in JSON format. - prompt_template_json (str): The SOP template in JSON format.
- return_parsed (bool): Whether to return the parsed output. - return_parsed (bool): Whether to return the parsed output.
""" """
def __init__( def __init__(
@ -30,17 +30,16 @@ class LongContextSwarmLeader:
self.agents = agents self.agents = agents
self.prompt_template_json = prompt_template_json self.prompt_template_json = prompt_template_json
self.return_parsed = return_parsed self.return_parsed = return_parsed
# Create an instance of the Agent class # Create an instance of the Agent class
self.agent = Agent( self.agent = Agent(
llm=llm, llm=llm,
system_prompt=None, system_prompt=None,
sop=self.prompt_template_json, sop=self.prompt_template_json,
*args, *args,
**kwargs **kwargs,
) )
def prep_schema(self, task: str, *args, **kwargs): def prep_schema(self, task: str, *args, **kwargs):
""" """
Returns a formatted string containing the metadata of all agents in the swarm. Returns a formatted string containing the metadata of all agents in the swarm.
@ -71,16 +70,15 @@ class LongContextSwarmLeader:
""" """
for agent in self.agents: for agent in self.agents:
prompt += f"Member Name: {agent.ai_name}\nMember ID: {agent.id}\nMember Description: {agent.description}\n\n" prompt += (
f"Member Name: {agent.ai_name}\nMember ID:"
f" {agent.id}\nMember Description:"
f" {agent.description}\n\n"
)
return prompt return prompt
def prep_schema_second(self, task_description: str, task: str):
def prep_schema_second(
self,
task_description: str,
task: str
):
prompt = f""" prompt = f"""
You are the leader of a team of {len(self.agents)} You are the leader of a team of {len(self.agents)}
members. Your team will need to collaborate to members. Your team will need to collaborate to
@ -115,7 +113,6 @@ class LongContextSwarmLeader:
""" """
return prompt return prompt
def run(self, task: str, *args, **kwargs): def run(self, task: str, *args, **kwargs):
""" """
@ -131,12 +128,13 @@ class LongContextSwarmLeader:
""" """
task = self.prep_schema(task) task = self.prep_schema(task)
out = self.agent.run(task, *args, **kwargs) out = self.agent.run(task, *args, **kwargs)
if self.return_parsed: if self.return_parsed:
out = extract_code_from_markdown(out) out = extract_code_from_markdown(out)
return out return out
# class LongContextSwarm(BaseSwarm): # class LongContextSwarm(BaseSwarm):
# def __init__( # def __init__(
# self, # self,

@ -7,7 +7,21 @@ from typing import Any, List
from swarms.structs.agent import Agent from swarms.structs.agent import Agent
from swarms.structs.conversation import Conversation from swarms.structs.conversation import Conversation
from swarms.utils.logger import logger from loguru import logger
import sys
# Configure loguru logger with advanced settings
logger.remove()
logger.add(
sys.stderr,
colorize=True,
format="<green>{time}</green> <level>{message}</level>",
backtrace=True,
diagnose=True,
enqueue=True,
catch=True,
)
def extract_last_python_code_block(text): def extract_last_python_code_block(text):
@ -157,11 +171,18 @@ class MajorityVoting:
time_enabled=True, *args, **kwargs time_enabled=True, *args, **kwargs
) )
# # Configure logging # If autosave is enabled, save the conversation to a file
# self.logging.basicConfig( if self.autosave:
# level=logging.INFO, self.conversation.save()
# format="%(asctime)s - %(levelname)s - %(message)s",
# ) # Log the agents
logger.info("Initializing majority voting system")
# Length of agents
logger.info(f"Number of agents: {len(self.agents)}")
logger.info(
"Agents:"
f" {', '.join(agent.agent_name for agent in self.agents)}"
)
def run(self, task: str, *args, **kwargs) -> List[Any]: def run(self, task: str, *args, **kwargs) -> List[Any]:
""" """
@ -176,10 +197,11 @@ class MajorityVoting:
List[Any]: The majority vote. List[Any]: The majority vote.
""" """
# Route to each agent # Route to each agent
if self.concurrent: if self.concurrent:
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
# Log the agents
logger.info("Running agents concurrently")
futures = [ futures = [
executor.submit(agent.run, task, *args) executor.submit(agent.run, task, *args)
for agent in self.agents for agent in self.agents
@ -191,6 +213,7 @@ class MajorityVoting:
) )
] ]
elif self.multithreaded: elif self.multithreaded:
logger.info("Running agents using multithreading")
with concurrent.futures.ThreadPoolExecutor() as executor: with concurrent.futures.ThreadPoolExecutor() as executor:
results = [ results = [
executor.submit(agent.run, task, *args) executor.submit(agent.run, task, *args)
@ -198,6 +221,7 @@ class MajorityVoting:
] ]
results = [future.result() for future in results] results = [future.result() for future in results]
elif self.multiprocess: elif self.multiprocess:
logger.info("Running agents using multiprocessing")
with Pool() as pool: with Pool() as pool:
results = pool.starmap( results = pool.starmap(
Agent.run, Agent.run,
@ -218,6 +242,8 @@ class MajorityVoting:
# Add responses to conversation and log them # Add responses to conversation and log them
for agent, response in zip(self.agents, results): for agent, response in zip(self.agents, results):
logger.info(f"[{agent.agent_id}][{response}]")
response = ( response = (
response if isinstance(response, list) else [response] response if isinstance(response, list) else [response]
) )
@ -227,6 +253,9 @@ class MajorityVoting:
# Perform majority voting on the conversation # Perform majority voting on the conversation
majority_vote = majority_voting(self.conversation.responses) majority_vote = majority_voting(self.conversation.responses)
# Log the majority vote
logger.info(f"Majority vote: {majority_vote}")
# If an output parser is provided, parse the output # If an output parser is provided, parse the output
if self.output_parser: if self.output_parser:
majority_vote = self.output_parser( majority_vote = self.output_parser(

@ -20,10 +20,12 @@ class SubprocessCodeInterpreter:
Example: Example:
""" """
def __init__(self): def __init__(
self.start_cmd = "" self,
start_cmd: str = "",
debug_mode: bool = False,
):
self.process = None self.process = None
self.debug_mode = False
self.output_queue = queue.Queue() self.output_queue = queue.Queue()
self.done = threading.Event() self.done = threading.Event()

@ -1,13 +1,9 @@
import logging import logging
import os import os
import sys
import warnings import warnings
def disable_logging(): def disable_logging():
log_file = open("errors.txt", "w")
sys.stderr = log_file
warnings.filterwarnings("ignore", category=UserWarning) warnings.filterwarnings("ignore", category=UserWarning)
# disable tensorflow warnings # disable tensorflow warnings
@ -29,6 +25,11 @@ def disable_logging():
"numexpr", "numexpr",
"git", "git",
"wandb.docker.auth", "wandb.docker.auth",
"langchain",
"distutils",
"urllib3",
"elasticsearch",
"packaging",
]: ]:
logger = logging.getLogger(logger_name) logger = logging.getLogger(logger_name)
logger.setLevel(logging.ERROR) logger.setLevel(logging.ERROR)

@ -0,0 +1,45 @@
from unittest.mock import MagicMock
from swarms.models.fire_function import FireFunctionCaller
def test_fire_function_caller_run(mocker):
# Create mock model and tokenizer
model = MagicMock()
tokenizer = MagicMock()
mocker.patch.object(FireFunctionCaller, "model", model)
mocker.patch.object(FireFunctionCaller, "tokenizer", tokenizer)
# Create mock task and arguments
task = "Add 2 and 3"
args = (2, 3)
kwargs = {}
# Create mock generated_ids and decoded output
generated_ids = [1, 2, 3]
decoded_output = "5"
model.generate.return_value = generated_ids
tokenizer.batch_decode.return_value = [decoded_output]
# Create FireFunctionCaller instance
fire_function_caller = FireFunctionCaller()
# Run the function
fire_function_caller.run(task, *args, **kwargs)
# Assert model.generate was called with the correct inputs
model.generate.assert_called_once_with(
tokenizer.apply_chat_template.return_value,
max_new_tokens=fire_function_caller.max_tokens,
*args,
**kwargs,
)
# Assert tokenizer.batch_decode was called with the correct inputs
tokenizer.batch_decode.assert_called_once_with(generated_ids)
# Assert the decoded output is printed
assert decoded_output in mocker.patch.object(
print, "call_args_list"
)
Loading…
Cancel
Save