[AGENT][LiteLLM FIX] [API FIX]

pull/703/head
Kye Gomez 4 months ago committed by mike dupont
parent bb7e18f654
commit 34ae5a4faf

@ -17,6 +17,10 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from typing import Dict, Optional, Tuple
from uuid import UUID
BASE_URL = "http://0.0.0.0:8000/v1"
# Configuration # Configuration
@dataclass @dataclass
@ -132,6 +136,191 @@ class TestRunner:
logger.info(f"\nRunning test: {test_name}") logger.info(f"\nRunning test: {test_name}")
start_time = time.time() start_time = time.time()
<<<<<<< HEAD
=======
def create_test_user(session: TestSession) -> Tuple[bool, str]:
"""Create a test user and store credentials in session."""
logger.info("Creating test user")
try:
response = requests.post(
f"{BASE_URL}/users",
json={"username": f"test_user_{int(time.time())}"},
)
if response.status_code == 200:
data = response.json()
session.user_id = data["user_id"]
session.api_key = data["api_key"]
logger.success(f"Created user with ID: {session.user_id}")
return True, "Success"
else:
logger.error(f"Failed to create user: {response.text}")
return False, response.text
except Exception as e:
logger.exception("Exception during user creation")
return False, str(e)
def create_additional_api_key(
session: TestSession,
) -> Tuple[bool, str]:
"""Test creating an additional API key."""
logger.info("Creating additional API key")
try:
response = requests.post(
f"{BASE_URL}/users/{session.user_id}/api-keys",
headers=session.headers,
json={"name": "Test Key"},
)
if response.status_code == 200:
logger.success("Created additional API key")
return True, response.json()["key"]
else:
logger.error(f"Failed to create API key: {response.text}")
return False, response.text
except Exception as e:
logger.exception("Exception during API key creation")
return False, str(e)
def test_create_agent(
session: TestSession,
) -> Tuple[bool, Optional[UUID]]:
"""Test creating a new agent."""
logger.info("Testing agent creation")
payload = {
"agent_name": f"Test Agent {int(time.time())}",
"system_prompt": "You are a helpful assistant",
"model_name": "gpt-4",
"description": "Test agent",
"tags": ["test", "automated"],
}
try:
response = requests.post(
f"{BASE_URL}/agent", headers=session.headers, json=payload
)
if response.status_code == 200:
agent_id = response.json()["agent_id"]
session.test_agents.append(agent_id)
logger.success(f"Created agent with ID: {agent_id}")
return True, agent_id
else:
logger.error(f"Failed to create agent: {response.text}")
return False, None
except Exception:
logger.exception("Exception during agent creation")
return False, None
def test_list_user_agents(session: TestSession) -> bool:
"""Test listing user's agents."""
logger.info("Testing user agent listing")
try:
response = requests.get(
f"{BASE_URL}/users/me/agents", headers=session.headers
)
if response.status_code == 200:
agents = response.json()
logger.success(f"Found {len(agents)} user agents")
return True
else:
logger.error(
f"Failed to list user agents: {response.text}"
)
return False
except Exception:
logger.exception("Exception during agent listing")
return False
def test_agent_operations(
session: TestSession, agent_id: UUID
) -> bool:
"""Test various operations on an agent."""
logger.info(f"Testing operations for agent {agent_id}")
# Test update
try:
update_response = requests.patch(
f"{BASE_URL}/agent/{agent_id}",
headers=session.headers,
json={
"description": "Updated description",
"tags": ["test", "updated"],
},
)
if update_response.status_code != 200:
logger.error(
f"Failed to update agent: {update_response.text}"
)
return False
# Test metrics
metrics_response = requests.get(
f"{BASE_URL}/agent/{agent_id}/metrics",
headers=session.headers,
)
if metrics_response.status_code != 200:
logger.error(
f"Failed to get agent metrics: {metrics_response.text}"
)
return False
logger.success("Successfully performed agent operations")
return True
except Exception:
logger.exception("Exception during agent operations")
return False
def test_completion(session: TestSession, agent_id: UUID) -> bool:
"""Test running a completion."""
logger.info("Testing completion")
payload = {
"prompt": "What is the weather like today?",
"agent_id": agent_id,
"max_tokens": 100,
}
try:
response = requests.post(
f"{BASE_URL}/agent/completions",
headers=session.headers,
json=payload,
)
if response.status_code == 200:
completion_data = response.json()
print(completion_data)
logger.success(
f"Got completion, used {completion_data['token_usage']['total_tokens']} tokens"
)
return True
else:
logger.error(f"Failed to get completion: {response.text}")
return False
except Exception:
logger.exception("Exception during completion")
return False
def cleanup_test_resources(session: TestSession):
"""Clean up all test resources."""
logger.info("Cleaning up test resources")
# Delete test agents
for agent_id in session.test_agents:
>>>>>>> 68728698 ([AGENT][LiteLLM FIX] [API FIX])
try: try:
test_func() test_func()
self.results["passed"] += 1 self.results["passed"] += 1
@ -281,6 +470,7 @@ Total Time: {self.results['total_time']:.2f}s
if __name__ == "__main__": if __name__ == "__main__":
<<<<<<< HEAD
try: try:
runner = TestRunner() runner = TestRunner()
runner.run_all_tests() runner.run_all_tests()
@ -289,3 +479,7 @@ if __name__ == "__main__":
except Exception as e: except Exception as e:
logger.error(f"Test suite failed: {str(e)}") logger.error(f"Test suite failed: {str(e)}")
logger.exception(e) logger.exception(e)
=======
success = run_test_workflow()
print(success)
>>>>>>> 68728698 ([AGENT][LiteLLM FIX] [API FIX])

