feat: Написан alpha version сервис реализизующий обработку вектора следования за ближайшим человеком в кадре RealSense

main
Artem-Darius Weber 2 months ago
parent 732c4fb7f7
commit 6cfbece237

@ -3,6 +3,7 @@
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/agentic" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/agentic/real-sense-object-detection-and-track-service" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/infrastructure" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />

@ -1,9 +1,6 @@
# producer.Dockerfile
FROM python:3.9
WORKDIR /app
COPY test_show_responses_from_llm.py .
COPY requirements.txt .
COPY AgenticInterfaces ./AgenticInterfaces
COPY . .
RUN pip install -r requirements.txt
ENV PYTHONPATH=/app
CMD ["python", "test_show_responses_from_llm.py"]

@ -1,8 +1,5 @@
FROM python:3.9
WORKDIR /app
COPY llm_worker.py .
COPY requirements.txt .
COPY AgenticInterfaces ./AgenticInterfaces
COPY . .
RUN pip install -r requirements.txt
ENV PYTHONPATH=/app
CMD ["python", "llm_worker.py"]

@ -1,9 +1,6 @@
# producer.Dockerfile
FROM python:3.9
WORKDIR /app
COPY test_send_llm_requests.py .
COPY requirements.txt .
COPY AgenticInterfaces ./AgenticInterfaces
COPY . .
RUN pip install -r requirements.txt
ENV PYTHONPATH=/app
CMD ["python", "test_send_llm_requests.py"]

@ -0,0 +1,86 @@
import numpy as np
import cv2
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from Interfaces.RealSense import RealSenseController
from Interfaces.CameraProcessor import CameraProcessor
from ultralytics import YOLO
from datetime import datetime
class TargetFollower:
def __init__(self, detections, annotated_image, camera_position=np.array([0, 0, 0])):
self.detections = detections
self.camera_position = camera_position
self.target = None
# Сохраняем аннотированное изображение
output_path = './annotated_image.jpg'
cv2.imwrite(output_path, annotated_image)
print(f"Annotated image saved to {output_path}")
print("Detections info:")
for info in detections:
print(info)
def select_target(self, class_name, nearest=False):
"""
Выбирает объект по указанному имени класса.
Если nearest=True, выбирается ближайший объект по значению mean_depth.
:param class_name: Имя класса, который нужно выбрать.
:param nearest: Если True, выбирает ближайший объект с данным классом.
:return: Найденный объект или None, если объект не найден.
"""
if nearest:
closest_detection = None
min_depth = float('inf')
for detection in self.detections:
if detection['class_name'] == class_name:
if detection['mean_depth'] < min_depth:
min_depth = detection['mean_depth']
closest_detection = detection
self.target = closest_detection
if self.target is not None:
print(f"Closest target selected: {self.target}")
else:
print(f"Target with class name '{class_name}' not found.")
return self.target
else:
for detection in self.detections:
if detection['class_name'] == class_name:
self.target = detection
print(f"Target selected: {self.target}")
return self.target
print(f"Target with class name '{class_name}' not found.")
return None
def calculate_follow_vector(self):
if self.target is None:
print("Target not selected.")
return None
bbox = self.target['bbox']
target_center_x = (bbox[0] + bbox[2]) / 2
target_center_y = (bbox[1] + bbox[3]) / 2
target_depth = self.target['mean_depth']
target_position = np.array([target_center_x, target_center_y, target_depth])
follow_vector = target_position - self.camera_position
return follow_vector
def get_target_position(self):
if self.target is None:
print("Target not selected.")
return None
bbox = self.target['bbox']
target_center_x = (bbox[0] + bbox[2]) / 2
target_center_y = (bbox[1] + bbox[3]) / 2
target_depth = self.target['mean_depth']
target_position = np.array([target_center_x, target_center_y, target_depth])
return target_position

