下载
中文
注册

消息发布

基本原理

普通消息发布的关键接口调用流程如图1所示。

图1 普通消息发布接口调用流程
  1. 资源初始化。

    用户在调用OpenHiva接口之前,需要先调用OpenHiva::Init接口进行初始化,返回值为0代表初始化成功,否则失败。

    初始化动作主要包含节点名注册、线程组创建、资源申请等动作。
    • 节点名注册:
      • 不同节点的节点名称不能重复,否则会导致后面启动的节点初始化失败。
      • 如果初始化失败,节点不能正常执行发布或者订阅动作。
    • 线程组创建:

      调用OpenHiva::Init接口时需传入线程组参数,因发布者不需要设置回调函数,因此可传入空线程组用于初始化。

  2. 注册发布信息。
    1. 用户在注册发布信息之前,必须要先创建节点句柄OpenHiva::Node n。
    2. 再使用创建的节点句柄n调用CreatePublisher接口向DataMaster注册要发布的Topic。CreatePublisher接口调用完成后,会返回Publisher对象。
    3. 最后通过调用Publisher对象的Ready接口,判断发布者是否创建成功。

    调用CreatePublisher接口时,OpenHiva内部会创建内存池、发布队列,并将Topic名称、发布队列ID等信息注册到中心节点(DataMaster)上,同时将发布队列ID与订阅该Topic的订阅队列ID绑定。

  3. 发布消息。

    用户使用步驟2中返回的Publisher对象调用OpenHiva::Publisher::Publish接口,将数据填充到发布队列并发布消息。若该Topic存在订阅者,队列调度器会根据步驟2中的队列绑定关系,将消息从发布队列搬运到与之绑定的订阅队列中,然后事件调度器会唤醒订阅者的工作线程处理订阅队列里的消息。

    • 如果此时队列绑定关系未创建,则发布队列或者对应的数据块满之后,OpenHiva会根据CreatePublisher接口入参topicOptions.overWriteFlag决定如何处理:
      • overWriteFlag=False(默认值),表示不写覆盖。此时系统会丢弃想入队的新数据,保留旧数据。
      • overWriteFlag=True,表示写覆盖。此时系统会丢弃队列中的旧数据,存入新数据。
    • 根据CreatePublisher接口入参topicOptions.queueFCFlag的取值不同,OpenHiva内部消息出队时的处理不同:
      • queueFCFlag=False(默认值):表示不开启队列流控。此时接口入参queueTTL无效,消息出队时不会丢弃队列中的消息。
      • queueFCFlag=True:表示开启队列流控。此时接口入参queueTTL有效(默认值为1000ms),消息出队时会丢弃队列中存储时间超过queueTTL值的消息。如果队列中的消息存储时间都超过了queueTTL,则消息出队时会保留最新一条消息,然后丢弃其他的。
  4. 资源释放。
    进程结束前需释放相应资源,定义的OpenHiva接口将无法使用。
    • 异常分支中,需主动调用OpenHiva::Shutdown接口释放资源,包括注销队列ID/Topic等信息、释放对应的内存块等。
    • 主线程中,需主动调用OpenHiva::WaitForShutdown接口释放资源,防止主线程提前退出。当进程异常终止时,WaitForShutdown接口内部会自动调用Shutdown接口进行资源清理。

示例代码

代码中公共的、相对稳定的参数(如队列大小queueSize),可以通过配置管理模块进行统一管理,方便后续维护,具体请参见配置管理

普通消息发布的关键步骤代码示例如下,仅供参考:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <string>
#include <iostream>
#include <memory>
#include <sstream>
#include "open/publisher.h"
#include "open/init.h"
#include "open/node.h"
#include "std_msgs/include/StringMessage.h"

// FixMessage是用户自定义函数,用于处理发布消息
void FixMessage(Hiva::StdMsgs::StringMessage &msg)
{
    std::stringstream ss;
    ss << "talker: ==>Hello World " << count;
    msg.stringData = ss.str();          // 将字符串内容赋值给消息,具体视用户消息结构而定
    HIVA_WARN("%s", msg.data.c_str());
}

int main(int argc, char **argv)
{
    // 1. 资源初始化
    uint32_t queueSize = 10;          // 数据发布队列长度,最大128
    uint32_t maxMsgSize = 20000;     // 每帧消息的最大大小,单位字节
    uint32_t blockNum = 20;         // 共享内存池中block的个数,默认为queueSize * 2,最大256
    uint32_t queueSize = 10U;      // 队列长度
    bool overwrite = true;         // 是否写覆盖
    bool queueFCFlag = false;     // 是否开启队列流控 
    uint16_t queueTTL = 0U;       // 流控的时间差
    std::string topicName = "/chatter";  // 发布者发布的话题名
    std::string nodeName = "/talker";     // 发布者节点名
    // 定义线程组
    std::vector<OpenHiva::ScheduleGroup> threadGroup;
    
    
    
    
    // 调用资源初始化接口
   OpenHiva::Init(argc, argv, threadGroup);  
    HIVA_EVENT("Talker init ok!");

    // 2. 注册发布消息
    // 构造Node对象
    OpenHiva::Node n(nodeName);
    // 构造TopicOptions
    OpenHiva::TopicOptions topicOps;
    topicOps.BuildGroupName(testGroup.groupName)
            .BuildMessageTraits<Hiva::StdMsgs::StringMessage>()
            .BuildShmOptions(maxMsgSize, blockNum)
            .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL)
            .BuildTopicName(topicName);
    // 通过NodeHandle对象调用CreatePublisher接口发布Topic,返回Publisher对象
    std::shared_ptr<OpenHiva::Publisher> chatterPub = n.CreatePublisher<Hiva::StdMsgs::StringMessage>(topicName, topicOps);  
    // 判断Publisher对象是否构造成功。若失败,调用Shutdown函数释放资源
    if ((chatterPub == nullptr) || !chatterPub->Ready()) {
        HIVA_ERROR("chatterPub Ok() Fail!");
        OpenHiva::Shutdown(); 
        return 0;
    }
    HIVA_INFO("Publisher creat ok");

    // 3. 发布消息
    int count = 0;    
    // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行
    while (OpenHiva::Ready()) {  
        Hiva::StdMsgs::StringMessage msg;  //定义要发布的消息
        FixMessage(msg);                  // 用户对将要发布消息预处理
        chatterPub.Publish(msg);        //发布消息到发布队列
        sleep(1);
        ++count;
    }

    // 4. 资源释放
    // 阻塞式调用,防止进程主动退出,在对进程执行ctrl+c或kill-2时函数会立刻返回
    OpenHiva::WaitForShutdown();  
    return 0;
}