Artem-Darius Weber 1 month ago
commit ea013ce71a

@ -0,0 +1,14 @@
Build:
```bash
docker-compose -f ros2/docker/docker-compose.yml build
```
Up:
```bash
docker-compose -f ros2/docker/docker-compose.yml up
```

@ -0,0 +1,23 @@
# Используем официальный образ ROS 2 Humble
FROM osrf/ros:humble-desktop
# Установка необходимых инструментов
RUN apt-get update && apt-get install -y \
python3-colcon-common-extensions \
python3-pip \
&& apt-get clean
# Создание рабочей директории
WORKDIR /ros2_ws
# Копируем рабочее пространство в контейнер
COPY ros2_ws /ros2_ws
# Устанавливаем зависимости Python
RUN pip3 install -r /ros2_ws/src/my_python_pkg/requirements.txt
# Сборка рабочего пространства
RUN /bin/bash -c "source /opt/ros/humble/setup.bash && colcon build"
# Настройка контейнера для запуска ROS 2
CMD ["/bin/bash", "-c", "source /opt/ros/humble/setup.bash && source install/setup.bash && bash"]

@ -0,0 +1,12 @@
<?xml version="1.0"?>
<package format="3">
<name>my_python_pkg</name>
<version>0.0.1</version>
<description>Пример Python-пакета для ROS 2</description>
<maintainer email="your.email@example.com">Ваше Имя</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
</package>

@ -0,0 +1,23 @@
# /ros2_ws/src/my_python_pkg/setup.py
from setuptools import setup
package_name = 'my_python_pkg'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Ваше Имя',
maintainer_email='your.email@example.com',
description='Пример Python-пакета для ROS 2',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'publisher = my_python_pkg.publisher:main',
'subscriber = my_python_pkg.subscriber:main',
],
},
)

@ -0,0 +1,118 @@
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
class RobotGripper:
def __init__(self, ip, port=502):
"""
Инициализация класса управления захватом робота.
:param ip: IP-адрес устройства
:param port: Порт Modbus TCP (по умолчанию 502)
"""
self.ip = ip
self.port = port
self.client = ModbusTcpClient(ip, port)
self.connected = False
def connect(self):
"""Подключение к захвату"""
try:
self.client.connect()
self.connected = True
except ModbusException as e:
print(f"Ошибка подключения: {e}")
def disconnect(self):
"""Отключение от захвата"""
if self.connected:
self.client.close()
self.connected = False
def initialize(self, full=False):
"""
Инициализация захвата.
:param full: Если True, выполнить полную инициализацию
:return: Ответ устройства
"""
if not self.connected:
raise ModbusException("Нет подключения к захвату.")
value = 0xA5 if full else 0x01
address = 0x0100
response = self.client.write_register(address, value, slave=1)
return response
def set_force(self, force):
"""
Установка силы захвата.
:param force: Значение силы (20-100%)
:return: Ответ устройства
"""
if not 20 <= force <= 100:
raise ValueError("Сила захвата должна быть в диапазоне 20-100%.")
if not self.connected:
raise ModbusException("Нет подключения к захвату.")
address = 0x0101
response = self.client.write_register(address, force, slave=1)
return response
def set_position(self, position):
"""
Установка позиции захвата.
:param position: Значение позиции (0-1000)
:return: Ответ устройства
"""
if not 0 <= position <= 1000:
raise ValueError("Позиция должна быть в диапазоне 0-1000.")
if not self.connected:
raise ModbusException("Нет подключения к захвату.")
address = 0x0103
response = self.client.write_register(address, position, slave=1)
return response
def read_register(self, address):
"""
Чтение значения из регистра.
:param address: Адрес регистра
:return: Значение регистра
"""
if not self.connected:
raise ModbusException("Нет подключения к захвату.")
response = self.client.read_holding_registers(address, count=1, slave=1)
if response.isError():
raise ModbusException(f"Ошибка чтения регистра {address}.")
return response.registers[0]
def get_status(self):
"""
Получение статуса захвата.
:return: Словарь статусов (инициализация, состояние захвата, позиция)
"""
if not self.connected:
raise ModbusException("Нет подключения к захвату.")
status = {
"initialization_state": self.read_register(0x0200),
"gripper_state": self.read_register(0x0201),
"current_position": self.read_register(0x0202)
}
return status
# Пример использования
if __name__ == "__main__":
gripper = RobotGripper("10.27.1.80")
try:
gripper.connect()
gripper.initialize(full=True)
gripper.set_force(50)
gripper.set_position(800)
status = gripper.get_status()
print("Статус захвата:", status)
except Exception as e:
print(f"Ошибка: {e}")
finally:
gripper.disconnect()

