diff --git a/swarms/tools/prebuilt/code_interpreter.py b/swarms/tools/prebuilt/code_interpreter.py index d26b555e..63b266db 100644 --- a/swarms/tools/prebuilt/code_interpreter.py +++ b/swarms/tools/prebuilt/code_interpreter.py @@ -4,6 +4,7 @@ import threading import time import traceback from swarms.utils.loguru_logger import logger +from swarms.utils.terminal_output import terminal, OutputChunk class SubprocessCodeInterpreter: @@ -39,6 +40,7 @@ class SubprocessCodeInterpreter: self.retry_count = retry_count self.output_queue = queue.Queue() self.done = threading.Event() + self.line_postprocessor = lambda x: x def detect_active_line(self, line): """Detect if the line is an active line @@ -193,34 +195,83 @@ class SubprocessCodeInterpreter: break def handle_stream_output(self, stream, is_error_stream): - """Handle the output from the subprocess + """Handle the output from the subprocess with enhanced formatting Args: - stream (_type_): _description_ - is_error_stream (bool): _description_ + stream (_type_): The stream to read from + is_error_stream (bool): Whether this is an error stream """ for line in iter(stream.readline, ""): if self.debug_mode: - print(f"Received output line:\n{line}\n---") + terminal.status_panel(f"Debug: {line}", "info") line = self.line_postprocessor(line) if line is None: - continue # `line = None` is the postprocessor's signal to discard completely + continue + chunk_type = "error" if is_error_stream else "text" + if self.detect_active_line(line): active_line = self.detect_active_line(line) - self.output_queue.put({"active_line": active_line}) + self.output_queue.put(OutputChunk( + content=f"Active line: {active_line}", + type="info", + metadata={"active_line": active_line} + )) elif self.detect_end_of_execution(line): - self.output_queue.put({"active_line": None}) + self.output_queue.put(OutputChunk( + content="Execution completed", + type="success" + )) time.sleep(0.1) self.done.set() elif is_error_stream and "KeyboardInterrupt" in line: - self.output_queue.put({"output": "KeyboardInterrupt"}) + self.output_queue.put(OutputChunk( + content="KeyboardInterrupt", + type="warning" + )) time.sleep(0.1) self.done.set() else: - self.output_queue.put({"output": line}) + self.output_queue.put(OutputChunk( + content=line, + type=chunk_type + )) + + def run_code(self, code: str) -> str: + """Run code with enhanced output handling""" + try: + process = subprocess.Popen( + code, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # Start output handling threads + stdout_thread = threading.Thread(target=self.handle_stream_output, args=(process.stdout, False)) + stderr_thread = threading.Thread(target=self.handle_stream_output, args=(process.stderr, True)) + stdout_thread.start() + stderr_thread.start() + + # Handle streaming output + terminal.handle_stream(self.output_queue, self.done) + + # Wait for completion + stdout_thread.join() + stderr_thread.join() + process.wait() + + if process.returncode != 0: + terminal.status_panel(f"Process exited with code {process.returncode}", "error") + + return "Code execution completed" + + except Exception as e: + terminal.status_panel(f"Error executing code: {str(e)}", "error") + raise # interpreter = SubprocessCodeInterpreter()