From eff45e88e688af95940e69383a93c7d052f2b55f Mon Sep 17 00:00:00 2001
From: Kye <kye@apacmediasolutions.com>
Date: Fri, 21 Jul 2023 18:54:42 -0400
Subject: [PATCH] clean up

---
 swarms/agents/tools/main.py | 420 ++++++++++++++++++------------------
 1 file changed, 210 insertions(+), 210 deletions(-)

diff --git a/swarms/agents/tools/main.py b/swarms/agents/tools/main.py
index 59a88e1c..5fcb4f16 100644
--- a/swarms/agents/tools/main.py
+++ b/swarms/agents/tools/main.py
@@ -240,236 +240,236 @@ class ToolsFactory:
 
 
 
-##########################################+>  SYS
-import signal
-from typing import Optional, Tuple
-
-from ptrace.debugger import (
-    NewProcessEvent,
-    ProcessExecution,
-    ProcessExit,
-    ProcessSignal,
-    PtraceDebugger,
-    PtraceProcess,
-)
-from ptrace.func_call import FunctionCallOptions
-from ptrace.syscall import PtraceSyscall
-from ptrace.tools import signal_to_exitcode
-
-
-class SyscallTimeoutException(Exception):
-    def __init__(self, pid: int, *args) -> None:
-        super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args)
-
-
-class SyscallTracer:
-    def __init__(self, pid: int):
-        self.debugger: PtraceDebugger = PtraceDebugger()
-        self.pid: int = pid
-        self.process: PtraceProcess = None
-
-    def is_waiting(self, syscall: PtraceSyscall) -> bool:
-        if syscall.name.startswith("wait"):
-            return True
-        return False
-
-    def attach(self):
-        self.process = self.debugger.addProcess(self.pid, False)
-
-    def detach(self):
-        self.process.detach()
-        self.debugger.quit()
-
-    def set_timer(self, timeout: int):
-        def handler(signum, frame):
-            raise SyscallTimeoutException(self.process.pid)
-
-        signal.signal(signal.SIGALRM, handler)
-        signal.alarm(timeout)
-
-    def reset_timer(self):
-        signal.alarm(0)
-
-    def wait_syscall_with_timeout(self, timeout: int):
-        self.set_timer(timeout)
-        self.process.waitSyscall()
-        self.reset_timer()
-
-    def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
-        self.process.syscall()
-        exitcode = None
-        reason = ""
-        while True:
-            if not self.debugger:
-                break
-
-            try:
-                self.wait_syscall_with_timeout(30)
-            except ProcessExit as event:
-                if event.exitcode is not None:
-                    exitcode = event.exitcode
-                continue
-            except ProcessSignal as event:
-                event.process.syscall(event.signum)
-                exitcode = signal_to_exitcode(event.signum)
-                reason = event.reason
-                continue
-            except NewProcessEvent as event:
-                continue
-            except ProcessExecution as event:
-                continue
-            except Exception as e:
-                reason = str(e)
-                break
-
-            syscall = self.process.syscall_state.event(
-                FunctionCallOptions(
-                    write_types=False,
-                    write_argname=False,
-                    string_max_length=300,
-                    replace_socketcall=True,
-                    write_address=False,
-                    max_array_count=20,
-                )
-            )
-
-            self.process.syscall()
-
-            if syscall is None:
-                continue
+# ##########################################+>  SYS
+# import signal
+# from typing import Optional, Tuple
+
+# from ptrace.debugger import (
+#     NewProcessEvent,
+#     ProcessExecution,
+#     ProcessExit,
+#     ProcessSignal,
+#     PtraceDebugger,
+#     PtraceProcess,
+# )
+# from ptrace.func_call import FunctionCallOptions
+# from ptrace.syscall import PtraceSyscall
+# from ptrace.tools import signal_to_exitcode
+
+
+# class SyscallTimeoutException(Exception):
+#     def __init__(self, pid: int, *args) -> None:
+#         super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args)
+
+
+# class SyscallTracer:
+#     def __init__(self, pid: int):
+#         self.debugger: PtraceDebugger = PtraceDebugger()
+#         self.pid: int = pid
+#         self.process: PtraceProcess = None
+
+#     def is_waiting(self, syscall: PtraceSyscall) -> bool:
+#         if syscall.name.startswith("wait"):
+#             return True
+#         return False
+
+#     def attach(self):
+#         self.process = self.debugger.addProcess(self.pid, False)
+
+#     def detach(self):
+#         self.process.detach()
+#         self.debugger.quit()
+
+#     def set_timer(self, timeout: int):
+#         def handler(signum, frame):
+#             raise SyscallTimeoutException(self.process.pid)
+
+#         signal.signal(signal.SIGALRM, handler)
+#         signal.alarm(timeout)
+
+#     def reset_timer(self):
+#         signal.alarm(0)
+
+#     def wait_syscall_with_timeout(self, timeout: int):
+#         self.set_timer(timeout)
+#         self.process.waitSyscall()
+#         self.reset_timer()
+
+#     def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
+#         self.process.syscall()
+#         exitcode = None
+#         reason = ""
+#         while True:
+#             if not self.debugger:
+#                 break
+
+#             try:
+#                 self.wait_syscall_with_timeout(30)
+#             except ProcessExit as event:
+#                 if event.exitcode is not None:
+#                     exitcode = event.exitcode
+#                 continue
+#             except ProcessSignal as event:
+#                 event.process.syscall(event.signum)
+#                 exitcode = signal_to_exitcode(event.signum)
+#                 reason = event.reason
+#                 continue
+#             except NewProcessEvent as event:
+#                 continue
+#             except ProcessExecution as event:
+#                 continue
+#             except Exception as e:
+#                 reason = str(e)
+#                 break
+
+#             syscall = self.process.syscall_state.event(
+#                 FunctionCallOptions(
+#                     write_types=False,
+#                     write_argname=False,
+#                     string_max_length=300,
+#                     replace_socketcall=True,
+#                     write_address=False,
+#                     max_array_count=20,
+#                 )
+#             )
 