@ -0,0 +1,78 @@
import numpy as np
import cv2
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from Interfaces.RealSense import RealSenseController
from Interfaces.CameraProcessor import CameraProcessor
from Algorithms.TargetFollower import TargetFollower
from ultralytics import YOLO
from datetime import datetime
class Visualization3D:
def __init__(self, camera_position):
self.camera_position = camera_position
def plot_scene(self, detections_info, follow_vector=None):
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for detection in detections_info:
x1, y1, x2, y2 = detection['bbox']
mean_depth = detection['mean_depth']
class_name = detection['class_name']
width = x2 - x1
height = y2 - y1
depth = 0.05
box_points = [
[x1, y1, mean_depth],
[x2, y1, mean_depth],
[x2, y2, mean_depth],
[x1, y2, mean_depth],
[x1, y1, mean_depth + depth],
[x2, y1, mean_depth + depth],
[x2, y2, mean_depth + depth],
[x1, y2, mean_depth + depth]
]
faces = [
[box_points[0], box_points[1], box_points[5], box_points[4]],
[box_points[3], box_points[2], box_points[6], box_points[7]],
[box_points[0], box_points[3], box_points[7], box_points[4]],
[box_points[1], box_points[2], box_points[6], box_points[5]],
[box_points[0], box_points[1], box_points[2], box_points[3]],
[box_points[4], box_points[5], box_points[6], box_points[7]]
]
box = Poly3DCollection(faces, facecolors='cyan', linewidths=1, edgecolors='r', alpha=0.25)
ax.add_collection3d(box)
ax.text((x1 + x2) / 2, (y1 + y2) / 2, mean_depth + depth, f'{class_name} {mean_depth:.2f}m',
color='blue', fontsize=8)
if follow_vector is not None:
target_position = self.camera_position + follow_vector
ax.quiver(
self.camera_position[0], self.camera_position[1], self.camera_position[2],
follow_vector[0], follow_vector[1], follow_vector[2],
color='red', arrow_length_ratio=0.1
)
ax.text(target_position[0], target_position[1], target_position[2],
"Target", color='red', fontsize=10)
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Depth (m)")
ax.set_title("3D Scene with Follow Vector")
all_x = [d['bbox'][0] for d in detections_info] + [d['bbox'][2] for d in detections_info]
all_y = [d['bbox'][1] for d in detections_info] + [d['bbox'][3] for d in detections_info]
all_z = [d['mean_depth'] for d in detections_info] + [d['mean_depth'] + 0.05 for d in detections_info]
ax.set_xlim(min(all_x) - 50, max(all_x) + 50)
ax.set_ylim(min(all_y) - 50, max(all_y) + 50)
ax.set_zlim(min(all_z) - 0.1, max(all_z) + 0.1)
plt.show()

@ -0,0 +1,13 @@
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "main.py"]

