@ -1,8 +1,7 @@
from swarms import Agent
from swarms import Agent
from swarms . tools . mcp_integration import MCPServerSseParams
from swarms . tools . mcp_integration import MCPServerSseParams
from swarms . prompts . finance_agent_sys_prompt import FINANCIAL_AGENT_SYS_PROMPT
# Configure multiple MCP servers
# Configure servers
math_server = MCPServerSseParams (
math_server = MCPServerSseParams (
url = " http://0.0.0.0:6274 " ,
url = " http://0.0.0.0:6274 " ,
headers = { " Content-Type " : " application/json " } ,
headers = { " Content-Type " : " application/json " } ,
@ -17,34 +16,11 @@ calc_server = MCPServerSseParams(
sse_read_timeout = 300.0
sse_read_timeout = 300.0
)
)
# Create specialized agents with different server access
# Create agent with access to both servers
math_agent = Agent (
calculator = Agent (
agent_name = " Math-Specialist " ,
agent_name = " Calculator " ,
agent_description = " Advanced mathematics expert " ,
agent_description = " Math and calculation expert " ,
system_prompt = " You are a mathematics expert. Use available math operations. " ,
system_prompt = " You are a math expert. Use the available calculation tools to solve problems. " ,
max_loops = 1 ,
mcp_servers = [ math_server ] ,
interactive = True ,
streaming_on = True ,
model_name = " gpt-4o-mini "
)
finance_agent = Agent (
agent_name = " Finance-Specialist " ,
agent_description = " Financial calculations expert " ,
system_prompt = FINANCIAL_AGENT_SYS_PROMPT ,
max_loops = 1 ,
mcp_servers = [ calc_server ] ,
interactive = True ,
streaming_on = True ,
model_name = " gpt-4o-mini "
)
# Multi-server agent with access to all operations
super_agent = Agent (
agent_name = " Super-Calculator " ,
agent_description = " Multi-capable calculation expert " ,
system_prompt = " You have access to multiple calculation servers. Use them appropriately. " ,
max_loops = 1 ,
max_loops = 1 ,
mcp_servers = [ math_server , calc_server ] ,
mcp_servers = [ math_server , calc_server ] ,
interactive = True ,
interactive = True ,
@ -52,62 +28,28 @@ super_agent = Agent(
model_name = " gpt-4o-mini "
model_name = " gpt-4o-mini "
)
)
def format_agent_output ( agent_name : str , output : str ) - > str :
return f """
╭ ─ ─ ─ { agent_name } Response ─ ─ ─ ╮
{ output }
╰ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ╯
"""
def main ( ) :
def main ( ) :
print ( " \n Multi- Agent MCP Test Environmen t" )
print ( " \n Multi-Server Calculator Test " )
print ( " Type ' exit ' to quit \n " )
print ( " Type ' exit ' to quit \n " )
print ( " Available commands: " )
print ( " Example commands: " )
print ( " - math: <calculation> (e.g., ' math: add 5 and 3 ' ) " )
print ( " - add 5 and 3 " )
print ( " - finance: <calculation> (e.g., ' finance: calculate compound interest on 1000 at 5 % f or 3 years ' ) " )
print ( " - multiply 4 by 7 \n " )
print ( " - Any other calculation will use the super agent \n " )
while True :
while True :
try :
try :
user_input = input ( " \n Enter your calculation request : " )
user_input = input ( " \n Enter calculation: " )
if user_input . lower ( ) == ' exit ' :
if user_input . lower ( ) == ' exit ' :
break
break
# Route request to appropriate agent based on keywords
response = calculator . run ( user_input )
if ' finance ' in user_input . lower ( ) :
print ( f " \n Calculation result: { response } " )
response = finance_agent . run ( user_input )
print ( " \n Finance Agent Response: " )
print ( " - " * 50 )
print ( f " Response: { response } " )
print ( " - " * 50 )
elif ' math ' in user_input . lower ( ) :
response = math_agent . run ( user_input )
print ( " \n Math Agent Response: " )
print ( " - " * 50 )
print ( f " Response: { response } " )
print ( " - " * 50 )
else :
try :
response = super_agent . run ( user_input )
print ( " \n Super Agent Response: " )
print ( " - " * 50 )
if isinstance ( response , dict ) :
result = response . get ( ' result ' , response )
print ( f " Result: { result } " )
elif hasattr ( response , ' content ' ) :
print ( f " Result: { response . content } " )
else :
print ( f " Result: { response } " )
print ( " - " * 50 )
except Exception as e :
print ( f " Error processing calculation: { str ( e ) } " )
except KeyboardInterrupt :
except KeyboardInterrupt :
print ( " \n Exiting gracefully... " )
print ( " \n Exiting gracefully... " )
break
break
except Exception as e :
except Exception as e :
print ( f " Error processing request : { e } " )
print ( f " Error: { e } " )
if __name__ == " __main__ " :
if __name__ == " __main__ " :
main ( )
main ( )