-            if syscall.result:
-                continue
+#             self.process.syscall()
 
-        self.reset_timer()
+#             if syscall is None:
+#                 continue
 
-        return exitcode, reason
-    ##########################################+> SYS CALL END
+#             if syscall.result:
+#                 continue
 
+#         self.reset_timer()
 
+#         return exitcode, reason
+#     ##########################################+> SYS CALL END
 
-############### => st dout.py
 
-import os
-import time
-import subprocess
-from datetime import datetime
-from typing import Callable, Literal, Optional, Union, Tuple
 
-PipeType = Union[Literal["stdout"], Literal["stderr"]]
+# ############### => st dout.py
 
+# import os
+# import time
+# import subprocess
+# from datetime import datetime
+# from typing import Callable, Literal, Optional, Union, Tuple
 
-class StdoutTracer:
-    def __init__(
-        self,
-        process: subprocess.Popen,
-        timeout: int = 30,
-        interval: int = 0.1,
-        on_output: Callable[[PipeType, str], None] = lambda: None,
-    ):
-        self.process: subprocess.Popen = process
-        self.timeout: int = timeout
-        self.interval: int = interval
-        self.last_output: datetime = None
-        self.on_output: Callable[[PipeType, str], None] = on_output
-
-    def nonblock(self):
-        os.set_blocking(self.process.stdout.fileno(), False)
-        os.set_blocking(self.process.stderr.fileno(), False)
-
-    def get_output(self, pipe: PipeType) -> str:
-        output = None
-        if pipe == "stdout":
-            output = self.process.stdout.read()
-        elif pipe == "stderr":
-            output = self.process.stderr.read()
-
-        if output:
-            decoded = output.decode()
-            self.on_output(pipe, decoded)
-            self.last_output = datetime.now()
-            return decoded
-        return ""
-
-    def last_output_passed(self, seconds: int) -> bool:
-        return (datetime.now() - self.last_output).seconds > seconds
-
-    def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
-        self.nonblock()
-        self.last_output = datetime.now()
-        output = ""
-        exitcode = None
-        while True:
-            new_stdout = self.get_output("stdout")
-            if new_stdout:
-                output += new_stdout
-
-            new_stderr = self.get_output("stderr")
-            if new_stderr:
-                output += new_stderr
-
-            if self.process.poll() is not None:
-                exitcode = self.process.poll()
-                break
-
-            if self.last_output_passed(self.timeout):
-                self.process.kill()
-                break
+# PipeType = Union[Literal["stdout"], Literal["stderr"]]
 
-            time.sleep(self.interval)
 
