diff --git a/playground/demos/plant_biologist_swarm/using_concurrent_workflow.py b/playground/demos/plant_biologist_swarm/using_concurrent_workflow.py new file mode 100644 index 00000000..87e6df54 --- /dev/null +++ b/playground/demos/plant_biologist_swarm/using_concurrent_workflow.py @@ -0,0 +1,111 @@ +import os + +from dotenv import load_dotenv +from playground.demos.plant_biologist_swarm.prompts import ( + diagnoser_agent, + disease_detector_agent, + growth_predictor_agent, + harvester_agent, + treatment_recommender_agent, +) + +from swarms import Agent, GPT4VisionAPI, ConcurrentWorkflow + + +# Load the OpenAI API key from the .env file +load_dotenv() + +# Initialize the OpenAI API key +api_key = os.environ.get("OPENAI_API_KEY") + + +# llm = llm, +llm = GPT4VisionAPI( + max_tokens=4000, +) + +# Initialize Diagnoser Agent +diagnoser_agent = Agent( + agent_name="Diagnoser Agent", + system_prompt=diagnoser_agent(), + llm=llm, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + # saved_state_path="diagnoser.json", + multi_modal=True, + autosave=True, +) + +# Initialize Harvester Agent +harvester_agent = Agent( + agent_name="Harvester Agent", + system_prompt=harvester_agent(), + llm=llm, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + # saved_state_path="harvester.json", + multi_modal=True, + autosave=True, +) + +# Initialize Growth Predictor Agent +growth_predictor_agent = Agent( + agent_name="Growth Predictor Agent", + system_prompt=growth_predictor_agent(), + llm=llm, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + # saved_state_path="growth_predictor.json", + multi_modal=True, + autosave=True, +) + +# Initialize Treatment Recommender Agent +treatment_recommender_agent = Agent( + agent_name="Treatment Recommender Agent", + system_prompt=treatment_recommender_agent(), + llm=llm, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + # saved_state_path="treatment_recommender.json", + multi_modal=True, + autosave=True, +) + +# Initialize Disease Detector Agent +disease_detector_agent = Agent( + agent_name="Disease Detector Agent", + system_prompt=disease_detector_agent(), + llm=llm, + max_loops=1, + dashboard=False, + streaming_on=True, + verbose=True, + # saved_state_path="disease_detector.json", + multi_modal=True, + autosave=True, +) +agents = [ + diagnoser_agent, + disease_detector_agent, + treatment_recommender_agent, + growth_predictor_agent, + harvester_agent, +] + + +# Create the Concurrent workflow +workflow = ConcurrentWorkflow( + agents=agents, + max_loops=1, +) + +workflow.run("Diagnose the plant disease.") diff --git a/playground/structs/new_concurrent_workflow.py b/playground/structs/new_concurrent_workflow.py new file mode 100644 index 00000000..e69de29b diff --git a/swarms/structs/concurrent_workflow.py b/swarms/structs/concurrent_workflow.py index 231170ca..61936f6d 100644 --- a/swarms/structs/concurrent_workflow.py +++ b/swarms/structs/concurrent_workflow.py @@ -89,35 +89,70 @@ class ConcurrentWorkflow(BaseWorkflow): """ loop = 0 while loop < self.max_loops: - with concurrent.futures.ThreadPoolExecutor( - max_workers=self.max_workers - ) as executor: - futures = { - executor.submit(task.execute): task - for task in self.task_pool - } - results = [] - - for future in concurrent.futures.as_completed(futures): - task = futures[future] - try: - result = future.result() - if self.print_results: - logger.info(f"Task {task}: {result}") - if self.return_results: - results.append(result) - except Exception as e: - logger.error( - f"Task {task} generated an exception: {e}" - ) - - loop += 1 - if self.stopping_condition and self.stopping_condition( - results - ): + + if self.tasks is not None: + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = { + executor.submit(task.execute): task + for task in self.task_pool + } + results = [] + + for future in concurrent.futures.as_completed(futures): + task = futures[future] + try: + result = future.result() + if self.print_results: + logger.info(f"Task {task}: {result}") + if self.return_results: + results.append(result) + except Exception as e: + logger.error( + f"Task {task} generated an exception: {e}" + ) + + loop += 1 + if self.stopping_condition and self.stopping_condition( + results + ): + break + + elif self.agents is not None: + with concurrent.futures.ThreadPoolExecutor( + max_workers=self.max_workers + ) as executor: + futures = { + executor.submit(agent.run): agent + for agent in self.agents + } + results = [] + + for future in concurrent.futures.as_completed(futures): + agent = futures[future] + try: + result = future.result() + if self.print_results: + logger.info(f"Agent {agent}: {result}") + if self.return_results: + results.append(result) + except Exception as e: + logger.error( + f"Agent {agent} generated an exception: {e}" + ) + + loop += 1 + if self.stopping_condition and self.stopping_condition( + results + ): + break + + else: + logger.warning("No tasks or agents found in the workflow.") break - return results if self.return_results else None + return results if self.return_results else None def list_tasks(self): """Prints a list of the tasks in the workflow."""