@ -0,0 +1,105 @@
from Neko.RealSense import RealSenseController
import cv2
from ultralytics import YOLO
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d.art3d import Poly3DCollection
from datetime import datetime
class CameraProcessor:
def __init__(self, model_path='./models/yolov9e_object_classification.pt'):
self.controller = RealSenseController()
self.model = YOLO(model_path)
def inline_detection(self):
depth_matrix = self.controller.acquisition.get_depth_image()
color_matrix = self.controller.acquisition.get_color_image()
if depth_matrix is None or color_matrix is None:
print("Не удалось получить изображения. Проверьте подключение камеры.")
return [], None
results = self.model(color_matrix)
annotated_image = results[0].plot()
detections_info = []
for detection in results[0].boxes:
x1, y1, x2, y2 = map(int, detection.xyxy[0])
class_id = int(detection.cls[0])
class_name = self.model.names[class_id]
depth_values = depth_matrix[y1:y2, x1:x2]
mean_depth = np.mean(depth_values)
detections_info.append({
'class_id': class_id,
'class_name': class_name,
'bbox': [x1, y1, x2, y2],
'mean_depth': float(mean_depth)
})
cv2.putText(annotated_image, f'{class_name} {mean_depth:.2f}m', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
return detections_info, annotated_image
def plot_3d_bounding_boxes(self, detections_info):
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
for detection in detections_info:
x1, y1, x2, y2 = detection['bbox']
mean_depth = detection['mean_depth']
class_name = detection['class_name']
width = x2 - x1
height = y2 - y1
depth = 0.05
box_points = [
[x1, y1, mean_depth],
[x2, y1, mean_depth],
[x2, y2, mean_depth],
[x1, y2, mean_depth],
[x1, y1, mean_depth + depth],
[x2, y1, mean_depth + depth],
[x2, y2, mean_depth + depth],
[x1, y2, mean_depth + depth]
]
faces = [
[box_points[0], box_points[1], box_points[5], box_points[4]],
[box_points[3], box_points[2], box_points[6], box_points[7]],
[box_points[0], box_points[3], box_points[7], box_points[4]],
[box_points[1], box_points[2], box_points[6], box_points[5]],
[box_points[0], box_points[1], box_points[2], box_points[3]],
[box_points[4], box_points[5], box_points[6], box_points[7]]
]
box = Poly3DCollection(faces, facecolors='cyan', linewidths=1, edgecolors='r', alpha=0.25)
ax.add_collection3d(box)
ax.text((x1 + x2) / 2, (y1 + y2) / 2, mean_depth + depth, f'{class_name} {mean_depth:.2f}m',
color='blue', fontsize=8)
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Depth (m)")
ax.set_title("3D Bounding Boxes with Depth Information")
all_x = [d['bbox'][0] for d in detections_info] + [d['bbox'][2] for d in detections_info]
all_y = [d['bbox'][1] for d in detections_info] + [d['bbox'][3] for d in detections_info]
all_z = [d['mean_depth'] for d in detections_info] + [d['mean_depth'] + 0.05 for d in detections_info]
ax.set_xlim(min(all_x) - 50, max(all_x) + 50)
ax.set_ylim(min(all_y) - 50, max(all_y) + 50)
ax.set_zlim(min(all_z) - 50, max(all_z) + 50)
plt.show()
def test_time_delta(self):
start_time = datetime.now()
detections_info, _ = self.inline_detection()
end_time = datetime.now()
delta_time = end_time - start_time
print(f"Время выполнения: {delta_time}")

@ -0,0 +1,45 @@
import cv2
from ultralytics import YOLO
import numpy as np
from datetime import datetime
class DataProcessor:
def __init__(self, model_path='./models/yolov9e_object_classification.pt'):
self.model = YOLO(model_path)
def __decode_image_from_base64(self, data, color=True):
decoded = base64.b64decode(data)
nparr = np.frombuffer(decoded, np.uint8)
return cv2.imdecode(nparr, cv2.IMREAD_COLOR if color else cv2.IMREAD_GRAYSCALE)
def inline_detection(self, data):
color_matrix = self.__decode_image_from_base64(data['rgb'], color=True)
depth_matrix = self.__decode_image_from_base64(data['depth'], color=False)
if depth_matrix is None or color_matrix is None:
print("Не удалось получить изображения. Проверьте подключение камеры.")
return [], None
results = self.model(color_matrix)
annotated_image = results[0].plot()
detections_info = []
for detection in results[0].boxes:
x1, y1, x2, y2 = map(int, detection.xyxy[0])
class_id = int(detection.cls[0])
class_name = self.model.names[class_id]
depth_values = depth_matrix[y1:y2, x1:x2]
mean_depth = np.mean(depth_values)
detections_info.append({
'class_id': class_id,
'class_name': class_name,
'bbox': [x1, y1, x2, y2],
'mean_depth': float(mean_depth)
})
cv2.putText(annotated_image, f'{class_name} {mean_depth:.2f}m', (x1, y1 - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.rectangle(annotated_image, (x1, y1), (x2, y2), (0, 255, 0), 2)
return detections_info, annotated_image

@ -0,0 +1,88 @@
import pyrealsense2 as rs
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import cv2
# RS_WIDTH = 1280
# RS_HEIGHT = 720
RS_WIDTH = 640
RS_HEIGHT = 480
class RealSenseInterface:
def noCameraFound(self):
print("RealSense camera not found. Check the connection.")
class ImageAcquisition(RealSenseInterface):
def __init__(self):
try:
self.__pipeline = rs.pipeline()
config = rs.config()
config.enable_stream(rs.stream.depth, RS_WIDTH, RS_HEIGHT, rs.format.z16, 30)
config.enable_stream(rs.stream.color, RS_WIDTH, RS_HEIGHT, rs.format.bgr8, 30)
self.__pipeline.start(config)
except:
self.noCameraFound()
exit()
def get_depth_image(self):
frames = self.__pipeline.wait_for_frames()
depth_frame = frames.get_depth_frame()
return np.asanyarray(depth_frame.get_data()) if depth_frame else None
def get_color_image(self):
frames = self.__pipeline.wait_for_frames()
color_frame = frames.get_color_frame()
return np.asanyarray(color_frame.get_data()) if color_frame else None
def stop(self):
self.__pipeline.stop()
class ImageDisplay(RealSenseInterface):
def display_images(self, depth_image, color_image):
fig, (ax1, ax2) = plt.subplots(1, 2)
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
ax1.imshow(depth_colormap, cmap='jet')
ax2.imshow(color_image)
plt.show()
class ImageSaver(RealSenseInterface):
def save_image(self, image, filename):
cv2.imwrite(filename, image)
class RealSenseController:
def __init__(self):
self.acquisition = ImageAcquisition()
self.display = ImageDisplay()
self.saver = ImageSaver()
def start(self):
print("RealSense 3D camera detected.")
fig, (ax1, ax2) = plt.subplots(1, 2)
depth_im = ax1.imshow(np.zeros((RS_HEIGHT, RS_WIDTH, 3)), cmap='jet')
color_im = ax2.imshow(np.zeros((RS_HEIGHT, RS_WIDTH, 3)), cmap='jet')
def update(frame):
depth_image = self.acquisition.get_depth_image()
color_image = self.acquisition.get_color_image()
if depth_image is not None and color_image is not None:
depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
depth_im.set_array(depth_colormap)
color_im.set_array(color_image)
self.saver.save_image(color_image, "color_image.png")
ani = FuncAnimation(fig, update, blit=False, cache_frame_data=False)
fig.canvas.manager.window.wm_geometry("+0+0")
plt.show()
self.acquisition.stop()

@ -0,0 +1,12 @@
# Object detection and track service
Этот сервис также регестрирует объекты в Kafka для работы ассоциативной памяти
### Input data:
```json
{
"rgb": "<base64_encoded_rgb_image>",
"depth": "<base64_encoded_depth_image>"
}
```

@ -0,0 +1,12 @@
version: '3.8'
services:
robot-rs-object-target-detection:
build: .
container_name: robot_realsense_object_target_detection
ports:
- "7780:5000"
environment:
- FLASK_ENV=development
volumes:
- .:/app

@ -0,0 +1,47 @@
from flask import Flask, request, jsonify
from flask_cors import CORS
from Interfaces.CameraProcessor import CameraProcessor
from Algorithms.TargetFollower import TargetFollower
from Interfaces.DataProcessor import DataProcessor
import numpy as np
import cv2
import base64
app = Flask(__name__)
CORS(app)
camera_processor = CameraProcessor()
data_processor = DataProcessor()
@app.route('/follow-vector', methods=['POST'])
def follow_vector():
try:
data = request.json
detections_info, annotated_image = data_processor.inline_detection(data)
if not detections_info and annotated_image is None:
return jsonify({'error': 'No detections found.'}), 400
# Указываем параметры камеры
image_height, image_width, _ = annotated_image.shape
camera_position = np.array([image_width / 2, image_height, 0])
# todo: add detections_info to the kafka topic
# Инициализируем TargetFollower и рассчитываем target вектор
follower = TargetFollower(detections_info, annotated_image, camera_position=camera_position)
selected_target = follower.select_target('person', nearest=True)
follow_vector = follower.calculate_follow_vector()
return jsonify({
'detections_info': detections_info,
'follow_vector': follow_vector.tolist()
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True)

@ -0,0 +1,6 @@
flask
flask-cors
opencv-python
ultralytics
matplotlib
numpy
Loading…
Cancel
Save