- Created `mock_math_server.py` to simulate MCP server behavior - Developed `mock_multi_agent.py` to orchestrate agent–server interaction - Added `mock_integration_test.py` to validate end-to-end data flow - Ensured user input → server request → agent processing → user output pipelinepull/819/head
parent
8d079f6141
commit
a8ad884d27
@ -0,0 +1,38 @@
|
||||
|
||||
import pytest
|
||||
import asyncio
|
||||
from mock_multi_agent import MultiAgentMathSystem
|
||||
import logging
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multi_agent_math():
|
||||
"""Test the multi-agent math system with various operations"""
|
||||
system = MultiAgentMathSystem()
|
||||
|
||||
test_cases = [
|
||||
"Add 5 and 3",
|
||||
"Multiply 4 by 6",
|
||||
"Divide 10 by 2"
|
||||
]
|
||||
|
||||
for task in test_cases:
|
||||
print(f"\nTesting: {task}")
|
||||
results = await system.process_task(task)
|
||||
|
||||
for result in results:
|
||||
assert "error" not in result, f"Agent {result['agent']} encountered error"
|
||||
assert result["response"] is not None
|
||||
print(f"{result['agent']} response: {result['response']}")
|
||||
|
||||
def test_interactive_system():
|
||||
"""Test the interactive system manually"""
|
||||
try:
|
||||
system = MultiAgentMathSystem()
|
||||
system.run_interactive()
|
||||
except Exception as e:
|
||||
pytest.fail(f"Interactive test failed: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
@ -0,0 +1,31 @@
|
||||
|
||||
from fastmcp import FastMCP
|
||||
from typing import Dict, Any
|
||||
import time
|
||||
|
||||
# Initialize MCP server
|
||||
mcp = FastMCP("Math-Mock-Server")
|
||||
|
||||
@mcp.tool()
|
||||
def add(a: int, b: int) -> int:
|
||||
"""Add two numbers together"""
|
||||
time.sleep(0.1) # Simulate processing time
|
||||
return a + b
|
||||
|
||||
@mcp.tool()
|
||||
def multiply(a: int, b: int) -> int:
|
||||
"""Multiply two numbers together"""
|
||||
time.sleep(0.1) # Simulate processing time
|
||||
return a * b
|
||||
|
||||
@mcp.tool()
|
||||
def divide(a: int, b: int) -> float:
|
||||
"""Divide two numbers"""
|
||||
if b == 0:
|
||||
raise ValueError("Cannot divide by zero")
|
||||
time.sleep(0.1) # Simulate processing time
|
||||
return a / b
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting Mock Math Server on port 8000...")
|
||||
mcp.run(transport="sse", port=8000, host="0.0.0.0")
|
@ -0,0 +1,76 @@
|
||||
|
||||
import asyncio
|
||||
from swarms import Agent
|
||||
from swarms.tools.mcp_integration import MCPServerSseParams
|
||||
import logging
|
||||
|
||||
class MathAgent:
|
||||
def __init__(self, name: str, server_url: str):
|
||||
self.server = MCPServerSseParams(
|
||||
url=server_url,
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
|
||||
self.agent = Agent(
|
||||
agent_name=name,
|
||||
agent_description="Math processing agent",
|
||||
system_prompt=f"You are {name}, a math processing agent. Use the provided tools to solve math problems.",
|
||||
max_loops=1,
|
||||
mcp_servers=[self.server],
|
||||
streaming_on=True
|
||||
)
|
||||
|
||||
async def process(self, task: str):
|
||||
try:
|
||||
response = await self.agent.arun(task)
|
||||
return {
|
||||
"agent": self.agent.agent_name,
|
||||
"task": task,
|
||||
"response": response
|
||||
}
|
||||
except Exception as e:
|
||||
logging.error(f"Error in {self.agent.agent_name}: {str(e)}")
|
||||
return {
|
||||
"agent": self.agent.agent_name,
|
||||
"task": task,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
class MultiAgentMathSystem:
|
||||
def __init__(self):
|
||||
base_url = "http://0.0.0.0:8000"
|
||||
self.agents = [
|
||||
MathAgent("Calculator-1", base_url),
|
||||
MathAgent("Calculator-2", base_url)
|
||||
]
|
||||
|
||||
async def process_task(self, task: str):
|
||||
tasks = [agent.process(task) for agent in self.agents]
|
||||
results = await asyncio.gather(*tasks)
|
||||
return results
|
||||
|
||||
def run_interactive(self):
|
||||
print("\nMulti-Agent Math System")
|
||||
print("Enter 'exit' to quit")
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("\nEnter a math problem: ")
|
||||
if user_input.lower() == 'exit':
|
||||
break
|
||||
|
||||
results = asyncio.run(self.process_task(user_input))
|
||||
|
||||
print("\nResults:")
|
||||
for result in results:
|
||||
if "error" in result:
|
||||
print(f"\n{result['agent']} encountered an error: {result['error']}")
|
||||
else:
|
||||
print(f"\n{result['agent']} response: {result['response']}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"System error: {str(e)}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
system = MultiAgentMathSystem()
|
||||
system.run_interactive()
|
Loading…
Reference in new issue