@ -0,0 +1,40 @@
import socket
import math
import time
IP = "192.168.1.201"
PORT = 6000
def get_integer_and_fractional_parts(number):
integer_fractional = str(abs(number)).split('.')
return list(map(int, integer_fractional))
def pad_number(number):
integer_part, fractional_part = get_integer_and_fractional_parts(number)
pad_length = 6
integer_part = str(integer_part)[:pad_length].rjust(pad_length, '0')
fractional_part = str(fractional_part)[:pad_length].ljust(pad_length, '0')
prefix = "+"
if number < 0:
prefix = "-"
return (f"{prefix}{integer_part}.{fractional_part}", 2 + pad_length * 2)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((IP, PORT))
# data = sock.recv(1024)
# print(data)
while True:
t = time.time()
frequency = 0.1
target = 1 * math.sin(frequency * 2.0 * math.pi * t)
for i in range(6):
padded, _ = pad_number(target)
print(padded)
message = padded.encode('utf-8')
sock.sendall(message)
time.sleep(0.05)

@ -0,0 +1,26 @@
# Используем базовый образ ROS 2 Humble
FROM ros:humble
# Устанавливаем необходимые зависимости
RUN apt-get update && apt-get install -y \
python3-colcon-common-extensions \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# Устанавливаем Python зависимости
RUN pip3 install pymodbus
# Создаем рабочую директорию
WORKDIR /ros2_ws
# Копируем исходный код
COPY ./src /ros2_ws/src
# Сборка ROS 2 workspace
RUN . /opt/ros/humble/setup.sh && colcon build
# Указываем рабочее окружение
RUN echo "source /ros2_ws/install/setup.bash" >> ~/.bashrc
# Точка входа
CMD ["bash"]

@ -0,0 +1,21 @@
version: "3.8"
services:
manipulator:
build:
context: ../../
dockerfile: ros2/docker/Dockerfile
container_name: manipulator_node
network_mode: host
environment:
- ROS_DOMAIN_ID=0
command: bash -c "source /opt/ros/humble/setup.bash && source /ros2_ws/install/setup.bash && ros2 run robotic_complex manipulator_node"
gripper:
build:
context: ../../
dockerfile: ros2/docker/Dockerfile
container_name: gripper_node
network_mode: host
environment:
- ROS_DOMAIN_ID=0
command: bash -c "source /opt/ros/humble/setup.bash && source /ros2_ws/install/setup.bash && ros2 run robotic_complex gripper_node"

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<package format="2">
<name>robotic_complex</name>
<version>0.0.1</version>
<description>ROS 2 пакет для управления манипулятором и захватом</description>
<maintainer email="your.email@example.com">Ваше Имя</maintainer>
<license>Apache License 2.0</license>
<buildtool_depend>ament_python</buildtool_depend>
<build_type>ament_python</build_type>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<exec_depend>pymodbus</exec_depend>
<export>
</export>
</package>

