code interpreter update

pull/663/head
Occupying-Mars 2 months ago
parent b5b86e347f
commit 2e5ad057e0

@ -4,6 +4,7 @@ import threading
import time
import traceback
from swarms.utils.loguru_logger import logger
from swarms.utils.terminal_output import terminal, OutputChunk
class SubprocessCodeInterpreter:
@ -39,6 +40,7 @@ class SubprocessCodeInterpreter:
self.retry_count = retry_count
self.output_queue = queue.Queue()
self.done = threading.Event()
self.line_postprocessor = lambda x: x
def detect_active_line(self, line):
"""Detect if the line is an active line
@ -193,34 +195,83 @@ class SubprocessCodeInterpreter:
break
def handle_stream_output(self, stream, is_error_stream):
"""Handle the output from the subprocess
"""Handle the output from the subprocess with enhanced formatting
Args:
stream (_type_): _description_
is_error_stream (bool): _description_
stream (_type_): The stream to read from
is_error_stream (bool): Whether this is an error stream
"""
for line in iter(stream.readline, ""):
if self.debug_mode:
print(f"Received output line:\n{line}\n---")
terminal.status_panel(f"Debug: {line}", "info")
line = self.line_postprocessor(line)
if line is None:
continue # `line = None` is the postprocessor's signal to discard completely
continue
chunk_type = "error" if is_error_stream else "text"
if self.detect_active_line(line):
active_line = self.detect_active_line(line)
self.output_queue.put({"active_line": active_line})
self.output_queue.put(OutputChunk(
content=f"Active line: {active_line}",
type="info",
metadata={"active_line": active_line}
))
elif self.detect_end_of_execution(line):
self.output_queue.put({"active_line": None})
self.output_queue.put(OutputChunk(
content="Execution completed",
type="success"
))
time.sleep(0.1)
self.done.set()
elif is_error_stream and "KeyboardInterrupt" in line:
self.output_queue.put({"output": "KeyboardInterrupt"})
self.output_queue.put(OutputChunk(
content="KeyboardInterrupt",
type="warning"
))
time.sleep(0.1)
self.done.set()
else:
self.output_queue.put({"output": line})
self.output_queue.put(OutputChunk(
content=line,
type=chunk_type
))
def run_code(self, code: str) -> str:
"""Run code with enhanced output handling"""
try:
process = subprocess.Popen(
code,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# Start output handling threads
stdout_thread = threading.Thread(target=self.handle_stream_output, args=(process.stdout, False))
stderr_thread = threading.Thread(target=self.handle_stream_output, args=(process.stderr, True))
stdout_thread.start()
stderr_thread.start()
# Handle streaming output
terminal.handle_stream(self.output_queue, self.done)
# Wait for completion
stdout_thread.join()
stderr_thread.join()
process.wait()
if process.returncode != 0:
terminal.status_panel(f"Process exited with code {process.returncode}", "error")
return "Code execution completed"
except Exception as e:
terminal.status_panel(f"Error executing code: {str(e)}", "error")
raise
# interpreter = SubprocessCodeInterpreter()

Loading…
Cancel
Save