@leaveye
2025-10-02T15:28:53.000000Z
字数 5985
阅读 105
shujusuo rtp srtp problem
这里面:
class underlying_policy : public srtp_policy_t {public:underlying_policy(const Side side, const Type type, const string &key) : srtp_policy_t(), _side(side), _type(type), _key_stuff(key.begin(), key.end()) {static once_flag once{};call_once(once, [] {const auto err = srtp_init();if (err) throw srtp_error(err, "srtp_init failed");// warnx("... srtp_init done");atexit([]() {// warnx("... srtp_shutdown calling");const auto err_ = srtp_shutdown();if (err_) throw srtp_error(err_, "srtp_shutdown failed");});});void(*init_policy)(srtp_crypto_policy_t*);switch (type) {case SM4_ECB_128_HMAC_SM3_80:init_policy = srtp_crypto_policy_set_sdt_soft_sm4_ecb_hmac_soft_sm3;break;case SM4_CTR_128_HMAC_SM3_80:init_policy = srtp_crypto_policy_set_sdt_sw_soft_sm4_ctr_hmac_soft_sm3;break;default:throw invalid_argument("invalid type");}switch (side) {case INBOUND: {// warnx("... from demo/recv");this->ssrc.type = ssrc_any_inbound;this->key = _key_stuff.data();this->window_size = 1024;this->allow_repeat_tx = 0;init_policy(&this->rtp);} break;case OUTBOUND: {// warnx("... from demo/send");init_policy(&this->rtp);this->ssrc.type = ssrc_any_outbound;this->key = _key_stuff.data();this->window_size = 1024;this->allow_repeat_tx = 0;} break;default:throw invalid_argument("invalid side");}}...};class underlying_cryptor {public:~underlying_cryptor() {const auto dummy = _ctx; _ctx = nullptr;const auto err = srtp_dealloc(dummy);if (err) errx(err, "srtp_dealloc failed");// warnx("... srtp_dealloc done");}explicit underlying_cryptor(underlying_policy policy) : _policy(std::move(policy)) {const auto err = srtp_create(&_ctx, &_policy);if (err) throw srtp_error(err, "srtp_create failed");warnx("... srtp_create done %p", _ctx);}BufferPtr process_unsafe(const void *buf, const size_t len) {static const auto show_buf = [](const void *buf, const size_t len) {const auto repr = to_hex({(const char *)buf, len});warnx("... %zu: %s", len, repr.c_str());};_counter.run++;switch (_policy.side()) {case INBOUND: {// warnx("... from demo/recv");// return unprotect(buf, len);const auto out = make_shared<Buffer>((const char *) buf, len);const auto out_buf = const_cast<char*>(out->data());int out_len = out->size();const auto err = srtp_unprotect(_ctx, out_buf, &out_len);if (err) {_counter.err++;throw srtp_error(err, "srtp_unprotect failed");}const auto real_len = check_padding(out_buf, out_len);if (!real_len) {return {};}out->resize(real_len);_counter.ok++;return out;}case OUTBOUND: {// warnx("... from demo/send");const auto policy_data = _policy.show();// warnx("... policy: %s", policy_data.c_str());// return protect(buf, len);const auto out = make_shared<Buffer>();out->resize(len + Cryptor::CRYPT_OVERHEAD_SIZE);const auto out_buf = const_cast<char*>(out->data());memcpy(out_buf, buf, len);// show_buf(out_buf, len);int out_len = make_padding(out_buf, len);// show_buf(out_buf, out_len);const auto err = srtp_protect(_ctx, out_buf, &out_len);if (err) {_counter.err++;throw srtp_error(err, "srtp_protect failed");}// show_buf(out_buf, out_len);out->resize(out_len);_counter.ok++;return out;}default:_counter.err++;throw srtp_error("invalid side");}}BufferPtr process(const void *buf, const size_t len) {try {return process_unsafe(buf, len);} catch (srtp_error &e) {warnx("ERROR: process failed: %s", e.what());throw;} catch (...) {warnx("ERROR: process failed: uncatched error");throw;}}private:static int check_padding(void *buf, const uint len) {if (len < 12) return 0;auto data = (uint8_t *) buf;const auto q = data + len;data += 12;const uint8_t pad_len = q[-1] & 0x1F;if (q - data < pad_len) return 0;for (auto p = q - pad_len; p + 1 < q; p++)if (*p != 0xAF)return 0;return len - pad_len;}// 在用的版本。如前所说还没用扩展包,偏移写死。static int make_padding(void *buf, const uint len) {assert(len > 12 && "rtp packet must be gt 12");auto data = (uint8_t *) buf;auto p = data + len;data += 12;const uint8_t pad_len = 16 - (p - data) % 16;auto q = p + pad_len;*--q = pad_len & 0x1F;while (p < q) *p++ = 0xAF;assert(p == q && "not correctly finished");return len + pad_len;}// 用于对比的原始函数static void SetEncPad(uint8_t *buf, const uint len, uint8_t *pad_len) {*pad_len = 16 - (len % 16);for (int i = 0; i < *pad_len - 1; i++)buf[len + i] = 0xaf;buf[len + *pad_len - 1] = *pad_len & 0x1F;}...};struct PacketEvent : UdpReader::PacketEvent {rtp_packet_t packet{};using UdpReader::PacketEvent::PacketEvent;explicit PacketEvent(const Options &options, const UdpReader::PacketEvent &evt) : UdpReader::PacketEvent(evt) {if (!is_acceptable(options.peer, evt.sender)) throw rtp_stream_failure("not mine packet");auto buf = evt.buffer;auto len = evt.length;// 当前端口复用未启用。这个分支不进入if (options.is_mux()) {if (len < 4) throw rtp_mux_failure("RTP mux packet too short");// ReSharper disable CppCStyleCastconst auto mux_id = ntohl(*(uint32_t*)buf);if (mux_id != (uint32_t)options.mux_id) throw rtp_mux_failure("RTP mux ID mismatch");buf = (uint8_t*)buf + 4, len -= 4;// ReSharper restore CppCStyleCast}// 现在启用了这个分支vic::BufferPtr save_ptr{};if (options.is_crypt()) {const auto cryptor = options.cryptor;if (!cryptor) throw rtp_crypto_error("no SRTP cryptor we can use");try {save_ptr = cryptor->process(buf, len);if (!save_ptr) throw rtp_crypto_error("SRTP decryption failed.");} catch (vic::srtp_error &e) {// RX_TRACE("... decrypt %zu bytes", len);// RX_TRACEH(buf, len);throw rtp_crypto_error("SRTP decryption failed. " + string{e.what()});}buf = &save_ptr->at(0);len = save_ptr->size();}if (rtp_packet_deserialize(&packet, buf, len) < 0) throw rtp_parse_failure("rtp_packet_deserialize() failed");do {const auto &rtp = packet.rtp;// batch check. return on failure, or accept it by break.if (rtp.pt == options.payload) break; // accept without SSRC check nowif (!options.is_fec()) throw rtp_stream_failure("not mine packet");if (rtp.pt == *options.fec_payload) break; // accept without SSRC check nowthrow rtp_stream_failure("not mine packet");} while (false);// 数据通过解析才认为是这一路媒体的包继续后续处理。包括FEC等现在未启用的逻辑。}uint16_t seq() const { return packet.rtp.seq; }};