[关闭]
@leaveye 2022-02-19T08:24:57.000000Z 字数 5400 阅读 1011

nng usage

@标签: nng api sample


目录

pair 协议

说明

https://nng.nanomsg.org/man/v1.3.2/nng_pair.7.html

pair协议实现了点对点模式,其中对等点之间的关系是一对一的。

通常,如果没有对等方能够接收到消息,则在尝试发送消息时此模式将阻塞。

  • 虽然看似可靠,涉及 nng_device 或 raw 模式套接字的底层实现可能会丢弃消息。
  • 如果为了与历史应用线路兼容,只应使用版本 0 。
  • 版本 1 的“多角恋”特性将被废弃。

例子

https://github.com/nanomsg/nng/blob/master/tests/cplusplus_pair.cc

  1. // ...
  2. #include "nng/nng.h"
  3. #include "nng/protocol/pair1/pair.h"
  4. #include <cstring>
  5. #include <iostream>
  6. #define SOCKET_ADDRESS "inproc://c++"
  7. int
  8. main(int argc, char **argv)
  9. {
  10. #if defined(NNG_HAVE_PAIR1)
  11. nng_socket s1;
  12. nng_socket s2;
  13. int rv;
  14. size_t sz;
  15. char buf[8];
  16. (void) argc;
  17. (void) argv;
  18. if ((rv = nng_pair0_open(&s1)) != 0) {
  19. throw nng_strerror(rv);
  20. }
  21. if ((rv = nng_pair0_open(&s2)) != 0) {
  22. throw nng_strerror(rv);
  23. }
  24. if ((rv = nng_listen(s1, SOCKET_ADDRESS, NULL, 0)) != 0) {
  25. throw nng_strerror(rv);
  26. }
  27. if ((rv = nng_dial(s2, SOCKET_ADDRESS, NULL, 0)) != 0) {
  28. throw nng_strerror(rv);
  29. }
  30. if ((rv = nng_send(s2, (void *) "ABC", 4, 0)) != 0) {
  31. throw nng_strerror(rv);
  32. }
  33. sz = sizeof(buf);
  34. if ((rv = nng_recv(s1, buf, &sz, 0)) != 0) {
  35. throw nng_strerror(rv);
  36. }
  37. if ((sz != 4) || (memcmp(buf, "ABC", 4) != 0)) {
  38. throw "Contents did not match";
  39. }
  40. if ((rv = nng_send(s1, (void *) "DEF", 4, 0)) != 0) {
  41. throw nng_strerror(rv);
  42. }
  43. sz = sizeof(buf);
  44. if ((rv = nng_recv(s2, buf, &sz, 0)) != 0) {
  45. throw nng_strerror(rv);
  46. }
  47. if ((sz != 4) || (memcmp(buf, "DEF", 4) != 0)) {
  48. throw "Contents did not match";
  49. }
  50. if ((rv = nng_close(s1)) != 0) {
  51. throw nng_strerror(rv);
  52. }
  53. if ((rv = nng_close(s2)) != 0) {
  54. throw nng_strerror(rv);
  55. }
  56. std::cout << "Pass." << std::endl;
  57. #else
  58. (void) argc;
  59. (void) argv;
  60. std::cout << "Skipped (protocol unconfigured)." << std::endl;
  61. #endif
  62. return (0);
  63. }

pubsub 协议

说明

发布者/订阅者模式的协议。

pub - 发布者

https://nng.nanomsg.org/man/v1.3.2/nng_pub.7.html

发布者发布所有数据。线路无附加头。

  • 只发送不接收
  • 一直可写

sub - 订阅者

https://nng.nanomsg.org/man/v1.3.2/nng_sub.7.html

订阅者接收所有数据,遇到数据头与关心“主题”匹配则可以返回给接口。

“主题” 是一个定长字节块。可使用 nng_setopt_string() (字符串,含 NUL 结束符)或 nng_setopt()(长度额外生成)函数,NNG_OPT_SUB_SUBSCRIBE 选项,来设置。

  • 可订阅多个“主题”,任一匹配即可
  • nng_socket_set_bool(s, NNG_OPT_SUB_PREFNEW, true) 可以指定接收队列满时的处理逻辑。默认 true 表示清除最旧数据腾出空间,false 表示拒绝新数据进入缓存。

例子

简短版

直接片段重组,未测试

  1. constexpr const char *url = "ipc:///path/to/the/node";
  2. std::thread t1([]() {
  3. nng_socket sock;
  4. nng_msg* msg;
  5. nng_pub0_open(&sock);
  6. nng_listen(sock, url, nullptr, 0);
  7. // wait for client connected
  8. sleep(1);
  9. char bufX[0x400] = "X:abcdefg...";
  10. char bufY[0x400] = "Y:zxcvm...";
  11. for (;;) {
  12. nng_msg* msg;
  13. nng_msg_alloc(&msg,size);
  14. nng_msg_clear(msg);
  15. nng_msg_append(msg,bufX,sizeof(bufX));
  16. nng_sendmsg(sock,msg,0);
  17. nng_msg_alloc(&msg,size);
  18. nng_msg_clear(msg);
  19. nng_msg_append(msg,bufY,sizeof(bufX));
  20. nng_sendmsg(sock,msg,0);
  21. // nng_send() works too
  22. }
  23. nng_close(sock);
  24. });
  25. std::thread t2([]() {
  26. nng_socket sock;
  27. nng_msg* msg;
  28. nng_sub0_open(&sock);
  29. nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "X:", 2);
  30. //wait for server started
  31. sleep(1);
  32. nng_dial(sock, url, nullptr, 0);
  33. for (;;) {
  34. nng_recvmsg(sock,&msg,0);
  35. // nng_recv() works too
  36. // will only get "X:..." message
  37. nng_msg_free(msg);
  38. }
  39. nng_close(sock);
  40. });
  41. t1.join();
  42. t2.join();