-        return (exitcode, output)
+# class StdoutTracer:
+#     def __init__(
+#         self,
+#         process: subprocess.Popen,
+#         timeout: int = 30,
+#         interval: int = 0.1,
+#         on_output: Callable[[PipeType, str], None] = lambda: None,
+#     ):
+#         self.process: subprocess.Popen = process
+#         self.timeout: int = timeout
+#         self.interval: int = interval
+#         self.last_output: datetime = None
+#         self.on_output: Callable[[PipeType, str], None] = on_output
+
+#     def nonblock(self):
+#         os.set_blocking(self.process.stdout.fileno(), False)
+#         os.set_blocking(self.process.stderr.fileno(), False)
+
+#     def get_output(self, pipe: PipeType) -> str:
+#         output = None
+#         if pipe == "stdout":
+#             output = self.process.stdout.read()
+#         elif pipe == "stderr":
+#             output = self.process.stderr.read()
+
+#         if output:
+#             decoded = output.decode()
+#             self.on_output(pipe, decoded)
+#             self.last_output = datetime.now()
+#             return decoded
+#         return ""
+
+#     def last_output_passed(self, seconds: int) -> bool:
+#         return (datetime.now() - self.last_output).seconds > seconds
+
+#     def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
+#         self.nonblock()
+#         self.last_output = datetime.now()
+#         output = ""
+#         exitcode = None
+#         while True:
+#             new_stdout = self.get_output("stdout")
+#             if new_stdout:
+#                 output += new_stdout
+
+#             new_stderr = self.get_output("stderr")
+#             if new_stderr:
+#                 output += new_stderr
+
+#             if self.process.poll() is not None:
+#                 exitcode = self.process.poll()
+#                 break
+
+#             if self.last_output_passed(self.timeout):
+#                 self.process.kill()
+#                 break
+
+#             time.sleep(self.interval)
+
+#         return (exitcode, output)
 
 ################## => stdout end
 
-import os
-import subprocess
-import time
-from datetime import datetime
-from typing import Dict, List
+# import os
+# import subprocess
+# import time
+# from datetime import datetime
+# from typing import Dict, List
 
-from swarms.utils.main import ANSI, Color, Style # test
+# from swarms.utils.main import ANSI, Color, Style # test
 
-class Terminal(BaseToolSet):
-    def __init__(self):
-        self.sessions: Dict[str, List[SyscallTracer]] = {}
+# class Terminal(BaseToolSet):
+#     def __init__(self):
+#         self.sessions: Dict[str, List[SyscallTracer]] = {}
 
-    @tool(
-        name="Terminal",
-        description="Executes commands in a terminal."
-        "If linux errno occurs, we have to solve the problem with the terminal. "
-        "Input must be one valid command. "
-        "Output will be any output from running that command.",
-        scope=ToolScope.SESSION,
-    )
-    def execute(self, commands: str, get_session: SessionGetter) -> str:
-        session, _ = get_session()
+#     @tool(
+#         name="Terminal",
+#         description="Executes commands in a terminal."
+#         "If linux errno occurs, we have to solve the problem with the terminal. "
+#         "Input must be one valid command. "
+#         "Output will be any output from running that command.",
+#         scope=ToolScope.SESSION,
+#     )
+#     def execute(self, commands: str, get_session: SessionGetter) -> str:
+#         session, _ = get_session()
 
-        try:
-            process = subprocess.Popen(
-                commands,
-                shell=True,
-                stdout=subprocess.PIPE,
-                stderr=subprocess.PIPE,
-            )
-            logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ")
-
-            output = ""
-            tracer = StdoutTracer(
-                process,
-                on_output=lambda p, o: logger.info(
-                    ANSI(p).to(Style.dim()) + " " + o.strip("\n")
-                ),
-            )
-            exitcode, output = tracer.wait_until_stop_or_exit()
-        except Exception as e:
-            output = str(e)
+#         try:
+#             process = subprocess.Popen(
+#                 commands,
+#                 shell=True,
+#                 stdout=subprocess.PIPE,
+#                 stderr=subprocess.PIPE,
+#             )
+#             logger.info(ANSI("Realtime Terminal Output").to(Color.magenta()) + ": ")
+
+#             output = ""
+#             tracer = StdoutTracer(
+#                 process,
+#                 on_output=lambda p, o: logger.info(
+#                     ANSI(p).to(Style.dim()) + " " + o.strip("\n")
+#                 ),
+#             )
+#             exitcode, output = tracer.wait_until_stop_or_exit()
+#         except Exception as e:
+#             output = str(e)
 
-        logger.debug(
-            f"\nProcessed Terminal, Input Commands: {commands} "
-            f"Output Answer: {output}"
-        )
-        return output
+#         logger.debug(
+#             f"\nProcessed Terminal, Input Commands: {commands} "
+#             f"Output Answer: {output}"
+#         )
+#         return output
 
 
 # if __name__ == "__main__":