From c82f4f9b5a775a7dbb749ce2dcf92c0df59af980 Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 23 Feb 2024 10:57:26 -0800 Subject: [PATCH] [FEAT][Improved Logging] --- example.py | 3 +- playground/structs/majority_voting.py | 6 +- pyproject.toml | 3 +- requirements.txt | 1 + swarms/models/__init__.py | 51 +-- swarms/models/cog_vlm.py | 530 ++++++++++++++++++++++ swarms/models/fire_function.py | 87 ++++ swarms/models/test_fire_function.py | 41 ++ swarms/structs/agent.py | 307 ++++++++----- swarms/structs/long_swarm.py | 34 +- swarms/structs/majority_voting.py | 43 +- swarms/utils/code_interpreter.py | 8 +- swarms/utils/disable_logging.py | 9 +- tests/models/test_fire_function_caller.py | 45 ++ 14 files changed, 985 insertions(+), 183 deletions(-) create mode 100644 swarms/models/cog_vlm.py create mode 100644 swarms/models/fire_function.py create mode 100644 swarms/models/test_fire_function.py create mode 100644 tests/models/test_fire_function_caller.py diff --git a/example.py b/example.py index f2b11586..1a83d232 100644 --- a/example.py +++ b/example.py @@ -3,7 +3,7 @@ from swarms import Agent, OpenAIChat ## Initialize the workflow agent = Agent( llm=OpenAIChat(), - max_loops="auto", + max_loops=1, autosave=True, dashboard=False, streaming_on=True, @@ -14,3 +14,4 @@ agent = Agent( agent( "Find a chick fil a equivalent in hayes valley" ) + diff --git a/playground/structs/majority_voting.py b/playground/structs/majority_voting.py index cd8de04a..149fd587 100644 --- a/playground/structs/majority_voting.py +++ b/playground/structs/majority_voting.py @@ -4,9 +4,9 @@ from swarms import Agent, MajorityVoting, OpenAIChat llm = OpenAIChat() # Initialize the agents -agent1 = Agent(llm=llm, max_loops=1) -agent2 = Agent(llm=llm, max_loops=1) -agent3 = Agent(llm=llm, max_loops=1) +agent1 = Agent(agent_name="worker-1", llm=llm, max_loops=1) +agent2 = Agent(agent_name="worker-2", llm=llm, max_loops=1) +agent3 = Agent(agent_name="worker3", llm=llm, max_loops=1) # Initialize the majority voting diff --git a/pyproject.toml b/pyproject.toml index 7e05820b..3247cc64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "4.1.6" +version = "4.1.7" description = "Swarms - Pytorch" license = "MIT" authors = ["Kye Gomez "] @@ -51,6 +51,7 @@ httpx = "0.24.1" tiktoken = "0.4.0" attrs = "22.2.0" ratelimit = "2.2.1" +loguru = "*" cohere = "4.24" huggingface-hub = "*" pydantic = "1.10.12" diff --git a/requirements.txt b/requirements.txt index e582fa25..25fffdfb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ sentencepiece==0.1.98 requests_mock pypdf==4.0.1 accelerate==0.22.0 +loguru chromadb tensorflow optimum diff --git a/swarms/models/__init__.py b/swarms/models/__init__.py index 0826e245..8981f70e 100644 --- a/swarms/models/__init__.py +++ b/swarms/models/__init__.py @@ -1,16 +1,23 @@ -############################################ LLMs from swarms.models.anthropic import Anthropic # noqa: E402 - -# 3############ Embedding models from swarms.models.base_embedding_model import BaseEmbeddingModel from swarms.models.base_llm import AbstractLLM # noqa: E402 - -################# MultiModal Models from swarms.models.base_multimodal_model import ( BaseMultiModalModel, -) # noqa: E402 +) + +# noqa: E402 from swarms.models.biogpt import BioGPT # noqa: E402 from swarms.models.clipq import CLIPQ # noqa: E402 + +# from swarms.models.dalle3 import Dalle3 +# from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402 +# from swarms.models.whisperx_model import WhisperX # noqa: E402 +# from swarms.models.kosmos_two import Kosmos # noqa: E402 +# from swarms.models.cog_agent import CogAgent # noqa: E402 +## Function calling models +from swarms.models.fire_function import ( + FireFunctionCaller, +) from swarms.models.fuyu import Fuyu # noqa: E402 from swarms.models.gemini import Gemini # noqa: E402 from swarms.models.gigabind import Gigabind # noqa: E402 @@ -20,9 +27,9 @@ from swarms.models.idefics import Idefics # noqa: E402 from swarms.models.kosmos_two import Kosmos # noqa: E402 from swarms.models.layoutlm_document_qa import ( LayoutLMDocumentQA, -) # noqa: E402 +) -# from swarms.models.vip_llava import VipLlavaMultiModal # noqa: E402 +# noqa: E402 from swarms.models.llava import LavaMultiModal # noqa: E402 from swarms.models.mistral import Mistral # noqa: E402 from swarms.models.mixtral import Mixtral # noqa: E402 @@ -32,18 +39,18 @@ from swarms.models.openai_models import ( AzureOpenAI, OpenAI, OpenAIChat, -) # noqa: E402 +) + +# noqa: E402 from swarms.models.openai_tts import OpenAITTS # noqa: E402 from swarms.models.petals import Petals # noqa: E402 from swarms.models.qwen import QwenVLMultiModal # noqa: E402 from swarms.models.roboflow_model import RoboflowMultiModal from swarms.models.sam_supervision import SegmentAnythingMarkGenerator - -##### Utils from swarms.models.sampling_params import ( SamplingParams, SamplingType, -) # noqa: E402 +) from swarms.models.timm import TimmModel # noqa: E402 # from swarms.models.modelscope_pipeline import ModelScopePipeline @@ -62,26 +69,19 @@ from swarms.models.types import ( # noqa: E402 ) from swarms.models.ultralytics_model import ( UltralyticsModel, -) # noqa: E402 +) + +# noqa: E402 from swarms.models.vilt import Vilt # noqa: E402 from swarms.models.wizard_storytelling import ( WizardLLMStoryTeller, -) # noqa: E402 +) +# noqa: E402 # from swarms.models.vllm import vLLM # noqa: E402 from swarms.models.zephyr import Zephyr # noqa: E402 from swarms.models.zeroscope import ZeroscopeTTV # noqa: E402 -# from swarms.models.dalle3 import Dalle3 -# from swarms.models.distilled_whisperx import DistilWhisperModel # noqa: E402 -# from swarms.models.whisperx_model import WhisperX # noqa: E402 -# from swarms.models.kosmos_two import Kosmos # noqa: E402 -# from swarms.models.cog_agent import CogAgent # noqa: E402 - - -################# Tokenizers - - __all__ = [ "AbstractLLM", "Anthropic", @@ -100,7 +100,6 @@ __all__ = [ "HuggingfaceLLM", "MPT7B", "WizardLLMStoryTeller", - # "GPT4Vision", # "Dalle3", # "DistilWhisperModel", "GPT4VisionAPI", @@ -118,7 +117,6 @@ __all__ = [ "TogetherLLM", "TimmModel", "UltralyticsModel", - # "VipLlavaMultiModal", "LavaMultiModal", "QwenVLMultiModal", "CLIPQ", @@ -129,4 +127,5 @@ __all__ = [ "SegmentAnythingMarkGenerator", "SamplingType", "SamplingParams", + "FireFunctionCaller", ] diff --git a/swarms/models/cog_vlm.py b/swarms/models/cog_vlm.py new file mode 100644 index 00000000..14b99b60 --- /dev/null +++ b/swarms/models/cog_vlm.py @@ -0,0 +1,530 @@ +import base64 +import os +import time +from io import BytesIO +from typing import List, Literal, Optional, Tuple, Union + +import torch +from PIL import Image +from pydantic import BaseModel, Field +from transformers import ( + AutoModelForCausalLM, + LlamaTokenizer, + TextIteratorStreamer, +) + +from swarms.models.base_multimodal_model import BaseMultiModalModel +from swarms.utils.logger import logger + +MODEL_PATH = "THUDM/cogvlm-chat-hf" +TOKENIZER_PATH = "lmsys/vicuna-7b-v1.5" +DEVICE = "cuda" if torch.cuda.is_available() else "cpu" +QUANT_ENABLED = False + + +class ImageUrl(BaseModel): + url: str + + +class TextContent(BaseModel): + type: Literal["text"] + text: str + + +class ImageUrlContent(BaseModel): + type: Literal["image_url"] + image_url: ImageUrl + + +ContentItem = Union[TextContent, ImageUrlContent] + + +class ChatMessageInput(BaseModel): + role: Literal["user", "assistant", "system"] + content: Union[str, List[ContentItem]] + name: Optional[str] = None + + +class ChatMessageResponse(BaseModel): + role: Literal["assistant"] + content: str = None + name: Optional[str] = None + + +class DeltaMessage(BaseModel): + role: Optional[Literal["user", "assistant", "system"]] = None + content: Optional[str] = None + + +class ChatCompletionRequest(BaseModel): + model: str + messages: List[ChatMessageInput] + temperature: Optional[float] = 0.8 + top_p: Optional[float] = 0.8 + max_tokens: Optional[int] = None + stream: Optional[bool] = False + # Additional parameters + repetition_penalty: Optional[float] = 1.0 + + +class ChatCompletionResponseChoice(BaseModel): + index: int + message: ChatMessageResponse + + +class ChatCompletionResponseStreamChoice(BaseModel): + index: int + delta: DeltaMessage + + +class UsageInfo(BaseModel): + prompt_tokens: int = 0 + total_tokens: int = 0 + completion_tokens: Optional[int] = 0 + + +class ChatCompletionResponse(BaseModel): + model: str + object: Literal["chat.completion", "chat.completion.chunk"] + choices: List[ + Union[ + ChatCompletionResponseChoice, + ChatCompletionResponseStreamChoice, + ] + ] + created: Optional[int] = Field( + default_factory=lambda: int(time.time()) + ) + usage: Optional[UsageInfo] = None + + +# async def create_chat_completion(request: ChatCompletionRequest): +# global model, tokenizer + +# gen_params = dict( +# messages=request.messages, +# temperature=request.temperature, +# top_p=request.top_p, +# max_tokens=request.max_tokens or 1024, +# echo=False, +# stream=request.stream, +# ) + +# # if request.stream: +# # predict(request.model, gen_params) +# # response = generate_cogvlm(model, tokenizer, gen_params) + +# usage = UsageInfo() + +# message = ChatMessageResponse( +# role="assistant", +# content=response["text"], +# ) +# logger.debug(f"==== message ====\n{message}") +# choice_data = ChatCompletionResponseChoice( +# index=0, +# message=message, +# ) +# task_usage = UsageInfo.model_validate(response["usage"]) +# for usage_key, usage_value in task_usage.model_dump().items(): +# setattr( +# usage, usage_key, getattr(usage, usage_key) + usage_value +# ) +# return ChatCompletionResponse( +# model=request.model, +# choices=[choice_data], +# object="chat.completion", +# usage=usage, +# ) + + +class CogVLMMultiModal(BaseMultiModalModel): + """ + Initializes the CogVLM model. + + Args: + model_name (str): The path or name of the pre-trained model. + tokenizer (str): The path or name of the tokenizer. + device (str): The device to run the model on. + quantize (bool): Whether to enable quantization. + torch_type (str): The torch data type to use. + temperature (float): The temperature for sampling. + top_p (float): The top-p value for sampling. + max_tokens (int): The maximum number of tokens to generate. + echo (bool): Whether to echo the input text. + stream (bool): Whether to stream the output. + repetition_penalty (float): The repetition penalty for sampling. + do_sample (bool): Whether to use sampling during generation. + *args: Additional positional arguments. + **kwargs: Additional keyword arguments. + + Methods: + run: Generates a response using the CogVLM model. + generate_stream_cogvlm: Generates a stream of responses using the CogVLM model in inference mode. + process_history_and_images: Processes history messages to extract text, identify the last user query, and convert base64 encoded image URLs to PIL images. + + Example: + >>> model = CogVLMMultiModal() + >>> response = model("Describe this image with meticlous details.", "https://example.com/image.jpg") + >>> print(response) + """ + + def __init__( + self, + model_name: str = MODEL_PATH, + tokenizer: str = TOKENIZER_PATH, + device: str = DEVICE, + quantize: bool = QUANT_ENABLED, + torch_type: str = "float16", + temperature: float = 0.5, + top_p: float = 0.9, + max_tokens: int = 3500, + echo: bool = False, + stream: bool = False, + repetition_penalty: float = 1.0, + do_sample: bool = True, + *args, + **kwargs, + ): + super().__init__() + self.model_name = model_name + self.device = device + self.tokenizer = tokenizer + self.device = device + self.quantize = quantize + self.torch_type = torch_type + self.temperature = temperature + self.top_p = top_p + self.max_tokens = max_tokens + self.echo = echo + self.stream = stream + self.repetition_penalty = repetition_penalty + self.do_sample = do_sample + + if os.environ.get("QUANT_ENABLED"): + pass + else: + with torch.cuda.device(device): + __, total_bytes = torch.cuda.mem_get_info() + total_gb = total_bytes / (1 << 30) + if total_gb < 40: + pass + else: + pass + + torch.cuda.empty_cache() + + self.tokenizer = LlamaTokenizer.from_pretrained( + tokenizer, trust_remote_code=True + ) + + if ( + torch.cuda.is_available() + and torch.cuda.get_device_capability()[0] >= 8 + ): + torch_type = torch.bfloat16 + else: + torch_type = torch.float16 + + print( + "========Use torch type as:{} with device:{}========\n\n" + .format(torch_type, device) + ) + + if "cuda" in device: + if QUANT_ENABLED: + self.model = AutoModelForCausalLM.from_pretrained( + model_name, + load_in_4bit=True, + trust_remote_code=True, + torch_dtype=torch_type, + low_cpu_mem_usage=True, + *args, + **kwargs, + ).eval() + else: + self.model = ( + AutoModelForCausalLM.from_pretrained( + model_name, + load_in_4bit=False, + trust_remote_code=True, + torch_dtype=torch_type, + low_cpu_mem_usage=True, + *args, + **kwargs, + ) + .to(device) + .eval() + ) + + else: + self.model = ( + AutoModelForCausalLM.from_pretrained( + model_name, + trust_remote_code=True, + *args, + **kwargs, + ) + .float() + .to(device) + .eval() + ) + + def run(self, task: str, img: str, *args, **kwargs): + """ + Generates a response using the CogVLM model. It processes the chat history and image data, if any, + and then invokes the model to generate a response. + """ + messages = [task] + + params = dict( + messages=messages, + temperature=self.temperature, + repitition_penalty=self.repetition_penalty, + top_p=self.top_p, + max_new_tokens=self.max_tokens, + ) + + for response in self.generate_stream_cogvlm(params): + pass + + return response + + @torch.inference_mode() + def generate_stream_cogvlm( + self, + params: dict, + ): + """ + Generates a stream of responses using the CogVLM model in inference mode. + It's optimized to handle continuous input-output interactions with the model in a streaming manner. + """ + messages = params["messages"] + temperature = float(params.get("temperature", 1.0)) + repetition_penalty = float( + params.get("repetition_penalty", 1.0) + ) + top_p = float(params.get("top_p", 1.0)) + max_new_tokens = int(params.get("max_tokens", 256)) + query, history, image_list = self.process_history_and_images( + messages + ) + + logger.debug(f"==== request ====\n{query}") + + input_by_model = self.model.build_conversation_input_ids( + self.tokenizer, + query=query, + history=history, + images=[image_list[-1]], + ) + inputs = { + "input_ids": ( + input_by_model["input_ids"] + .unsqueeze(0) + .to(self.device) + ), + "token_type_ids": ( + input_by_model["token_type_ids"] + .unsqueeze(0) + .to(self.device) + ), + "attention_mask": ( + input_by_model["attention_mask"] + .unsqueeze(0) + .to(self.device) + ), + "images": [ + [ + input_by_model["images"][0] + .to(self.device) + .to(self.torch_type) + ] + ], + } + if ( + "cross_images" in input_by_model + and input_by_model["cross_images"] + ): + inputs["cross_images"] = [ + [ + input_by_model["cross_images"][0] + .to(self.device) + .to(self.torch_type) + ] + ] + + input_echo_len = len(inputs["input_ids"][0]) + streamer = TextIteratorStreamer( + tokenizer=self.tokenizer, + timeout=60.0, + skip_promptb=True, + skip_special_tokens=True, + ) + gen_kwargs = { + "repetition_penalty": repetition_penalty, + "max_new_tokens": max_new_tokens, + "do_sample": True if temperature > 1e-5 else False, + "top_p": top_p if temperature > 1e-5 else 0, + "streamer": streamer, + } + if temperature > 1e-5: + gen_kwargs["temperature"] = temperature + + total_len = 0 + generated_text = "" + with torch.no_grad(): + self.model.generate(**inputs, **gen_kwargs) + for next_text in streamer: + generated_text += next_text + yield { + "text": generated_text, + "usage": { + "prompt_tokens": input_echo_len, + "completion_tokens": ( + total_len - input_echo_len + ), + "total_tokens": total_len, + }, + } + ret = { + "text": generated_text, + "usage": { + "prompt_tokens": input_echo_len, + "completion_tokens": total_len - input_echo_len, + "total_tokens": total_len, + }, + } + yield ret + + def process_history_and_images( + self, + messages: List[ChatMessageInput], + ) -> Tuple[ + Optional[str], + Optional[List[Tuple[str, str]]], + Optional[List[Image.Image]], + ]: + """ + Process history messages to extract text, identify the last user query, + and convert base64 encoded image URLs to PIL images. + + Args: + messages(List[ChatMessageInput]): List of ChatMessageInput objects. + return: A tuple of three elements: + - The last user query as a string. + - Text history formatted as a list of tuples for the model. + - List of PIL Image objects extracted from the messages. + """ + formatted_history = [] + image_list = [] + last_user_query = "" + + for i, message in enumerate(messages): + role = message.role + content = message.content + + # Extract text content + if isinstance(content, list): # text + text_content = " ".join( + item.text + for item in content + if isinstance(item, TextContent) + ) + else: + text_content = content + + # Extract image data + if isinstance(content, list): # image + for item in content: + if isinstance(item, ImageUrlContent): + image_url = item.image_url.url + if image_url.startswith( + "data:image/jpeg;base64," + ): + base64_encoded_image = image_url.split( + "data:image/jpeg;base64," + )[1] + image_data = base64.b64decode( + base64_encoded_image + ) + image = Image.open( + BytesIO(image_data) + ).convert("RGB") + image_list.append(image) + + # Format history + if role == "user": + if i == len(messages) - 1: + last_user_query = text_content + else: + formatted_history.append((text_content, "")) + elif role == "assistant": + if formatted_history: + if formatted_history[-1][1] != "": + assert False, ( + "the last query is answered. answer" + f" again. {formatted_history[-1][0]}," + f" {formatted_history[-1][1]}," + f" {text_content}" + ) + formatted_history[-1] = ( + formatted_history[-1][0], + text_content, + ) + else: + assert False, "assistant reply before user" + else: + assert False, f"unrecognized role: {role}" + + return last_user_query, formatted_history, image_list + + async def predict(self, params: dict): + """ + Handle streaming predictions. It continuously generates responses for a given input stream. + This is particularly useful for real-time, continuous interactions with the model. + """ + + choice_data = ChatCompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(role="assistant"), + finish_reason=None, + ) + chunk = ChatCompletionResponse( + model=self.model_name, + choices=[choice_data], + object="chat.completion.chunk", + ) + yield "{}".format(chunk.model_dump_json(exclude_unset=True)) + + previous_text = "" + for new_response in self.generate_stream_cogvlm(params): + decoded_unicode = new_response["text"] + delta_text = decoded_unicode[len(previous_text) :] + previous_text = decoded_unicode + delta = DeltaMessage( + content=delta_text, + role="assistant", + ) + choice_data = ChatCompletionResponseStreamChoice( + index=0, + delta=delta, + ) + chunk = ChatCompletionResponse( + model=self.model_name, + choices=[choice_data], + object="chat.completion.chunk", + ) + yield "{}".format( + chunk.model_dump_json(exclude_unset=True) + ) + choice_data = ChatCompletionResponseStreamChoice( + index=0, + delta=DeltaMessage(), + ) + chunk = ChatCompletionResponse( + model=self.model_name, + choices=[choice_data], + object="chat.completion.chunk", + ) + yield "{}".format(chunk.model_dump_json(exclude_unset=True)) diff --git a/swarms/models/fire_function.py b/swarms/models/fire_function.py new file mode 100644 index 00000000..6803d822 --- /dev/null +++ b/swarms/models/fire_function.py @@ -0,0 +1,87 @@ +from transformers import AutoModelForCausalLM, AutoTokenizer +import json +from swarms.models.base_llm import AbstractLLM +from typing import Any + + +class FireFunctionCaller(AbstractLLM): + """ + A class that represents a caller for the FireFunction model. + + Args: + model_name (str): The name of the model to be used. + device (str): The device to be used. + function_spec (Any): The specification of the function. + max_tokens (int): The maximum number of tokens. + system_prompt (str): The system prompt. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Methods: + run(self, task: str, *args, **kwargs) -> None: Run the function with the given task and arguments. + + Examples: + >>> fire_function_caller = FireFunctionCaller() + >>> fire_function_caller.run("Add 2 and 3") + """ + + def __init__( + self, + model_name: str = "fireworks-ai/firefunction-v1", + device: str = "cuda", + function_spec: Any = None, + max_tokens: int = 3000, + system_prompt: str = "You are a helpful assistant with access to functions. Use them if required.", + *args, + **kwargs, + ): + super().__init__(model_name, device) + self.model_name = model_name + self.device = device + self.fucntion_spec = function_spec + self.max_tokens = max_tokens + self.system_prompt = system_prompt + + self.model = AutoModelForCausalLM.from_pretrained( + model_name, device_map="auto", *args, **kwargs + ) + self.tokenizer = AutoTokenizer.from_pretrained(model_name) + + self.functions = json.dumps(function_spec, indent=4) + + def run(self, task: str, *args, **kwargs): + """ + Run the function with the given task and arguments. + + Args: + task (str): The task to be performed. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + None + """ + messages = [ + {"role": "functions", "content": self.functions}, + { + "role": "system", + "content": self.system_prompt, + }, + { + "role": "user", + "content": task, + }, + ] + + model_inputs = self.tokenizer.apply_chat_template( + messages, return_tensors="pt" + ).to(self.model.device) + + generated_ids = self.model.generate( + model_inputs, + max_new_tokens=self.max_tokens, + *args, + **kwargs, + ) + decoded = self.tokenizer.batch_decode(generated_ids) + print(decoded[0]) diff --git a/swarms/models/test_fire_function.py b/swarms/models/test_fire_function.py new file mode 100644 index 00000000..e28d64ee --- /dev/null +++ b/swarms/models/test_fire_function.py @@ -0,0 +1,41 @@ +import pytest +from unittest.mock import MagicMock +from swarms.models.fire_function import FireFunctionCaller + +def test_fire_function_caller_run(mocker): + # Create mock model and tokenizer + model = MagicMock() + tokenizer = MagicMock() + mocker.patch.object(FireFunctionCaller, 'model', model) + mocker.patch.object(FireFunctionCaller, 'tokenizer', tokenizer) + + # Create mock task and arguments + task = "Add 2 and 3" + args = (2, 3) + kwargs = {} + + # Create mock generated_ids and decoded output + generated_ids = [1, 2, 3] + decoded_output = "5" + model.generate.return_value = generated_ids + tokenizer.batch_decode.return_value = [decoded_output] + + # Create FireFunctionCaller instance + fire_function_caller = FireFunctionCaller() + + # Run the function + fire_function_caller.run(task, *args, **kwargs) + + # Assert model.generate was called with the correct inputs + model.generate.assert_called_once_with( + tokenizer.apply_chat_template.return_value, + max_new_tokens=fire_function_caller.max_tokens, + *args, + **kwargs, + ) + + # Assert tokenizer.batch_decode was called with the correct inputs + tokenizer.batch_decode.assert_called_once_with(generated_ids) + + # Assert the decoded output is printed + assert decoded_output in mocker.patch.object(print, 'call_args_list') diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 95c01c79..be5b4402 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -3,10 +3,12 @@ import json import logging import os import random +import sys import time import uuid from typing import Any, Callable, Dict, List, Optional, Tuple +from loguru import logger from termcolor import colored from swarms.memory.base_vectordb import AbstractVectorDatabase @@ -22,7 +24,8 @@ from swarms.tools.exec_tool import execute_tool_by_name from swarms.tools.tool import BaseTool from swarms.utils.code_interpreter import SubprocessCodeInterpreter from swarms.utils.data_to_text import data_to_text -from swarms.utils.logger import logger + +# from swarms.utils.logger import logger from swarms.utils.parse_code import extract_code_from_markdown from swarms.utils.pdf_to_text import pdf_to_text from swarms.utils.token_count_tiktoken import limit_tokens_from_string @@ -51,10 +54,18 @@ def agent_id(): return str(uuid.uuid4()) +# Task ID generator def task_id(): + """ + Generate a unique task ID. + + Returns: + str: A string representation of a UUID. + """ return str(uuid.uuid4()) +# Step ID generator def step_id(): return str(uuid.uuid1()) @@ -194,6 +205,10 @@ class Agent: callback: Optional[Callable] = None, metadata: Optional[Dict[str, Any]] = None, callbacks: Optional[List[Callable]] = None, + logger_handler: Any = sys.stderr, + search_algorithm: Optional[Callable] = None, + logs_to_filename: Optional[str] = None, + evaluator: Optional[Callable] = None, *args, **kwargs, ): @@ -243,9 +258,14 @@ class Agent: self.callback = callback self.metadata = metadata self.callbacks = callbacks + self.logger_handler = logger_handler + self.search_algorithm = search_algorithm + self.logs_to_filename = logs_to_filename + self.evaluator = evaluator # The max_loops will be set dynamically if the dynamic_loop if self.dynamic_loops: + logger.info("Dynamic loops enabled") self.max_loops = "auto" # If multimodal = yes then set the sop to the multimodal sop @@ -260,7 +280,9 @@ class Agent: self.feedback = [] # Initialize the code executor - self.code_executor = SubprocessCodeInterpreter() + self.code_executor = SubprocessCodeInterpreter( + debug_mode=True, + ) # If the preset stopping token is enabled then set the stopping token to the preset stopping token if preset_stopping_token: @@ -279,11 +301,12 @@ class Agent: self.get_docs_from_doc_folders() # If tokenizer and context length exists then: - if self.tokenizer and self.context_length: - self.truncate_history() + # if self.tokenizer and self.context_length: + # self.truncate_history() - if verbose: - logger.setLevel(logging.DEBUG) + # If verbose is enabled then set the logger level to info + # if verbose: + # logger.setLevel(logging.INFO) # If tools are provided then set the tool prompt by adding to sop if self.tools: @@ -308,6 +331,21 @@ class Agent: # Step cache self.step_cache = [] + # Set the logger handler + if logger_handler: + logger.add( + f"{self.agent_name}.log", + level="INFO", + colorize=True, + format=( + "{time} {message}" + ), + backtrace=True, + diagnose=True, + ) + + # logger.info("Creating Agent {}".format(self.agent_name)) + def set_system_prompt(self, system_prompt: str): """Set the system prompt""" self.system_prompt = system_prompt @@ -342,6 +380,7 @@ class Agent: if hasattr(self.llm, "temperature"): # Randomly change the temperature attribute of self.llm object self.llm.temperature = random.uniform(0.0, 1.0) + logger.info(f"Temperature: {self.llm.temperature}") else: # Use a default temperature self.llm.temperature = 0.7 @@ -359,6 +398,7 @@ class Agent: def add_task_to_memory(self, task: str): """Add the task to the memory""" try: + logger.info(f"Adding task to memory: {task}") self.short_memory.add(f"{self.user_name}: {task}") except Exception as error: print( @@ -370,6 +410,7 @@ class Agent: def add_message_to_memory(self, message: str): """Add the message to the memory""" try: + logger.info(f"Adding message to memory: {message}") self.short_memory.add( role=self.agent_name, content=message ) @@ -590,12 +631,26 @@ class Agent: # Log each step step = Step( - input=task, - task_id=task_id, - step_id=step_id, - output=response, + input=str(task), + task_id=str(task_id), + step_id=str(step_id), + output=str(response), + status="running", ) + if self.evaluator: + evaluated_response = self.evaluator( + response + ) + + out = ( + f"Response: {response}\nEvaluated" + f" Response: {evaluated_response}" + ) + out = self.short_memory.add( + "Evaluator", out + ) + # Check to see if stopping token is in the output to stop the loop if self.stopping_token: if self._check_stopping_condition( @@ -679,20 +734,6 @@ class Agent: """ self.run(task, img, *args, **kwargs) - def _run(self, **kwargs: Any) -> str: - """Run the agent on a task - - Returns: - str: _description_ - """ - try: - task = self.format_prompt(**kwargs) - response, history = self._generate(task, task) - logging.info(f"Message history: {history}") - return response - except Exception as error: - print(colored(f"Error running agent: {error}", "red")) - def agent_history_prompt( self, history: str = None, @@ -710,13 +751,12 @@ class Agent: if self.sop: system_prompt = self.system_prompt agent_history_prompt = f""" - SYSTEM_PROMPT: {system_prompt} + role: system + {system_prompt} Follow this standard operating procedure (SOP) to complete tasks: {self.sop} - ----------------- - ################ CHAT HISTORY #################### {history} """ return agent_history_prompt @@ -758,6 +798,7 @@ class Agent: Returns: _type_: _description_ """ + logger.info(f"Adding memory: {message}") return self.short_memory.add( role=self.agent_name, content=message ) @@ -770,10 +811,12 @@ class Agent: tasks (List[str]): A list of tasks to run. """ try: + logger.info(f"Running concurrent tasks: {tasks}") task_coroutines = [ self.run_async(task, **kwargs) for task in tasks ] completed_tasks = await asyncio.gather(*task_coroutines) + logger.info(f"Completed tasks: {completed_tasks}") return completed_tasks except Exception as error: print( @@ -789,6 +832,7 @@ class Agent: def bulk_run(self, inputs: List[Dict[str, Any]]) -> List[str]: try: """Generate responses for multiple input sets.""" + logger.info(f"Running bulk tasks: {inputs}") return [self.run(**input_data) for input_data in inputs] except Exception as error: print(colored(f"Error running bulk run: {error}", "red")) @@ -881,6 +925,7 @@ class Agent: """ try: + logger.info(f"Running a single step: {task}") # Generate the response using lm response = self.llm(task, **kwargs) @@ -960,6 +1005,7 @@ class Agent: """ + logger.info(f"Adding response filter: {filter_word}") self.reponse_filters.append(filter_word) def apply_reponse_filters(self, response: str) -> str: @@ -967,6 +1013,9 @@ class Agent: Apply the response filters to the response """ + logger.info( + f"Applying response filters to response: {response}" + ) for word in self.response_filters: response = response.replace(word, "[FILTERED]") return response @@ -978,11 +1027,13 @@ class Agent: response = agent.filtered_run("Generate a report on finance") print(response) """ + logger.info(f"Running filtered task: {task}") raw_response = self.run(task) return self.apply_response_filters(raw_response) def interactive_run(self, max_loops: int = 5) -> None: """Interactive run mode""" + logger.info("Running in interactive mode") response = input("Start the cnversation") for i in range(max_loops): @@ -992,28 +1043,6 @@ class Agent: # Get user input response = input("You: ") - def streamed_generation(self, prompt: str) -> str: - """ - Stream the generation of the response - - Args: - prompt (str): The prompt to use - - Example: - # Feature 4: Streamed generation - response = agent.streamed_generation("Generate a report on finance") - print(response) - - """ - tokens = list(prompt) - response = "" - for token in tokens: - time.sleep(0.1) - response += token - print(token, end="", flush=True) - print() - return response - def save_state(self, file_path: str) -> None: """ Saves the current state of the agent to a JSON file, including the llm parameters. @@ -1025,6 +1054,7 @@ class Agent: >>> agent.save_state('saved_flow.json') """ try: + logger.info(f"Saving agent state to: {file_path}") state = { "agent_id": str(self.id), "agent_name": self.agent_name, @@ -1045,6 +1075,7 @@ class Agent: "autosave": self.autosave, "saved_state_path": self.saved_state_path, "max_loops": self.max_loops, + # "StepCache": self.step_cache, } with open(file_path, "w") as f: @@ -1104,54 +1135,55 @@ class Agent: >>> agent.run("Continue with the task") """ - with open(file_path) as f: - state = json.load(f) - - # Restore other saved attributes - self.id = state.get("agent_id", self.id) - self.agent_name = state.get("agent_name", self.agent_name) - self.agent_description = state.get( - "agent_description", self.agent_description - ) - self.system_prompt = state.get( - "system_prompt", self.system_prompt - ) - self.sop = state.get("sop", self.sop) - self.short_memory = state.get("short_memory", []) - self.max_loops = state.get("max_loops", 5) - self.loop_interval = state.get("loop_interval", 1) - self.retry_attempts = state.get("retry_attempts", 3) - self.retry_interval = state.get("retry_interval", 1) - self.interactive = state.get("interactive", False) - - print(f"Agent state loaded from {file_path}") + try: + with open(file_path) as f: + state = json.load(f) + + # Restore other saved attributes + self.id = state.get("agent_id", self.id) + self.agent_name = state.get("agent_name", self.agent_name) + self.agent_description = state.get( + "agent_description", self.agent_description + ) + self.system_prompt = state.get( + "system_prompt", self.system_prompt + ) + self.sop = state.get("sop", self.sop) + self.short_memory = state.get("short_memory", []) + self.max_loops = state.get("max_loops", 5) + self.loop_interval = state.get("loop_interval", 1) + self.retry_attempts = state.get("retry_attempts", 3) + self.retry_interval = state.get("retry_interval", 1) + self.interactive = state.get("interactive", False) + + print(f"Agent state loaded from {file_path}") + except Exception as error: + print( + colored(f"Error loading agent state: {error}", "red") + ) def retry_on_failure( - self, function, retries: int = 3, retry_delay: int = 1 + self, + function: callable, + retries: int = 3, + retry_delay: int = 1, ): """Retry wrapper for LLM calls.""" - attempt = 0 - while attempt < retries: - try: - return function() - except Exception as error: - logging.error(f"Error generating response: {error}") - attempt += 1 - time.sleep(retry_delay) - raise Exception("All retry attempts failed") - - def generate_reply(self, history: str, **kwargs) -> str: - """ - Generate a response based on initial or task - """ - prompt = f""" - - SYSTEM_PROMPT: {self.system_prompt} - - History: {history} - """ - response = self.llm(prompt, **kwargs) - return {"role": self.agent_name, "content": response} + try: + logger.info(f"Retrying function: {function}") + attempt = 0 + while attempt < retries: + try: + return function() + except Exception as error: + logging.error( + f"Error generating response: {error}" + ) + attempt += 1 + time.sleep(retry_delay) + raise Exception("All retry attempts failed") + except Exception as error: + print(colored(f"Error retrying function: {error}", "red")) def update_system_prompt(self, system_prompt: str): """Upddate the system message""" @@ -1181,9 +1213,13 @@ class Agent: """ text -> parse_code by looking for code inside 6 backticks `````-> run_code """ - parsed_code = extract_code_from_markdown(code) - run_code = self.code_executor.run(parsed_code) - return run_code + try: + logger.info(f"Running code: {code}") + parsed_code = extract_code_from_markdown(code) + run_code = self.code_executor.run(parsed_code) + return run_code + except Exception as error: + logger.debug(f"Error running code: {error}") def pdf_connector(self, pdf: str = None): """Transforms the pdf into text @@ -1194,9 +1230,13 @@ class Agent: Returns: _type_: _description_ """ - pdf = pdf or self.pdf_path - text = pdf_to_text(pdf) - return text + try: + pdf = pdf or self.pdf_path + text = pdf_to_text(pdf) + return text + except Exception as error: + print(f"Error connecting to the pdf: {error}") + raise error def pdf_chunker(self, text: str = None, num_limits: int = 1000): """Chunk the pdf into sentences @@ -1220,12 +1260,15 @@ class Agent: Returns: _type_: _description_ """ - for doc in docs: - data = data_to_text(doc) + try: + for doc in docs: + data = data_to_text(doc) - return self.short_memory.add( - role=self.user_name, content=data - ) + return self.short_memory.add( + role=self.user_name, content=data + ) + except Exception as error: + print(colored(f"Error ingesting docs: {error}", "red")) def ingest_pdf(self, pdf: str): """Ingest the pdf into the memory @@ -1236,22 +1279,37 @@ class Agent: Returns: _type_: _description_ """ - text = pdf_to_text(pdf) - return self.short_memory.add( - role=self.user_name, content=text - ) + try: + logger.info(f"Ingesting pdf: {pdf}") + text = pdf_to_text(pdf) + return self.short_memory.add( + role=self.user_name, content=text + ) + except Exception as error: + print(colored(f"Error ingesting pdf: {error}", "red")) def receieve_mesage(self, name: str, message: str): """Receieve a message""" - message = f"{name}: {message}" - return self.short_memory.add(role=name, content=message) + try: + message = f"{name}: {message}" + return self.short_memory.add(role=name, content=message) + except Exception as error: + print(colored(f"Error receiving message: {error}", "red")) def send_agent_message( self, agent_name: str, message: str, *args, **kwargs ): """Send a message to the agent""" - message = f"{agent_name}: {message}" - return self.run(message, *args, **kwargs) + try: + logger.info(f"Sending agent message: {message}") + message = f"{agent_name}: {message}" + return self.run(message, *args, **kwargs) + except Exception as error: + print( + colored( + f"Error sending agent message: {error}", "red" + ) + ) def truncate_history(self): """ @@ -1278,13 +1336,22 @@ class Agent: def get_docs_from_doc_folders(self): """Get the docs from the files""" - # Get the list of files then extract them and add them to the memory - files = os.listdir(self.docs_folder) + try: + logger.info("Getting docs from doc folders") + # Get the list of files then extract them and add them to the memory + files = os.listdir(self.docs_folder) - # Extract the text from the files - for file in files: - text = data_to_text(file) + # Extract the text from the files + for file in files: + text = data_to_text(file) - return self.short_memory.add( - role=self.user_name, content=text - ) + return self.short_memory.add( + role=self.user_name, content=text + ) + except Exception as error: + print( + colored( + f"Error getting docs from doc folders: {error}", + "red", + ) + ) diff --git a/swarms/structs/long_swarm.py b/swarms/structs/long_swarm.py index 80d301cb..e24a3e08 100644 --- a/swarms/structs/long_swarm.py +++ b/swarms/structs/long_swarm.py @@ -13,7 +13,7 @@ class LongContextSwarmLeader: - agents (List[Agent]): The agents in the swarm. - prompt_template_json (str): The SOP template in JSON format. - return_parsed (bool): Whether to return the parsed output. - + """ def __init__( @@ -30,17 +30,16 @@ class LongContextSwarmLeader: self.agents = agents self.prompt_template_json = prompt_template_json self.return_parsed = return_parsed - + # Create an instance of the Agent class self.agent = Agent( llm=llm, system_prompt=None, - sop=self.prompt_template_json, - *args, - **kwargs + sop=self.prompt_template_json, + *args, + **kwargs, ) - def prep_schema(self, task: str, *args, **kwargs): """ Returns a formatted string containing the metadata of all agents in the swarm. @@ -71,16 +70,15 @@ class LongContextSwarmLeader: """ for agent in self.agents: - prompt += f"Member Name: {agent.ai_name}\nMember ID: {agent.id}\nMember Description: {agent.description}\n\n" - + prompt += ( + f"Member Name: {agent.ai_name}\nMember ID:" + f" {agent.id}\nMember Description:" + f" {agent.description}\n\n" + ) + return prompt - - - def prep_schema_second( - self, - task_description: str, - task: str - ): + + def prep_schema_second(self, task_description: str, task: str): prompt = f""" You are the leader of a team of {len(self.agents)} members. Your team will need to collaborate to @@ -115,7 +113,6 @@ class LongContextSwarmLeader: """ return prompt - def run(self, task: str, *args, **kwargs): """ @@ -131,12 +128,13 @@ class LongContextSwarmLeader: """ task = self.prep_schema(task) out = self.agent.run(task, *args, **kwargs) - + if self.return_parsed: out = extract_code_from_markdown(out) - + return out + # class LongContextSwarm(BaseSwarm): # def __init__( # self, diff --git a/swarms/structs/majority_voting.py b/swarms/structs/majority_voting.py index 05539ecf..fc4f8018 100644 --- a/swarms/structs/majority_voting.py +++ b/swarms/structs/majority_voting.py @@ -7,7 +7,21 @@ from typing import Any, List from swarms.structs.agent import Agent from swarms.structs.conversation import Conversation -from swarms.utils.logger import logger +from loguru import logger +import sys + + +# Configure loguru logger with advanced settings +logger.remove() +logger.add( + sys.stderr, + colorize=True, + format="{time} {message}", + backtrace=True, + diagnose=True, + enqueue=True, + catch=True, +) def extract_last_python_code_block(text): @@ -157,11 +171,18 @@ class MajorityVoting: time_enabled=True, *args, **kwargs ) - # # Configure logging - # self.logging.basicConfig( - # level=logging.INFO, - # format="%(asctime)s - %(levelname)s - %(message)s", - # ) + # If autosave is enabled, save the conversation to a file + if self.autosave: + self.conversation.save() + + # Log the agents + logger.info("Initializing majority voting system") + # Length of agents + logger.info(f"Number of agents: {len(self.agents)}") + logger.info( + "Agents:" + f" {', '.join(agent.agent_name for agent in self.agents)}" + ) def run(self, task: str, *args, **kwargs) -> List[Any]: """ @@ -176,10 +197,11 @@ class MajorityVoting: List[Any]: The majority vote. """ - # Route to each agent if self.concurrent: with concurrent.futures.ThreadPoolExecutor() as executor: + # Log the agents + logger.info("Running agents concurrently") futures = [ executor.submit(agent.run, task, *args) for agent in self.agents @@ -191,6 +213,7 @@ class MajorityVoting: ) ] elif self.multithreaded: + logger.info("Running agents using multithreading") with concurrent.futures.ThreadPoolExecutor() as executor: results = [ executor.submit(agent.run, task, *args) @@ -198,6 +221,7 @@ class MajorityVoting: ] results = [future.result() for future in results] elif self.multiprocess: + logger.info("Running agents using multiprocessing") with Pool() as pool: results = pool.starmap( Agent.run, @@ -218,6 +242,8 @@ class MajorityVoting: # Add responses to conversation and log them for agent, response in zip(self.agents, results): + logger.info(f"[{agent.agent_id}][{response}]") + response = ( response if isinstance(response, list) else [response] ) @@ -227,6 +253,9 @@ class MajorityVoting: # Perform majority voting on the conversation majority_vote = majority_voting(self.conversation.responses) + # Log the majority vote + logger.info(f"Majority vote: {majority_vote}") + # If an output parser is provided, parse the output if self.output_parser: majority_vote = self.output_parser( diff --git a/swarms/utils/code_interpreter.py b/swarms/utils/code_interpreter.py index e3850250..a586a1eb 100644 --- a/swarms/utils/code_interpreter.py +++ b/swarms/utils/code_interpreter.py @@ -20,10 +20,12 @@ class SubprocessCodeInterpreter: Example: """ - def __init__(self): - self.start_cmd = "" + def __init__( + self, + start_cmd: str = "", + debug_mode: bool = False, + ): self.process = None - self.debug_mode = False self.output_queue = queue.Queue() self.done = threading.Event() diff --git a/swarms/utils/disable_logging.py b/swarms/utils/disable_logging.py index 368c85bf..a5ad63cf 100644 --- a/swarms/utils/disable_logging.py +++ b/swarms/utils/disable_logging.py @@ -1,13 +1,9 @@ import logging import os -import sys import warnings def disable_logging(): - log_file = open("errors.txt", "w") - sys.stderr = log_file - warnings.filterwarnings("ignore", category=UserWarning) # disable tensorflow warnings @@ -29,6 +25,11 @@ def disable_logging(): "numexpr", "git", "wandb.docker.auth", + "langchain", + "distutils", + "urllib3", + "elasticsearch", + "packaging", ]: logger = logging.getLogger(logger_name) logger.setLevel(logging.ERROR) diff --git a/tests/models/test_fire_function_caller.py b/tests/models/test_fire_function_caller.py new file mode 100644 index 00000000..703417a7 --- /dev/null +++ b/tests/models/test_fire_function_caller.py @@ -0,0 +1,45 @@ +from unittest.mock import MagicMock + + +from swarms.models.fire_function import FireFunctionCaller + + +def test_fire_function_caller_run(mocker): + # Create mock model and tokenizer + model = MagicMock() + tokenizer = MagicMock() + mocker.patch.object(FireFunctionCaller, "model", model) + mocker.patch.object(FireFunctionCaller, "tokenizer", tokenizer) + + # Create mock task and arguments + task = "Add 2 and 3" + args = (2, 3) + kwargs = {} + + # Create mock generated_ids and decoded output + generated_ids = [1, 2, 3] + decoded_output = "5" + model.generate.return_value = generated_ids + tokenizer.batch_decode.return_value = [decoded_output] + + # Create FireFunctionCaller instance + fire_function_caller = FireFunctionCaller() + + # Run the function + fire_function_caller.run(task, *args, **kwargs) + + # Assert model.generate was called with the correct inputs + model.generate.assert_called_once_with( + tokenizer.apply_chat_template.return_value, + max_new_tokens=fire_function_caller.max_tokens, + *args, + **kwargs, + ) + + # Assert tokenizer.batch_decode was called with the correct inputs + tokenizer.batch_decode.assert_called_once_with(generated_ids) + + # Assert the decoded output is printed + assert decoded_output in mocker.patch.object( + print, "call_args_list" + )