消息发布
基本原理
普通消息发布的关键接口调用流程如图1所示。
- 资源初始化。
用户在调用OpenHiva接口之前,需要先调用OpenHiva::Init接口进行初始化,返回值为0代表初始化成功,否则失败。
- 注册发布信息。
- 用户在注册发布信息之前,必须要先创建节点句柄OpenHiva::Node n。
- 再使用创建的节点句柄n调用CreatePublisher接口向DataMaster注册要发布的Topic。CreatePublisher接口调用完成后,会返回Publisher对象。
- 最后通过调用Publisher对象的Ready接口,判断发布者是否创建成功。
调用CreatePublisher接口时,OpenHiva内部会创建内存池、发布队列,并将Topic名称、发布队列ID等信息注册到中心节点(DataMaster)上,同时将发布队列ID与订阅该Topic的订阅队列ID绑定。
- 发布消息。
用户使用步驟2中返回的Publisher对象调用OpenHiva::Publisher::Publish接口,将数据填充到发布队列并发布消息。若该Topic存在订阅者,队列调度器会根据步驟2中的队列绑定关系,将消息从发布队列搬运到与之绑定的订阅队列中,然后事件调度器会唤醒订阅者的工作线程处理订阅队列里的消息。
- 如果此时队列绑定关系未创建,则发布队列或者对应的数据块满之后,OpenHiva会根据CreatePublisher接口入参topicOptions.overWriteFlag决定如何处理:
- overWriteFlag=False(默认值),表示不写覆盖。此时系统会丢弃想入队的新数据,保留旧数据。
- overWriteFlag=True,表示写覆盖。此时系统会丢弃队列中的旧数据,存入新数据。
- 根据CreatePublisher接口入参topicOptions.queueFCFlag的取值不同,OpenHiva内部消息出队时的处理不同:
- queueFCFlag=False(默认值):表示不开启队列流控。此时接口入参queueTTL无效,消息出队时不会丢弃队列中的消息。
- queueFCFlag=True:表示开启队列流控。此时接口入参queueTTL有效(默认值为1000ms),消息出队时会丢弃队列中存储时间超过queueTTL值的消息。如果队列中的消息存储时间都超过了queueTTL,则消息出队时会保留最新一条消息,然后丢弃其他的。
- 如果此时队列绑定关系未创建,则发布队列或者对应的数据块满之后,OpenHiva会根据CreatePublisher接口入参topicOptions.overWriteFlag决定如何处理:
- 资源释放。
示例代码
普通消息发布的关键步骤代码示例如下,仅供参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
#include <string> #include <iostream> #include <memory> #include <sstream> #include "open/publisher.h" #include "open/init.h" #include "open/node.h" #include "std_msgs/include/StringMessage.h" // FixMessage是用户自定义函数,用于处理发布消息 void FixMessage(Hiva::StdMsgs::StringMessage &msg) { std::stringstream ss; ss << "talker: ==>Hello World " << count; msg.stringData = ss.str(); // 将字符串内容赋值给消息,具体视用户消息结构而定 HIVA_WARN("%s", msg.data.c_str()); } int main(int argc, char **argv) { // 1. 资源初始化 uint32_t queueSize = 10; // 数据发布队列长度,最大128 uint32_t maxMsgSize = 20000; // 每帧消息的最大大小,单位字节 uint32_t blockNum = 20; // 共享内存池中block的个数,默认为queueSize * 2,最大256 uint32_t queueSize = 10U; // 队列长度 bool overwrite = true; // 是否写覆盖 bool queueFCFlag = false; // 是否开启队列流控 uint16_t queueTTL = 0U; // 流控的时间差 std::string topicName = "/chatter"; // 发布者发布的话题名 std::string nodeName = "/talker"; // 发布者节点名 // 定义线程组 std::vector<OpenHiva::ScheduleGroup> threadGroup; // 调用资源初始化接口 OpenHiva::Init(argc, argv, threadGroup); HIVA_EVENT("Talker init ok!"); // 2. 注册发布消息 // 构造Node对象 OpenHiva::Node n(nodeName); // 构造TopicOptions OpenHiva::TopicOptions topicOps; topicOps.BuildGroupName(testGroup.groupName) .BuildMessageTraits<Hiva::StdMsgs::StringMessage>() .BuildShmOptions(maxMsgSize, blockNum) .BuildQueueOptions(queueSize, overwrite, queueFCFlag, queueTTL) .BuildTopicName(topicName); // 通过NodeHandle对象调用CreatePublisher接口发布Topic,返回Publisher对象 std::shared_ptr<OpenHiva::Publisher> chatterPub = n.CreatePublisher<Hiva::StdMsgs::StringMessage>(topicName, topicOps); // 判断Publisher对象是否构造成功。若失败,调用Shutdown函数释放资源 if ((chatterPub == nullptr) || !chatterPub->Ready()) { HIVA_ERROR("chatterPub Ok() Fail!"); OpenHiva::Shutdown(); return 0; } HIVA_INFO("Publisher creat ok"); // 3. 发布消息 int count = 0; // 判断Hiva节点状态。当节点是使能状态,返回true;当节点是shutdown或初始化失败状态,返回false,收发包均不能正确进行 while (OpenHiva::Ready()) { Hiva::StdMsgs::StringMessage msg; //定义要发布的消息 FixMessage(msg); // 用户对将要发布消息预处理 chatterPub.Publish(msg); //发布消息到发布队列 sleep(1); ++count; } // 4. 资源释放 // 阻塞式调用,防止进程主动退出,在对进程执行ctrl+c或kill-2时函数会立刻返回 OpenHiva::WaitForShutdown(); return 0; } |
父主题: 普通消息