diff --git a/docs/examples/agent_stream.md b/docs/examples/agent_stream.md index 79c0a8ef..58bc41d4 100644 --- a/docs/examples/agent_stream.md +++ b/docs/examples/agent_stream.md @@ -46,6 +46,71 @@ response = agent.run("Tell me a detailed story about humanity colonizing the sta print(response) ``` +## Streaming with Tools Execution + +Swarms also supports real-time streaming while executing tools, providing immediate feedback on both the thinking process and tool execution results: + +```python +from swarms import Agent + +def get_weather(location: str, units: str = "celsius") -> str: + """ + Get the current weather for a location. + + Args: + location (str): The city/location to get weather for + units (str): Temperature units (celsius or fahrenheit) + + Returns: + str: Weather information + """ + weather_data = { + "New York": {"temperature": "22°C", "condition": "sunny", "humidity": "65%"}, + "London": {"temperature": "15°C", "condition": "cloudy", "humidity": "80%"}, + "Tokyo": {"temperature": "28°C", "condition": "rainy", "humidity": "90%"}, + } + + location_key = location.title() + if location_key in weather_data: + data = weather_data[location_key] + temp = data["temperature"] + if units == "fahrenheit" and "°C" in temp: + celsius = int(temp.replace("°C", "")) + fahrenheit = (celsius * 9/5) + 32 + temp = f"{fahrenheit}°F" + + return f"Weather in {location}: {temp}, {data['condition']}, humidity: {data['humidity']}" + else: + return f"Weather data not available for {location}" + +# Create agent with streaming and tool support +agent = Agent( + model_name="gpt-4o", + max_loops=1, + verbose=True, + streaming_on=True, # Enable streaming + print_on=True, # Enable pretty printing + tools=[get_weather], # Add tools +) + +# This will stream both the reasoning and tool execution results +agent.run("What is the weather in Tokyo? ") +``` + +### Key Features of Streaming with Tools: + +- **Real-time tool execution**: See tool calls happen as they're invoked +- **Streaming responses**: Get immediate feedback on the agent's reasoning +- **Tool result integration**: Watch how tools results are incorporated into the final response +- **Interactive debugging**: Monitor the complete workflow from thought to action + +### Best Practices: + +1. **Set appropriate max_loops**: Use `max_loops=1` for simple tasks or higher values for complex multi-step operations +2. **Enable verbose mode**: Use `verbose=True` to see detailed tool execution logs +3. **Use print_on for UI**: Enable `print_on=True` for better visual streaming experience +4. **Monitor performance**: Streaming with tools may be slower due to real-time processing + ## Connect With Us If you'd like technical support, join our Discord below and stay updated on our Twitter for new updates! diff --git a/examples/streaming_with_tools.py b/examples/streaming_with_tools.py new file mode 100644 index 00000000..ee88a4c8 --- /dev/null +++ b/examples/streaming_with_tools.py @@ -0,0 +1,44 @@ +from swarms import Agent + +def get_weather(location: str, units: str = "celsius") -> str: + """ + Get the current weather for a location. + + Args: + location (str): The city/location to get weather for + units (str): Temperature units (celsius or fahrenheit) + + Returns: + str: Weather information + """ + # Simulated weather data + weather_data = { + "New York": {"temperature": "22°C", "condition": "sunny", "humidity": "65%"}, + "London": {"temperature": "15°C", "condition": "cloudy", "humidity": "80%"}, + "Tokyo": {"temperature": "28°C", "condition": "rainy", "humidity": "90%"}, + } + + location_key = location.title() + if location_key in weather_data: + data = weather_data[location_key] + temp = data["temperature"] + if units == "fahrenheit" and "°C" in temp: + # Convert to Fahrenheit for demo + celsius = int(temp.replace("°C", "")) + fahrenheit = (celsius * 9/5) + 32 + temp = f"{fahrenheit}°F" + + return f"Weather in {location}: {temp}, {data['condition']}, humidity: {data['humidity']}" + else: + return f"Weather data not available for {location}" + +agent = Agent( + model_name="gpt-4o", + max_loops=1, + verbose=True, + streaming_on=True, + print_on=True, + tools=[get_weather], +) + +agent.run("What is the weather in Tokyo? ") \ No newline at end of file diff --git a/swarms/structs/agent.py b/swarms/structs/agent.py index 1c2ff95d..e617b4ec 100644 --- a/swarms/structs/agent.py +++ b/swarms/structs/agent.py @@ -1174,20 +1174,24 @@ class Agent: **kwargs, ) - # If streaming is enabled, then don't print the response - - # Parse the response from the agent with the output type - if exists(self.tools_list_dictionary): - if isinstance(response, BaseModel): - response = response.model_dump() + # Handle streaming response with tools + if self.streaming_on and exists(self.tools_list_dictionary) and hasattr(response, "__iter__") and not isinstance(response, str): + response = self.tool_struct.handle_streaming_with_tools( + response=response, + llm=self.llm, + agent_name=self.agent_name, + print_on=self.print_on + ) + else: + # Parse the response from the agent with the output type + if exists(self.tools_list_dictionary): + if isinstance(response, BaseModel): + response = response.model_dump() - # Parse the response from the agent with the output type - response = self.parse_llm_output(response) - - self.short_memory.add( - role=self.agent_name, - content=response, - ) + response = self.parse_llm_output(response) + + if isinstance(response, str) and response.strip(): + self.short_memory.add(role=self.agent_name, content=response) # Print if self.print_on is True: @@ -1197,13 +1201,18 @@ class Agent: f"[Structured Output] [Time: {time.strftime('%H:%M:%S')}] \n\n {json.dumps(response, indent=4)}", loop_count, ) - elif self.streaming_on: - pass - else: + elif self.streaming_on and isinstance(response, dict) and response.get("choices"): + # Handle streaming tool calls structured output + tool_calls = response.get("choices", [{}])[0].get("message", {}).get("tool_calls", []) + if tool_calls: + self.pretty_print( + f"[Structured Output] [Time: {time.strftime('%H:%M:%S')}] \n\n {json.dumps(tool_calls, indent=4)}", + loop_count, + ) + elif not self.streaming_on: self.pretty_print( response, loop_count ) - # Check and execute callable tools if exists(self.tools): self.tool_execution_retry( @@ -2207,11 +2216,19 @@ class Agent: raise ValueError("Response is required.") try: - # Stream and print the response token by token - for token in response.split(): - print(token, end=" ", flush=True) - time.sleep(delay) - print() # Ensure a newline after streaming + # Use centralized string streaming from wrapper + if hasattr(self.llm, "handle_string_streaming"): + self.llm.handle_string_streaming( + response=response, + print_on=self.print_on, + delay=delay, + ) + else: + # Fallback to original implementation if wrapper doesn't support it + for token in response.split(): + print(token, end=" ", flush=True) + time.sleep(delay) + print() # Ensure a newline after streaming except Exception as e: print(f"An error occurred during streaming: {e}") @@ -2424,86 +2441,32 @@ class Agent: del kwargs["is_last"] try: - # Set streaming parameter in LLM if streaming is enabled - if self.streaming_on and hasattr(self.llm, "stream"): - original_stream = self.llm.stream + # Special handling for streaming with tools - need raw stream for parsing + if self.streaming_on and exists(self.tools_list_dictionary): + original_stream = getattr(self.llm, 'stream', False) self.llm.stream = True - - if img is not None: - streaming_response = self.llm.run( - task=task, img=img, *args, **kwargs - ) - else: - streaming_response = self.llm.run( - task=task, *args, **kwargs - ) - - # If we get a streaming response, handle it with the new streaming panel - if hasattr( - streaming_response, "__iter__" - ) and not isinstance(streaming_response, str): - # Check if streaming_callback is provided (for ConcurrentWorkflow dashboard integration) - if streaming_callback is not None: - # Real-time callback streaming for dashboard integration - chunks = [] - for chunk in streaming_response: - if ( - hasattr(chunk, "choices") - and chunk.choices[0].delta.content - ): - content = chunk.choices[ - 0 - ].delta.content - chunks.append(content) - # Call the streaming callback with the new chunk - streaming_callback(content) - complete_response = "".join(chunks) - # Check print_on parameter for different streaming behaviors - elif self.print_on is False: - # Silent streaming - no printing, just collect chunks - chunks = [] - for chunk in streaming_response: - if ( - hasattr(chunk, "choices") - and chunk.choices[0].delta.content - ): - content = chunk.choices[ - 0 - ].delta.content - chunks.append(content) - complete_response = "".join(chunks) + + try: + if img is not None: + stream_response = self.llm.run(task=task, img=img, *args, **kwargs) else: - # Collect chunks for conversation saving - collected_chunks = [] - - def on_chunk_received(chunk: str): - """Callback to collect chunks as they arrive""" - collected_chunks.append(chunk) - # Optional: Save each chunk to conversation in real-time - # This creates a more detailed conversation history - if self.verbose: - logger.debug( - f"Streaming chunk received: {chunk[:50]}..." - ) - - # Use the streaming panel to display and collect the response - complete_response = formatter.print_streaming_panel( - streaming_response, - title=f"🤖 Agent: {self.agent_name} Loops: {current_loop}", - style=None, # Use random color like non-streaming approach - collect_chunks=True, - on_chunk_callback=on_chunk_received, - ) - - # Restore original stream setting - self.llm.stream = original_stream - - # Return the complete response for further processing - return complete_response - else: - # Restore original stream setting + stream_response = self.llm.run(task=task, *args, **kwargs) + return stream_response + finally: self.llm.stream = original_stream - return streaming_response + + # Use centralized streaming logic from wrapper if streaming is enabled (no tools) + elif self.streaming_on and hasattr(self.llm, "run_with_streaming"): + return self.llm.run_with_streaming( + task=task, + img=img, + streaming_callback=streaming_callback, + title=f"Agent: {self.agent_name} Loops: {current_loop}", + print_on=self.print_on, + verbose=self.verbose, + *args, + **kwargs, + ) else: args = { "task": task, @@ -2908,9 +2871,18 @@ class Agent: try: temp_llm = self.temp_llm_instance_for_tool_summary() - summary = temp_llm.run( - task=self.short_memory.get_str() - ) + # Use centralized streaming logic for MCP tool summary + if self.streaming_on: + summary = temp_llm.run_with_streaming( + task=self.short_memory.get_str(), + title=f"Agent: {self.agent_name} - MCP Tool Summary", + style="cyan", + print_on=self.print_on, + verbose=self.verbose, + ) + else: + summary = temp_llm.run(task=self.short_memory.get_str()) + except Exception as e: logger.error( f"Error calling LLM after MCP tool execution: {e}" @@ -2918,7 +2890,7 @@ class Agent: # Fallback: provide a default summary summary = "I successfully executed the MCP tool and retrieved the information above." - if self.print_on is True: + if self.print_on and not self.streaming_on: self.pretty_print(summary, loop_count=current_loop) # Add to the memory @@ -2935,7 +2907,7 @@ class Agent: temperature=self.temperature, max_tokens=self.max_tokens, system_prompt=self.system_prompt, - stream=False, # Always disable streaming for tool summaries + stream=self.streaming_on, tools_list_dictionary=None, parallel_tool_calls=False, base_url=self.llm_base_url, @@ -3000,12 +2972,26 @@ class Agent: """ ) + # Use centralized streaming logic for tool summary + if self.streaming_on: + tool_response = temp_llm.run_tool_summary_with_streaming( + tool_results=str(output), + agent_name=self.agent_name, + print_on=self.print_on, + verbose=self.verbose, + ) + else: + tool_response = temp_llm.run( + f"Please analyze and summarize the following tool execution output:\n\n{output}" + ) + + # Add the tool response to memory self.short_memory.add( role=self.agent_name, content=tool_response, ) - if self.print_on is True: + if self.print_on and not self.streaming_on: self.pretty_print( tool_response, loop_count, diff --git a/swarms/tools/base_tool.py b/swarms/tools/base_tool.py index af08f11e..1b8192f3 100644 --- a/swarms/tools/base_tool.py +++ b/swarms/tools/base_tool.py @@ -16,6 +16,7 @@ from swarms.tools.pydantic_to_json import ( ) from swarms.tools.tool_parse_exec import parse_and_execute_json from swarms.utils.loguru_logger import initialize_logger +from loguru import logger as loguru_logger logger = initialize_logger(log_folder="base_tool") @@ -3063,3 +3064,77 @@ class BaseTool(BaseModel): ) return function_calls + + def handle_streaming_with_tools( + self, + response: Any, + llm: Any, + agent_name: str = "agent", + print_on: bool = True + ) -> Union[str, Dict[str, Any]]: + """ + Simplified streaming response handler with tool support. + + Args: + response: Streaming response object + llm: Language model instance + agent_name: Name of the agent + print_on: Whether to print streaming output + + Returns: + Union[str, Dict[str, Any]]: Processed response (text or tool calls) + """ + # Validate response + if not response: + logger.warning("Empty streaming response received") + return "" + + if not hasattr(response, "__iter__"): + logger.warning("Non-iterable response received for streaming") + return str(response) if response else "" + if hasattr(llm, 'parse_streaming_chunks_with_tools'): + text_response, tool_calls = llm.parse_streaming_chunks_with_tools( + stream=response, + agent_name=agent_name, + print_on=print_on, + verbose=self.verbose + ) + + if tool_calls: + formatted_calls = [] + for tc in tool_calls: + if tc and tc.get("name"): + args = tc.get("input") or tc.get("arguments", {}) + if isinstance(args, str): + try: + args = json.loads(args) + except json.JSONDecodeError as e: + print(f"Warning: Failed to parse tool arguments for {tc.get('name')}: {e}") + args = {"error": f"JSON parse failed: {e}", "raw": args} + + formatted_calls.append({ + "type": "function", + "function": {"name": tc["name"], "arguments": json.dumps(args)}, + "id": tc.get("id") + }) + + + return {"choices": [{"message": {"tool_calls": formatted_calls}}]} if formatted_calls else text_response + + return text_response + else: + # Simple fallback streaming + chunks = [] + try: + for chunk in response: + if hasattr(chunk, "choices") and chunk.choices and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + chunks.append(content) + if print_on: + print(content, end="", flush=True) + if print_on and chunks: + print() + return "".join(chunks) + except Exception as e: + logger.error(f"Error in fallback streaming for agent {agent_name}: {e}") + return "".join(chunks) if chunks else "" diff --git a/swarms/utils/litellm_wrapper.py b/swarms/utils/litellm_wrapper.py index ff0ae728..eb9da1b5 100644 --- a/swarms/utils/litellm_wrapper.py +++ b/swarms/utils/litellm_wrapper.py @@ -1,3 +1,5 @@ +import traceback +from typing import Optional, Callable import asyncio import base64 import traceback @@ -340,6 +342,7 @@ class LiteLLM: # Store other types of runtime_args for debugging completion_params["runtime_args"] = runtime_args + def output_for_tools(self, response: any): if self.mcp_call is True: out = response.choices[0].message.tool_calls[0].function @@ -648,6 +651,338 @@ class LiteLLM: f"Model {self.model_name} does not support vision" ) + def _collect_streaming_chunks(self, streaming_response, callback=None): + """Helper method to collect chunks from streaming response.""" + chunks = [] + for chunk in streaming_response: + if hasattr(chunk, "choices") and chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + chunks.append(content) + if callback: + callback(content) + return "".join(chunks) + + def _handle_streaming_response( + self, + streaming_response, + title: str = "LLM Response", + style: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + print_on: bool = True, + verbose: bool = False, + ) -> str: + """ + Centralized streaming response handler for all streaming scenarios. + + Args: + streaming_response: The streaming response object + title: Title for the streaming panel + style: Style for the panel (optional) + streaming_callback: Callback for real-time streaming + print_on: Whether to print the streaming output + verbose: Whether to enable verbose logging + + Returns: + str: The complete response string + """ + # Non-streaming response - return as is + if not (hasattr(streaming_response, "__iter__") and not isinstance(streaming_response, str)): + return streaming_response + + # Handle callback streaming + if streaming_callback is not None: + return self._collect_streaming_chunks(streaming_response, streaming_callback) + + # Handle silent streaming + if not print_on: + return self._collect_streaming_chunks(streaming_response) + + # Handle formatted streaming with panel + from swarms.utils.formatter import formatter + from loguru import logger + + collected_chunks = [] + def on_chunk_received(chunk: str): + collected_chunks.append(chunk) + if verbose: + logger.debug(f"Streaming chunk received: {chunk[:50]}...") + + return formatter.print_streaming_panel( + streaming_response, + title=title, + style=style, + collect_chunks=True, + on_chunk_callback=on_chunk_received, + ) + + def run_with_streaming( + self, + task: str, + img: Optional[str] = None, + audio: Optional[str] = None, + streaming_callback: Optional[Callable[[str], None]] = None, + title: str = "LLM Response", + style: Optional[str] = None, + print_on: bool = True, + verbose: bool = False, + *args, + **kwargs, + ) -> str: + """ + Run LLM with centralized streaming handling. + + Args: + task: The task/prompt to send to the LLM + img: Optional image input + audio: Optional audio input + streaming_callback: Callback for real-time streaming + title: Title for streaming panel + style: Style for streaming panel + print_on: Whether to print streaming output + verbose: Whether to enable verbose logging + + Returns: + str: The complete response + """ + original_stream = self.stream + self.stream = True + + try: + # Build kwargs for run method + run_kwargs = {"task": task, **kwargs} + if img is not None: + run_kwargs["img"] = img + if audio is not None: + run_kwargs["audio"] = audio + + response = self.run(*args, **run_kwargs) + + return self._handle_streaming_response( + response, + title=title, + style=style, + streaming_callback=streaming_callback, + print_on=print_on, + verbose=verbose, + ) + finally: + self.stream = original_stream + + def run_tool_summary_with_streaming( + self, + tool_results: str, + agent_name: str = "Agent", + print_on: bool = True, + verbose: bool = False, + *args, + **kwargs, + ) -> str: + """ + Run tool summary with streaming support. + + Args: + tool_results: The tool execution results to summarize + agent_name: Name of the agent for the panel title + print_on: Whether to print streaming output + verbose: Whether to enable verbose logging + + Returns: + str: The complete summary response + """ + return self.run_with_streaming( + task=f"Please analyze and summarize the following tool execution output:\n\n{tool_results}", + title=f"Agent: {agent_name} - Tool Summary", + style="green", + print_on=print_on, + verbose=verbose, + *args, + **kwargs, + ) + + def handle_string_streaming( + self, + response: str, + print_on: bool = True, + delay: float = 0.01, + ) -> None: + """ + Handle streaming for string responses by simulating streaming output. + + Args: + response: The string response to stream + print_on: Whether to print the streaming output + delay: Delay between characters for streaming effect + """ + if not (print_on and response): + return + + import time + for char in response: + print(char, end="", flush=True) + if delay > 0: + time.sleep(delay) + print() # Newline at the end + + def _process_anthropic_chunk(self, chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose): + """Process Anthropic-style streaming chunks.""" + import json + from loguru import logger + + chunk_type = getattr(chunk, 'type', None) + full_text_response = "" + + if chunk_type == 'content_block_start' and hasattr(chunk, 'content_block') and chunk.content_block.type == 'tool_use': + tool_name = chunk.content_block.name + if print_on: + print(f"\nTool Call: {tool_name}...", flush=True) + current_tool_call = {"id": chunk.content_block.id, "name": tool_name, "input": ""} + tool_call_buffer = "" + + elif chunk_type == 'content_block_delta' and hasattr(chunk, 'delta'): + if chunk.delta.type == 'input_json_delta': + tool_call_buffer += chunk.delta.partial_json + elif chunk.delta.type == 'text_delta': + text_chunk = chunk.delta.text + full_text_response += text_chunk + if print_on: + print(text_chunk, end="", flush=True) + + elif chunk_type == 'content_block_stop' and current_tool_call: + try: + tool_input = json.loads(tool_call_buffer) + current_tool_call["input"] = tool_input + tool_calls_in_stream.append(current_tool_call) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse tool arguments: {tool_call_buffer}. Error: {e}") + # Store the raw buffer for debugging + current_tool_call["input"] = {"raw_buffer": tool_call_buffer, "error": str(e)} + tool_calls_in_stream.append(current_tool_call) + current_tool_call = None + tool_call_buffer = "" + + return full_text_response, current_tool_call, tool_call_buffer + + def _process_openai_chunk(self, chunk, tool_calls_in_stream, print_on, verbose): + """Process OpenAI-style streaming chunks.""" + import json + full_text_response = "" + + if not (hasattr(chunk, 'choices') and chunk.choices): + return full_text_response + + choice = chunk.choices[0] + if not (hasattr(choice, 'delta') and choice.delta): + return full_text_response + + delta = choice.delta + + # Handle text content + if hasattr(delta, 'content') and delta.content: + text_chunk = delta.content + full_text_response += text_chunk + if print_on: + print(text_chunk, end="", flush=True) + + # Handle tool calls in streaming chunks + if hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + tool_index = getattr(tool_call, 'index', 0) + + # Ensure we have enough slots in the list + while len(tool_calls_in_stream) <= tool_index: + tool_calls_in_stream.append(None) + + if hasattr(tool_call, 'function') and tool_call.function: + func = tool_call.function + + # Create new tool call if slot is empty and we have a function name + if tool_calls_in_stream[tool_index] is None and hasattr(func, 'name') and func.name: + if print_on: + print(f"\nTool Call: {func.name}...", flush=True) + tool_calls_in_stream[tool_index] = { + "id": getattr(tool_call, 'id', f"call_{tool_index}"), + "name": func.name, + "arguments": "" + } + + # Accumulate arguments + if tool_calls_in_stream[tool_index] and hasattr(func, 'arguments') and func.arguments is not None: + tool_calls_in_stream[tool_index]["arguments"] += func.arguments + + if verbose: + logger.debug(f"Accumulated arguments for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: '{tool_calls_in_stream[tool_index]['arguments']}'") + + # Try to parse if we have complete JSON + try: + args_dict = json.loads(tool_calls_in_stream[tool_index]["arguments"]) + tool_calls_in_stream[tool_index]["input"] = args_dict + tool_calls_in_stream[tool_index]["arguments_complete"] = True + if verbose: + logger.info(f"Complete tool call for {tool_calls_in_stream[tool_index]['name']} with args: {args_dict}") + except json.JSONDecodeError: + # Continue accumulating - JSON might be incomplete + if verbose: + logger.debug(f"Incomplete JSON for {tool_calls_in_stream[tool_index].get('name', 'unknown')}: {tool_calls_in_stream[tool_index]['arguments'][:100]}...") + + return full_text_response + + def parse_streaming_chunks_with_tools( + self, + stream, + agent_name: str = "Agent", + print_on: bool = True, + verbose: bool = False, + ) -> tuple: + """ + Parse streaming chunks and extract both text and tool calls. + + Args: + stream: The streaming response object + agent_name: Name of the agent for printing + print_on: Whether to print streaming output + verbose: Whether to enable verbose logging + + Returns: + tuple: (full_text_response, tool_calls_list) + """ + full_text_response = "" + tool_calls_in_stream = [] + current_tool_call = None + tool_call_buffer = "" + + if print_on: + print(f"{agent_name}: ", end="", flush=True) + + # Process streaming chunks in real-time + try: + for chunk in stream: + if verbose: + logger.debug(f"Processing streaming chunk: {type(chunk)}") + + # Try Anthropic-style processing first + anthropic_result = self._process_anthropic_chunk( + chunk, current_tool_call, tool_call_buffer, tool_calls_in_stream, print_on, verbose + ) + if anthropic_result[0]: # If text was processed + text_chunk, current_tool_call, tool_call_buffer = anthropic_result + full_text_response += text_chunk + continue + + # If not Anthropic, try OpenAI-style processing + openai_text = self._process_openai_chunk(chunk, tool_calls_in_stream, print_on, verbose) + if openai_text: + full_text_response += openai_text + except Exception as e: + logger.error(f"Error processing streaming chunks: {e}") + if print_on: + print(f"\n[Streaming Error: {e}]") + return full_text_response, tool_calls_in_stream + + if print_on: + print() # Newline after streaming text + + return full_text_response, tool_calls_in_stream + def run( self, task: str, @@ -840,6 +1175,11 @@ class LiteLLM: .message.tool_calls[0] .function.arguments ) + # Standard completion + response = await acompletion(**completion_params) + + print(response) + return response elif self.return_all is True: return response.model_dump() elif "gemini" in self.model_name.lower():