完整版

  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <unistd.h>
  4. #include <nng/nng.h>
  5. #include <nng/protocol/pubsub0/pub.h>
  6. int main()
  7. {
  8. nng_socket sock;
  9. int i=0;
  10. int rv;
  11. const char *url="ipc:///var/run/loop/venc";
  12. if((rv = nng_pub0_open(&sock)) != 0)
  13. {
  14. printf("nng_pub0_open error!!! rv=0x%x\n",rv);
  15. return -1;
  16. }
  17. if((rv = nng_listen(sock,url,NULL,0)) != 0)
  18. {
  19. printf("nng_listen error!!! rv=0x%x\n",rv);
  20. return -1;
  21. }
  22. //sleep(1);
  23. char bufX[0x400] = "X:abcdefg7890123456";
  24. //char bufY[0x400] = "A:dgsj123456789.....";
  25. int size = 0x400;
  26. int ret=0;
  27. FILE *test_fp;
  28. int len=1880;
  29. char buf[1900];
  30. char *p = &(buf[2]);
  31. buf[0] = 'X';
  32. buf[1] = ':';
  33. int read_len=0;
  34. test_fp = fopen("test.mp4.ts","r");
  35. while(1)
  36. {
  37. nng_msg* msg;
  38. read_len = fread(p, 1, len, test_fp);
  39. if(read_len == 0)
  40. {
  41. printf("read end!!!,read_len=%d\n",read_len);
  42. break;
  43. }
  44. //printf("read_len=%d\n",read_len);
  45. nng_msg_alloc(&msg,read_len+2);
  46. nng_msg_clear(msg);
  47. nng_msg_append(msg,buf,read_len+2);
  48. char *p = (char *)nng_msg_body(msg);
  49. if(p != NULL)
  50. {
  51. //printf("msg body=%s\n",p);
  52. printf("%d------p size = %x\n",i++,nng_msg_len(msg));
  53. }
  54. ret = nng_sendmsg(sock,msg,0);
  55. if(ret != 0)
  56. {
  57. printf("nng_sendmsg error!!!\n");
  58. nng_msg_free(msg);
  59. }
  60. if(i%10 == 0)
  61. usleep(1000);
  62. /* nng_msg_alloc(&msg,size);
  63. nng_msg_clear(msg);
  64. nng_msg_append(msg,bufX,sizeof(bufX));
  65. char *p = (char *)nng_msg_body(msg);
  66. if(p != NULL)
  67. {
  68. //printf("msg body=%s\n",p);
  69. printf("p size = %x\n",nng_msg_len(msg));
  70. }
  71. ret = nng_sendmsg(sock,msg,0);
  72. if(ret != 0)
  73. {
  74. printf("nng_sendmsg error!!!\n");
  75. nng_msg_free(msg);
  76. }*/
  77. }
  78. read_len = 2;
  79. nng_msg* msg;
  80. nng_msg_alloc(&msg,read_len);
  81. nng_msg_clear(msg);
  82. nng_msg_append(msg,buf,read_len);
  83. ret = nng_sendmsg(sock,msg,0);
  84. if(ret != 0)
  85. {
  86. printf("nng_sendmsg error!!!\n");
  87. nng_msg_free(msg);
  88. }
  89. fclose(test_fp);
  90. }
  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <nng/nng.h>
  4. #include <nng/protocol/pubsub0/sub.h>
  5. int main()
  6. {
  7. const char *url="ipc:///var/run/loop/venc";
  8. int rv;
  9. int i=0;
  10. nng_socket sock;
  11. nng_msg* msg;
  12. rv = nng_sub0_open(&sock);
  13. if(rv != 0)
  14. {
  15. printf("nng_sub0_open error!!! rv=%d\n",rv);
  16. return -1;
  17. }
  18. rv = nng_setopt(sock, NNG_OPT_SUB_SUBSCRIBE, "X:", 2);
  19. if(rv != 0)
  20. {
  21. printf("nng_setopt error!!! rv=%d\n",rv);
  22. return -1;
  23. }
  24. rv = nng_dial(sock, url, NULL, 0);
  25. if(rv != 0)
  26. {
  27. printf("nng_dial error!!! rv=%d\n",rv);
  28. while(rv == 6)
  29. {
  30. rv = nng_dial(sock, url, NULL, 0);
  31. printf("nng_dial !!! rv=%d\n",rv);
  32. }
  33. if(rv != 0)
  34. {
  35. printf("nng_dial error!!! rv=%d\n",rv);
  36. return -1;
  37. }
  38. }
  39. FILE *test_fp = NULL;
  40. test_fp = fopen("rcv_test.mp4.ts","w+");
  41. if(test_fp == NULL)
  42. {
  43. printf("open file rcv_test.mp4.ts error!!!\n");
  44. return -1;
  45. }
  46. while(1)
  47. {
  48. nng_recvmsg(sock,&msg,0);
  49. char *p = (char *)nng_msg_body(msg);
  50. if(p != NULL)
  51. {
  52. //printf("msg body=%s\n",p);
  53. printf("%d------p size = %x\n",i++,nng_msg_len(msg));
  54. }
  55. int len = nng_msg_len(msg)-2;
  56. if(len > 0)
  57. fwrite(p+2,1,len,test_fp);
  58. else
  59. {
  60. printf("end!!!\n");
  61. fclose(test_fp);
  62. }
  63. nng_msg_free(msg);
  64. }
  65. nng_close(sock);
  66. return 0;
  67. }

reqrep 协议

请求/响应形式的协议。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注