You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
swarms/examples/mcp_example/mock_multi_agent.py

76 lines
2.4 KiB

import asyncio
from swarms import Agent
from swarms.tools.mcp_integration import MCPServerSseParams
import logging
class MathAgent:
def __init__(self, name: str, server_url: str):
self.server = MCPServerSseParams(
url=server_url,
headers={"Content-Type": "application/json"}
)
self.agent = Agent(
agent_name=name,
agent_description="Math processing agent",
system_prompt=f"You are {name}, a math processing agent. Use the provided tools to solve math problems.",
max_loops=1,
mcp_servers=[self.server],
streaming_on=False,
model_name="gpt-4o-mini",
temperature=0.1,
max_tokens=1000
)
async def process(self, task: str):
try:
response = await self.agent.arun(task)
return {
"agent": self.agent.agent_name,
"task": task,
"response": str(response)
}
except Exception as e:
logging.error(f"Error in {self.agent.agent_name}: {str(e)}")
return {
"agent": self.agent.agent_name,
"task": task,
"error": str(e)
}
class MultiAgentMathSystem:
def __init__(self):
base_url = "http://0.0.0.0:8000"
self.calculator = MathAgent("Calculator", base_url)
async def process_task(self, task: str):
result = await self.calculator.process(task)
return [result] # Keep list format for compatibility
def run_interactive(self):
print("\nMulti-Agent Math System")
print("Enter 'exit' to quit")
while True:
try:
user_input = input("\nEnter a math problem: ")
if user_input.lower() == 'exit':
break
results = asyncio.run(self.process_task(user_input))
print("\nResult:")
result = results[0] # We now only have one result
if "error" in result:
print(f"\nCalculator encountered an error: {result['error']}")
else:
print(f"\nCalculation: {result['response']}")
except Exception as e:
print(f"System error: {str(e)}")
if __name__ == "__main__":
system = MultiAgentMathSystem()
system.run_interactive()