@leaveye
2025-09-25T04:04:49.000000Z
字数 14023
阅读 128
debug ss528 net udp rtp
// git-version: da48d0cclass RtpWriterIo : public ... {// 仅列举相关成员mutex _fifo_swap_mutex{};shared_ptr<list<BufferPtr>> _fifo_swap{}; // to swap from user thread to worker threadatomic_bool _worker_alive{};shared_ptr<thread> _worker_thread{};// debug only membersstruct {atomic_uint swap_in{};atomic_uint swap_out{};atomic_uint pending_count{};atomic_uint pending_size{};...} watchdog_checkpoint{}; // 简化发送线程里用作 metrics}constexpr uint64_t MAX_GAP = 800;constexpr uint64_t TARGET_PPMS = 2;uint64_t now_ms(loop *loop) {(void) loop;// ::timeval tv{};// gettimeofday(&tv, nullptr);// return static_cast<uint64_t>((int64_t) tv.tv_sec * 1000) + static_cast<uint64_t>(tv.tv_usec) / 1000;using namespace chrono;return static_cast<uint64_t>(duration_cast<milliseconds>(steady_clock::now().time_since_epoch()).count());// loop->update();// return loop->now().count();}void RtpWriterIo::open(const Config &config, const SockAddr4 &peer_addr, const SockAddr4 &bind_addr) {... // 参数载入,必要资源初始化if (simple_writer) { // 在简化发送线程的逻辑里,simple_writer = true_worker_alive = true;_worker_thread = std::make_shared<thread>([this]() { main(); }); // 发送线程初始化io.update_timer = get_resource<timer_handle>(L);io.update_timer->on<timer_event>([&](timer_event &, timer_handle &h) {char message[0x80] = "", *p = message, *q = (&message)[1];const auto &metrics = io.watchdog_checkpoint;p += snprintf(p, q - p, " %u>swap>%u", metrics.swap_in.load(), metrics.swap_out.load());p += snprintf(p, q - p, " pending %u/%u", metrics.pending_size.load(), metrics.pending_count.load());RX_MDEBUG(io.logger, "%s: ... wakeup @%lu ...%s", io.description.c_str(), now_ms(&h.parent()), message);});io.update_timer->start(timer_handle::time{1000}, timer_handle::time{200});return;}... // 旧逻辑的初始化代码}void RtpWriterIo::main() {static const auto make_socket = [](const RtpWriterIo &io) -> int {const auto m = io.logger;const auto hint = io.description.c_str();const auto fd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, IPPROTO_UDP);if (fd < 0) {RX_XMFATAL(EXIT_FAILURE, m, "%s: create socket failed: %d %s", hint, errno, strerror(errno));}const int on = 1;if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) {RX_MWARN(m, "%s: setsockopt(SO_REUSEADDR) failed: %d %s", hint, errno, strerror(errno));}if (bind(fd, reinterpret_cast<const sockaddr*>(&io.local), sizeof(io.local)) < 0) {::close(fd);RX_XMFATAL(EXIT_FAILURE, m, "%s: bind(%s) failed: %d %s", hint, to_string(io.local).c_str(), errno, strerror(errno));}if (is_multicast(io.peer)) {ip_mreq mreq{};mreq.imr_multiaddr = io.peer.sin_addr;mreq.imr_interface = io.local.sin_addr;if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {RX_MWARN(m, "%s: setsockopt(IP_ADD_MEMBERSHIP) failed: %d %s", hint, errno, strerror(errno));}const int loop = 0;if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop)) < 0) {RX_MWARN(m, "%s: setsockopt(IP_MULTICAST_LOOP) failed: %d %s", hint, errno, strerror(errno));}const int ttl = 64;if (setsockopt(fd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)) < 0) {RX_MWARN(m, "%s: setsockopt(IP_MULTICAST_TTL) failed: %d %s", hint, errno, strerror(errno));}}RX_MNOTICE(m, "%s: socket ready: %s", hint, to_string(io.peer).c_str());return fd;};const unique_ptr<int, void(*)(int*)> fd_closer(new int, [](int *p) { ::close(*p); delete p; });int fd = *fd_closer = make_socket(*this);const auto direct_sender = [this, &fd](BufferPtr &&buf) -> int {const auto rv = sendto(fd, buf->buf(), buf->len(), MSG_DONTWAIT, reinterpret_cast<const sockaddr*>(&peer), sizeof(peer));if (rv < 0) return -errno;return rv;};const auto desc = description.c_str();RX_INFO("%s: sender loop enter", desc);while (_worker_alive) {ts_updated = loop->now().count();const auto no_gap = iter(direct_sender);if (no_gap) continue;usleep(1000);}RX_INFO("%s: sender loop exit", desc);}bool RtpWriterIo::iter(const function<int(BufferPtr&&)> &sender) {const auto desc = description.c_str();auto &metrics = watchdog_checkpoint;thread_local uint max_packet_size = 0;auto no_gap = false;decltype(_fifo_swap) q_swap{};if (_fifo_swap) {auto guard = make_shared<lock_guard<mutex>>(_fifo_swap_mutex);q_swap = std::move(_fifo_swap);}auto &q_pending = fifo_pending;if (!q_swap || q_swap->empty()) {metrics.swap_out = 0;} else {no_gap = true;for (const auto &buf : *q_swap) {if (max_packet_size < buf->len())max_packet_size = buf->len();}const auto old_pending_count = q_pending.size();q_pending.splice(q_pending.end(), *std::move(q_swap));RX_MDEBUG(logger, "%s: swap in %zu packets, pending %zu", desc, q_pending.size() - old_pending_count, q_pending.size());metrics.swap_out = q_pending.size() - old_pending_count;// packets got, to check overflow}const auto ts_send_0 = now_ms(loop.get());thread_local uint64_t ts_send_last = 0;const auto ts_send_interval = !ts_send_last ? MAX_GAP / 4 : std::min(ts_send_0 - ts_send_last, MAX_GAP);auto want_write_bytes = max_packet_size * TARGET_PPMS * ts_send_interval;ts_send_last = ts_send_0;int rv = 0, n = 0, err = 0, sz = 0;while (!q_pending.empty()) {if (enable_rate_limit && want_write_bytes < q_pending.front()->len()) {no_gap = false;break;}++n, no_gap = true;BufferPtr buf = std::move(q_pending.front());q_pending.pop_front();const auto len = buf->len();rv = sender(std::move(buf));if (rv < 0) {++err;RX_MERROR(logger, "%s: send packet failed: %d", desc, rv);if (rv == -EAGAIN) no_gap = false;} else {sz += len;want_write_bytes -= len;}}const auto ts_send_1 = now_ms(loop.get());{char result[0x80] = "", *p = result, *q = (&result)[1];if (n) {const auto ts_send_elapsed = ts_send_1 - ts_send_0;p += snprintf(p, q - p, " in %lums (%lu~%lu)", ts_send_elapsed, ts_send_0, ts_send_1);if (err && n > err) p += snprintf(p, q - p, " ok %d", n - err);if (n > err) p += snprintf(p, q - p, " (%d bytes)", sz);if (err) p += snprintf(p, q - p, " error %d", err);if (!q_pending.empty()) p += snprintf(p, q - p, " pending %zu", q_pending.size());RX_MDEBUG(logger, "%s: sent %d %s%s", desc, n, n == 1 ? "packet" : "packets", result);} else if (q_pending.empty()) {RX_MDEBUG(logger, "%s: nothing to send", desc);} else {RX_MDEBUG(logger, "%s: skip send for rate control", desc);}}return no_gap;}void RtpStream::stopWriter() {auto &io = writer_;if (io.simple_writer) {io._worker_alive = false;std::move(io._worker_thread)->join();rtp_payload_encode_destroy(io.media); io.media = {};return;} else ... // 旧逻辑的释放代码}
接收侧 tcpdump 查看抓包样本。得到发送端的端口:v=39563, a=46657
20:41:22.269050 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 117020:41:22.285755 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 140020:41:22.285794 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 140020:41:22.285803 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 140020:41:22.285811 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 9020:41:22.287878 IP 192.168.80.60.46657 > 192.168.80.70.9660: UDP, length 50720:41:22.302380 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 140020:41:22.302415 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 140020:41:22.302424 IP 192.168.80.60.39563 > 192.168.80.70.6666: UDP, length 136520:41:22.308916 IP 192.168.80.60.46657 > 192.168.80.70.9660: UDP, length 522
发送侧通过 netstate 看到的发送缓存时有累积。
>>>> 此时已经拔掉了 70 的网线~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:29> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:29> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:30> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:30> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:30> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:30> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:30> udp 0 2304 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:30> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:30> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:30> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:30> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:30> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:31> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:31> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:31> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:31> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:31> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:31> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:31> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:31> udp 0 1280 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:32> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:32> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:32> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:32> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:32> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:32> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:32> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:32> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:32> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:32> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:33> udp 0 4608 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:33> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:33> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:33> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer>>>> 此时插上了 70 的网线~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:36> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:36> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:36> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:36> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:36> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:36> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:36> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:36> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:37> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:37> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:37> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:37> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer~ # netstat -aup 2>/dev/null | sed -n "/:\(39563\|46657\)/s|^|`date +%T`> |p"03:49:37> udp 0 0 192.168.80.60:39563 0.0.0.0:* 1875/transfer03:49:37> udp 0 0 192.168.80.60:46657 0.0.0.0:* 1875/transfer
缺省状态下:
~ # ulimitunlimited~ # grep ^ /proc/sys/net/core/rmem_*/proc/sys/net/core/rmem_default:212992/proc/sys/net/core/rmem_max:212992~ # transfer # 筛选后的输出udp_socket: fd=35 bind(192.168.80.70:7804) rxbuf get=212992 set=212992 get=425984udp_socket: fd=28 bind(192.168.80.70:7804) rxbuf get=212992 set=15360000 get=425984udp_socket: fd=35 bind(192.168.80.70:7802) rxbuf get=212992 set=425984 get=425984
进行配置后:
# 配置ulimit -n 999999ulimit -c unlimited(cd /proc/sys/net/coreecho 4097152 >rmem_defaultecho 16777216 >rmem_max#echo 4097152 >wmem_defaultecho 16777216 >wmem_max)~ # ulimitunlimited~ # grep ^ /proc/sys/net/core/rmem_*/proc/sys/net/core/rmem_default:4097152/proc/sys/net/core/rmem_max:16777216~ # transfer # 筛选后的输出,几次不同的 RCVBUFSZ 参数udp_socket: fd=28 bind(192.168.80.70:7804) rxbuf get=4097152 set=15360000 get=30720000udp_socket: fd=35 bind(192.168.80.70:7804) rxbuf get=4097152 set=30720000 get=33554432udp_socket: fd=34 bind(192.168.80.70:7804) rxbuf get=4097152 set=33554432 get=33554432
RCVBUFSZ 来调节缓存设置
explicit udp_socket(const socket_address &addr) : _addr(addr) {const auto fd = socket(AF_INET, SOCK_DGRAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0);if (fd < 0) throw c_runtime_error("socket() failed.");sockaddr_storage addr_store{};uv_ip4_addr(addr.ip.c_str(), addr.port, reinterpret_cast<sockaddr_in*>(&addr_store));if (bind(fd, reinterpret_cast<const sockaddr*>(&addr_store), sizeof(addr_store)) < 0) {const auto err = errno;close(fd);throw c_runtime_error("bind() failed.", err);}message_builder msg{};try {int rxbuf{-1};socklen_t len{};len = sizeof(rxbuf);if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxbuf, &len) < 0)throw c_runtime_error("getsockopt() failed.");msg.putf(" get=%d", rxbuf);rxbuf = atoi(rx_loadenv(RCVBUFSZ));len = sizeof(rxbuf);if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxbuf, len) < 0)throw c_runtime_error("setsockopt() failed.");msg.putf(" set=%d", rxbuf);len = sizeof(rxbuf);if (getsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rxbuf, &len) < 0)throw c_runtime_error("getsockopt() failed.");msg.putf(" get=%d", rxbuf);} catch (c_runtime_error &e) {msg.putf(" %s", e.what());}RX_INFO("udp_socket: fd=%d bind(%s) rxbuf %s", fd, addr.str().c_str(), msg.str(1));_fd = fd;}
in RtpStream.cc, RtpWriterIo::open()
io.udp_link = get_resource<udp_handle>(L);io.udp_link->bind(reinterpret_cast<sockaddr&>(io.local), udp_handle::udp_flags::REUSEADDR);if (is_multicast(io.peer)) {io.udp_link->multicast_membership(to_string(io.peer.sin_addr), to_string(io.local.sin_addr), udp_handle::membership::JOIN_GROUP);io.udp_link->multicast_loop(false);io.udp_link->multicast_ttl(64);}
in RtpStream.cc, rtp_handle::make_udp()
auto udp = get_resource<udp_handle>(_loop, [local, peer](close_event &, udp_handle &h) {RX_TRACE("!!! udp %p close !!! %s <- %s", &h, local.str().c_str(), peer.str().c_str());h.reset();});RX_TRACE("!!! udp %p open !!! %s <- %s", udp.get(), local.str().c_str(), peer.str().c_str());udp->bind(local, udp_handle::udp_flags::REUSEADDR);if (is_multicast(peer.ip)) {udp->multicast_membership(peer.ip, local.ip, membership::JOIN_GROUP);}udp->recv_buffer_size(buffer_size);RX_NOTICE("udp rcvbufsz=%d want=%d arg=%d", udp->recv_buffer_size(), buffer_size, max_kbps);// 01:52:35.106004 NOTICE | udp rcvbufsz=30720000 want=15360000 arg=20000 <<XFER/RtpStream.cc:191