@leaveye
2022-02-19T08:24:57.000000Z
字数 5400
阅读 1272
@标签: nng api sample
pair协议实现了点对点模式,其中对等点之间的关系是一对一的。
通常,如果没有对等方能够接收到消息,则在尝试发送消息时此模式将阻塞。
- 虽然看似可靠,涉及 nng_device 或 raw 模式套接字的底层实现可能会丢弃消息。
- 如果为了与历史应用线路兼容,只应使用版本 0 。
- 版本 1 的“多角恋”特性将被废弃。
https://github.com/nanomsg/nng/blob/master/tests/cplusplus_pair.cc
// ...#include "nng/nng.h"#include "nng/protocol/pair1/pair.h"#include <cstring>#include <iostream>#define SOCKET_ADDRESS "inproc://c++"intmain(int argc, char **argv){#if defined(NNG_HAVE_PAIR1)nng_socket s1;nng_socket s2;int rv;size_t sz;char buf[8];(void) argc;(void) argv;if ((rv = nng_pair0_open(&s1)) != 0) {throw nng_strerror(rv);}if ((rv = nng_pair0_open(&s2)) != 0) {throw nng_strerror(rv);}if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) {throw nng_strerror(rv);}if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) {throw nng_strerror(rv);}if ((rv = nng_send(s2, (void *) "ABC", 4, 0)) != 0) {throw nng_strerror(rv);}sz = sizeof(buf);if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) {throw nng_strerror(rv);}if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) {throw "Contents did not match";}if ((rv = nng_send(s1, (void *) "DEF", 4, 0)) != 0) {throw nng_strerror(rv);}sz = sizeof(buf);if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) {throw nng_strerror(rv);}if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) {throw "Contents did not match";}if ((rv = nng_close(s1)) != 0) {throw nng_strerror(rv);}if ((rv = nng_close(s2)) != 0) {throw nng_strerror(rv);}std::cout << "Pass." << std::endl;#else(void) argc;(void) argv;std::cout << "Skipped (protocol unconfigured)." << std::endl;#endifreturn (0);}
发布者/订阅者模式的协议。
发布者发布所有数据。线路无附加头。
- 只发送不接收
- 一直可写
订阅者接收所有数据,遇到数据头与关心“主题”匹配则可以返回给接口。
“主题” 是一个定长字节块。可使用 nng_setopt_string() (字符串,含 NUL 结束符)或 nng_setopt()(长度额外生成)函数,NNG_OPT_SUB_SUBSCRIBE 选项,来设置。
- 可订阅多个“主题”,任一匹配即可
nng_socket_set_bool(s, NNG_OPT_SUB_PREFNEW, true)可以指定接收队列满时的处理逻辑。默认true表示清除最旧数据腾出空间,false表示拒绝新数据进入缓存。
直接片段重组,未测试
constexpr const char *url = "ipc:///path/to/the/node";std::thread t1([]() {nng_socket sock;nng_msg* msg;nng_pub0_open(&sock);nng_listen(sock, url, nullptr, 0);// wait for client connectedsleep(1);char bufX[0x400] = "X:abcdefg...";char bufY[0x400] = "Y:zxcvm...";for (;;) {nng_msg* msg;nng_msg_alloc(&msg,size);nng_msg_clear(msg);nng_msg_append(msg,bufX,sizeof(bufX));nng_sendmsg(sock,msg,0);nng_msg_alloc(&msg,size);nng_msg_clear(msg);nng_msg_append(msg,bufY,sizeof(bufX));nng_sendmsg(sock,msg,0);// nng_send() works too}nng_close(sock);});std::thread t2([]() {nng_socket sock;nng_msg* msg;nng_sub0_open(&sock);nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "X:", 2);//wait for server startedsleep(1);nng_dial(sock, url, nullptr, 0);for (;;) {nng_recvmsg(sock,&msg,0);// nng_recv() works too// will only get "X:..." messagenng_msg_free(msg);}nng_close(sock);});t1.join();t2.join();
#include <stdio.h>#include <string.h>#include <unistd.h>#include <nng/nng.h>#include <nng/protocol/pubsub0/pub.h>int main(){nng_socket sock;int i=0;int rv;const char *url="ipc:///var/run/loop/venc";if((rv = nng_pub0_open(&sock)) != 0){printf("nng_pub0_open error!!! rv=0x%x\n",rv);return -1;}if((rv = nng_listen(sock,url,NULL,0)) != 0){printf("nng_listen error!!! rv=0x%x\n",rv);return -1;}//sleep(1);char bufX[0x400] = "X:abcdefg7890123456";//char bufY[0x400] = "A:dgsj123456789.....";int size = 0x400;int ret=0;FILE *test_fp;int len=1880;char buf[1900];char *p = &(buf[2]);buf[0] = 'X';buf[1] = ':';int read_len=0;test_fp = fopen("test.mp4.ts","r");while(1){nng_msg* msg;read_len = fread(p, 1, len, test_fp);if(read_len == 0){printf("read end!!!,read_len=%d\n",read_len);break;}//printf("read_len=%d\n",read_len);nng_msg_alloc(&msg,read_len+2);nng_msg_clear(msg);nng_msg_append(msg,buf,read_len+2);char *p = (char *)nng_msg_body(msg);if(p != NULL){//printf("msg body=%s\n",p);printf("%d------p size = %x\n",i++,nng_msg_len(msg));}ret = nng_sendmsg(sock,msg,0);if(ret != 0){printf("nng_sendmsg error!!!\n");nng_msg_free(msg);}if(i%10 == 0)usleep(1000);/* nng_msg_alloc(&msg,size);nng_msg_clear(msg);nng_msg_append(msg,bufX,sizeof(bufX));char *p = (char *)nng_msg_body(msg);if(p != NULL){//printf("msg body=%s\n",p);printf("p size = %x\n",nng_msg_len(msg));}ret = nng_sendmsg(sock,msg,0);if(ret != 0){printf("nng_sendmsg error!!!\n");nng_msg_free(msg);}*/}read_len = 2;nng_msg* msg;nng_msg_alloc(&msg,read_len);nng_msg_clear(msg);nng_msg_append(msg,buf,read_len);ret = nng_sendmsg(sock,msg,0);if(ret != 0){printf("nng_sendmsg error!!!\n");nng_msg_free(msg);}fclose(test_fp);}
#include <stdio.h>#include <string.h>#include <nng/nng.h>#include <nng/protocol/pubsub0/sub.h>int main(){const char *url="ipc:///var/run/loop/venc";int rv;int i=0;nng_socket sock;nng_msg* msg;rv = nng_sub0_open(&sock);if(rv != 0){printf("nng_sub0_open error!!! rv=%d\n",rv);return -1;}rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "X:", 2);if(rv != 0){printf("nng_setopt error!!! rv=%d\n",rv);return -1;}rv = nng_dial(sock, url, NULL, 0);if(rv != 0){printf("nng_dial error!!! rv=%d\n",rv);while(rv == 6){rv = nng_dial(sock, url, NULL, 0);printf("nng_dial !!! rv=%d\n",rv);}if(rv != 0){printf("nng_dial error!!! rv=%d\n",rv);return -1;}}FILE *test_fp = NULL;test_fp = fopen("rcv_test.mp4.ts","w+");if(test_fp == NULL){printf("open file rcv_test.mp4.ts error!!!\n");return -1;}while(1){nng_recvmsg(sock,&msg,0);char *p = (char *)nng_msg_body(msg);if(p != NULL){//printf("msg body=%s\n",p);printf("%d------p size = %x\n",i++,nng_msg_len(msg));}int len = nng_msg_len(msg)-2;if(len > 0)fwrite(p+2,1,len,test_fp);else{printf("end!!!\n");fclose(test_fp);}nng_msg_free(msg);}nng_close(sock);return 0;}
请求/响应形式的协议。