@ -9,20 +9,21 @@ agent = Agent(
agent_description="Personal finance advisor agent", agent_description="Personal finance advisor agent",
system_prompt=FINANCIAL_AGENT_SYS_PROMPT system_prompt=FINANCIAL_AGENT_SYS_PROMPT
+ "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI", + "Output the <DONE> token when you're done creating a portfolio of etfs, index, funds, and more for AI",
model_name="gpt-4o", # Use any model from litellm max_loops=1,
max_loops="auto", model_name="gpt-4o",
dynamic_temperature_enabled=True, dynamic_temperature_enabled=True,
user_name="Kye", user_name="Kye",
retry_attempts=3, retry_attempts=3,
streaming_on=True, # streaming_on=True,
context_length=16000, context_length=8192,
return_step_meta=False, return_step_meta=False,
output_type="str", # "json", "dict", "csv" OR "string" "yaml" and output_type="str", # "json", "dict", "csv" OR "string" "yaml" and
auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task auto_generate_prompt=False, # Auto generate prompt for the agent based on name, description, and system prompt, task
max_tokens=16000, # max output tokens max_tokens=4000, # max output tokens
interactive=True, # interactive=True,
stopping_token="<DONE>", stopping_token="<DONE>",
execute_tool=True, saved_state_path="agent_00.json",
interactive=False,
) )
agent.run( agent.run(

@ -1,333 +0,0 @@
import os
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel, Field
from pydantic.v1 import validator
from loguru import logger
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
)
from swarm_models import OpenAIFunctionCaller, OpenAIChat
from swarms.structs.agent import Agent
from swarms.structs.swarm_router import SwarmRouter
from swarms.structs.agents_available import showcase_available_agents
BOSS_SYSTEM_PROMPT = """
Manage a swarm of worker agents to efficiently serve the user by deciding whether to create new agents or delegate tasks. Ensure operations are efficient and effective.
### Instructions:
1. **Task Assignment**:
- Analyze available worker agents when a task is presented.
- Delegate tasks to existing agents with clear, direct, and actionable instructions if an appropriate agent is available.
- If no suitable agent exists, create a new agent with a fitting system prompt to handle the task.
2. **Agent Creation**:
- Name agents according to the task they are intended to perform (e.g., "Twitter Marketing Agent").
- Provide each new agent with a concise and clear system prompt that includes its role, objectives, and any tools it can utilize.
3. **Efficiency**:
- Minimize redundancy and maximize task completion speed.
- Avoid unnecessary agent creation if an existing agent can fulfill the task.
4. **Communication**:
- Be explicit in task delegation instructions to avoid ambiguity and ensure effective task execution.
- Require agents to report back on task completion or encountered issues.
5. **Reasoning and Decisions**:
- Offer brief reasoning when selecting or creating agents to maintain transparency.
- Avoid using an agent if unnecessary, with a clear explanation if no agents are suitable for a task.
# Output Format
Present your plan in clear, bullet-point format or short concise paragraphs, outlining task assignment, agent creation, efficiency strategies, and communication protocols.
# Notes
- Preserve transparency by always providing reasoning for task-agent assignments and creation.
- Ensure instructions to agents are unambiguous to minimize error.
"""
class AgentConfig(BaseModel):
"""Configuration for an individual agent in a swarm"""
name: str = Field(
description="The name of the agent", example="Research-Agent"
)
description: str = Field(
description="A description of the agent's purpose and capabilities",
example="Agent responsible for researching and gathering information",
)
system_prompt: str = Field(
description="The system prompt that defines the agent's behavior",
example="You are a research agent. Your role is to gather and analyze information...",
)
@validator("name")
def validate_name(cls, v):
if not v.strip():
raise ValueError("Agent name cannot be empty")
return v.strip()
@validator("system_prompt")
def validate_system_prompt(cls, v):
if not v.strip():
raise ValueError("System prompt cannot be empty")
return v.strip()
class SwarmConfig(BaseModel):
"""Configuration for a swarm of cooperative agents"""
name: str = Field(
description="The name of the swarm",
example="Research-Writing-Swarm",
)
description: str = Field(
description="The description of the swarm's purpose and capabilities",
example="A swarm of agents that work together to research topics and write articles",
)
agents: List[AgentConfig] = Field(
description="The list of agents that make up the swarm",
min_items=1,
)
@validator("agents")
def validate_agents(cls, v):
if not v:
raise ValueError("Swarm must have at least one agent")
return v
class AutoSwarmBuilder:
"""A class that automatically builds and manages swarms of AI agents with enhanced error handling."""
def __init__(
self,
name: Optional[str] = None,
description: Optional[str] = None,
verbose: bool = True,
api_key: Optional[str] = None,
model_name: str = "gpt-4",
):
self.name = name or "DefaultSwarm"
self.description = description or "Generic AI Agent Swarm"
self.verbose = verbose
self.agents_pool = []
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.model_name = model_name
if not self.api_key:
raise ValueError(
"OpenAI API key must be provided either through initialization or environment variable"
)
logger.info(
"Initialized AutoSwarmBuilder",
extra={
"swarm_name": self.name,
"description": self.description,
"model": self.model_name,
},
)
# Initialize OpenAI chat model
try:
self.chat_model = OpenAIChat(
openai_api_key=self.api_key,
model_name=self.model_name,
temperature=0.1,
)
except Exception as e:
logger.error(
f"Failed to initialize OpenAI chat model: {str(e)}"
)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def run(self, task: str, image_url: Optional[str] = None) -> str:
"""Run the swarm on a given task with error handling and retries."""
if not task or not task.strip():
raise ValueError("Task cannot be empty")
logger.info("Starting swarm execution", extra={"task": task})
try:
# Create agents for the task
agents = self._create_agents(task, image_url)
if not agents:
raise ValueError(
"No agents were created for the task"
)
# Execute the task through the swarm router
logger.info(
"Routing task through swarm",
extra={"num_agents": len(agents)},
)
output = self.swarm_router(agents, task, image_url)
logger.info("Swarm execution completed successfully")
return output
except Exception as e:
logger.error(
f"Error during swarm execution: {str(e)}",
exc_info=True,
)
raise
def _create_agents(
self, task: str, image_url: Optional[str] = None
) -> List[Agent]:
"""Create the necessary agents for a task with enhanced error handling."""
logger.info("Creating agents for task", extra={"task": task})
try:
model = OpenAIFunctionCaller(
system_prompt=BOSS_SYSTEM_PROMPT,
api_key=self.api_key,
temperature=0.1,
base_model=SwarmConfig,
)
agents_config = model.run(task)
print(f"{agents_config}")
if isinstance(agents_config, dict):
agents_config = SwarmConfig(**agents_config)
# Update swarm configuration
self.name = agents_config.name
self.description = agents_config.description
# Create agents from configuration
agents = []
for agent_config in agents_config.agents:
if isinstance(agent_config, dict):
agent_config = AgentConfig(**agent_config)
agent = self.build_agent(
agent_name=agent_config.name,
agent_description=agent_config.description,
agent_system_prompt=agent_config.system_prompt,
)
agents.append(agent)
# Add available agents showcase to system prompts
agents_available = showcase_available_agents(
name=self.name,
description=self.description,
agents=agents,
)
for agent in agents:
agent.system_prompt += "\n" + agents_available
logger.info(
"Successfully created agents",
extra={"num_agents": len(agents)},
)
return agents
except Exception as e:
logger.error(
f"Error creating agents: {str(e)}", exc_info=True
)
raise
def build_agent(
self,
agent_name: str,
agent_description: str,
agent_system_prompt: str,
) -> Agent:
"""Build a single agent with enhanced error handling."""
logger.info(
"Building agent", extra={"agent_name": agent_name}
)
try:
agent = Agent(
agent_name=agent_name,
description=agent_description,
system_prompt=agent_system_prompt,
llm=self.chat_model,
autosave=True,
dashboard=False,
verbose=self.verbose,
dynamic_temperature_enabled=True,
saved_state_path=f"states/{agent_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
user_name="swarms_corp",
retry_attempts=3,
context_length=200000,
return_step_meta=False,
output_type="str",
streaming_on=False,
auto_generate_prompt=True,
)
return agent
except Exception as e:
logger.error(
f"Error building agent: {str(e)}", exc_info=True
)
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
)
def swarm_router(
self,
agents: List[Agent],
task: str,
image_url: Optional[str] = None,
) -> str:
"""Route tasks between agents in the swarm with error handling and retries."""
logger.info(
"Initializing swarm router",
extra={"num_agents": len(agents)},
)
try:
swarm_router_instance = SwarmRouter(
name=self.name,
description=self.description,
agents=agents,
swarm_type="auto",
)
formatted_task = f"{self.name} {self.description} {task}"
result = swarm_router_instance.run(formatted_task)
logger.info("Successfully completed swarm routing")
return result
except Exception as e:
logger.error(
f"Error in swarm router: {str(e)}", exc_info=True
)
raise
swarm = AutoSwarmBuilder(
name="ChipDesign-Swarm",
description="A swarm of specialized AI agents for chip design",
api_key="your-api-key", # Optional if set in environment
model_name="gpt-4", # Optional, defaults to gpt-4
)
result = swarm.run(
"Design a new AI accelerator chip optimized for transformer model inference..."
)

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save