// License: Apache 2.0. See LICENSE file in root directory.
// Copyright(c) 2024 Intel Corporation. All Rights Reserved.
#include <realdds/dds-defines.h>
#include <realdds/dds-participant.h>
#include <realdds/topics/device-info-msg.h>
#include <realdds/topics/flexible-msg.h>
#include <realdds/topics/flexible/flexiblePubSubTypes.h>
#include <realdds/topics/image-msg.h>
#include <realdds/topics/imu-msg.h>
#include <realdds/topics/blob-msg.h>
#include <realdds/topics/blob/blobPubSubTypes.h>
#include <realdds/topics/ros2/ros2imagePubSubTypes.h>
#include <realdds/topics/ros2/ros2imuPubSubTypes.h>
#include <realdds/topics/dds-topic-names.h>
#include <realdds/dds-device-broadcaster.h>
#include <realdds/dds-device-server.h>
#include <realdds/dds-stream-server.h>
#include <realdds/dds-stream-profile.h>
#include <realdds/dds-device-watcher.h>
#include <realdds/dds-device.h>
#include <realdds/dds-stream.h>
#include <realdds/dds-guid.h>
#include <realdds/dds-time.h>
#include <realdds/dds-topic.h>
#include <realdds/dds-topic-reader.h>
#include <realdds/dds-topic-writer.h>
#include <realdds/dds-publisher.h>
#include <realdds/dds-subscriber.h>
#include <realdds/dds-log-consumer.h>
#include <realdds/dds-stream-sensor-bridge.h>
#include <realdds/dds-metadata-syncer.h>
#include <rsutils/os/special-folder.h>
#include <rsutils/os/executable-name.h>
#include <rsutils/easylogging/easyloggingpp.h>
#include <rsutils/number/crc32.h>
#include <rsutils/string/from.h>
#include <rsutils/string/nocase.h>
#include <rsutils/json.h>
#include <rsutils/json-config.h>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/subscriber/SampleInfo.hpp>
#include <fastrtps/types/DynamicType.h>
#include <rsutils/py/pybind11.h>
using rsutils::json;
#define NAME pyrealdds
#define SNAME "pyrealdds"
namespace {
py::list get_vector3( geometry_msgs::msg::Vector3 const & v )
py::list obj( 3 );
obj[0] = py::float_( v.x() );
obj[1] = py::float_( v.y() );
obj[2] = py::float_( v.z() );
return std::move( obj );
void set_vector3( geometry_msgs::msg::Vector3 & v, std::array< double, 3 > const & l )
v.x( l[0] );
v.y( l[1] );
v.z( l[2] );
std::string script_name()
// Returns the name of the python script that's currently running us
return rsutils::os::base_name(
py::module_::import( "__main__" ).attr( "__file__" ).cast< std::string >() );
json load_rs_settings( json const & local_settings )
// Load the realsense configuration file settings
std::string const filename = rsutils::os::get_special_folder( rsutils::os::special_folder::app_data ) + "realsense-config.json";
auto config = rsutils::json_config::load_from_file( filename );
// Load "python"-specific settings
json settings = rsutils::json_config::load_app_settings( config, "python", "context", "config-file" );
// Take the "dds" settings only
settings = settings.nested( "dds" );
// Patch any script-specific settings
// NOTE: this is also accessed by pyrealsense2, where a "context" hierarchy is still used
auto script = script_name();
if( auto script_settings = config.nested( script, "context", "dds" ) )
settings.override( script_settings, "config-file/" + script + "/context" );
// We should always have DDS enabled
if( settings.is_object() )
settings.erase( "enabled" );
// Patch the given local settings into the configuration
settings.override( local_settings, "local settings" );
return settings;
} // namespace
m.doc() = R"pbdoc(
RealSense DDS Server Python Bindings
m.attr( "__version__" ) = "0.1"; // RS2_API_VERSION_STR;
// Configure the same logger as librealsense, and default to only errors by default...
// And set the DDS logger similarly
eprosima::fastdds::dds::Log::RegisterConsumer( realdds::log_consumer::create() );
eprosima::fastdds::dds::Log::SetVerbosity( eprosima::fastdds::dds::Log::Error );
m.def( "debug",
py::arg( "enable" ),
py::arg( "nested-string" ) = "",
py::arg( "logger" ) = LIBREALSENSE_ELPP_ID );
using realdds::dds_guid;
py::class_< dds_guid >( m, "guid" )
.def( py::init<>() )
.def_static( "from_string",
[]( std::string const & raw_guid ) { return realdds::guid_from_string( raw_guid ); } )
.def( "__bool__", []( dds_guid const & self ) { return self != realdds::unknown_guid; } )
.def( "__str__",
[]( dds_guid const & self ) { return rsutils::string::from( realdds::print_guid( (self) ) ).str(); } )
.def( "__repr__",
[]( dds_guid const & self ) { return rsutils::string::from( realdds::print_raw_guid( ( self ) ) ).str(); } )
// Following two (hash and ==) are needed if we want to be able to use guids as dictionary keys
.def( "__hash__",
[]( dds_guid const & self )
return std::hash< std::string >{}(
rsutils::string::from( realdds::print_raw_guid( self ) ) );
} )
.def( py::self == py::self );
using realdds::dds_participant;
using eprosima::fastrtps::types::ReturnCode_t;
py::class_< dds_participant::listener,
std::shared_ptr< dds_participant::listener > // handled with a shared_ptr
listener( m, "participant.listener" );
listener // no ctor: use participant.create_listener()
.def( FN_FWD( dds_participant::listener,
( dds_guid guid, char const * topic_name ),
( dds_guid guid, char const * topic_name ),
callback( guid, topic_name ); ) )
.def( FN_FWD( dds_participant::listener,
( dds_guid guid, char const * topic_name ),
( dds_guid guid, char const * topic_name ),
callback( guid, topic_name ); ) )
.def( FN_FWD( dds_participant::listener,
( dds_guid guid, char const * topic_name ),
( dds_guid guid, char const * topic_name ),
callback( guid, topic_name ); ) )
.def( FN_FWD( dds_participant::listener,
( dds_guid guid, char const * topic_name ),
( dds_guid guid, char const * topic_name ),
callback( guid, topic_name ); ) )
.def( FN_FWD( dds_participant::listener,
( dds_guid guid, char const * name ),
( dds_guid guid, char const * name ),
callback( guid, name ); ) )
.def( FN_FWD( dds_participant::listener,
( dds_guid guid, char const * name ),
( dds_guid guid, char const * name ),
callback( guid, name ); ) )
.def( FN_FWD( dds_participant::listener,
( std::string const & topic_name, std::string const & type_name ),
( char const * topic_name, eprosima::fastrtps::types::DynamicType_ptr dyn_type ),
callback( topic_name, dyn_type->get_name() ); ) );
m.def( "load_rs_settings", &load_rs_settings, "local-settings"_a = json::object() );
m.def( "script_name", &script_name );
py::class_< dds_participant,
std::shared_ptr< dds_participant > // handled with a shared_ptr
participant( m, "participant" );
participant.def( py::init<>() )
.def( "init",
[]( dds_participant & self, json const & local_settings, realdds::dds_domain_id domain_id )
{ self.init( domain_id, script_name(), local_settings ); },
"local-settings"_a = json::object(), "domain-id"_a = -1 )
.def( "init", &dds_participant::init, "domain-id"_a, "participant-name"_a, "local-settings"_a = json::object() )
.def( "is_valid", &dds_participant::is_valid )
.def( "guid", &dds_participant::guid )
.def( "create_guid", &dds_participant::create_guid )
.def( "__bool__", &dds_participant::is_valid )
.def( "name",
[]( dds_participant const & self ) {
eprosima::fastdds::dds::DomainParticipantQos qos;
if( ReturnCode_t::RETCODE_OK == self.get()->get_qos( qos ) )
return std::string( );
return std::string();
} )
.def( "name_from_guid", []( dds_guid const & guid ) { return dds_participant::name_from_guid( guid ); } )
.def( "names", []( dds_participant const & self ) { return self.get()->get_participant_names(); } )
.def( "settings", &dds_participant::settings )
.def( "__repr__",
[]( const dds_participant & self ) {
std::ostringstream os;
os << "<" SNAME ".participant";
if( ! self.is_valid() )
os << " NULL";
eprosima::fastdds::dds::DomainParticipantQos qos;
if( ReturnCode_t::RETCODE_OK == self.get()->get_qos( qos ) )
os << " \"" << << "\"";
os << " " << realdds::print_guid( self.guid() );
os << ">";
return os.str();
} )
.def( "create_listener", []( dds_participant & self ) { return self.create_listener(); } );
// The 'message' submodule will contain all the known message types in realdds.
auto message = m.def_submodule( "message", "all the messages that " SNAME " can work with" );
// The 'topics' submodule will contain pointers to the topic names:
auto topics = m.def_submodule( "topics", "all the topics that " SNAME " knows" );
topics.attr( "separator" ) = realdds::topics::SEPARATOR;
topics.attr( "root" ) = realdds::topics::ROOT;
topics.attr( "device_info" ) = realdds::topics::DEVICE_INFO_TOPIC_NAME;
topics.attr( "device_notification" ) = realdds::topics::NOTIFICATION_TOPIC_NAME;
topics.attr( "device_control" ) = realdds::topics::CONTROL_TOPIC_NAME;
topics.attr( "device_metadata" ) = realdds::topics::METADATA_TOPIC_NAME;
topics.attr( "device_dfu" ) = realdds::topics::DFU_TOPIC_NAME;
using realdds::video_intrinsics;
py::class_< video_intrinsics, std::shared_ptr< video_intrinsics > >( m, "video_intrinsics" )
.def( py::init<>() )
.def_readwrite( "width", &video_intrinsics::width )
.def_readwrite( "height", &video_intrinsics::height )
.def_readwrite( "principal_point_x", &video_intrinsics::principal_point_x )
.def_readwrite( "principal_point_y", &video_intrinsics::principal_point_y )
.def_readwrite( "focal_lenght_x", &video_intrinsics::focal_lenght_x )
.def_readwrite( "focal_lenght_y", &video_intrinsics::focal_lenght_y )
.def_readwrite( "distortion_model", &video_intrinsics::distortion_model )
.def_readwrite( "distortion_coeffs", &video_intrinsics::distortion_coeffs );
using realdds::motion_intrinsics;
py::class_< motion_intrinsics, std::shared_ptr< motion_intrinsics > >( m, "motion_intrinsics" )
.def( py::init<>() )
.def_readwrite( "data", &motion_intrinsics::data )
.def_readwrite( "noise_variances", &motion_intrinsics::noise_variances )
.def_readwrite( "bias_variances", &motion_intrinsics::bias_variances );
using realdds::extrinsics;
py::class_< extrinsics, std::shared_ptr< extrinsics > >( m, "extrinsics" )
.def( py::init<>() )
.def_readwrite( "rotation", &extrinsics::rotation )
.def_readwrite( "translation", &extrinsics::translation );
using realdds::dds_topic;
py::class_< dds_topic, std::shared_ptr< dds_topic > >( m, "topic" )
.def( "get_participant", &dds_topic::get_participant )
.def( "name", []( dds_topic const & self ) { return self->get_name(); } )
.def( "__repr__", []( dds_topic const & self ) {
std::ostringstream os;
os << "<" SNAME ".topic \"" << self->get_name() << "\"";
os << ">";
return os.str();
} );
using durability = eprosima::fastdds::dds::DurabilityQosPolicyKind;
py::enum_< durability >( m, "durability" )
.value( "volatile", eprosima::fastdds::dds::DurabilityQosPolicyKind::VOLATILE_DURABILITY_QOS )
.value( "transient_local", eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_LOCAL_DURABILITY_QOS )
.value( "transient", eprosima::fastdds::dds::DurabilityQosPolicyKind::TRANSIENT_DURABILITY_QOS );
using reliability = eprosima::fastdds::dds::ReliabilityQosPolicyKind;
py::enum_< reliability >( m, "reliability" )
.value( "reliable", eprosima::fastdds::dds::ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS )
.value( "best_effort", eprosima::fastdds::dds::ReliabilityQosPolicyKind::BEST_EFFORT_RELIABILITY_QOS );
using reader_qos = realdds::dds_topic_reader::qos;
py::class_< reader_qos >( m, "reader_qos" ) //
.def( "__repr__", []( reader_qos const & self ) {
std::ostringstream os;
os << "<" SNAME ".reader_qos";
os << ">";
return os.str();
} );
using realdds::dds_topic_reader;
py::class_< dds_topic_reader, std::shared_ptr< dds_topic_reader > >( m, "topic_reader" )
.def( py::init< std::shared_ptr< dds_topic > const & >() )
.def( FN_FWD( dds_topic_reader, on_data_available, (dds_topic_reader &), (), callback( self ); ) )
.def( FN_FWD( dds_topic_reader,
(dds_topic_reader &, int),
( eprosima::fastdds::dds::SubscriptionMatchedStatus const & status ),
callback( self, status.current_count_change ); ) )
.def( FN_FWD( dds_topic_reader,
(dds_topic_reader &, int, int),
(eprosima::fastdds::dds::SampleLostStatus const & status),
callback( self, status.total_count, status.total_count_change ); ) )
.def( "topic", &dds_topic_reader::topic )
.def( "run", &dds_topic_reader::run )
.def( "qos", []() { return reader_qos(); } )
.def( "qos", []( reliability r, durability d ) { return reader_qos( r, d ); } );
using writer_qos = realdds::dds_topic_writer::qos;
py::class_< writer_qos >( m, "writer_qos" ) //
.def( "__repr__", []( writer_qos const & self ) {
std::ostringstream os;
os << "<" SNAME ".writer_qos";
os << ">";
return os.str();
} );
using realdds::dds_publisher;
py::class_< dds_publisher, std::shared_ptr< dds_publisher > >( m, "publisher" )
.def( py::init< std::shared_ptr< dds_participant > const & >() );
using realdds::dds_subscriber;
py::class_< dds_subscriber, std::shared_ptr< dds_subscriber > >( m, "subscriber" )
.def( py::init< std::shared_ptr< dds_participant > const & >() );
using realdds::dds_topic_writer;
py::class_< dds_topic_writer, std::shared_ptr< dds_topic_writer > >( m, "topic_writer" )
.def( py::init< std::shared_ptr< dds_topic > const & >() )
.def( "guid", &dds_topic_writer::guid )
.def( FN_FWD( dds_topic_writer,
(dds_topic_writer &, int),
( eprosima::fastdds::dds::PublicationMatchedStatus const & status ),
callback( self, status.current_count_change ); ) )
.def( "topic", &dds_topic_writer::topic )
.def( "run", &dds_topic_writer::run )
.def( "has_readers", &dds_topic_writer::has_readers )
.def( "wait_for_readers", &dds_topic_writer::wait_for_readers )
.def( "wait_for_acks", &dds_topic_writer::wait_for_acks )
.def_static( "qos", []() { return writer_qos(); } )
.def_static( "qos", []( reliability r, durability d ) { return writer_qos( r, d ); } );
// The actual types are declared as functions and not classes: the py::init<> inheritance rules are pretty strict
// and, coupled with shared_ptr usage, are very hard to get around. This is much simpler...
using realdds::topics::device_info;
using realdds::topics::flexible_msg;
typedef std::shared_ptr< dds_topic > flexible_msg_create_topic( std::shared_ptr< dds_participant > const &,
char const * );
py::class_< device_info >( message, "device_info" )
.def( py::init<>() )
.def_static( "create_topic", static_cast< flexible_msg_create_topic * >( &flexible_msg::create_topic ) )
.def_property( "name", &device_info::name, &device_info::set_name )
.def_property( "topic_root", &device_info::topic_root, &device_info::set_topic_root )
.def_property( "serial", &device_info::serial_number, &device_info::set_serial_number )
.def_static( "from_json", &device_info::from_json )
.def( "to_json", &device_info::to_json )
.def( "__repr__",
[]( device_info const & self ) {
std::ostringstream os;
os << "<" SNAME ".device_info";
os << " " << self.to_json();
os << ">";
return os.str();
} );
using realdds::topics::flexible_msg;
py::enum_< flexible_msg::data_format >( message, "flexible_format" )
.value( "json", flexible_msg::data_format::JSON )
.value( "cbor", flexible_msg::data_format::CBOR )
.value( "custom", flexible_msg::data_format::CUSTOM );
using eprosima::fastrtps::rtps::SampleIdentity;
py::class_< SampleIdentity >( message, "sample_identity" ) //
.def( "__repr__",
[]( SampleIdentity const & self )
std::ostringstream os;
os << realdds::print_guid( self.writer_guid() );
os << '.';
os << self.sequence_number();
return os.str();
} );
using eprosima::fastdds::dds::SampleInfo;
py::class_< SampleInfo >( message, "sample_info" ) //
.def( py::init<>() )
.def( "identity", []( SampleInfo const & self ) { return self.sample_identity; } )
.def( "source_timestamp", []( SampleInfo const & self ) { return self.source_timestamp.to_ns(); } )
.def( "reception_timestamp", []( SampleInfo const & self ) { return self.reception_timestamp.to_ns(); } );
using realdds::dds_time;
using realdds::dds_nsec;
py::class_< dds_time >( m, "time" )
.def( py::init<>() )
.def( py::init< int32_t, uint32_t >() ) // sec, nsec
.def( py::init< long double >() ) // inexact (1.001 -> 1.000999999)
.def( py::init( []( dds_nsec ns ) { return realdds::time_from( ns ); } ) ) // exact
.def_readwrite( "seconds", &dds_time::seconds )
.def_readwrite( "nanosec", &dds_time::nanosec )
.def( "to_ns", &dds_time::to_ns )
.def_static( "from_ns", []( dds_nsec ns ) { return realdds::time_from( ns ); } )
.def_static( "from_double", []( long double d ) { return realdds::dds_time( d ); } )
.def( "to_double", &realdds::time_to_double )
.def( "__repr__", &realdds::time_to_string )
.def( pybind11::self == pybind11::self )
.def( pybind11::self != pybind11::self );
// We need a timestamp function that returns timestamps in the same domain as the sample-info timestamps
using realdds::timestr;
m.def( "now", []() { return realdds::now(); } );
py::enum_< timestr::no_suffix_t >( m, "no_suffix_t" );
m.attr( "no_suffix" ) = timestr::no_suffix;
py::enum_< timestr::rel_t >( m, "rel_t" );
m.attr( "rel" ) = timestr::rel;
py::enum_< timestr::abs_t >( m, "abs_t" );
m.attr( "abs" ) = timestr::abs;
m.def( "timestr", []( dds_nsec t ) { return timestr( t ).to_string(); } );
m.def( "timestr", []( dds_nsec t, timestr::no_suffix_t ) { return timestr( t, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_nsec dt, timestr::rel_t ) { return timestr( dt, timestr::rel ).to_string(); } );
m.def( "timestr", []( dds_nsec dt, timestr::rel_t, timestr::no_suffix_t ) { return timestr( dt, timestr::rel, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_nsec t1, dds_nsec t2 ) { return timestr( t1, t2 ).to_string(); } );
m.def( "timestr", []( dds_nsec t1, dds_nsec t2, timestr::no_suffix_t ) { return timestr( t1, t2, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_time t, dds_time start, timestr::no_suffix_t ) { return timestr( t, start, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_time t, dds_nsec start, timestr::no_suffix_t ) { return timestr( t, start, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_nsec t, dds_time start, timestr::no_suffix_t ) { return timestr( t, start, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_time t, timestr::no_suffix_t ) { return timestr( t, timestr::no_suffix ).to_string(); } );
m.def( "timestr", []( dds_time t, dds_time start ) { return timestr( t, start ).to_string(); } );
m.def( "timestr", []( dds_time t, dds_nsec start ) { return timestr( t, start ).to_string(); } );
m.def( "timestr", []( dds_nsec t, dds_time start ) { return timestr( t, start ).to_string(); } );
m.def( "timestr", []( dds_time t ) { return timestr( t ).to_string(); } );
py::class_< flexible_msg >( message, "flexible" )
.def( py::init<>() )
.def( py::init( []( py::dict const & dict, uint32_t version )
{ return flexible_msg( py_to_json( dict ), version ); } ),
"version"_a = 0 )
.def( py::init( []( std::string const & json_string ) {
return flexible_msg( flexible_msg::data_format::JSON, json::parse( json_string ) );
} ) )
.def_static( "create_topic", static_cast< flexible_msg_create_topic * >( &flexible_msg::create_topic ) )
.def_readwrite( "data_format", &flexible_msg::_data_format )
.def_readwrite( "version", &flexible_msg::_version )
.def_readwrite( "data", &flexible_msg::_data )
.def( "invalidate", &flexible_msg::invalidate )
.def( "is_valid", &flexible_msg::is_valid )
.def( "__bool__", &flexible_msg::is_valid )
.def( "__repr__",
[]( flexible_msg const & self ) {
std::ostringstream os;
os << "<" SNAME ".flexible_msg ";
switch( self._data_format )
case flexible_msg::data_format::JSON:
os << "JSON";
case flexible_msg::data_format::CBOR:
os << "CBOR";
case flexible_msg::data_format::CUSTOM:
os << "CUSTOM";
os << "UNKNOWN(" << (int)self._data_format << ")";
os << ' ';
if( ! self.is_valid() )
os << "INVALID";
os << self._data.size();
switch( self._data_format )
case flexible_msg::data_format::JSON:
os << ' ';
os << std::string( (char const *), self._data.size() );
os << ">";
return os.str();
} )
[]( dds_topic_reader & reader, SampleInfo * sample ) {
auto actual_type = reader.topic()->get()->get_type_name();
if( actual_type != flexible_msg::type().getName() )
throw std::runtime_error( "can't initialize raw::flexible from " + actual_type );
flexible_msg data;
if( ! flexible_msg::take_next( reader, &data, sample ) )
assert( ! data.is_valid() );
return data;
py::arg( "reader" ), py::arg( "sample" ) = nullptr,
py::call_guard< py::gil_scoped_release >() )
.def( "json_data", &flexible_msg::json_data )
.def( "json_string",
[]( flexible_msg const & self )
{ return std::string( (char const *), self._data.size() ); } )
.def( "write_to",
[]( flexible_msg & self, dds_topic_writer & writer ) { std::move( self ).write_to( writer ); } );
using image_msg = realdds::topics::image_msg;
py::class_< image_msg, std::shared_ptr< image_msg > >( message, "image" )
.def( py::init<>() )
.def_static( "create_topic", &image_msg::create_topic )
.def_readwrite( "data", &image_msg::raw_data )
.def_readwrite( "width", &image_msg::width )
.def_readwrite( "height", &image_msg::height )
.def_readwrite( "timestamp", &image_msg::timestamp )
.def( "__repr__",
[]( image_msg const & self )
std::ostringstream os;
os << "<" SNAME ".image_msg";
if( self.width > 0 && self.height > 0 )
os << ' ' << self.width << 'x' << self.height;
os << 'x' << (self.raw_data.size() / (self.width * self.height));
os << " @ " << realdds::time_to_string( self.timestamp );
os << ">";
return os.str();
} )
[]( dds_topic_reader & reader, SampleInfo * sample )
auto actual_type = reader.topic()->get()->get_type_name();
if( actual_type != image_msg::type().getName() )
throw std::runtime_error( "can't initialize raw::image from " + actual_type );
image_msg data;
if( ! image_msg::take_next( reader, &data, sample ) )
assert( ! data.is_valid() );
return data;
py::arg( "reader" ),
py::arg( "sample" ) = nullptr,
py::call_guard< py::gil_scoped_release >() )
/*.def("write_to", &image_msg::write_to, py::call_guard< py::gil_scoped_release >())*/;
using blob_msg = realdds::topics::blob_msg;
py::class_< blob_msg, std::shared_ptr< blob_msg > >( message, "blob" )
.def( py::init<>() )
.def( py::init(
[]( py::bytes const & bytes )
auto info = py::buffer( bytes ).request();
auto data = reinterpret_cast< uint8_t const * >( info.ptr );
size_t length = static_cast< size_t >( info.size );
return blob_msg( std::vector< uint8_t >( data, data + length ) );
} ) )
static_cast< std::shared_ptr< dds_topic > ( * )( std::shared_ptr< dds_participant > const &,
std::string const & ) >( &blob_msg::create_topic ) )
.def( "data", []( blob_msg const & self ) { return; } )
.def( "size", []( blob_msg const & self ) { return; } )
.def( "__repr__",
[]( blob_msg const & self )
std::ostringstream os;
os << "<" SNAME ".blob_msg";
os << ' ' <<;
os << ' ' << rsutils::number::calc_crc32(, );
os << ">";
return os.str();
} )
[]( dds_topic_reader & reader, SampleInfo * sample )
auto actual_type = reader.topic()->get()->get_type_name();
if( actual_type != blob_msg::type().getName() )
throw std::runtime_error( "can't initialize blob from " + actual_type );
blob_msg data;
if( ! blob_msg::take_next( reader, &data, sample ) )
assert( ! data.is_valid() );
return data;
py::arg( "reader" ),
py::arg( "sample" ) = nullptr,
py::call_guard< py::gil_scoped_release >() )
.def( "write_to", &blob_msg::write_to, py::call_guard< py::gil_scoped_release >() );
using imu_msg = realdds::topics::imu_msg;
py::class_< imu_msg, std::shared_ptr< imu_msg > >( message, "imu" )
.def( py::init<>() )
.def_static( "create_topic", &imu_msg::create_topic )
[]( imu_msg const & self ) { return get_vector3( self.gyro_data() ); },
[]( imu_msg & self, std::array< double, 3 > const & xyz ) { set_vector3( self.gyro_data(), xyz ); } )
[]( imu_msg const & self ) { return get_vector3( self.accel_data() ); },
[]( imu_msg & self, std::array< double, 3 > const & xyz ) { set_vector3( self.accel_data(), xyz ); } )
[]( imu_msg const & self ) { return self.timestamp(); },
[]( imu_msg & self, dds_time time ) { self.timestamp( time ); } )
.def( "__repr__",
[]( imu_msg const & self )
std::ostringstream os;
os << "<" SNAME ".imu_msg " << self.to_string() << ">";
return os.str();
} )
[]( dds_topic_reader & reader, SampleInfo * sample )
auto actual_type = reader.topic()->get()->get_type_name();
if( actual_type != imu_msg::type().getName() )
throw std::runtime_error( "can't initialize raw::imu from " + actual_type );
imu_msg data;
if( ! imu_msg::take_next( reader, &data, sample ) )
assert( ! data.is_valid() );
return data;
py::arg( "reader" ),
py::arg( "sample" ) = nullptr,
py::call_guard< py::gil_scoped_release >() )
.def( "write_to", &imu_msg::write_to, py::call_guard< py::gil_scoped_release >() );
using realdds::dds_device_broadcaster;
py::class_< dds_device_broadcaster, std::shared_ptr< dds_device_broadcaster > >( m, "device_broadcaster" )
.def( py::init<>( []( std::shared_ptr< dds_publisher > const & publisher, device_info const & device_info )
{ return std::make_shared< dds_device_broadcaster >( publisher, device_info, nullptr ); } ) );
using realdds::dds_option;
py::class_< dds_option, std::shared_ptr< dds_option > >( m, "option" )
.def_static( "from_json", &dds_option::from_json )
.def( "get_name", &dds_option::get_name )
.def( "get_default_value", &dds_option::get_default_value )
.def( "get_description", &dds_option::get_description )
.def( "value_type", &dds_option::value_type )
.def( "is_read_only", &dds_option::is_read_only )
.def( "is_optional", &dds_option::is_optional )
.def( "stream", &dds_option::stream )
.def( "get_value", &dds_option::get_value )
.def( "set_value", &dds_option::set_value )
.def( "get_minimum_value", &dds_option::get_minimum_value )
.def( "get_maximum_value", &dds_option::get_maximum_value )
.def( "get_stepping", &dds_option::get_stepping )
.def( "to_json", []( dds_option const & self ) { return self.to_json(); } )
.def( "__repr__",
[]( dds_option const & self )
std::ostringstream os;
os << '<' << self.get_name();
os << " " << self.get_value();
os << " |";
if( self.is_optional() )
os << " optional";
if( self.is_read_only() )
os << " r/o";
if( self.get_default_value().exists() )
os << " default=" << self.get_default_value();
if( ! self.get_minimum_value().is_null() )
os << " min=" << self.get_minimum_value();
if( ! self.get_maximum_value().is_null() )
os << " max=" << self.get_maximum_value();
if( ! self.get_stepping().is_null() )
os << " step=" << self.get_stepping();
os << ">";
return os.str();
} );
using realdds::dds_video_encoding;
py::class_< dds_video_encoding > video_encoding( m, "video_encoding" );
video_encoding //
.def( py::init<>() )
.def( py::init< std::string const & >() )
.def( "is_valid", &dds_video_encoding::is_valid )
.def( "to_rs2", &dds_video_encoding::to_rs2 )
.def( "from_rs2", &dds_video_encoding::from_rs2 )
.def( "__bool__", &dds_video_encoding::is_valid )
.def( "__repr__", []( dds_video_encoding const & self ) { return self.to_string(); } );
video_encoding.attr( "z16" ) = dds_video_encoding( "16UC1" );
video_encoding.attr( "y8" ) = dds_video_encoding( "mono8" );
video_encoding.attr( "y16" ) = dds_video_encoding( "Y16" ); // todo should be mono16
video_encoding.attr( "byr2" ) = dds_video_encoding( "BYR2" );
video_encoding.attr( "yuyv" ) = dds_video_encoding( "yuv422_yuy2" );
video_encoding.attr( "uyvy" ) = dds_video_encoding( "uyvy" );
video_encoding.attr( "rgb" ) = dds_video_encoding( "rgb8" );
using realdds::dds_stream_profile;
py::class_< dds_stream_profile, std::shared_ptr< dds_stream_profile > > stream_profile_base( m, "stream_profile" );
.def( "frequency", &dds_stream_profile::frequency )
.def( "stream", &dds_stream_profile::stream )
.def( "to_string", &dds_stream_profile::to_string )
.def( "details_to_string", &dds_stream_profile::details_to_string )
.def( "to_json", []( dds_stream_profile const & self ) { return self.to_json().dump(); } )
.def( "__repr__", []( dds_stream_profile const & self ) {
std::ostringstream os;
std::string self_as_string = self.to_string(); // <video 0xUID ...>
std::string type;
auto stream =;
if( stream )
type = std::string( stream->type_string() ) + '_';
os << "<" SNAME "." << type << "stream_profile " << ( self_as_string.c_str() + 1 );
return os.str();
} );
using realdds::dds_video_stream_profile;
py::class_< dds_video_stream_profile, std::shared_ptr< dds_video_stream_profile > >( m, "video_stream_profile", stream_profile_base )
.def( py::init< int16_t, dds_video_encoding, uint16_t, uint16_t >() )
.def( "format", &dds_video_stream_profile::encoding )
.def( "width", &dds_video_stream_profile::width )
.def( "height", &dds_video_stream_profile::height );
using realdds::dds_motion_stream_profile;
py::class_< dds_motion_stream_profile, std::shared_ptr< dds_motion_stream_profile > >( m, "motion_stream_profile", stream_profile_base )
.def( py::init< int16_t >() );
using realdds::dds_stream_base;
py::class_< dds_stream_base, std::shared_ptr< dds_stream_base > > stream_base( m, "stream_base" );
.def( "name", &dds_stream_base::name )
.def( "sensor_name", &dds_stream_base::sensor_name )
.def( "type_string", &dds_stream_base::type_string )
.def( "profiles", &dds_stream_base::profiles )
.def( "enable_metadata", &dds_stream_base::enable_metadata )
.def( "init_profiles", &dds_stream_base::init_profiles )
.def( "init_options", &dds_stream_base::init_options )
.def( "default_profile_index", &dds_stream_base::default_profile_index )
.def( "default_profile", &dds_stream_base::default_profile )
.def( "options", &dds_stream_base::options )
.def( "is_open", &dds_stream_base::is_open )
.def( "is_streaming", &dds_stream_base::is_streaming )
.def( "get_topic", &dds_stream_base::get_topic );
using realdds::dds_stream_server;
py::class_< dds_stream_server, std::shared_ptr< dds_stream_server > > stream_server_base( m, "stream_server", stream_base );
stream_server_base //
.def( "on_readers_changed", &dds_stream_server::on_readers_changed )
.def( "open", &dds_stream_server::open )
.def( "stop_streaming", &dds_stream_server::stop_streaming )
.def( "__repr__",
[]( dds_stream_server const & self )
std::ostringstream os;
os << "<" SNAME "." << self.type_string() << "_stream_server \"" << << "\"";
os << " " << self.profiles().size() << "[" << self.default_profile_index() << "] profiles";
os << '>';
return os.str();
} );
using realdds::dds_video_stream_server;
py::class_< dds_video_stream_server, std::shared_ptr< dds_video_stream_server > >
video_stream_server_base( m, "video_stream_server", stream_server_base );
.def( "set_intrinsics", &dds_video_stream_server::set_intrinsics )
.def( "start_streaming",
[]( dds_video_stream_server & self, dds_video_encoding encoding, int width, int height ) {
self.start_streaming( { encoding, height, width } );
} )
.def( "publish_image",
[]( dds_video_stream_server & self, image_msg const & img )
// We don't have C++ 'std::move' explicit semantics in Python, so we create a copy.
// Notice there's no copy constructor on purpose (!) so we do it manually...
image_msg img_copy;
img_copy.raw_data = img.raw_data;
img_copy.timestamp = img.timestamp;
img_copy.width = img.width;
img_copy.height = img.height;
self.publish_image( std::move( img_copy ) );
} );
using realdds::dds_depth_stream_server;
py::class_< dds_depth_stream_server, std::shared_ptr< dds_depth_stream_server > >( m, "depth_stream_server", video_stream_server_base )
.def( py::init< std::string const &, std::string const & >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_ir_stream_server;
py::class_< dds_ir_stream_server, std::shared_ptr< dds_ir_stream_server > >( m, "ir_stream_server", video_stream_server_base )
.def( py::init< std::string const&, std::string const& >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_color_stream_server;
py::class_< dds_color_stream_server, std::shared_ptr< dds_color_stream_server > >( m, "color_stream_server", video_stream_server_base )
.def( py::init< std::string const&, std::string const& >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_confidence_stream_server;
py::class_< dds_confidence_stream_server, std::shared_ptr< dds_confidence_stream_server > >( m, "confidence_stream_server", video_stream_server_base )
.def( py::init< std::string const&, std::string const& >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_motion_stream_server;
py::class_< dds_motion_stream_server, std::shared_ptr< dds_motion_stream_server > >( m, "motion_stream_server", stream_server_base )
.def( py::init< std::string const &, std::string const & >(), "stream_name"_a, "sensor_name"_a )
.def( "set_gyro_intrinsics", &dds_motion_stream_server::set_gyro_intrinsics )
.def( "set_accel_intrinsics", &dds_motion_stream_server::set_accel_intrinsics )
.def( "start_streaming", &dds_motion_stream_server::start_streaming );
// To have the python code be able to modify json objects in callbacks, we need to somehow refer to the original
// json object as changes to translated dict will not automatically get picked up!
// We do this thru the [] operator:
// def callback( json ):
// print( json['value'] ) # calls __getitem__
// json['value'] = { 'more': True } # calls __setitem__
struct json_ref { json & j; };
py::class_< json_ref, std::shared_ptr< json_ref > >( m, "json_ref" )
.def( "__getitem__", []( json_ref const & jr, std::string const & key ) { return key ); } )
.def( "__setitem__", []( json_ref & jr, std::string const & key, json const & value ) { jr.j[key] = value; } );
using realdds::dds_device_server;
py::class_< dds_device_server, std::shared_ptr< dds_device_server > >( m, "device_server" )
.def( py::init< std::shared_ptr< dds_participant > const&, std::string const& >() )
.def( "init", &dds_device_server::init )
.def( "guid", &dds_device_server::guid )
.def( "streams",
[]( dds_device_server const & self )
std::vector< std::shared_ptr< dds_stream_server > > streams;
for( auto const & name2stream : self.streams() )
streams.push_back( name2stream.second );
return streams;
} )
[]( dds_device_server & self, json const & j ) { self.publish_notification( j ); },
py::call_guard< py::gil_scoped_release >() )
.def( "publish_metadata", &dds_device_server::publish_metadata, py::call_guard< py::gil_scoped_release >() )
.def( "broadcast", &dds_device_server::broadcast )
.def( "broadcast_disconnect", &dds_device_server::broadcast_disconnect, py::arg( "ack-timeout" ) = dds_time() )
.def( FN_FWD_R( dds_device_server, on_control,
(dds_device_server &, std::string const &, py::object &&, json_ref &&),
( std::string const & id, json const & control, json & reply ),
return callback( self, id, json_to_py( control ), json_ref{ reply } ); ) );
using realdds::dds_stream;
py::class_< dds_stream, std::shared_ptr< dds_stream > > stream_client_base( m, "stream", stream_base );
stream_client_base //
.def( "open", &dds_stream::open )
.def( "close", &dds_stream::close )
.def( "is_open", &dds_stream::is_open )
.def( "start_streaming", &dds_stream::start_streaming )
.def( "stop_streaming", &dds_stream::stop_streaming )
.def( "__repr__", []( dds_stream const & self ) {
std::ostringstream os;
os << "<" SNAME "." << self.type_string() << "_stream \"" << << "\"";
os << " " << self.profiles().size() << "[" << self.default_profile_index() << "] profiles";
os << '>';
return os.str();
} );
using realdds::dds_video_stream;
py::class_< dds_video_stream, std::shared_ptr< dds_video_stream > >
video_stream_client_base( m, "video_stream", stream_client_base );
video_stream_client_base //
.def( "set_intrinsics", &dds_video_stream::set_intrinsics )
.def( FN_FWD( dds_video_stream,
( dds_video_stream &, image_msg && ),
( image_msg && i ),
callback( self, std::move( i ) ); ) );
using realdds::dds_depth_stream;
py::class_< dds_depth_stream, std::shared_ptr< dds_depth_stream > >( m, "depth_stream", video_stream_client_base )
.def( py::init< std::string const &, std::string const & >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_ir_stream;
py::class_< dds_ir_stream, std::shared_ptr< dds_ir_stream > >( m, "ir_stream", video_stream_client_base )
.def( py::init< std::string const &, std::string const & >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_color_stream;
py::class_< dds_color_stream, std::shared_ptr< dds_color_stream > >( m, "color_stream", video_stream_client_base )
.def( py::init< std::string const &, std::string const & >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_confidence_stream;
py::class_< dds_confidence_stream, std::shared_ptr< dds_confidence_stream > >( m, "confidence_stream", video_stream_client_base )
.def( py::init< std::string const &, std::string const & >(), "stream_name"_a, "sensor_name"_a );
using realdds::dds_motion_stream;
py::class_< dds_motion_stream, std::shared_ptr< dds_motion_stream > >
motion_stream_client_base( m, "motion_stream", stream_client_base );
.def( "set_gyro_intrinsics", &dds_motion_stream::set_gyro_intrinsics )
.def( "set_accel_intrinsics", &dds_motion_stream::set_accel_intrinsics );
using subscription = rsutils::subscription;
py::class_< subscription,
std::shared_ptr< subscription > // handled with a shared_ptr
>( m, "subscription" )
.def( "cancel", &subscription::cancel )
.def( "__bool__", []( subscription const & self ) { return self.is_active(); } )
.def( "__repr__",
[]( subscription const & self )
std::ostringstream os;
os << "<" SNAME ".subscription";
if( ! self.is_active() )
os << " cancelled";
os << ">";
return os.str();
} );
using realdds::dds_device;
py::class_< dds_device,
std::shared_ptr< dds_device > // handled with a shared_ptr
>( m, "device" )
.def( py::init< std::shared_ptr< dds_participant > const &, device_info const & >() )
.def( "device_info", &dds_device::device_info )
.def( "participant", &dds_device::participant )
.def( "server_guid", &dds_device::server_guid )
.def( "guid", &dds_device::guid )
.def( "is_ready", &dds_device::is_ready )
.def( "is_online", &dds_device::is_online )
.def( "is_offline", &dds_device::is_offline )
.def( "wait_until_ready",
py::call_guard< py::gil_scoped_release >(),
"timeout-ms"_a = 5000 )
.def( "wait_until_online",
py::call_guard< py::gil_scoped_release >(),
"timeout-ms"_a = 5000 )
.def( "wait_until_offline",
py::call_guard< py::gil_scoped_release >(),
"timeout-ms"_a = 5000 )
.def( "on_metadata_available",
[]( dds_device & self, std::function< void( dds_device &, py::object && ) > callback )
return std::make_shared< subscription >( self.on_metadata_available(
[&self, callback]( std::shared_ptr< const json > const & pj )
{ FN_FWD_CALL( dds_device, "on_metadata_available", callback( self, json_to_py( *pj ) ); ) } ) );
} )
.def( "on_device_log",
[]( dds_device & self, std::function< void( dds_device &, dds_nsec, char, std::string const &, py::object && ) > callback )
return std::make_shared< subscription >( self.on_device_log(
[&self, callback]( dds_nsec timestamp, char type, std::string const & text, json const & data )
{ FN_FWD_CALL( dds_device, "on_device_log", callback( self, timestamp, type, text, json_to_py( data ) ); ) } ) );
} )
.def( "on_notification",
[]( dds_device & self, std::function< void( dds_device &, std::string const &, py::object && ) > callback )
return std::make_shared< subscription >( self.on_notification(
[&self, callback]( std::string const & id, json const & data )
{ FN_FWD_CALL( dds_device, "on_notification", callback( self, id, json_to_py( data ) ); ) } ) );
} )
.def( "n_streams", &dds_device::number_of_streams )
.def( "streams",
[]( dds_device const & self ) {
std::vector< std::shared_ptr< dds_stream > > streams;
[&]( std::shared_ptr< dds_stream > const & stream ) { streams.push_back( stream ); } );
return streams;
} )
.def( "options",
[]( dds_device const & self ) {
std::vector< std::shared_ptr< dds_option > > options;
self.foreach_option( [&]( std::shared_ptr< dds_option > const & option ) { options.push_back( option ); } );
return options;
} )
.def( "set_option_value", &dds_device::set_option_value )
.def( "query_option_value", &dds_device::query_option_value )
[]( dds_device & self, json const & j, bool wait_for_reply )
json reply;
self.send_control( j, wait_for_reply ? &reply : nullptr );
return reply;
py::arg( "json" ), py::arg( "wait-for-reply" ) = false,
py::call_guard< py::gil_scoped_release >() )
.def_static( "check_reply", &dds_device::check_reply )
.def( "__repr__", []( dds_device const & self ) {
std::ostringstream os;
os << "<" SNAME ".device";
os << " " << self.participant()->print( self.guid() );
if( ! self.device_info().name().empty() )
os << " \"" << self.device_info().name() << "\"";
os << " @ " << self.device_info().debug_name();
os << ">";
return os.str();
} );
using realdds::dds_device_watcher;
py::class_< dds_device_watcher >( m, "device_watcher" )
.def( py::init< std::shared_ptr< dds_participant > const & >() )
.def( "start", &dds_device_watcher::start )
.def( "stop", &dds_device_watcher::stop )
.def( "is_stopped", &dds_device_watcher::is_stopped )
.def( FN_FWD( dds_device_watcher,
( dds_device_watcher const &, std::shared_ptr< dds_device > const & ),
( std::shared_ptr< dds_device > const & dev ),
callback( self, dev ); ) )
.def( FN_FWD( dds_device_watcher,
( dds_device_watcher const &, std::shared_ptr< dds_device > const & ),
( std::shared_ptr< dds_device > const & dev ),
callback( self, dev ); ) )
.def( "devices",
[]( dds_device_watcher const & self )
std::vector< std::shared_ptr< dds_device > > devices;
[&]( std::shared_ptr< dds_device > const & dev )
devices.push_back( dev );
return true;
} );
return devices;
} )
.def( "is_device_broadcast", &dds_device_watcher::is_device_broadcast );
using realdds::dds_stream_sensor_bridge;
py::class_< dds_stream_sensor_bridge >( m, "stream_sensor_bridge" )
.def( py::init<>() )
.def( FN_FWD( dds_stream_sensor_bridge,
(std::string const &, std::vector< std::shared_ptr< realdds::dds_stream_profile > > const &),
( std::string const & sensor_name,
std::vector< std::shared_ptr< realdds::dds_stream_profile > > const & profiles ),
callback( sensor_name, profiles ); ) )
.def( FN_FWD( dds_stream_sensor_bridge,
(std::string const &),
( std::string const & sensor_name ),
callback( sensor_name ); ) )
.def( FN_FWD( dds_stream_sensor_bridge,
(std::shared_ptr< dds_stream_server > const &, int),
( std::shared_ptr< dds_stream_server > const & server, int n_readers ),
callback( server, n_readers ); ) )
.def( FN_FWD( dds_stream_sensor_bridge,
(std::string const &),
( std::string const & error_string ),
callback( error_string ); ) )
.def( "init", &dds_stream_sensor_bridge::init )
.def( "reset", &dds_stream_sensor_bridge::reset, py::arg( "by_force" ) = false )
.def( "open", &dds_stream_sensor_bridge::open )
.def( "close", &dds_stream_sensor_bridge::close )
.def( "add_implicit_profiles", &dds_stream_sensor_bridge::add_implicit_profiles )
.def( "commit", &dds_stream_sensor_bridge::commit )
.def( "is_streaming", &dds_stream_sensor_bridge::is_streaming );
// Custom syncer with its own frame management over image_msg
struct dds_metadata_syncer : realdds::dds_metadata_syncer
typedef std::shared_ptr< image_msg > frame_type; // the base case for image_msg (see above)
typedef realdds::dds_metadata_syncer super;
static void frame_releaser( void * frame )
// frame is really a pointer to a shared object that we create on the heap
delete static_cast< frame_type * >( frame );
on_frame_release( frame_releaser );
void enqueue_frame( key_type key, frame_type const & img )
// We need to keep the image alive and somehow point to it at the same time
super::enqueue_frame( key, hold( new std::shared_ptr< image_msg >( img ) ) );
frame_type get_frame( frame_holder const & fh ) const
return *static_cast< frame_type * >( fh.get() );
py::class_< dds_metadata_syncer > metadata_syncer( m, "metadata_syncer" );
metadata_syncer //
.def( py::init<>() )
.def( FN_FWD( dds_metadata_syncer,
( dds_metadata_syncer::frame_type, json const & ),
( dds_metadata_syncer::frame_holder && fh, std::shared_ptr< const json > const & metadata ),
callback( self.get_frame( fh ), metadata ? *metadata : json() ); ) )
.def( FN_FWD( dds_metadata_syncer,
( dds_metadata_syncer::key_type, json const & ),
( dds_metadata_syncer::key_type key, std::shared_ptr< const json > const & metadata ),
callback( key, metadata ? *metadata : json() ); ) )
.def( "enqueue_frame", &dds_metadata_syncer::enqueue_frame )
.def( "enqueue_metadata",
[]( dds_metadata_syncer & self, dds_metadata_syncer::key_type key, json const & j )
{ self.enqueue_metadata( key, std::make_shared< const json >( j ) ); } );
metadata_syncer.attr( "max_frame_queue_size" ) = dds_metadata_syncer::max_frame_queue_size;
metadata_syncer.attr( "max_md_queue_size" ) = dds_metadata_syncer::max_md_queue_size;