Reverting to puled agent.py with new changes

pull/615/head
Sambhav Dixit 2 months ago committed by GitHub
parent e100b468bc
commit a63541c619
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -57,11 +57,7 @@ from clusterops import (
execute_with_cpu_cores,
)
from swarms.agents.ape_agent import auto_generate_prompt
from dataclasses import asdict
import yaml
# Utils
# Custom stopping condition
@ -785,8 +781,6 @@ class Agent:
or loop_count < self.max_loops
):
loop_count += 1
# Log step start
current_step_id = f"step_{loop_count}_{uuid.uuid4().hex}"
self.loop_count_print(loop_count, self.max_loops)
print("\n")
@ -805,21 +799,29 @@ class Agent:
while attempt < self.retry_attempts and not success:
try:
if self.long_term_memory is not None:
logger.info("Querying long term memory...")
logger.info(
"Querying long term memory..."
)
self.memory_query(task_prompt)
# Generate response using LLM
response_args = (
(task_prompt, *args) if img is None else (task_prompt, img, *args)
(task_prompt, *args)
if img is None
else (task_prompt, img, *args)
)
response = self.call_llm(
*response_args, **kwargs
)
response = self.call_llm(*response_args, **kwargs)
# Log step metadata
step_meta = self.log_step_metadata(loop_count, task_prompt, response)
# Check if response is a dictionary and has 'choices' key
if isinstance(response, dict) and 'choices' in response:
response = response['choices'][0]['message']['content']
if (
isinstance(response, dict)
and "choices" in response
):
response = response["choices"][0][
"message"
]["content"]
elif isinstance(response, str):
# If response is already a string, use it as is
pass
@ -827,48 +829,37 @@ class Agent:
raise ValueError(
f"Unexpected response format: {type(response)}"
)
# Check and execute tools
if self.tools is not None:
tool_result = self.parse_and_execute_tools(response)
if tool_result:
self.update_tool_usage(
step_meta["step_id"],
tool_result["tool"],
tool_result["args"],
tool_result["response"]
)
# Update agent output history
self.agent_output.full_history = self.short_memory.return_history_as_string()
print(
f"self.tools is not None: {response}"
)
self.parse_and_execute_tools(response)
# Log the step metadata
logged = self.log_step_metadata(
loop_count,
task_prompt,
response
loop_count, task_prompt, response
)
logger.info(logged)
# Convert to a str if the response is not a str
response = self.llm_output_parser(response)
# Print
if self.streaming_on is True:
self.stream_response(response)
else:
print(response)
# Add the response to the memory
self.short_memory.add(
role=self.agent_name,
content=response
role=self.agent_name, content=response
)
# Add to all responses
all_responses.append(response)
# TODO: Implement reliability check
if self.tools is not None:
# self.parse_function_call_and_execute(response)
@ -997,12 +988,25 @@ class Agent:
]
# return self.agent_output_type(all_responses)
if self.output_type == "json":
return asdict(self.agent_output)
else:
# More flexible output types
if self.output_type == "string":
return concat_strings(all_responses)
elif self.output_type == "list":
return all_responses
elif self.output_type == "json":
return self.agent_output.model_dump_json(indent=4)
elif self.output_type == "csv":
return self.dict_to_csv(
self.agent_output.model_dump()
)
elif self.output_type == "dict":
return self.agent_output.model_dump()
elif self.output_type == "yaml":
return yaml.safe_dump(self.agent_output.model_dump(), sort_keys=False)
else:
raise ValueError(
f"Invalid output type: {self.output_type}"
)
except Exception as error:
logger.info(
@ -1011,20 +1015,79 @@ class Agent:
raise error
def __call__(
self, task: str = None, img: str = None, *args, **kwargs
):
self,
task: Optional[str] = None,
img: Optional[str] = None,
is_last: bool = False,
device: str = "cpu", # gpu
device_id: int = 0,
all_cores: bool = True,
*args,
**kwargs,
) -> Any:
"""Call the agent
Args:
task (str): _description_
img (str, optional): _description_. Defaults to None.
task (Optional[str]): The task to be performed. Defaults to None.
img (Optional[str]): The image to be processed. Defaults to None.
is_last (bool): Indicates if this is the last task. Defaults to False.
device (str): The device to use for execution. Defaults to "cpu".
device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 0.
all_cores (bool): If True, uses all available CPU cores. Defaults to True.
"""
try:
return self.run(task, img, *args, **kwargs)
if task is not None:
return self.run(
task=task,
is_last=is_last,
device=device,
device_id=device_id,
all_cores=all_cores,
*args,
**kwargs,
)
elif img is not None:
return self.run(
img=img,
is_last=is_last,
device=device,
device_id=device_id,
all_cores=all_cores,
*args,
**kwargs,
)
else:
raise ValueError(
"Either 'task' or 'img' must be provided."
)
except Exception as error:
logger.error(f"Error calling agent: {error}")
raise error
def dict_to_csv(self, data: dict) -> str:
"""
Convert a dictionary to a CSV string.
Args:
data (dict): The dictionary to convert.
Returns:
str: The CSV string representation of the dictionary.
"""
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# Write header
writer.writerow(data.keys())
# Write values
writer.writerow(data.values())
return output.getvalue()
def parse_and_execute_tools(self, response: str, *args, **kwargs):
# Extract json from markdown
# response = extract_code_from_markdown(response)
@ -1883,42 +1946,36 @@ class Agent:
"""Parse the output from the LLM"""
try:
if isinstance(response, dict):
if 'choices' in response:
return response['choices'][0]['message']['content']
if "choices" in response:
return response["choices"][0]["message"][
"content"
]
else:
return json.dumps(response) # Convert dict to string
return json.dumps(
response
) # Convert dict to string
elif isinstance(response, str):
return response
else:
return str(response) # Convert any other type to string
return str(
response
) # Convert any other type to string
except Exception as e:
logger.error(f"Error parsing LLM output: {e}")
return str(response) # Return string representation as fallback
return str(
response
) # Return string representation as fallback
def log_step_metadata(
self, loop: int, task: str, response: str
) -> Step:
"""Log metadata for each step of agent execution."""
# Generate unique step ID
step_id = f"step_{loop}_{uuid.uuid4().hex}"
# Calculate token usage
# # # Step Metadata
# full_memory = self.short_memory.return_history_as_string()
# prompt_tokens = self.tokenizer.count_tokens(full_memory)
# completion_tokens = self.tokenizer.count_tokens(response)
# total_tokens = prompt_tokens + completion_tokens
total_tokens=self.tokenizer.count_tokens(task) + self.tokenizer.count_tokens(response),
# Create memory usage tracking
memory_usage = {
"short_term": len(self.short_memory.messages),
"long_term": self.long_term_memory.count if hasattr(self, 'long_term_memory') else 0
}
# self.tokenizer.count_tokens(prompt_tokens + completion_tokens)
step_log = Step(
step_id=step_id,
time=time.time(),
tokens = total_tokens,
response=AgentChatCompletionResponse(
id=self.agent_id,
agent_name=self.agent_name,
@ -1933,34 +1990,14 @@ class Agent:
),
# usage=UsageInfo(
# prompt_tokens=prompt_tokens,
# completion_tokens=completion_tokens,
# total_tokens=total_tokens,
# completion_tokens=completion_tokens,
# ),
tool_calls=[],
memory_usage=memory_usage
),
)
# Update total tokens if agent_output exists
if hasattr(self, 'agent_output'):
self.agent_output.total_tokens += step.response.total_tokens
# Add step to agent output tracking
self.step_pool.append(step_log)
def update_tool_usage(self, step_id: str, tool_name: str, tool_args: dict, tool_response: Any):
"""Update tool usage information for a specific step."""
for step in self.agent_output.steps:
if step.step_id == step_id:
step.response.tool_calls.append({
"tool": tool_name,
"arguments": tool_args,
"response": str(tool_response)
})
break
def _serialize_callable(
self, attr_value: Callable
) -> Dict[str, Any]:

Loading…
Cancel
Save