[关闭]
@leaveye 2025-10-02T15:28:53.000000Z 字数 5985 阅读 105

数据所沟通

shujusuo rtp srtp problem


SRTP

测试现象

  1. 建立视音频通讯,启动发送和接收端(先后无关)
  2. 保持一小段时间(2分钟以上,时间短不出异常)
  3. 停止接收端(保持则不出异常)
  4. 再次开启接收端
  5. 接收侧一直报错 err 7 ,授权失败

这里面:

涉及的调用流程

sequenceDiagram participant P as 编码器 actor U as 用户/平台 box 解码器 participant A as 控制源/SIP代理 participant X as 传输服务 participant B as srtp 上下文 participant D as 解码服务 end U -) A: 启动接收
步骤1、4 A -->> X: startRx() activate X X ->> B: once_call() %% activate X Note right of X: srtp_init()
atexit(srtp_shutdown) X ->> B: set policy X ->> B: srtp_create() loop 数据接收中 P -) X: rtp packet X ->> B: srtp_unprotect() X -) D: raw data end U -) A: 停止接收
步骤3 A -->> X: stopRx() X -->> B: srtp_dealloc() %% deactivate X deactivate X

在用的核心代码样本

  1. class underlying_policy : public srtp_policy_t {
  2. public:
  3. underlying_policy(const Side side, const Type type, const string &key) : srtp_policy_t(), _side(side), _type(type), _key_stuff(key.begin(), key.end()) {
  4. static once_flag once{};
  5. call_once(once, [] {
  6. const auto err = srtp_init();
  7. if (err) throw srtp_error(err, "srtp_init failed");
  8. // warnx("... srtp_init done");
  9. atexit([]() {
  10. // warnx("... srtp_shutdown calling");
  11. const auto err_ = srtp_shutdown();
  12. if (err_) throw srtp_error(err_, "srtp_shutdown failed");
  13. });
  14. });
  15. void(*init_policy)(srtp_crypto_policy_t*);
  16. switch (type) {
  17. case SM4_ECB_128_HMAC_SM3_80:
  18. init_policy = srtp_crypto_policy_set_sdt_soft_sm4_ecb_hmac_soft_sm3;
  19. break;
  20. case SM4_CTR_128_HMAC_SM3_80:
  21. init_policy = srtp_crypto_policy_set_sdt_sw_soft_sm4_ctr_hmac_soft_sm3;
  22. break;
  23. default:
  24. throw invalid_argument("invalid type");
  25. }
  26. switch (side) {
  27. case INBOUND: {
  28. // warnx("... from demo/recv");
  29. this->ssrc.type = ssrc_any_inbound;
  30. this->key = _key_stuff.data();
  31. this->window_size = 1024;
  32. this->allow_repeat_tx = 0;
  33. init_policy(&this->rtp);
  34. } break;
  35. case OUTBOUND: {
  36. // warnx("... from demo/send");
  37. init_policy(&this->rtp);
  38. this->ssrc.type = ssrc_any_outbound;
  39. this->key = _key_stuff.data();
  40. this->window_size = 1024;
  41. this->allow_repeat_tx = 0;
  42. } break;
  43. default:
  44. throw invalid_argument("invalid side");
  45. }
  46. }
  47. ...
  48. };
  49. class underlying_cryptor {
  50. public:
  51. ~underlying_cryptor() {
  52. const auto dummy = _ctx; _ctx = nullptr;
  53. const auto err = srtp_dealloc(dummy);
  54. if (err) errx(err, "srtp_dealloc failed");
  55. // warnx("... srtp_dealloc done");
  56. }
  57. explicit underlying_cryptor(underlying_policy policy) : _policy(std::move(policy)) {
  58. const auto err = srtp_create(&_ctx, &_policy);
  59. if (err) throw srtp_error(err, "srtp_create failed");
  60. warnx("... srtp_create done %p", _ctx);
  61. }
  62. BufferPtr process_unsafe(const void *buf, const size_t len) {
  63. static const auto show_buf = [](const void *buf, const size_t len) {
  64. const auto repr = to_hex({(const char *)buf, len});
  65. warnx("... %zu: %s", len, repr.c_str());
  66. };
  67. _counter.run++;
  68. switch (_policy.side()) {
  69. case INBOUND: {
  70. // warnx("... from demo/recv");
  71. // return unprotect(buf, len);
  72. const auto out = make_shared<Buffer>((const char *) buf, len);
  73. const auto out_buf = const_cast<char*>(out->data());
  74. int out_len = out->size();
  75. const auto err = srtp_unprotect(_ctx, out_buf, &out_len);
  76. if (err) {
  77. _counter.err++;
  78. throw srtp_error(err, "srtp_unprotect failed");
  79. }
  80. const auto real_len = check_padding(out_buf, out_len);
  81. if (!real_len) {
  82. return {};
  83. }
  84. out->resize(real_len);
  85. _counter.ok++;
  86. return out;
  87. }
  88. case OUTBOUND: {
  89. // warnx("... from demo/send");
  90. const auto policy_data = _policy.show();
  91. // warnx("... policy: %s", policy_data.c_str());
  92. // return protect(buf, len);
  93. const auto out = make_shared<Buffer>();
  94. out->resize(len + Cryptor::CRYPT_OVERHEAD_SIZE);
  95. const auto out_buf = const_cast<char*>(out->data());
  96. memcpy(out_buf, buf, len);
  97. // show_buf(out_buf, len);
  98. int out_len = make_padding(out_buf, len);
  99. // show_buf(out_buf, out_len);
  100. const auto err = srtp_protect(_ctx, out_buf, &out_len);
  101. if (err) {
  102. _counter.err++;
  103. throw srtp_error(err, "srtp_protect failed");
  104. }
  105. // show_buf(out_buf, out_len);
  106. out->resize(out_len);
  107. _counter.ok++;
  108. return out;
  109. }
  110. default:
  111. _counter.err++;
  112. throw srtp_error("invalid side");
  113. }
  114. }
  115. BufferPtr process(const void *buf, const size_t len) {
  116. try {
  117. return process_unsafe(buf, len);
  118. } catch (srtp_error &e) {
  119. warnx("ERROR: process failed: %s", e.what());
  120. throw;
  121. } catch (...) {
  122. warnx("ERROR: process failed: uncatched error");
  123. throw;
  124. }
  125. }
  126. private:
  127. static int check_padding(void *buf, const uint len) {
  128. if (len < 12) return 0;
  129. auto data = (uint8_t *) buf;
  130. const auto q = data + len;
  131. data += 12;
  132. const uint8_t pad_len = q[-1] & 0x1F;
  133. if (q - data < pad_len) return 0;
  134. for (auto p = q - pad_len; p + 1 < q; p++)
  135. if (*p != 0xAF)
  136. return 0;
  137. return len - pad_len;
  138. }
  139. // 在用的版本。如前所说还没用扩展包,偏移写死。
  140. static int make_padding(void *buf, const uint len) {
  141. assert(len > 12 && "rtp packet must be gt 12");
  142. auto data = (uint8_t *) buf;
  143. auto p = data + len;
  144. data += 12;
  145. const uint8_t pad_len = 16 - (p - data) % 16;
  146. auto q = p + pad_len;
  147. *--q = pad_len & 0x1F;
  148. while (p < q) *p++ = 0xAF;
  149. assert(p == q && "not correctly finished");
  150. return len + pad_len;
  151. }
  152. // 用于对比的原始函数
  153. static void SetEncPad(uint8_t *buf, const uint len, uint8_t *pad_len) {
  154. *pad_len = 16 - (len % 16);
  155. for (int i = 0; i < *pad_len - 1; i++)
  156. buf[len + i] = 0xaf;
  157. buf[len + *pad_len - 1] = *pad_len & 0x1F;
  158. }
  159. ...
  160. };
  161. struct PacketEvent : UdpReader::PacketEvent {
  162. rtp_packet_t packet{};
  163. using UdpReader::PacketEvent::PacketEvent;
  164. explicit PacketEvent(const Options &options, const UdpReader::PacketEvent &evt) : UdpReader::PacketEvent(evt) {
  165. if (!is_acceptable(options.peer, evt.sender)) throw rtp_stream_failure("not mine packet");
  166. auto buf = evt.buffer;
  167. auto len = evt.length;
  168. // 当前端口复用未启用。这个分支不进入
  169. if (options.is_mux()) {
  170. if (len < 4) throw rtp_mux_failure("RTP mux packet too short");
  171. // ReSharper disable CppCStyleCast
  172. const auto mux_id = ntohl(*(uint32_t*)buf);
  173. if (mux_id != (uint32_t)options.mux_id) throw rtp_mux_failure("RTP mux ID mismatch");
  174. buf = (uint8_t*)buf + 4, len -= 4;
  175. // ReSharper restore CppCStyleCast
  176. }
  177. // 现在启用了这个分支
  178. vic::BufferPtr save_ptr{};
  179. if (options.is_crypt()) {
  180. const auto cryptor = options.cryptor;
  181. if (!cryptor) throw rtp_crypto_error("no SRTP cryptor we can use");
  182. try {
  183. save_ptr = cryptor->process(buf, len);
  184. if (!save_ptr) throw rtp_crypto_error("SRTP decryption failed.");
  185. } catch (vic::srtp_error &e) {
  186. // RX_TRACE("... decrypt %zu bytes", len);
  187. // RX_TRACEH(buf, len);
  188. throw rtp_crypto_error("SRTP decryption failed. " + string{e.what()});
  189. }
  190. buf = &save_ptr->at(0);
  191. len = save_ptr->size();
  192. }
  193. if (rtp_packet_deserialize(&packet, buf, len) < 0) throw rtp_parse_failure("rtp_packet_deserialize() failed");
  194. do {
  195. const auto &rtp = packet.rtp;
  196. // batch check. return on failure, or accept it by break.
  197. if (rtp.pt == options.payload) break; // accept without SSRC check now
  198. if (!options.is_fec()) throw rtp_stream_failure("not mine packet");
  199. if (rtp.pt == *options.fec_payload) break; // accept without SSRC check now
  200. throw rtp_stream_failure("not mine packet");
  201. } while (false);
  202. // 数据通过解析才认为是这一路媒体的包继续后续处理。包括FEC等现在未启用的逻辑。
  203. }
  204. uint16_t seq() const { return packet.rtp.seq; }
  205. };
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注