You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
129 lines
3.8 KiB
129 lines
3.8 KiB
from dotenv import load_dotenv
|
|
|
|
load_dotenv() # take environment variables from .env.
|
|
|
|
import asyncio
|
|
import subprocess
|
|
import platform
|
|
import os
|
|
import shutil
|
|
|
|
from .logs import setup_logging
|
|
from .logs import logger
|
|
|
|
setup_logging()
|
|
|
|
# dmesg process created at boot time
|
|
dmesg_proc = None
|
|
|
|
|
|
def get_kernel_messages():
|
|
"""
|
|
Is this the way to do this?
|
|
"""
|
|
current_platform = platform.system()
|
|
|
|
if current_platform == "Darwin":
|
|
process = subprocess.Popen(
|
|
["syslog"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
|
|
)
|
|
output, _ = process.communicate()
|
|
return output.decode("utf-8")
|
|
elif current_platform == "Linux":
|
|
log_path = get_dmesg_log_path()
|
|
with open(log_path, 'r') as file:
|
|
return file.read()
|
|
else:
|
|
logger.info("Unsupported platform.")
|
|
|
|
|
|
def get_dmesg_log_path():
|
|
"""
|
|
Check for the existence of a readable dmesg log file and return its path.
|
|
Create an accessible path if not found.
|
|
"""
|
|
if os.access('/var/log/dmesg', os.F_OK | os.R_OK):
|
|
return '/var/log/dmesg'
|
|
|
|
global dmesg_proc
|
|
dmesg_log_path = '/tmp/dmesg'
|
|
if dmesg_proc:
|
|
return dmesg_log_path
|
|
|
|
logger.info("Created /tmp/dmesg.")
|
|
subprocess.run(['touch', dmesg_log_path])
|
|
dmesg_path = shutil.which('dmesg')
|
|
if dmesg_path:
|
|
logger.info(f"Writing to {dmesg_log_path} from dmesg.")
|
|
dmesg_proc = subprocess.Popen([dmesg_path, '--follow'], text=True, stdout=subprocess.PIPE)
|
|
subprocess.Popen(['tee', dmesg_log_path], text=True, stdin=dmesg_proc.stdout, stdout=subprocess.DEVNULL)
|
|
|
|
return dmesg_log_path
|
|
|
|
|
|
def custom_filter(message):
|
|
# Check for {TO_INTERPRETER{ message here }TO_INTERPRETER} pattern
|
|
if "{TO_INTERPRETER{" in message and "}TO_INTERPRETER}" in message:
|
|
start = message.find("{TO_INTERPRETER{") + len("{TO_INTERPRETER{")
|
|
end = message.find("}TO_INTERPRETER}", start)
|
|
return message[start:end]
|
|
# Check for USB mention
|
|
# elif 'USB' in message:
|
|
# return message
|
|
# # Check for network related keywords
|
|
# elif any(keyword in message for keyword in ['network', 'IP', 'internet', 'LAN', 'WAN', 'router', 'switch']) and "networkStatusForFlags" not in message:
|
|
|
|
# return message
|
|
else:
|
|
return None
|
|
|
|
|
|
last_messages = ""
|
|
|
|
|
|
def check_filtered_kernel():
|
|
messages = get_kernel_messages()
|
|
if messages is None:
|
|
return "" # Handle unsupported platform or error in fetching kernel messages
|
|
|
|
global last_messages
|
|
messages.replace(last_messages, "")
|
|
messages = messages.split("\n")
|
|
|
|
filtered_messages = []
|
|
for message in messages:
|
|
if custom_filter(message):
|
|
filtered_messages.append(message)
|
|
|
|
return "\n".join(filtered_messages)
|
|
|
|
|
|
async def put_kernel_messages_into_queue(queue):
|
|
while True:
|
|
text = check_filtered_kernel()
|
|
if text:
|
|
if isinstance(queue, asyncio.Queue):
|
|
await queue.put({"role": "computer", "type": "console", "start": True})
|
|
await queue.put(
|
|
{
|
|
"role": "computer",
|
|
"type": "console",
|
|
"format": "output",
|
|
"content": text,
|
|
}
|
|
)
|
|
await queue.put({"role": "computer", "type": "console", "end": True})
|
|
else:
|
|
queue.put({"role": "computer", "type": "console", "start": True})
|
|
queue.put(
|
|
{
|
|
"role": "computer",
|
|
"type": "console",
|
|
"format": "output",
|
|
"content": text,
|
|
}
|
|
)
|
|
queue.put({"role": "computer", "type": "console", "end": True})
|
|
|
|
await asyncio.sleep(5)
|