@leaveye
2022-02-19T08:24:57.000000Z
字数 5400
阅读 1011
@标签: nng api sample
pair协议实现了点对点模式,其中对等点之间的关系是一对一的。
通常,如果没有对等方能够接收到消息,则在尝试发送消息时此模式将阻塞。
- 虽然看似可靠,涉及 nng_device 或 raw 模式套接字的底层实现可能会丢弃消息。
- 如果为了与历史应用线路兼容,只应使用版本 0 。
- 版本 1 的“多角恋”特性将被废弃。
https://github.com/nanomsg/nng/blob/master/tests/cplusplus_pair.cc
// ...
#include "nng/nng.h"
#include "nng/protocol/pair1/pair.h"
#include <cstring>
#include <iostream>
#define SOCKET_ADDRESS "inproc://c++"
int
main(int argc, char **argv)
{
#if defined(NNG_HAVE_PAIR1)
nng_socket s1;
nng_socket s2;
int rv;
size_t sz;
char buf[8];
(void) argc;
(void) argv;
if ((rv = nng_pair0_open(&s1)) != 0) {
throw nng_strerror(rv);
}
if ((rv = nng_pair0_open(&s2)) != 0) {
throw nng_strerror(rv);
}
if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) {
throw nng_strerror(rv);
}
if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) {
throw nng_strerror(rv);
}
if ((rv = nng_send(s2, (void *) "ABC", 4, 0)) != 0) {
throw nng_strerror(rv);
}
sz = sizeof(buf);
if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) {
throw nng_strerror(rv);
}
if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) {
throw "Contents did not match";
}
if ((rv = nng_send(s1, (void *) "DEF", 4, 0)) != 0) {
throw nng_strerror(rv);
}
sz = sizeof(buf);
if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) {
throw nng_strerror(rv);
}
if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) {
throw "Contents did not match";
}
if ((rv = nng_close(s1)) != 0) {
throw nng_strerror(rv);
}
if ((rv = nng_close(s2)) != 0) {
throw nng_strerror(rv);
}
std::cout << "Pass." << std::endl;
#else
(void) argc;
(void) argv;
std::cout << "Skipped (protocol unconfigured)." << std::endl;
#endif
return (0);
}
发布者/订阅者模式的协议。
发布者发布所有数据。线路无附加头。
- 只发送不接收
- 一直可写
订阅者接收所有数据,遇到数据头与关心“主题”匹配则可以返回给接口。
“主题” 是一个定长字节块。可使用 nng_setopt_string()
(字符串,含 NUL 结束符)或 nng_setopt()
(长度额外生成)函数,NNG_OPT_SUB_SUBSCRIBE
选项,来设置。
- 可订阅多个“主题”,任一匹配即可
nng_socket_set_bool(s, NNG_OPT_SUB_PREFNEW, true)
可以指定接收队列满时的处理逻辑。默认true
表示清除最旧数据腾出空间,false
表示拒绝新数据进入缓存。
直接片段重组,未测试
constexpr const char *url = "ipc:///path/to/the/node";
std::thread t1([]() {
nng_socket sock;
nng_msg* msg;
nng_pub0_open(&sock);
nng_listen(sock, url, nullptr, 0);
// wait for client connected
sleep(1);
char bufX[0x400] = "X:abcdefg...";
char bufY[0x400] = "Y:zxcvm...";
for (;;) {
nng_msg* msg;
nng_msg_alloc(&msg,size);
nng_msg_clear(msg);
nng_msg_append(msg,bufX,sizeof(bufX));
nng_sendmsg(sock,msg,0);
nng_msg_alloc(&msg,size);
nng_msg_clear(msg);
nng_msg_append(msg,bufY,sizeof(bufX));
nng_sendmsg(sock,msg,0);
// nng_send() works too
}
nng_close(sock);
});
std::thread t2([]() {
nng_socket sock;
nng_msg* msg;
nng_sub0_open(&sock);
nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "X:", 2);
//wait for server started
sleep(1);
nng_dial(sock, url, nullptr, 0);
for (;;) {
nng_recvmsg(sock,&msg,0);
// nng_recv() works too
// will only get "X:..." message
nng_msg_free(msg);
}
nng_close(sock);
});
t1.join();
t2.join();
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/pub.h>
int main()
{
nng_socket sock;
int i=0;
int rv;
const char *url="ipc:///var/run/loop/venc";
if((rv = nng_pub0_open(&sock)) != 0)
{
printf("nng_pub0_open error!!! rv=0x%x\n",rv);
return -1;
}
if((rv = nng_listen(sock,url,NULL,0)) != 0)
{
printf("nng_listen error!!! rv=0x%x\n",rv);
return -1;
}
//sleep(1);
char bufX[0x400] = "X:abcdefg7890123456";
//char bufY[0x400] = "A:dgsj123456789.....";
int size = 0x400;
int ret=0;
FILE *test_fp;
int len=1880;
char buf[1900];
char *p = &(buf[2]);
buf[0] = 'X';
buf[1] = ':';
int read_len=0;
test_fp = fopen("test.mp4.ts","r");
while(1)
{
nng_msg* msg;
read_len = fread(p, 1, len, test_fp);
if(read_len == 0)
{
printf("read end!!!,read_len=%d\n",read_len);
break;
}
//printf("read_len=%d\n",read_len);
nng_msg_alloc(&msg,read_len+2);
nng_msg_clear(msg);
nng_msg_append(msg,buf,read_len+2);
char *p = (char *)nng_msg_body(msg);
if(p != NULL)
{
//printf("msg body=%s\n",p);
printf("%d------p size = %x\n",i++,nng_msg_len(msg));
}
ret = nng_sendmsg(sock,msg,0);
if(ret != 0)
{
printf("nng_sendmsg error!!!\n");
nng_msg_free(msg);
}
if(i%10 == 0)
usleep(1000);
/* nng_msg_alloc(&msg,size);
nng_msg_clear(msg);
nng_msg_append(msg,bufX,sizeof(bufX));
char *p = (char *)nng_msg_body(msg);
if(p != NULL)
{
//printf("msg body=%s\n",p);
printf("p size = %x\n",nng_msg_len(msg));
}
ret = nng_sendmsg(sock,msg,0);
if(ret != 0)
{
printf("nng_sendmsg error!!!\n");
nng_msg_free(msg);
}*/
}
read_len = 2;
nng_msg* msg;
nng_msg_alloc(&msg,read_len);
nng_msg_clear(msg);
nng_msg_append(msg,buf,read_len);
ret = nng_sendmsg(sock,msg,0);
if(ret != 0)
{
printf("nng_sendmsg error!!!\n");
nng_msg_free(msg);
}
fclose(test_fp);
}
#include <stdio.h>
#include <string.h>
#include <nng/nng.h>
#include <nng/protocol/pubsub0/sub.h>
int main()
{
const char *url="ipc:///var/run/loop/venc";
int rv;
int i=0;
nng_socket sock;
nng_msg* msg;
rv = nng_sub0_open(&sock);
if(rv != 0)
{
printf("nng_sub0_open error!!! rv=%d\n",rv);
return -1;
}
rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "X:", 2);
if(rv != 0)
{
printf("nng_setopt error!!! rv=%d\n",rv);
return -1;
}
rv = nng_dial(sock, url, NULL, 0);
if(rv != 0)
{
printf("nng_dial error!!! rv=%d\n",rv);
while(rv == 6)
{
rv = nng_dial(sock, url, NULL, 0);
printf("nng_dial !!! rv=%d\n",rv);
}
if(rv != 0)
{
printf("nng_dial error!!! rv=%d\n",rv);
return -1;
}
}
FILE *test_fp = NULL;
test_fp = fopen("rcv_test.mp4.ts","w+");
if(test_fp == NULL)
{
printf("open file rcv_test.mp4.ts error!!!\n");
return -1;
}
while(1)
{
nng_recvmsg(sock,&msg,0);
char *p = (char *)nng_msg_body(msg);
if(p != NULL)
{
//printf("msg body=%s\n",p);
printf("%d------p size = %x\n",i++,nng_msg_len(msg));
}
int len = nng_msg_len(msg)-2;
if(len > 0)
fwrite(p+2,1,len,test_fp);
else
{
printf("end!!!\n");
fclose(test_fp);
}
nng_msg_free(msg);
}
nng_close(sock);
return 0;
}
请求/响应形式的协议。