@ -0,0 +1,94 @@
# robotic_complex/gripper_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Int32
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
class GripperNode(Node):
def __init__(self):
super().__init__('gripper_node')
# Параметры подключения
self.declare_parameter('ip', '10.27.1.80')
self.declare_parameter('port', 502)
self.ip = self.get_parameter('ip').value
self.port = self.get_parameter('port').value
# Подключение к захвату
self.client = ModbusTcpClient(self.ip, self.port)
if not self.client.connect():
self.get_logger().error("Не удалось подключиться к захвату.")
self.destroy_node()
return
else:
self.get_logger().info(f"Подключен к захвату по адресу {self.ip}:{self.port}")
# Инициализация захвата
self.initialize_gripper()
# Подписка на топики
self.subscription_position = self.create_subscription(
Int32,
'gripper/position',
self.position_callback,
10)
self.subscription_force = self.create_subscription(
Int32,
'gripper/force',
self.force_callback,
10)
def initialize_gripper(self):
try:
value = 0xA5 # Полная инициализация
address = 0x0100
self.client.write_register(address, value, unit=1)
self.get_logger().info("Захват инициализирован.")
except ModbusException as e:
self.get_logger().error(f"Ошибка инициализации захвата: {e}")
def position_callback(self, msg):
position = msg.data
if not 0 <= position <= 1000:
self.get_logger().error('Позиция должна быть в диапазоне 0-1000.')
return
try:
address = 0x0103
self.client.write_register(address, position, unit=1)
self.get_logger().info(f"Позиция захвата установлена на {position}.")
except ModbusException as e:
self.get_logger().error(f"Ошибка установки позиции: {e}")
def force_callback(self, msg):
force = msg.data
if not 20 <= force <= 100:
self.get_logger().error('Сила должна быть в диапазоне 20-100.')
return
try:
address = 0x0101
self.client.write_register(address, force, unit=1)
self.get_logger().info(f"Сила захвата установлена на {force}%.")
except ModbusException as e:
self.get_logger().error(f"Ошибка установки силы: {e}")
def destroy_node(self):
self.client.close()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = GripperNode()
if node:
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,82 @@
# robotic_complex/manipulator_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float64MultiArray
import socket
import time
class ManipulatorNode(Node):
def __init__(self):
super().__init__('manipulator_node')
# Параметры подключения
self.declare_parameter('ip', '192.168.1.201')
self.declare_parameter('port', 6000)
self.ip = self.get_parameter('ip').value
self.port = self.get_parameter('port').value
# Подключение к манипулятору
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.sock.connect((self.ip, self.port))
self.get_logger().info(f"Подключен к манипулятору по адресу {self.ip}:{self.port}")
except Exception as e:
self.get_logger().error(f"Ошибка подключения к манипулятору: {e}")
self.destroy_node()
return
# Подписка на топик с углами
self.subscription = self.create_subscription(
Float64MultiArray,
'manipulator/angles',
self.listener_callback,
10)
self.subscription # предотвращаем предупреждение об отсутствии использования переменной
def listener_callback(self, msg):
if len(msg.data) != 6:
self.get_logger().error('Ожидается массив из 6 углов.')
return
for angle in msg.data:
padded, _ = self.pad_number(angle)
message = padded.encode('utf-8')
try:
self.sock.sendall(message)
time.sleep(0.05)
except Exception as e:
self.get_logger().error(f"Ошибка отправки данных: {e}")
def pad_number(self, number):
integer_part, fractional_part = self.get_integer_and_fractional_parts(number)
pad_length = 6
integer_part = str(integer_part)[:pad_length].rjust(pad_length, '0')
fractional_part = str(fractional_part)[:pad_length].ljust(pad_length, '0')
prefix = "+" if number >= 0 else "-"
return (f"{prefix}{integer_part}.{fractional_part}", 2 + pad_length * 2)
def get_integer_and_fractional_parts(self, number):
integer_fractional = str(abs(number)).split('.')
if len(integer_fractional) == 1:
integer_fractional.append('0')
return list(map(int, integer_fractional))
def destroy_node(self):
self.sock.close()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = ManipulatorNode()
if node:
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,22 @@
from setuptools import setup
package_name = 'robotic_complex'
setup(
name=package_name,
version='0.0.1',
packages=[package_name],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Ваше Имя',
maintainer_email='your.email@example.com',
description='ROS 2 пакет для управления манипулятором и захватом',
license='Apache License 2.0',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'manipulator_node = robotic_complex.manipulator_node:main',
'gripper_node = robotic_complex.gripper_node:main',
],
},
)
Loading…
Cancel
Save