Former-commit-id: eff45e88e6
huggingface
Kye 2 years ago
parent 122fd955b0
commit f5c10eb36c

@ -240,236 +240,236 @@ class ToolsFactory:
##########################################+> SYS
import signal
from typing import Optional, Tuple
from ptrace.debugger import (
NewProcessEvent,
ProcessExecution,
ProcessExit,
ProcessSignal,
PtraceDebugger,
PtraceProcess,
)
from ptrace.func_call import FunctionCallOptions
from ptrace.syscall import PtraceSyscall
from ptrace.tools import signal_to_exitcode
class SyscallTimeoutException(Exception):
def __init__(self, pid: int, *args) -> None:
super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args)
class SyscallTracer:
def __init__(self, pid: int):
self.debugger: PtraceDebugger = PtraceDebugger()
self.pid: int = pid
self.process: PtraceProcess = None
def is_waiting(self, syscall: PtraceSyscall) -> bool:
if syscall.name.startswith("wait"):
return True
return False
def attach(self):
self.process = self.debugger.addProcess(self.pid, False)
def detach(self):
self.process.detach()
self.debugger.quit()
def set_timer(self, timeout: int):
def handler(signum, frame):
raise SyscallTimeoutException(self.process.pid)
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout)
def reset_timer(self):
signal.alarm(0)
def wait_syscall_with_timeout(self, timeout: int):
self.set_timer(timeout)
self.process.waitSyscall()
self.reset_timer()
def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
self.process.syscall()
exitcode = None
reason = ""
while True:
if not self.debugger:
break
try:
self.wait_syscall_with_timeout(30)
except ProcessExit as event:
if event.exitcode is not None:
exitcode = event.exitcode
continue
except ProcessSignal as event:
event.process.syscall(event.signum)
exitcode = signal_to_exitcode(event.signum)
reason = event.reason
continue
except NewProcessEvent as event:
continue
except ProcessExecution as event:
continue
except Exception as e:
reason = str(e)
break
syscall = self.process.syscall_state.event(
FunctionCallOptions(
write_types=False,
write_argname=False,
string_max_length=300,
replace_socketcall=True,
write_address=False,
max_array_count=20,
)
)
self.process.syscall()
if syscall is None:
continue
# ##########################################+> SYS
# import signal
# from typing import Optional, Tuple
# from ptrace.debugger import (
# NewProcessEvent,
# ProcessExecution,
# ProcessExit,
# ProcessSignal,
# PtraceDebugger,
# PtraceProcess,
# )
# from ptrace.func_call import FunctionCallOptions
# from ptrace.syscall import PtraceSyscall
# from ptrace.tools import signal_to_exitcode
# class SyscallTimeoutException(Exception):
# def __init__(self, pid: int, *args) -> None:
# super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args)
# class SyscallTracer:
# def __init__(self, pid: int):
# self.debugger: PtraceDebugger = PtraceDebugger()
# self.pid: int = pid
# self.process: PtraceProcess = None
# def is_waiting(self, syscall: PtraceSyscall) -> bool:
# if syscall.name.startswith("wait"):
# return True
# return False
# def attach(self):
# self.process = self.debugger.addProcess(self.pid, False)
# def detach(self):
# self.process.detach()
# self.debugger.quit()
# def set_timer(self, timeout: int):
# def handler(signum, frame):
# raise SyscallTimeoutException(self.process.pid)
# signal.signal(signal.SIGALRM, handler)
# signal.alarm(timeout)
# def reset_timer(self):
# signal.alarm(0)
# def wait_syscall_with_timeout(self, timeout: int):
# self.set_timer(timeout)
# self.process.waitSyscall()
# self.reset_timer()
# def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
# self.process.syscall()
# exitcode = None
# reason = ""
# while True:
# if not self.debugger:
# break
# try:
# self.wait_syscall_with_timeout(30)
# except ProcessExit as event:
# if event.exitcode is not None:
# exitcode = event.exitcode
# continue
# except ProcessSignal as event:
# event.process.syscall(event.signum)
# exitcode = signal_to_exitcode(event.signum)
# reason = event.reason
# continue
# except NewProcessEvent as event:
# continue
# except ProcessExecution as event:
# continue
# except Exception as e:
# reason = str(e)
# break
# syscall = self.process.syscall_state.event(
# FunctionCallOptions(
# write_types=False,
# write_argname=False,
# string_max_length=300,
# replace_socketcall=True,
# write_address=False,
# max_array_count=20,
# )
# )
if syscall.result:
continue
# self.process.syscall()
self.reset_timer()
# if syscall is None:
# continue
return exitcode, reason
##########################################+> SYS CALL END
# if syscall.result:
# continue
# self.reset_timer()
# return exitcode, reason
# ##########################################+> SYS CALL END
############### => st dout.py
import os
import time
import subprocess
from datetime import datetime
from typing import Callable, Literal, Optional, Union, Tuple
PipeType = Union[Literal["stdout"], Literal["stderr"]]
# ############### => st dout.py
# import os
# import time
# import subprocess
# from datetime import datetime
# from typing import Callable, Literal, Optional, Union, Tuple
class StdoutTracer:
def __init__(
self,
process: subprocess.Popen,
timeout: int = 30,
interval: int = 0.1,
on_output: Callable[[PipeType, str], None] = lambda: None,
):
self.process: subprocess.Popen = process
self.timeout: int = timeout
self.interval: int = interval
self.last_output: datetime = None
self.on_output: Callable[[PipeType, str], None] = on_output
def nonblock(self):
os.set_blocking(self.process.stdout.fileno(), False)
os.set_blocking(self.process.stderr.fileno(), False)
def get_output(self, pipe: PipeType) -> str:
output = None
if pipe == "stdout":
output = self.process.stdout.read()
elif pipe == "stderr":
output = self.process.stderr.read()
if output:
decoded = output.decode()
self.on_output(pipe, decoded)
self.last_output = datetime.now()
return decoded
return ""
def last_output_passed(self, seconds: int) -> bool:
return (datetime.now() - self.last_output).seconds > seconds
def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
self.nonblock()
self.last_output = datetime.now()
output = ""
exitcode = None
while True:
new_stdout = self.get_output("stdout")
if new_stdout:
output += new_stdout
new_stderr = self.get_output("stderr")
if new_stderr:
output += new_stderr
if self.process.poll() is not None:
exitcode = self.process.poll()
break
if self.last_output_passed(self.timeout):
self.process.kill()
break
# PipeType = Union[Literal["stdout"], Literal["stderr"]]
time.sleep(self.interval)
return (exitcode, output)
# class StdoutTracer:
# def __init__(
# self,
# process: subprocess.Popen,
# timeout: int = 30,
# interval: int = 0.1,
# on_output: Callable[[PipeType, str], None] = lambda: None,
# ):
# self.process: subprocess.Popen = process
# self.timeout: int = timeout
# self.interval: int = interval
# self.last_output: datetime = None
# self.on_output: Callable[[PipeType, str], None] = on_output
# def nonblock(self):
# os.set_blocking(self.process.stdout.fileno(), False)
# os.set_blocking(self.process.stderr.fileno(), False)
# def get_output(self, pipe: PipeType) -> str:
# output = None
# if pipe == "stdout":
# output = self.process.stdout.read()
# elif pipe == "stderr":
# output = self.process.stderr.read()
# if output:
# decoded = output.decode()
# self.on_output(pipe, decoded)
# self.last_output = datetime.now()
# return decoded
# return ""
# def last_output_passed(self, seconds: int) -> bool:
# return (datetime.now() - self.last_output).seconds > seconds
# def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
# self.nonblock()
# self.last_output = datetime.now()
# output = ""
# exitcode = None
# while True:
# new_stdout = self.get_output("stdout")
# if new_stdout:
# output += new_stdout
# new_stderr = self.get_output("stderr")
# if new_stderr:
# output += new_stderr
# if self.process.poll() is not None:
# exitcode = self.process.poll()
# break
# if self.last_output_passed(self.timeout):
# self.process.kill()
# break
# time.sleep(self.interval)
# return (exitcode, output)
################## => stdout end
import os
import subprocess
import time
from datetime import datetime
from typing import Dict, List
# import os
# import subprocess
# import time
# from datetime import datetime
# from typing import Dict, List
from swarms.utils.main import ANSI, Color, Style # test
# from swarms.utils.main import ANSI, Color, Style # test
class Terminal(BaseToolSet):
def __init__(self):
self.sessions: Dict[str, List[SyscallTracer]] = {}
# class Terminal(BaseToolSet):
# def __init__(self):
# self.sessions: Dict[str, List[SyscallTracer]] = {}
@tool(
name="Terminal",
description="Executes commands in a terminal."
"If linux errno occurs, we have to solve the problem with the terminal. "
"Input must be one valid command. "
"Output will be any output from running that command.",
scope=ToolScope.SESSION,
)
def execute(self, commands: str, get_session: SessionGetter) -> str:
session, _ = get_session()
# @tool(
# name="Terminal",
# description="Executes commands in a terminal."
# "If linux errno occurs, we have to solve the problem with the terminal. "
# "Input must be one valid command. "
# "Output will be any output from running that command.",
# scope=ToolScope.SESSION,
# )
# def execute(self, commands: str, get_session: SessionGetter) -> str:
# session, _ = get_session()
try:
process = subprocess.Popen(
commands,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ")
output = ""
tracer = StdoutTracer(
process,
on_output=lambda p, o: logger.info(
ANSI(p).to(Style.dim()) + " " + o.strip("\n")
),
)
exitcode, output = tracer.wait_until_stop_or_exit()
except Exception as e:
output = str(e)
# try:
# process = subprocess.Popen(
# commands,
# shell=True,
# stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
# )
# logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ")
# output = ""
# tracer = StdoutTracer(
# process,
# on_output=lambda p, o: logger.info(
# ANSI(p).to(Style.dim()) + " " + o.strip("\n")
# ),
# )
# exitcode, output = tracer.wait_until_stop_or_exit()
# except Exception as e:
# output = str(e)
logger.debug(
f"\nProcessed Terminal, Input Commands: {commands} "
f"Output Answer: {output}"
)
return output
# logger.debug(
# f"\nProcessed Terminal, Input Commands: {commands} "
# f"Output Answer: {output}"
# )
# return output
# if __name__ == "__main__":

Loading…
Cancel
Save