Создадим простой пример с двумя нодами:
- Первая нода будет публиковать сообщения типа
String
в топик"chatter"
- Вторая нода будет подписываться на этот топик и выводить полученные сообщения
Нода-издатель (Publisher)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalPublisher(Node):
def __init__(self):
super().__init__('minimal_publisher') # Имя ноды
# Создаем паблишер, который будет отправлять сообщения типа String в топик 'chatter'
self.publisher_ = self.create_publisher(String, 'chatter', 10) # 10 - размер очереди
timer_period = 1.0 # секунды
# Создаем таймер, который будет вызывать функцию timer_callback каждую секунду
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
# Создаем сообщение
msg = String()
msg.data = f'Hello World: {self.i}'
# Публикуем сообщение
self.publisher_.publish(msg)
# Выводим информацию о публикации
self.get_logger().info(f'Publishing: "{msg.data}"')
self.i += 1
def main(args=None):
# Инициализация ROS2
rclpy.init(args=args)
# Создаем экземпляр ноды
minimal_publisher = MinimalPublisher()
# Запускаем ноду
rclpy.spin(minimal_publisher)
# Уничтожаем ноду и завершаем работу
minimal_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Нода-подписчик (Subscriber)
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber') # Имя ноды
# Создаем подписчика на топик 'chatter', который будет вызывать
# функцию listener_callback при получении сообщения
self.subscription = self.create_subscription(
String, # Тип сообщения
'chatter', # Имя топика
self.listener_callback, # Функция обратного вызова
10) # Размер очереди
self.subscription # Предотвращает предупреждение о неиспользуемой переменной
def listener_callback(self, msg):
# Обрабатываем полученное сообщение
self.get_logger().info(f'I heard: "{msg.data}"')
def main(args=None):
# Инициализация ROS2
rclpy.init(args=args)
# Создаем экземпляр ноды
minimal_subscriber = MinimalSubscriber()
# Запускаем ноду
rclpy.spin(minimal_subscriber)
# Уничтожаем ноду и завершаем работу
minimal_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Как это работает
- Инициализация ROS2:
rclpy.init(args=args)
инициализирует систему ROS2. - Создание нод:
MinimalPublisher
иMinimalSubscriber
— это классы, наследующие от базового классаNode
.- При создании ноды мы задаем ей имя:
super().__init__('minimal_publisher')
.
- Паблишер:
- Создается с помощью
self.create_publisher(String, 'chatter', 10)
, где:String
– тип сообщения'chatter'
– имя топика10
– размер очереди сообщений
- Создается с помощью
- Подписчик:
- Создается с помощью
self.create_subscription(String, 'chatter', self.listener_callback, 10)
, где:String
– тип сообщения'chatter'
— имя топикаself.listener_callback
— функция, которая будет вызвана при получении сообщения10
– размер очереди сообщений
- Создается с помощью
- Обмен сообщениями:
- Паблишер отправляет сообщения через
self.publisher_.publish(msg)
- Подписчик автоматически получает сообщения через функцию обратного вызова
listener_callback
- Паблишер отправляет сообщения через
- Запуск нод:
rclpy.spin(node)
запускает обработку сообщений и колбэков ноды- Нода работает до тех пор, пока не будет вызван
rclpy.shutdown()
Запуск примера
Чтобы запустить этот пример, нужно:
1. Создать пакет ROS2:
source /opt/ros/jazzy/setup.bash
ros2 pkg create --build-type ament_python my_package

2. Поместить код в файлы my_package/my_package/publisher_node.py
Где создадим Python файлы:
# Перейдем в директорию пакета
cd my_package
# Создадим файлы в подкаталоге my_package
touch my_package/publisher_node.py
touch my_package/subscriber_node.py

Добавим код в соответствующие файлы:
nano publisher_node.py
nano subscriber_node.py
3. Настроим точки входа в setup.py
:
Когда мы создаем пакет ROS2 с помощью команды ros2 pkg create
, в корневой директории пакета автоматически создается файл setup.py
. Нужно открыть этот файл в любом текстовом редакторе:
nano setup.py
Так выглядит setup.py
по умолчанию:
from setuptools import find_packages, setup
package_name = 'my_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='wtf',
maintainer_email='wtf@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
],
},
)
Нужно добавить или изменить раздел entry_points
в функции setup()
. Найдем этот раздел (если он уже есть) или добавим его перед закрывающей скобкой функции setup()
:
entry_points={
'console_scripts': [
'talker = my_package.publisher_node:main',
'listener = my_package.subscriber_node:main',
],
},
Полный файл setup.py
после редактирования должен выглядеть так:
from setuptools import find_packages, setup
package_name = 'my_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='wtf',
maintainer_email='wtf@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'talker = my_package.publisher_node:main',
'listener = my_package.subscriber_node:main',
],
},
)
Что означают эти точки входа
Каждая строка в разделе 'console_scripts'
имеет формат:
'имя_команды = путь_к_модулю:функция'
Где:
имя_команды
— это имя, которое будет использоваться для запуска программы через командную строку (например,ros2 run my_package talker
)путь_к_модулю
— это путь к Python-модулю относительно корня пакетафункция
— это имя функции в модуле, которая будет вызвана при запуске команды
В нашем примере:
talker = my_package.publisher_node:main
означает, что командаros2 run my_package talker
будет вызывать функциюmain()
из файлаmy_package/publisher_node.py
listener = my_package.subscriber_node:main
означает, что командаros2 run my_package listener
будет вызывать функциюmain()
из файлаmy_package/subscriber_node.py
Сохраним файл и соберем пакет
После редактирования сохраним файл и соберем пакет:
# Соберем пакет
colcon build --packages-select my_package
# Загрузим настройки окружения
source install/setup.bash

Запуск
Запустим в первом терминале
# В первом терминале:
source install/setup.bash
ros2 run my_package talker
Запустим во втором терминале:
# Во втором терминале:
source install/setup.bash
export ROS_AUTOMATIC_DISCOVERY_RANGE=LOCALHOST
ros2 run my_package listener
Команда export ROS_AUTOMATIC_DISCOVERY_RANGE=LOCALHOST
используется для ограничения области обнаружения узлов ROS2 только локальным компьютером.

Удаление пакета
Удалим папку с пакетом:
rm -rf my_package
Перестроим рабочее пространство без этого пакета:
colcon build

Основные причины перестройки рабочего пространства
- Обновление индексов пакетов
- ROS2 поддерживает индексы доступных пакетов
- После удаления пакета нужно обновить эти индексы, чтобы система «знала», что пакет больше не доступен
- Обновление зависимостей
- Если другие пакеты зависели от удаленного пакета, они должны быть перестроены
colcon build
проверит зависимости и перестроит только то, что необходимо
- Очистка ссылок
- В директориях
install
иbuild
остаются ссылки на удаленный пакет - Перестройка обновит эти директории, удалив неактуальные ссылки
- В директориях
- Обновление overlay
- ROS2 использует систему overlay для пакетов
- Перестройка обновляет overlay, чтобы удаленный пакет не был доступен при использовании команды
source install/setup.bash
Перестройку можно пропустить, если:
- Удаляем весь рабочий каталог целиком
- Не планируется использовать оставшиеся пакеты из этого рабочего пространства
- Собираемся создать новое рабочее пространство с нуля
***