从自动驾驶到机器人用Fast DDS 2.15.x构建你的第一个实时数据发布订阅系统附完整C代码在机器人导航和工业物联网场景中毫秒级的延迟差异可能意味着机械臂能否精准抓取工件或是AGV小车能否及时避开障碍物。传统通信方案如ROS 1.0的TCP/UDP传输在面对高频率传感器数据时常常力不从心这正是我们选择Fast DDS作为通信中间件的原因——它不仅是自动驾驶领域的事实标准更在工业场景中展现出惊人的实时性能。1. 环境配置与Fast DDS安装1.1 系统需求与依赖准备在Ubuntu 20.04 LTS上部署Fast DDS 2.15.x需要确保系统满足以下条件编译器GCC 9.3或Clang 10C14支持内存至少2GB空闲内存编译时需求存储5GB可用磁盘空间安装基础依赖项sudo apt update sudo apt install -y cmake g python3-pip wget git sudo apt install -y libasio-dev libtinyxml2-dev libssl-dev提示工业现场设备若使用ARM架构需交叉编译时添加-DCMAKE_TOOLCHAIN_FILE参数指定工具链1.2 源码编译安装从GitHub拉取最新稳定版并编译git clone --recursive https://github.com/eProsima/Fast-DDS.git -b v2.15.0 mkdir Fast-DDS/build cd Fast-DDS/build cmake -DCMAKE_BUILD_TYPERelease -DTHIRDPARTYON .. make -j$(nproc) sudo make install验证安装成功的快速测试// test_installation.cpp #include fastdds/dds/domain/DomainParticipantFactory.hpp int main() { eprosima::fastdds::dds::DomainParticipantFactory::get_instance(); std::cout Fast DDS 2.15.0 安装成功! std::endl; return 0; }编译运行g test_installation.cpp -o test -lfastdds ./test2. 构建第一个DDS通信系统2.1 定义数据类型IDL文件创建sensor_data.idl文件定义激光雷达点云数据结构module sensor_msgs { struct PointCloud { unsigned long sensor_id; sequencefloat x_coordinates; sequencefloat y_coordinates; sequencefloat z_coordinates; double timestamp; }; };使用fastddsgen生成类型支持代码fastddsgen -replace sensor_data.idl这将生成sensor_msgs/PointCloud.hpp数据结构定义sensor_msgs/PointCloud.cxx序列化实现sensor_msgs/PointCloudPubSubTypes.hpp发布订阅支持类2.2 配置QoS策略针对工业机械臂控制场景的QoS配置示例// 可靠传输配置适用于控制指令 eprosima::fastdds::dds::ReliabilityQosPolicy reliability; reliability.kind eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; reliability.max_blocking_time.seconds 1; // 历史深度配置保留最新10条消息 eprosima::fastdds::dds::HistoryQosPolicy history; history.kind eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; history.depth 10; // 传输优先级设置UDP优先于SHM eprosima::fastdds::dds::TransportPriorityQosPolicy transport_priority; transport_priority.value 100;2.3 完整发布者实现// publisher.cpp #include fastdds/dds/domain/DomainParticipantFactory.hpp #include sensor_msgs/PointCloudPubSubTypes.h class SensorPublisher { public: SensorPublisher() : participant_(nullptr), publisher_(nullptr), topic_(nullptr), writer_(nullptr) { // 1. 创建DomainParticipant eprosima::fastdds::dds::DomainParticipantQos pqos; pqos.name(IndustrialRobot_Participant); participant_ eprosima::fastdds::dds::DomainParticipantFactory::get_instance()-create_participant(0, pqos); // 2. 注册数据类型 eprosima::fastdds::dds::TypeSupport type(new sensor_msgs::PointCloudPubSubType()); type.register_type(participant_); // 3. 创建Publisher publisher_ participant_-create_publisher(eprosima::fastdds::dds::PUBLISHER_QOS_DEFAULT); // 4. 配置Topic QoS eprosima::fastdds::dds::TopicQos tqos; tqos.history().kind eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; tqos.history().depth 30; // 5. 创建Topic topic_ participant_-create_topic(RobotArmSensorData, type.get_type_name(), tqos); // 6. 配置DataWriter QoS eprosima::fastdds::dds::DataWriterQos wqos; wqos.reliability().kind eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; wqos.durability().kind eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS; // 7. 创建DataWriter writer_ publisher_-create_datawriter(topic_, wqos); } void publish_sample(uint32_t point_count) { sensor_msgs::PointCloud cloud; cloud.sensor_id(2024); cloud.timestamp(std::chrono::duration_caststd::chrono::milliseconds( std::chrono::system_clock::now().time_since_epoch()).count() / 1000.0); // 生成模拟点云数据 for (uint32_t i 0; i point_count; i) { cloud.x_coordinates().push_back(static_castfloat(rand()) / RAND_MAX * 10.0f); cloud.y_coordinates().push_back(static_castfloat(rand()) / RAND_MAX * 10.0f); cloud.z_coordinates().push_back(static_castfloat(rand()) / RAND_MAX * 2.0f - 1.0f); } writer_-write(cloud); std::cout 发布 point_count 个点云数据 cloud.timestamp() std::endl; } private: eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::Publisher* publisher_; eprosima::fastdds::dds::Topic* topic_; eprosima::fastdds::dds::DataWriter* writer_; };3. 订阅者实现与数据接收3.1 完整订阅者代码// subscriber.cpp #include fastdds/dds/subscriber/DataReaderListener.hpp #include sensor_msgs/PointCloudPubSubTypes.h class SensorSubscriber : public eprosima::fastdds::dds::DataReaderListener { public: SensorSubscriber() : participant_(nullptr), subscriber_(nullptr), topic_(nullptr), reader_(nullptr) { // 1. 创建DomainParticipant participant_ eprosima::fastdds::dds::DomainParticipantFactory::get_instance()-create_participant(0); // 2. 注册数据类型 eprosima::fastdds::dds::TypeSupport type(new sensor_msgs::PointCloudPubSubType()); type.register_type(participant_); // 3. 创建Subscriber subscriber_ participant_-create_subscriber(eprosima::fastdds::dds::SUBSCRIBER_QOS_DEFAULT); // 4. 创建Topic topic_ participant_-create_topic(RobotArmSensorData, type.get_type_name(), eprosima::fastdds::dds::TOPIC_QOS_DEFAULT); // 5. 配置DataReader QoS eprosima::fastdds::dds::DataReaderQos rqos; rqos.reliability().kind eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; rqos.durability().kind eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS; // 6. 创建DataReader并设置监听器 reader_ subscriber_-create_datareader(topic_, rqos, this); } // 数据到达回调 void on_data_available(eprosima::fastdds::dds::DataReader* reader) override { sensor_msgs::PointCloud cloud; eprosima::fastdds::dds::SampleInfo info; if (reader-take_next_sample(cloud, info) ReturnCode_t::RETCODE_OK) { if (info.valid_data) { std::cout 收到传感器 cloud.sensor_id() 的数据: cloud.x_coordinates().size() 个点 cloud.timestamp() std::endl; } } } private: eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::Subscriber* subscriber_; eprosima::fastdds::dds::Topic* topic_; eprosima::fastdds::dds::DataReader* reader_; };3.2 编译与运行测试创建CMakeLists.txt构建文件cmake_minimum_required(VERSION 3.16) project(fastdds_demo) find_package(fastdds REQUIRED) add_executable(publisher publisher.cpp sensor_msgs/PointCloud.cxx sensor_msgs/PointCloudPubSubTypes.cxx) target_link_libraries(publisher fastdds) add_executable(subscriber subscriber.cpp sensor_msgs/PointCloud.cxx sensor_msgs/PointCloudPubSubTypes.cxx) target_link_libraries(subscriber fastdds)运行测试两个终端分别执行# 终端1 - 订阅者 ./subscriber # 终端2 - 发布者 ./publisher4. 性能优化实战技巧4.1 共享内存传输配置对于同一主机内的进程通信共享内存(SHM)传输可降低延迟至微秒级// 在DomainParticipantQos中配置 eprosima::fastdds::dds::DomainParticipantQos pqos; // 启用SHM传输 auto shm_transport std::make_sharedeprosima::fastdds::rtps::SharedMemTransportDescriptor(); pqos.transport().user_transports.push_back(shm_transport); // 禁用UDPv4仅用于测试 pqos.transport().use_builtin_transports false;4.2 零拷贝配置优化通过预分配内存池减少动态内存分配eprosima::fastdds::dds::DataWriterQos wqos; // 配置资源限制 wqos.resource_limits().max_samples 1000; wqos.resource_limits().max_instances 10; wqos.resource_limits().max_samples_per_instance 100; // 启用内存池 wqos.endpoint().history_memory_policy eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;4.3 多网卡环境配置工业现场常有多个网络接口需指定通信网卡// 创建自定义UDP传输 auto udp_transport std::make_sharedeprosima::fastdds::rtps::UDPv4TransportDescriptor(); udp_transport-interfaceWhiteList.push_back(192.168.1.100); // 指定工业以太网IP // 应用到QoS pqos.transport().user_transports.push_back(udp_transport);4.4 实时性监控指标通过内置统计模块监控通信性能// 启用统计模块 eprosima::fastdds::dds::PropertyPolicy properties; properties.properties().emplace_back(fastdds.statistics, HISTORY_LATENCY_TOPIC;NETWORK_LATENCY_TOPIC); // 应用到DomainParticipant pqos.properties(properties);获取统计数据的Python示例# statistics_monitor.py from fastdds import DomainParticipantFactory, Statistics factory DomainParticipantFactory.get_instance() participant factory.lookup_participant(0) stats Statistics(participant) print(f当前延迟: {stats.network_latency.mean()} ms) print(f吞吐量: {stats.throughput.bytes_per_second / 1024:.2f} KB/s)