From a63541c61953446a7868138b4ee9d477ea30262e Mon Sep 17 00:00:00 2001 From: Sambhav Dixit <94298612+sambhavnoobcoder@users.noreply.github.com> Date: Mon, 28 Oct 2024 22:42:17 +0530 Subject: [PATCH] Reverting to puled agent.py with new changes --- swarms/structs/agent.py | 221 +++++++++++++++++++++++----------------- 1 file changed, 129 insertions(+), 92 deletions(-) diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 1a4d8769..2d07f106 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -57,11 +57,7 @@ from clusterops import ( execute_with_cpu_cores, ) from swarms.agents.ape_agent import auto_generate_prompt -from dataclasses import asdict - - - - +import yaml # Utils # Custom stopping condition @@ -785,8 +781,6 @@ class Agent: or loop_count < self.max_loops ): loop_count += 1 - # Log step start - current_step_id = f"step_{loop_count}_{uuid.uuid4().hex}" self.loop_count_print(loop_count, self.max_loops) print("\n") @@ -805,21 +799,29 @@ class Agent: while attempt < self.retry_attempts and not success: try: if self.long_term_memory is not None: - logger.info("Querying long term memory...") + logger.info( + "Querying long term memory..." + ) self.memory_query(task_prompt) - + # Generate response using LLM response_args = ( - (task_prompt, *args) if img is None else (task_prompt, img, *args) + (task_prompt, *args) + if img is None + else (task_prompt, img, *args) + ) + response = self.call_llm( + *response_args, **kwargs ) - response = self.call_llm(*response_args, **kwargs) - - # Log step metadata - step_meta = self.log_step_metadata(loop_count, task_prompt, response) # Check if response is a dictionary and has 'choices' key - if isinstance(response, dict) and 'choices' in response: - response = response['choices'][0]['message']['content'] + if ( + isinstance(response, dict) + and "choices" in response + ): + response = response["choices"][0][ + "message" + ]["content"] elif isinstance(response, str): # If response is already a string, use it as is pass @@ -827,48 +829,37 @@ class Agent: raise ValueError( f"Unexpected response format: {type(response)}" ) - + # Check and execute tools if self.tools is not None: - tool_result = self.parse_and_execute_tools(response) - if tool_result: - self.update_tool_usage( - step_meta["step_id"], - tool_result["tool"], - tool_result["args"], - tool_result["response"] - ) - - - # Update agent output history - self.agent_output.full_history = self.short_memory.return_history_as_string() - + print( + f"self.tools is not None: {response}" + ) + self.parse_and_execute_tools(response) + # Log the step metadata logged = self.log_step_metadata( - loop_count, - task_prompt, - response + loop_count, task_prompt, response ) logger.info(logged) - + # Convert to a str if the response is not a str response = self.llm_output_parser(response) - + # Print if self.streaming_on is True: self.stream_response(response) else: print(response) - + # Add the response to the memory self.short_memory.add( - role=self.agent_name, - content=response + role=self.agent_name, content=response ) - + # Add to all responses all_responses.append(response) - + # TODO: Implement reliability check if self.tools is not None: # self.parse_function_call_and_execute(response) @@ -997,12 +988,25 @@ class Agent: ] # return self.agent_output_type(all_responses) - - if self.output_type == "json": - return asdict(self.agent_output) - else: + # More flexible output types + if self.output_type == "string": return concat_strings(all_responses) - + elif self.output_type == "list": + return all_responses + elif self.output_type == "json": + return self.agent_output.model_dump_json(indent=4) + elif self.output_type == "csv": + return self.dict_to_csv( + self.agent_output.model_dump() + ) + elif self.output_type == "dict": + return self.agent_output.model_dump() + elif self.output_type == "yaml": + return yaml.safe_dump(self.agent_output.model_dump(), sort_keys=False) + else: + raise ValueError( + f"Invalid output type: {self.output_type}" + ) except Exception as error: logger.info( @@ -1011,20 +1015,79 @@ class Agent: raise error def __call__( - self, task: str = None, img: str = None, *args, **kwargs - ): + self, + task: Optional[str] = None, + img: Optional[str] = None, + is_last: bool = False, + device: str = "cpu", # gpu + device_id: int = 0, + all_cores: bool = True, + *args, + **kwargs, + ) -> Any: """Call the agent Args: - task (str): _description_ - img (str, optional): _description_. Defaults to None. + task (Optional[str]): The task to be performed. Defaults to None. + img (Optional[str]): The image to be processed. Defaults to None. + is_last (bool): Indicates if this is the last task. Defaults to False. + device (str): The device to use for execution. Defaults to "cpu". + device_id (int): The ID of the GPU to use if device is set to "gpu". Defaults to 0. + all_cores (bool): If True, uses all available CPU cores. Defaults to True. """ try: - return self.run(task, img, *args, **kwargs) + if task is not None: + return self.run( + task=task, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + *args, + **kwargs, + ) + elif img is not None: + return self.run( + img=img, + is_last=is_last, + device=device, + device_id=device_id, + all_cores=all_cores, + *args, + **kwargs, + ) + else: + raise ValueError( + "Either 'task' or 'img' must be provided." + ) except Exception as error: logger.error(f"Error calling agent: {error}") raise error + def dict_to_csv(self, data: dict) -> str: + """ + Convert a dictionary to a CSV string. + + Args: + data (dict): The dictionary to convert. + + Returns: + str: The CSV string representation of the dictionary. + """ + import csv + import io + + output = io.StringIO() + writer = csv.writer(output) + + # Write header + writer.writerow(data.keys()) + + # Write values + writer.writerow(data.values()) + + return output.getvalue() + def parse_and_execute_tools(self, response: str, *args, **kwargs): # Extract json from markdown # response = extract_code_from_markdown(response) @@ -1883,42 +1946,36 @@ class Agent: """Parse the output from the LLM""" try: if isinstance(response, dict): - if 'choices' in response: - return response['choices'][0]['message']['content'] + if "choices" in response: + return response["choices"][0]["message"][ + "content" + ] else: - return json.dumps(response) # Convert dict to string + return json.dumps( + response + ) # Convert dict to string elif isinstance(response, str): return response else: - return str(response) # Convert any other type to string + return str( + response + ) # Convert any other type to string except Exception as e: logger.error(f"Error parsing LLM output: {e}") - return str(response) # Return string representation as fallback + return str( + response + ) # Return string representation as fallback def log_step_metadata( self, loop: int, task: str, response: str ) -> Step: - """Log metadata for each step of agent execution.""" - # Generate unique step ID - step_id = f"step_{loop}_{uuid.uuid4().hex}" - - # Calculate token usage + # # # Step Metadata # full_memory = self.short_memory.return_history_as_string() # prompt_tokens = self.tokenizer.count_tokens(full_memory) # completion_tokens = self.tokenizer.count_tokens(response) - # total_tokens = prompt_tokens + completion_tokens - total_tokens=self.tokenizer.count_tokens(task) + self.tokenizer.count_tokens(response), - - # Create memory usage tracking - memory_usage = { - "short_term": len(self.short_memory.messages), - "long_term": self.long_term_memory.count if hasattr(self, 'long_term_memory') else 0 - } + # self.tokenizer.count_tokens(prompt_tokens + completion_tokens) step_log = Step( - step_id=step_id, - time=time.time(), - tokens = total_tokens, response=AgentChatCompletionResponse( id=self.agent_id, agent_name=self.agent_name, @@ -1933,34 +1990,14 @@ class Agent: ), # usage=UsageInfo( # prompt_tokens=prompt_tokens, - # completion_tokens=completion_tokens, # total_tokens=total_tokens, + # completion_tokens=completion_tokens, # ), - tool_calls=[], - memory_usage=memory_usage ), ) - # Update total tokens if agent_output exists - if hasattr(self, 'agent_output'): - self.agent_output.total_tokens += step.response.total_tokens - - - # Add step to agent output tracking self.step_pool.append(step_log) - - def update_tool_usage(self, step_id: str, tool_name: str, tool_args: dict, tool_response: Any): - """Update tool usage information for a specific step.""" - for step in self.agent_output.steps: - if step.step_id == step_id: - step.response.tool_calls.append({ - "tool": tool_name, - "arguments": tool_args, - "response": str(tool_response) - }) - break - def _serialize_callable( self, attr_value: Callable ) -> Dict[str, Any]: