From c20f40216a314839fd482ba37558df599942c44f Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sun, 23 Feb 2025 18:47:39 -0800 Subject: [PATCH] [fix][concurrentworkflow with entire history output with all of the agent's outputs --- concurrent_example.py | 69 +++++++ duo_agent.py => examples/duo_agent.py | 0 pyproject.toml | 2 +- swarms/structs/__init__.py | 2 - swarms/structs/concurrent_workflow.py | 24 ++- swarms/structs/swarm_router.py | 49 ++--- swarms/structs/swarms_api.py | 248 +++++++++++++++++++++----- swarms_api_client.py | 21 ++- swarms_api_example.py | 15 ++ 9 files changed, 351 insertions(+), 79 deletions(-) create mode 100644 concurrent_example.py rename duo_agent.py => examples/duo_agent.py (100%) diff --git a/concurrent_example.py b/concurrent_example.py new file mode 100644 index 00000000..70f11a13 --- /dev/null +++ b/concurrent_example.py @@ -0,0 +1,69 @@ +import os + + +from swarms import Agent, ConcurrentWorkflow + +# Fetch the OpenAI API key from the environment variable +api_key = os.getenv("OPENAI_API_KEY") + +# Initialize agents for different roles +delaware_ccorp_agent = Agent( + agent_name="Delaware-CCorp-Hiring-Agent", + system_prompt=""" + Create a comprehensive hiring description for a Delaware C Corporation, + including all relevant laws and regulations, such as the Delaware General + Corporation Law (DGCL) and the Delaware Corporate Law. Ensure the description + covers the requirements for hiring employees, contractors, and officers, + including the necessary paperwork, tax obligations, and benefits. Also, + outline the procedures for compliance with Delaware's employment laws, + including anti-discrimination laws, workers' compensation, and unemployment + insurance. Provide guidance on how to navigate the complexities of Delaware's + corporate law and ensure that all hiring practices are in compliance with + state and federal regulations. + """, + model_name="gpt-4o", + max_loops=1, +) + +indian_foreign_agent = Agent( + agent_name="Indian-Foreign-Hiring-Agent", + system_prompt=""" + Create a comprehensive hiring description for an Indian or foreign country, + including all relevant laws and regulations, such as the Indian Contract Act, + the Indian Labour Laws, and the Foreign Exchange Management Act (FEMA). + Ensure the description covers the requirements for hiring employees, + contractors, and officers, including the necessary paperwork, tax obligations, + and benefits. Also, outline the procedures for compliance with Indian and + foreign employment laws, including anti-discrimination laws, workers' + compensation, and unemployment insurance. Provide guidance on how to navigate + the complexities of Indian and foreign corporate law and ensure that all hiring + practices are in compliance with state and federal regulations. Consider the + implications of hiring foreign nationals and the requirements for obtaining + necessary visas and work permits. + """, + model_name="gpt-4o", + max_loops=1, +) + +# List of agents and corresponding tasks +agents = [delaware_ccorp_agent, indian_foreign_agent] +task = """ + Create a comprehensive hiring description for an Agent Engineer, including + required skills and responsibilities. Ensure the description covers the + necessary technical expertise, such as proficiency in AI/ML frameworks, + programming languages, and data structures. Outline the key responsibilities, + including designing and developing AI agents, integrating with existing systems, + and ensuring scalability and performance. + """ + +# Run agents with tasks concurrently +swarm = ConcurrentWorkflow( + agents=agents, + return_str_on=True, +) + +print( + swarm.run( + task="what is the best state to incorporate a company in the USA?" + ) +) diff --git a/duo_agent.py b/examples/duo_agent.py similarity index 100% rename from duo_agent.py rename to examples/duo_agent.py diff --git a/pyproject.toml b/pyproject.toml index ad771f0a..9cad3e10 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "swarms" -version = "7.4.2" +version = "7.4.4" description = "Swarms - TGSC" license = "MIT" authors = ["Kye Gomez "] diff --git a/swarms/structs/__init__.py b/swarms/structs/__init__.py index d6a63fb4..dd0dce3b 100644 --- a/swarms/structs/__init__.py +++ b/swarms/structs/__init__.py @@ -75,7 +75,6 @@ from swarms.structs.swarming_architectures import ( from swarms.structs.swarms_api import ( SwarmsAPIClient, - SwarmResponse, SwarmRequest, SwarmAuthenticationError, SwarmAPIError, @@ -149,7 +148,6 @@ __all__ = [ "MemeAgentGenerator", "ModelRouter", "SwarmsAPIClient", - "SwarmResponse", "SwarmRequest", "SwarmAuthenticationError", "SwarmAPIError", diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 0a86e676..cab91b56 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -12,6 +12,7 @@ from swarms.structs.base_swarm import BaseSwarm from swarms.utils.file_processing import create_file_in_folder import concurrent.futures from swarms.utils.loguru_logger import initialize_logger +from swarms.structs.conversation import Conversation from swarms.structs.swarm_id_generator import generate_swarm_id logger = initialize_logger(log_folder="concurrent_workflow") @@ -109,7 +110,6 @@ class ConcurrentWorkflow(BaseSwarm): agent_responses: list = [], auto_generate_prompts: bool = False, max_workers: int = None, - user_interface: bool = True, *args, **kwargs, ): @@ -131,11 +131,12 @@ class ConcurrentWorkflow(BaseSwarm): self.agent_responses = agent_responses self.auto_generate_prompts = auto_generate_prompts self.max_workers = max_workers or os.cpu_count() - self.user_interface = user_interface self.tasks = [] # Initialize tasks list self.reliability_check() + self.conversation = Conversation() + def disable_agent_prints(self): for agent in self.agents: agent.no_print = False @@ -249,10 +250,22 @@ class ConcurrentWorkflow(BaseSwarm): f"Running concurrent workflow with {len(self.agents)} agents." ) - def run_agent(agent: Agent, task: str) -> AgentOutputSchema: + self.conversation.add( + "user", + task, + ) + + def run_agent( + agent: Agent, task: str, img: str = None + ) -> AgentOutputSchema: start_time = datetime.now() try: - output = agent.run(task) + output = agent.run(task=task, img=img) + + self.conversation.add( + agent.agent_name, + output, + ) except Exception as e: logger.error( f"Error running agent {agent.agent_name}: {e}" @@ -300,6 +313,9 @@ class ConcurrentWorkflow(BaseSwarm): return self.transform_metadata_schema_to_str( self.output_schema ) + + elif self.return_entire_history: + return self.conversation.return_history_as_string() else: return self.output_schema.model_dump_json(indent=4) diff --git a/swarms/structs/swarm_router.py b/swarms/structs/swarm_router.py index c0f0ab89..d760f9c8 100644 --- a/swarms/structs/swarm_router.py +++ b/swarms/structs/swarm_router.py @@ -1,7 +1,7 @@ import os import uuid from datetime import datetime -from typing import Any, Callable, Dict, List, Literal, Union +from typing import Any, Callable, Dict, List, Literal, Optional, Union from pydantic import BaseModel, Field @@ -10,6 +10,8 @@ from swarms.structs.agent import Agent from swarms.structs.concurrent_workflow import ConcurrentWorkflow from swarms.structs.csv_to_agent import AgentLoader from swarms.structs.groupchat import GroupChat +from swarms.structs.hiearchical_swarm import HierarchicalSwarm +from swarms.structs.majority_voting import MajorityVoting from swarms.structs.mixture_of_agents import MixtureOfAgents from swarms.structs.multi_agent_orchestrator import MultiAgentRouter from swarms.structs.rearrange import AgentRearrange @@ -17,9 +19,6 @@ from swarms.structs.sequential_workflow import SequentialWorkflow from swarms.structs.spreadsheet_swarm import SpreadSheetSwarm from swarms.structs.swarm_matcher import swarm_matcher from swarms.utils.loguru_logger import initialize_logger -from swarms.structs.hiearchical_swarm import HierarchicalSwarm -from swarms.structs.majority_voting import MajorityVoting - logger = initialize_logger(log_folder="swarm_router") @@ -48,13 +47,17 @@ class SwarmLog(BaseModel): A Pydantic model to capture log entries. """ - id: str = Field(default_factory=lambda: str(uuid.uuid4())) - timestamp: datetime = Field(default_factory=datetime.utcnow) - level: str - message: str - swarm_type: SwarmType - task: str = "" - metadata: Dict[str, Any] = Field(default_factory=dict) + id: Optional[str] = Field( + default_factory=lambda: str(uuid.uuid4()) + ) + timestamp: Optional[datetime] = Field( + default_factory=datetime.utcnow + ) + level: Optional[str] = None + message: Optional[str] = None + swarm_type: Optional[SwarmType] = None + task: Optional[str] = "" + metadata: Optional[Dict[str, Any]] = Field(default_factory=dict) documents: List[Document] = [] @@ -148,7 +151,7 @@ class SwarmRouter: speaker_fn: callable = None, load_agents_from_csv: bool = False, csv_file_path: str = None, - return_entire_history: bool = False, + return_entire_history: bool = True, *args, **kwargs, ): @@ -382,7 +385,7 @@ class SwarmRouter: agents=self.agents, max_loops=self.max_loops, auto_save=self.autosave, - return_str_on=self.return_json, + return_str_on=self.return_entire_history, *args, **kwargs, ) @@ -435,18 +438,18 @@ class SwarmRouter: self.swarm = self._create_swarm(task, *args, **kwargs) try: - self._log( - "info", - f"Running task on {self.swarm_type} swarm with task: {task}", - ) + # self._log( + # "info", + # f"Running task on {self.swarm_type} swarm with task: {task}", + # ) result = self.swarm.run(task=task, *args, **kwargs) - self._log( - "success", - f"Task completed successfully on {self.swarm_type} swarm", - task=task, - metadata={"result": str(result)}, - ) + # self._log( + # "success", + # f"Task completed successfully on {self.swarm_type} swarm", + # task=task, + # metadata={"result": str(result)}, + # ) return result except Exception as e: self._log( diff --git a/swarms/structs/swarms_api.py b/swarms/structs/swarms_api.py index e570dff0..3353b119 100644 --- a/swarms/structs/swarms_api.py +++ b/swarms/structs/swarms_api.py @@ -7,45 +7,108 @@ from swarms.utils.loguru_logger import initialize_logger from pydantic import BaseModel, Field from tenacity import retry, stop_after_attempt, wait_exponential from swarms.structs.swarm_router import SwarmType +from typing import Any logger = initialize_logger(log_folder="swarms_api") class AgentInput(BaseModel): - agent_name: Optional[str] = Field(None, description="Agent Name", max_length=100) - description: Optional[str] = Field(None, description="Description", max_length=500) + agent_name: Optional[str] = Field( + None, + description="The name of the agent, limited to 100 characters.", + max_length=100, + ) + description: Optional[str] = Field( + None, + description="A detailed description of the agent's purpose and capabilities, up to 500 characters.", + max_length=500, + ) system_prompt: Optional[str] = Field( - None, description="System Prompt", max_length=500 + None, + description="The initial prompt or instructions given to the agent, up to 500 characters.", + max_length=500, ) model_name: Optional[str] = Field( - "gpt-4o", description="Model Name", max_length=500 + "gpt-4o", + description="The name of the model used by the agent, limited to 500 characters.", + max_length=500, ) auto_generate_prompt: Optional[bool] = Field( - False, description="Auto Generate Prompt" + False, + description="Indicates whether the agent should automatically generate prompts.", + ) + max_tokens: Optional[int] = Field( + 8192, + description="The maximum number of tokens the agent can use in its responses.", + ) + temperature: Optional[float] = Field( + 0.5, + description="Controls the randomness of the agent's responses; higher values result in more random outputs.", + ) + role: Optional[str] = Field( + "worker", + description="The role assigned to the agent, such as 'worker' or 'manager'.", + ) + max_loops: Optional[int] = Field( + 1, + description="The maximum number of iterations the agent is allowed to perform.", ) - max_tokens: Optional[int] = Field(None, description="Max Tokens") - temperature: Optional[float] = Field(0.5, description="Temperature") - role: Optional[str] = Field("worker", description="Role") - max_loops: Optional[int] = Field(1, description="Max Loops") class SwarmRequest(BaseModel): - name: Optional[str] = Field(None, description="Swarm Name", max_length=100) - description: Optional[str] = Field(None, description="Description", max_length=500) - agents: Optional[List[AgentInput]] = Field(None, description="Agents") - max_loops: Optional[int] = Field(None, description="Max Loops") - swarm_type: Optional[SwarmType] = Field(None, description="Swarm Type") - rearrange_flow: Optional[str] = Field(None, description="Flow") - task: Optional[str] = Field(None, description="Task") - img: Optional[str] = Field(None, description="Img") - return_history: Optional[bool] = Field(True, description="Return History") - rules: Optional[str] = Field(None, description="Rules") - -class SwarmResponse(BaseModel): - swarm_id: str - status: str - result: Optional[str] - error: Optional[str] + name: Optional[str] = Field( + "swarms-01", + description="The name of the swarm, limited to 100 characters.", + max_length=100, + ) + description: Optional[str] = Field( + None, + description="A comprehensive description of the swarm's objectives and scope, up to 500 characters.", + max_length=500, + ) + agents: Optional[List[AgentInput]] = Field( + None, + description="A list of agents that are part of the swarm.", + ) + max_loops: Optional[int] = Field( + 1, + description="The maximum number of iterations the swarm can execute.", + ) + swarm_type: Optional[SwarmType] = Field( + None, + description="The type of swarm, defining its operational structure and behavior.", + ) + rearrange_flow: Optional[str] = Field( + None, + description="The flow or sequence in which agents are rearranged during the swarm's operation.", + ) + task: Optional[str] = Field( + None, + description="The specific task or objective the swarm is designed to accomplish.", + ) + img: Optional[str] = Field( + None, + description="A URL to an image associated with the swarm, if applicable.", + ) + return_history: Optional[bool] = Field( + True, + description="Determines whether the full history of the swarm's operations should be returned.", + ) + rules: Optional[str] = Field( + None, + description="Any specific rules or guidelines that the swarm should follow.", + ) + output_type: Optional[str] = Field( + "str", + description="The format in which the swarm's output should be returned, such as 'str', 'json', or 'dict'.", + ) + + +# class SwarmResponse(BaseModel): +# swarm_id: str +# status: str +# result: Optional[str] +# error: Optional[str] class HealthResponse(BaseModel): @@ -94,6 +157,9 @@ class SwarmsAPIClient: self.api_key = api_key or os.getenv("SWARMS_API_KEY") if not self.api_key: + logger.error( + "API key not provided and SWARMS_API_KEY env var not found" + ) raise SwarmAuthenticationError( "API key not provided and SWARMS_API_KEY env var not found" ) @@ -110,6 +176,10 @@ class SwarmsAPIClient: "Content-Type": "application/json", }, ) + logger.info( + "SwarmsAPIClient initialized with base_url: {}", + self.base_url, + ) @retry( stop=stop_after_attempt(3), @@ -125,13 +195,17 @@ class SwarmsAPIClient: Returns: HealthResponse object or formatted output """ + logger.info("Performing health check") try: response = self.client.get(f"{self.base_url}/health") response.raise_for_status() health_response = HealthResponse(**response.json()) - return self.format_output(health_response, self.format_type) + logger.info("Health check successful") + return self.format_output( + health_response, self.format_type + ) except httpx.HTTPError as e: - logger.error(f"Health check failed: {str(e)}") + logger.error("Health check failed: {}", str(e)) raise SwarmAPIError(f"Health check failed: {str(e)}") @retry( @@ -139,9 +213,7 @@ class SwarmsAPIClient: wait=wait_exponential(multiplier=1, min=4, max=10), reraise=True, ) - async def run( - self, swarm_request: SwarmRequest - ) -> SwarmResponse: + async def arun(self, swarm_request: SwarmRequest) -> Any: """Create and run a new swarm. Args: @@ -151,26 +223,78 @@ class SwarmsAPIClient: Returns: SwarmResponse object or formatted output """ + logger.info( + "Creating and running a new swarm with request: {}", + swarm_request, + ) try: response = self.client.post( f"{self.base_url}/v1/swarm/completions", json=swarm_request.model_dump(), ) response.raise_for_status() - swarm_response = SwarmResponse(**response.json()) - return self.format_output(swarm_response, self.format_type) + logger.info("Swarm creation and run successful") + return self.format_output( + response.json(), self.format_type + ) except httpx.HTTPStatusError as e: if e.response.status_code == 401: + logger.error("Invalid API key") raise SwarmAuthenticationError("Invalid API key") elif e.response.status_code == 422: + logger.error("Invalid request parameters") raise SwarmValidationError( "Invalid request parameters" ) - logger.error(f"Swarm creation failed: {str(e)}") + logger.error("Swarm creation failed: {}", str(e)) raise SwarmAPIError(f"Swarm creation failed: {str(e)}") except Exception as e: logger.error( - f"Unexpected error during swarm creation: {str(e)}" + "Unexpected error during swarm creation: {}", str(e) + ) + raise + + @retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=4, max=10), + reraise=True, + ) + def run(self, swarm_request: SwarmRequest) -> Any: + """Create and run a new swarm. + + Args: + swarm_request: SwarmRequest object containing the swarm configuration + output_format: Desired output format ('pydantic', 'json', 'dict') + + Returns: + SwarmResponse object or formatted output + """ + logger.info( + "Creating and running a new swarm with request: {}", + swarm_request, + ) + try: + response = self.client.post( + f"{self.base_url}/v1/swarm/completions", + json=swarm_request.model_dump(), + ) + print(response.json()) + logger.info("Swarm creation and run successful") + return response.json() + except httpx.HTTPStatusError as e: + if e.response.status_code == 401: + logger.error("Invalid API key") + raise SwarmAuthenticationError("Invalid API key") + elif e.response.status_code == 422: + logger.error("Invalid request parameters") + raise SwarmValidationError( + "Invalid request parameters" + ) + logger.error("Swarm creation failed: {}", str(e)) + raise SwarmAPIError(f"Swarm creation failed: {str(e)}") + except Exception as e: + logger.error( + "Unexpected error during swarm creation: {}", str(e) ) raise @@ -181,7 +305,7 @@ class SwarmsAPIClient: ) async def run_batch( self, swarm_requests: List[SwarmRequest] - ) -> List[SwarmResponse]: + ) -> List[Any]: """Create and run multiple swarms in batch. Args: @@ -191,36 +315,54 @@ class SwarmsAPIClient: Returns: List of SwarmResponse objects or formatted outputs """ + logger.info( + "Creating and running batch swarms with requests: {}", + swarm_requests, + ) try: response = self.client.post( f"{self.base_url}/v1/swarm/batch/completions", json=[req.model_dump() for req in swarm_requests], ) response.raise_for_status() - swarm_responses = [SwarmResponse(**resp) for resp in response.json()] - return [self.format_output(resp, self.format_type) for resp in swarm_responses] + logger.info("Batch swarm creation and run successful") + return [ + self.format_output(resp, self.format_type) + for resp in response.json() + ] except httpx.HTTPStatusError as e: if e.response.status_code == 401: + logger.error("Invalid API key") raise SwarmAuthenticationError("Invalid API key") elif e.response.status_code == 422: + logger.error("Invalid request parameters") raise SwarmValidationError( "Invalid request parameters" ) - logger.error(f"Batch swarm creation failed: {str(e)}") + logger.error("Batch swarm creation failed: {}", str(e)) raise SwarmAPIError( f"Batch swarm creation failed: {str(e)}" ) except Exception as e: logger.error( - f"Unexpected error during batch swarm creation: {str(e)}" + "Unexpected error during batch swarm creation: {}", + str(e), ) raise - + def get_logs(self): - response = self.client.get(f"{self.base_url}/v1/swarm/logs") - response.raise_for_status() - logs = response.json() - return self.format_output(logs, self.format_type) + logger.info("Retrieving logs") + try: + response = self.client.get( + f"{self.base_url}/v1/swarm/logs" + ) + response.raise_for_status() + logs = response.json() + logger.info("Logs retrieved successfully") + return self.format_output(logs, self.format_type) + except httpx.HTTPError as e: + logger.error("Failed to retrieve logs: {}", str(e)) + raise SwarmAPIError(f"Failed to retrieve logs: {str(e)}") def format_output(self, data, output_format: str): """Format the output based on the specified format. @@ -232,18 +374,32 @@ class SwarmsAPIClient: Returns: Formatted data """ + logger.info( + "Formatting output with format: {}", output_format + ) if output_format == "json": - return data.model_dump_json(indent=4) if isinstance(data, BaseModel) else json.dumps(data) + return ( + data.model_dump_json(indent=4) + if isinstance(data, BaseModel) + else json.dumps(data) + ) elif output_format == "dict": - return data.model_dump() if isinstance(data, BaseModel) else data + return ( + data.model_dump() + if isinstance(data, BaseModel) + else data + ) return data # Default to returning the pydantic model def close(self): """Close the HTTP client.""" + logger.info("Closing HTTP client") self.client.close() async def __aenter__(self): + logger.info("Entering async context") return self async def __aexit__(self, exc_type, exc_val, exc_tb): + logger.info("Exiting async context") self.close() diff --git a/swarms_api_client.py b/swarms_api_client.py index 9f92aadf..0b241777 100644 --- a/swarms_api_client.py +++ b/swarms_api_client.py @@ -1,3 +1,4 @@ +import json from swarms.structs.swarms_api import ( SwarmsAPIClient, SwarmRequest, @@ -10,16 +11,25 @@ agents = [ agent_name="Medical Researcher", description="Conducts medical research and analysis", system_prompt="You are a medical researcher specializing in clinical studies.", + max_loops=1, + model_name="gpt-4o", + role="worker", ), AgentInput( agent_name="Medical Diagnostician", description="Provides medical diagnoses based on symptoms and test results", system_prompt="You are a medical diagnostician with expertise in identifying diseases.", + max_loops=1, + model_name="gpt-4o", + role="worker", ), AgentInput( agent_name="Pharmaceutical Expert", description="Advises on pharmaceutical treatments and drug interactions", system_prompt="You are a pharmaceutical expert knowledgeable about medications and their effects.", + max_loops=1, + model_name="gpt-4o", + role="worker", ), ] @@ -29,10 +39,15 @@ swarm_request = SwarmRequest( agents=agents, max_loops=1, swarm_type="ConcurrentWorkflow", + output_type="str", + return_history=True, + task="What is the cause of the common cold?", ) -client = SwarmsAPIClient(api_key=os.getenv("SWARMS_API_KEY")) +client = SwarmsAPIClient( + api_key=os.getenv("SWARMS_API_KEY"), format_type="json" +) -response = client.create_swarm(swarm_request) +response = client.run(swarm_request) -print(response) +print(json.dumps(response, indent=4)) diff --git a/swarms_api_example.py b/swarms_api_example.py index 630ff377..ce9cab29 100644 --- a/swarms_api_example.py +++ b/swarms_api_example.py @@ -49,6 +49,8 @@ def run_single_swarm(): "max_loops": 1, "swarm_type": "ConcurrentWorkflow", "task": "Analyze current market trends in tech sector", + "output_type": "str", + "return_history": True, } response = requests.post( @@ -63,6 +65,19 @@ def run_single_swarm(): return json.dumps(output, indent=4) +def get_logs(): + response = requests.get( + f"{BASE_URL}/v1/swarm/logs", headers=headers + ) + output = response.json() + return json.dumps(output, indent=4) + + if __name__ == "__main__": result = run_single_swarm() + print("Swarm Result:") print(result) + + # print("Swarm Logs:") + # logs = get_logs() + # print(logs)