|
|
|
@ -9,87 +9,21 @@ import requests
|
|
|
|
|
import platform
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
|
|
class Device:
|
|
|
|
|
def __init__(self, device_type, device_info):
|
|
|
|
|
self.device_type = device_type
|
|
|
|
|
self.device_info = device_info
|
|
|
|
|
|
|
|
|
|
def get_device_info(self):
|
|
|
|
|
info = f"Device Type: {self.device_type}\n"
|
|
|
|
|
for key, value in self.device_info.items():
|
|
|
|
|
info += f"{key}: {value}\n"
|
|
|
|
|
return info
|
|
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
|
if isinstance(other, Device):
|
|
|
|
|
return self.device_type == other.device_type and self.device_info == other.device_info
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_connected_devices():
|
|
|
|
|
"""
|
|
|
|
|
Get all connected devices on macOS using system_profiler
|
|
|
|
|
"""
|
|
|
|
|
devices = []
|
|
|
|
|
usb_output = subprocess.check_output(['system_profiler', 'SPUSBDataType'])
|
|
|
|
|
network_output = subprocess.check_output(['system_profiler', 'SPNetworkDataType'])
|
|
|
|
|
|
|
|
|
|
usb_lines = usb_output.decode('utf-8').split('\n')
|
|
|
|
|
network_lines = network_output.decode('utf-8').split('\n')
|
|
|
|
|
|
|
|
|
|
device_info = {}
|
|
|
|
|
for line in usb_lines:
|
|
|
|
|
if 'Product ID:' in line or 'Serial Number:' in line or 'Manufacturer:' in line:
|
|
|
|
|
key, value = line.strip().split(':')
|
|
|
|
|
device_info[key.strip()] = value.strip()
|
|
|
|
|
if 'Manufacturer:' in line:
|
|
|
|
|
devices.append(Device('USB', device_info))
|
|
|
|
|
device_info = {}
|
|
|
|
|
|
|
|
|
|
for line in network_lines:
|
|
|
|
|
if 'Type:' in line or 'Hardware:' in line or 'BSD Device Name:' in line:
|
|
|
|
|
key, value = line.strip().split(':')
|
|
|
|
|
device_info[key.strip()] = value.strip()
|
|
|
|
|
if 'BSD Device Name:' in line:
|
|
|
|
|
devices.append(Device('Network', device_info))
|
|
|
|
|
device_info = {}
|
|
|
|
|
|
|
|
|
|
return devices
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_kernel_watch_darwin():
|
|
|
|
|
prev_connected_devices = None
|
|
|
|
|
while True:
|
|
|
|
|
messages_to_send = []
|
|
|
|
|
connected_devices = get_connected_devices()
|
|
|
|
|
if prev_connected_devices is not None:
|
|
|
|
|
for device in connected_devices:
|
|
|
|
|
if device not in prev_connected_devices:
|
|
|
|
|
messages_to_send.append(f'New device connected: {device.get_device_info()}')
|
|
|
|
|
for device in prev_connected_devices:
|
|
|
|
|
if device not in connected_devices:
|
|
|
|
|
messages_to_send.append(f'Device disconnected: {device.get_device_info()}')
|
|
|
|
|
|
|
|
|
|
if messages_to_send:
|
|
|
|
|
requests.post('http://localhost:8000/computer', json = {'messages': messages_to_send})
|
|
|
|
|
prev_connected_devices = connected_devices
|
|
|
|
|
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_dmesg(after):
|
|
|
|
|
def get_kernel_messages():
|
|
|
|
|
"""
|
|
|
|
|
Is this the way to do this?
|
|
|
|
|
"""
|
|
|
|
|
messages = []
|
|
|
|
|
with open('/var/log/dmesg', 'r') as file:
|
|
|
|
|
lines = file.readlines()
|
|
|
|
|
for line in lines:
|
|
|
|
|
timestamp = float(line.split(' ')[0].strip('[]'))
|
|
|
|
|
if timestamp > after:
|
|
|
|
|
messages.append(line)
|
|
|
|
|
return messages
|
|
|
|
|
|
|
|
|
|
current_platform = platform.system()
|
|
|
|
|
|
|
|
|
|
if current_platform == "Darwin":
|
|
|
|
|
process = subprocess.Popen(['syslog'], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
|
|
|
|
|
output, _ = process.communicate()
|
|
|
|
|
return output.decode('utf-8')
|
|
|
|
|
elif current_platform == "Linux":
|
|
|
|
|
with open('/var/log/dmesg', 'r') as file:
|
|
|
|
|
return file.read()
|
|
|
|
|
else:
|
|
|
|
|
print("Unsupported platform.")
|
|
|
|
|
|
|
|
|
|
def custom_filter(message):
|
|
|
|
|
# Check for {TO_INTERPRETER{ message here }TO_INTERPRETER} pattern
|
|
|
|
@ -107,12 +41,12 @@ def custom_filter(message):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_kernel_watch_linux():
|
|
|
|
|
last_timestamp = time.time()
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
last_messages = ""
|
|
|
|
|
while True:
|
|
|
|
|
messages = get_dmesg(after=last_timestamp)
|
|
|
|
|
last_timestamp = time.time()
|
|
|
|
|
messages = get_kernel_messages()
|
|
|
|
|
messages.replace(last_messages, "")
|
|
|
|
|
messages = messages.split("\n")
|
|
|
|
|
|
|
|
|
|
messages_for_core = []
|
|
|
|
|
for message in messages:
|
|
|
|
@ -121,14 +55,7 @@ def run_kernel_watch_linux():
|
|
|
|
|
if messages_for_core:
|
|
|
|
|
port = os.getenv('ASSISTANT_PORT', 8000)
|
|
|
|
|
requests.post(f'http://localhost:{port}/computer', json = {'messages': messages_for_core})
|
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
|
|
|
|
time.sleep(5)
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
current_platform = platform.system()
|
|
|
|
|
if current_platform == "Darwin":
|
|
|
|
|
run_kernel_watch_darwin()
|
|
|
|
|
elif current_platform == "Linux":
|
|
|
|
|
run_kernel_watch_linux()
|
|
|
|
|
else:
|
|
|
|
|
print("Unsupported platform. Exiting.")
|
|
|
|
|
main()
|
|
|
|
|