下载
中文
注册

总体说明

订阅-发布原理

OpenHiva支持基于话题(Topic)的通信方式,主要包括以下消息类型:
  • 普通消息:提供标准Topic进行消息的订阅、发布。

    消息内存由OpenHiva申请和释放,用户只需关注业务逻辑,代码逻辑相对简单。

    本章将以消息的发布、订阅场景为例,介绍普通消息的使用方式。

  • BufferMessage:提供HivaBuffer进行消息的订阅、发布。

    消息内存由用户申请和释放,并自行管理内存中的数据,消息需要遵循固定的BufferMessage格式。框架会从发布的消息中获取用户申请好的内存地址,在发布过程中如果出错,框架会自动释放该内存。如果消息发布成功,内存将由订阅端自动释放,用户无需关心该内存的释放情况。

    BufferMessage通常适用于图像消息通信场景,尤其是从Lidar、Camera等传感器获取的数据,数据量较大,为减少图像数据的拷贝,使用BufferMessage消息可提高传输性能。

    本章将先以消息发布、订阅的基本场景为例,介绍BufferMessage消息(基本场景)的使用方式;然后以推理场景为例,结合NN Engine,从获取数据、推理、发布推理结果介绍BufferMessage消息(异步推理场景)的使用方式。

    NN Engine实际上是节点内部的实现,首先在APP里面加载推理模型,输入原始Camera数据后,推理的结果支持以Topic形式和BufferMessage的数据格式发布出去。

无论是哪种消息类型,消息发布和订阅的原理基本一致,关键流程如图1所示。其中Talker作为发布者发布话题,Listener作为订阅者订阅话题,两者通过DataMaster(中心节点)建立管道进行通信,QueueScheduler作为消息队列调度器。

图1 消息订阅-发布流程图
  1. 发布端,通过CreatePublisher接口向DataMaster注册待发布Topic,以及Topic对应的TopicOptions属性,如队列深度、最大消息尺寸等。
  2. 订阅端,通过CreateSubscriber接口向DataMaster注册待订阅Topic、回调函数,以及Topic对应的TopicOptions属性,如队列深度、最大消息尺寸,Subscriber分组等。
  3. OpenHiva根据DataMatser接收到的注册队列信息,将发布队列ID/订阅队列ID绑定,并存储在QueueScheduler中。
  4. 发布端,通过Publish接口发布指定Topic的消息。
  5. 订阅端,通过QueueScheduler将发布队列的消息搬运到与之绑定的订阅队列中,并唤醒订阅节点的工作线程。
  6. 订阅端线程调用回调函数处理消息,获取发布的Topic消息。

TopicOptions参数说明

TopicOptions参数是Topic消息一系列属性值的集合,包括话题名称、话题通信方式、队列深度、消息尺寸等。目前支持3种TopicOptions参数配置方法:

TopicOptions配置优先级:yaml文件 > 参数输入 > 默认值。

  • 默认值:在参数未配置的情况下,OpenHiva提供了可行的默认参数值。
  • 参数输入:通过一系列Build接口进行参数配置,具体参见OpenHiva::TopicOptions接口
  • yaml文件:通过yaml文件进行参数配置。配置完成后,需在命令行中使用“__transport=yaml path”参数指定yaml文件路径,使配置文件生效。

    yaml文件配置示例如下,具体参数说明参见表1

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    testTopic:                 
        transport: [Shm, Dsf] 
        groupName: test_group   
        blockSize: 102400      
        blockNum: 10           
        Dsf:
            queueDepth: 5      
            overwrite: false  
            queueFCFlag: false 
            queueTTL: 0      
        Dds:
             history: DDS_QOS_HISTORY_POLICY_KEEP_LAST
             depth: 5
             reliability: DDS_QOS_RELIABILITY_POLICY_BEST_EFFORT
             durability: DDS_QOS_DURABILITY_POLICY_VOLATILE
             deadline:
                 second: 100
                 nanosecond: 
             lifespan:
                 second: 10
                 nanosecond: 10
             liveliness: DDS_QOS_LIVELINESS_POLICY_AUTOMATIC
             lease_duration:
                 second: 
                 nanosecond: 
    
    表1 yaml配置文件参数说明

    参数分类

    参数名

    参数含义

    基础参数

    testTopic

    指明了需要配置参数的话题名。

    transport

    配置话题对应的通信方式,可以指定一个或者多个,需要包含在“[]”内部。

    说明:

    当前支持的通信方式有Dsf、Vfs、Shm、Dds。如果配置的通信方式不合法,在service情况下会默认使用为Shm,管理面会默认使用Vfs,数据面会默认使用Dsf。

    groupName

    指明了Topic所属的group名 。

    blockSize

    指内存块大小。Dsf通信、Shm通信和Dds通信(单设备共享内存通信)会根据blockSize申请内存资源。

    blockNum

    指内存块数量,使用Dsf通信或Shm通信时会在初始化时申请多个内存块,每个都是blockSize大小,但要满足blockSize*blockNum<300M。Dds通信(单设备共享内存通信)中blockSize*blockNum接近或小于消息大小会导致丢包。

    Dsf

    queueDepth

    指通信时缓存消息的队列长度。

    overwrite

    是否写覆盖,其值只能为true或者false:

    • 为true时,接收队列满了后会将队首消息丢弃,新收到的消息会加入队尾。
    • 为false时,接收队列满了后将新收到的消息丢弃。

    queueFCFlag

    是否队列流控,其值只能为true或者false:

    • 为true时,启动队列流控,此时queueTTL参数生效。
    • 为false时,不启动队列流控,queueTTL参数不生效。

    queueTTL

    指队列流控时间(ms),仅当Dsf/queueFCFlag为true时生效。当queueTTL=0,接收队列最多缓存一帧数据;当queueTTL>0,接收队列消息出队时会计算消息从入队到出队的时间差,一旦超出queueTTL范围,该消息会被丢弃。

    Dds

    说明:
    • 发布端与接收端可以单独配置Dds QoS策略,两者需要在策略兼容的情况下才能匹配上。
    • Dds通信节点发现受网络波动影响。在单设备空载情况下,通信底层节点匹配最多需要约500ms时间。默认QoS配置下在节点匹配前发送的报文会被丢掉。
    • 不同策略组合的兼容性说明请参照表2

    history

    设置DDS消息保存策略,支持2种:

    • DDS_QOS_HISTORY_POLICY_KEEP_LAST:保留最多队列长度的个数。
    • DDS_QOS_HISTORY_POLICY_KEEP_ALL:保留全部消息样本。

    depth

    设置队列长度,仅当history设置成DDS_QOS_HISTORY_POLICY_KEEP_LAST时,该参数才能生效。

    reliability

    设置消息可靠性,支持2种:

    • DDS_QOS_RELIABILITY_POLICY_RELIABLE:保证消息被传递。
    • DDS_QOS_RELIABILITY_POLICY_BEST_EFFORT:消息不会重传,不会等待发送到达的确认。

    durability

    设置消息的持久性,支持2种:

    • DDS_QOS_DURABILITY_POLICY_VOLATILE:过往发送的消息都被忽略,只有当订阅者匹配上发布者才会接收消息。
    • DDS_QOS_DURABILITY_POLICY_TRANSIENT_LOCAL:创建新datareader后,历史记录中会补充过去已发送的消息。

    deadline

    设置消息发送到接收方的有效时间。消息被发出来后,接收方检查该有效时间,超时则认为该消息无效而被丢弃。

    • second:设置秒
    • nanosecond:设置纳秒

    总时间为两者时间相加。

    lifespan

    设置消息在系统里的有效时间。

    • second:设置秒
    • nanosecond:设置纳秒

    总时间为两者时间相加。

    liveliness

    设置datareader、datawriter的生命周期检测策略,支持2种:

    • DDS_QOS_LIVELINESS_POLICY_AUTOMATIC:系统自动校验datareader、datawriter有效性。
    • DDS_QOS_LIVELINESS_POLICY_MANUAL_BY_TOPIC:发布端会在lease_duration周期内检测订阅端的有效性,若订阅端没有回复发布端发送的心跳信号则认为该订阅端已失效。

    lease_duration

    订阅端、发布端有效性检测的时间。超过该时候应答心跳信号则认为该订阅端或发布端已失效。

    • second:设置秒
    • nanosecond:设置纳秒

    总时间为两者时间相加。需要与DDS_QOS_LIVELINESS_POLICY_MANUAL_BY_TOPIC配合使用。

    表2 Dds QoS策略兼容性

    策略项

    发布端

    订阅端

    是否兼容

    reliability策略兼容性

    Best effort

    Best effort

    兼容

    Best effort

    Reliable

    不兼容

    Reliable

    Best effort

    兼容

    Reliable

    Reliable

    兼容

    durability策略兼容性

    Volatile

    Volatile

    兼容

    Volatile

    Transient local

    不兼容

    Transient local

    Volatile

    兼容

    Transient local

    Transient local

    兼容

    deadline策略兼容性

    系统默认值(QOS_DURATION_MAX)

    系统默认值(QOS_DURATION_MAX)

    兼容

    系统默认值(QOS_DURATION_MAX)

    x

    不兼容

    x

    系统默认值(QOS_DURATION_MAX)

    兼容

    x

    x

    兼容

    x

    y (y > x)

    兼容

    x

    y (y < x)

    不兼容

    lifespan策略兼容性

    NA

    NA

    NA

    liveliness策略兼容性

    Automatic

    Automatic

    兼容

    Automatic

    Manual by topic

    不兼容

    Manual by topic

    Automatic

    兼容

    Manual by topic

    Manual by topic

    兼容

    lease duration策略兼容性

    系统默认值(QOS_DURATION_MAX)

    系统默认值(QOS_DURATION_MAX)

    兼容

    系统默认值(QOS_DURATION_MAX)

    x

    不兼容

    x

    系统默认值(QOS_DURATION_MAX)

    兼容

    x

    x

    兼容

    x

    y (y > x)

    兼容

    x

    y (y < x)

    不兼容

    说明:假设x、y为有效数值,取值范围为[0, QOS_DURATION_MAX]。