From 37813ddf866b03ef0d341ed8d21d5123a892426e Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 16 Oct 2025 10:24:26 -0400 Subject: [PATCH] MT#63317 rework port opening mechanics Instead of opening local sockets directly during call setup, just create the appropriate objects first, and then open the actual sockets in a second step. Move the opening step down, after the bundle logic has run. This way we can avoid opening sockets that aren't needed. Removes an intermediate step of obtaining consecutive ports. Change-Id: Ib94c871ed6b3a433872afd23cb1aebbd9910b33e --- daemon/call.c | 130 ++++++++++++++++++++++------------ daemon/media_socket.c | 53 ++------------ include/media_socket.h | 7 +- t/auto-daemon-tests-bundle.pl | 4 +- t/auto-daemon-tests.pl | 2 + 5 files changed, 96 insertions(+), 100 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 11231a465..f0e58729c 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -841,8 +841,6 @@ static struct endpoint_map *__hunt_endpoint_map(struct call_media *media, unsign static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigned int num_ports, const struct endpoint *ep, const sdp_ng_flags *flags, bool always_reuse) { - stream_fd *sfd; - socket_intf_list_q intf_sockets = TYPED_GQUEUE_INIT; unsigned int want_interfaces = __media_want_interfaces(media); if (num_ports > 16) @@ -876,39 +874,15 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne t_queue_init(&em->intf_sfds); t_queue_push_tail(&media->endpoint_maps, em); em->num_ports = num_ports; - - if (!get_consecutive_ports(&intf_sockets, num_ports, want_interfaces, media)) - return NULL; - media->endpoint_map = em; - __C_DBG("allocating stream_fds for %u ports", num_ports); - MEDIA_CLEAR(media, PUBLIC); - - struct socket_intf_list *il; - while ((il = t_queue_pop_head(&intf_sockets))) { - if (il->list.length != num_ports) - goto next_il; - + // populate intf_sfds list + const struct logical_intf *log = media->logical_intf; + const local_intf_list *intf = log->list.head; + for (unsigned int i = 0; i < want_interfaces && intf; i++, intf = intf->next) { struct sfd_intf_list *em_il = g_new0(__typeof(*em_il), 1); - em_il->local_intf = il->local_intf; + em_il->local_intf = intf->data; t_queue_push_tail(&em->intf_sfds, em_il); - - struct socket_port_link *spl; - while ((spl = t_queue_pop_head(&il->list))) { - set_tos(&spl->socket, media->call->tos); - if (media->call->cpu_affinity >= 0) { - if (socket_cpu_affinity(&spl->socket, media->call->cpu_affinity)) - ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " - "affinity: %s", strerror(errno)); - } - sfd = stream_fd_new(spl, media->call, il->local_intf); - t_queue_push_tail(&em_il->list, sfd); // not referenced - g_free(spl); - } - -next_il: - free_socket_intf_list(il); } return em; @@ -972,8 +946,6 @@ static bool __wildcard_endpoint_map(struct call_media *media, unsigned int num_p if (!em) return false; - __assign_stream_fds(media, &em->intf_sfds); - return true; } @@ -1310,9 +1282,6 @@ static bool __init_streams(struct call_media *A, const struct stream_params *sp, } bf_copy_same(&a->ps_flags, &A->media_flags, SHARED_FLAG_ICE); - if (!__init_stream(a)) - return false; - /* RTCP */ if (!MEDIA_ISSET(A, RTCP_MUX)) PS_CLEAR(a, RTCP); @@ -1352,9 +1321,6 @@ static bool __init_streams(struct call_media *A, const struct stream_params *sp, PS_CLEAR(a, ZERO_ADDR); - if (!__init_stream(a)) - return false; - recording_setup_stream(ax); // RTP recording_setup_stream(a); // RTCP @@ -2629,6 +2595,12 @@ static void monologue_media_start(struct call_monologue *ml) { if (!media) continue; + for (__auto_type l = media->streams.head; l; l = l->next) + __init_stream(l->data); + + if (media->bundle && media->bundle != media && MEDIA_ISSET(media, BUNDLE_ONLY)) + continue; + ice_start(media->ice_agent); call_media_state_machine(media); } @@ -3719,6 +3691,59 @@ static void monologue_bundle_accept(struct call_monologue *ml, sdp_ng_flags *fla monologue_bundle_set_fds(ml); } +static bool media_open_ports(struct call_media *media) { + struct endpoint_map *em = media->endpoint_map; + const str *label = &media->call->callid; + + if (media->bundle && media->bundle != media && MEDIA_ISSET(media, BUNDLE_ONLY)) + return true; // no ports needed, will be shared from bundle head + + for (__auto_type l = em->intf_sfds.head; l; l = l->next) { + struct sfd_intf_list *em_il = l->data; + if (em_il->list.length) + continue; // not empty, we can use these + + __C_DBG("allocating stream_fds for %u ports", em->num_ports); + MEDIA_CLEAR(media, PUBLIC); + + socket_port_q q = TYPED_GQUEUE_INIT; // XXX use some sort of intrusive list for this + if (!get_consecutive_ports(&q, em->num_ports, em_il->local_intf, label)) + return false; + + struct socket_port_link *spl; + while ((spl = t_queue_pop_head(&q))) { + set_tos(&spl->socket, media->call->tos); + if (media->call->cpu_affinity >= 0) { + if (socket_cpu_affinity(&spl->socket, media->call->cpu_affinity)) + ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " + "affinity: %s", strerror(errno)); + } + __auto_type sfd = stream_fd_new(spl, media->call, em_il->local_intf); + t_queue_push_tail(&em_il->list, sfd); // not referenced + g_free(spl); // XXX eliminate this + } + } + + __assign_stream_fds(media, &media->endpoint_map->intf_sfds); + + return true; +} + +static bool monologue_open_ports(struct call_monologue *ml) { + for (unsigned int i = 0; i < ml->medias->len; i++) { + __auto_type media = ml->medias->pdata[i]; + if (!media) + continue; + if (!media->endpoint_map) + continue; // disabled media + + if (!media_open_ports(media)) + return false; + } + + return true; +} + /* called with call->master_lock held in W */ int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q *streams, sdp_ng_flags *flags) @@ -3936,15 +3961,13 @@ int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q * * assign the ports to the streams */ em = __get_endpoint_map(receiver_media, num_ports_this, &sp->rtp_endpoint, flags, sender_media->bundle ? true : false); - if (!em) { + if (!em) goto error_ports; - } if (flags->disable_jb && receiver_media->call) CALL_SET(receiver_media->call, DISABLE_JB); __num_media_streams(receiver_media, num_ports_this); - __assign_stream_fds(receiver_media, &em->intf_sfds); if (__num_media_streams(sender_media, num_ports_other)) { /* new streams created on OTHER side. normally only happens in @@ -3965,6 +3988,12 @@ int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q * monologue_bundle_offer(receiver_ml, flags); monologue_bundle_check_consistency(receiver_ml); monologue_bundle_check_heads(receiver_ml); + + if (!monologue_open_ports(receiver_ml)) + goto error_ports; + if (!monologue_open_ports(sender_ml)) + goto error_ports; + monologue_bundle_set_fds(receiver_ml); monologue_bundle_set_sinks(sender_ml); monologue_bundle_set_sinks(receiver_ml); @@ -4315,7 +4344,6 @@ int monologue_publish(struct call_monologue *ml, sdp_streams_q *streams, sdp_ng_ return -1; // XXX error - no ports __num_media_streams(media, num_ports); - __assign_stream_fds(media, &em->intf_sfds); // XXX this should be covered by __update_init_subscribers ? if (!__init_streams(media, sp, flags)) @@ -4324,6 +4352,7 @@ int monologue_publish(struct call_monologue *ml, sdp_streams_q *streams, sdp_ng_ ice_update(media->ice_agent, sp, false); } + monologue_open_ports(ml); monologue_media_start(ml); return 0; @@ -4409,7 +4438,6 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca return -1; // XXX error - no ports __num_media_streams(dst_media, num_ports); - __assign_stream_fds(dst_media, &em->intf_sfds); if (!__init_streams(dst_media, NULL, flags)) return -1; @@ -4417,6 +4445,8 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca update_init_subscribers(src_media, NULL, NULL, flags->opmode); } + monologue_open_ports(dst_ml); + return 0; } /* called with call->master_lock held in W */ @@ -6133,7 +6163,8 @@ int call_delete_branch_by_id(const str *callid, const str *branch, struct call_media *call_make_transform_media(struct call_monologue *ml, const str *type, enum media_type type_id, const str *media_id, const endpoint_t *remote, const str *interface) { - struct call_media *ret = call_get_media(ml, type, type_id, media_id, false, ml->medias->len + 1, str_ht_null()); + struct call_media *ret = call_get_media(ml, type, type_id, media_id, false, ml->medias->len + 1, + str_ht_null()); if (!media_id->len) generate_mid(ret, ret->unique_id); @@ -6146,10 +6177,12 @@ struct call_media *call_make_transform_media(struct call_monologue *ml, const st struct endpoint_map *em = __get_endpoint_map(ret, 1, remote, NULL, true); if (!em) - return false; + return NULL; __num_media_streams(ret, 1); - __assign_stream_fds(ret, &em->intf_sfds); + + if (!media_open_ports(ret)) + return NULL; return ret; } @@ -6166,6 +6199,9 @@ bool monologue_transform(struct call_monologue *ml, sdp_ng_flags *flags, medias_ __auto_type m = call_make_transform_media(ml, &media->type, codec_get_type(&media->type), &media->id, &media->destination, &flags->interface); + if (!m) + return false; + media->id = m->media_id; t_queue_push_tail(out_q, m); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 15ffb9714..e357cf0ad 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1378,57 +1378,12 @@ fail: return false; } -/* puts a list of "struct intf_list" into "out", containing socket_t list */ -bool get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, - struct call_media *media) +bool get_consecutive_ports(socket_port_q *out, unsigned int num_ports, struct local_intf *loc, + const str *label) { - struct socket_intf_list *il; - struct local_intf *loc; - const struct logical_intf *log = media->logical_intf; - const str *label = &media->call->callid; /* call's callid */ - - /* - // debug locals of logical incerface - char ip[100]; - for (l = log->list.head; l; l = l->next) { - loc = l->data; - inet_ntop(loc->spec->local_address.addr.family->af, &loc->spec->local_address.addr.u, ip, sizeof(ip)); - } - ilog(LOG_DEBUG, ""); - */ - - for (auto_iter(l, log->list.head); l; l = l->next) { - if (out->length >= num_intfs) - break; - - loc = l->data; - - il = g_new0(__typeof(*il), 1); - il->local_intf = loc; - t_queue_push_tail(out, il); - if (G_LIKELY(__get_consecutive_ports(&il->list, num_ports, loc->spec, label))) { - // success - found available ports on local interfaces, so far - continue; - } else { - // fail - did not found available ports on at least one local interface - goto error_ports; - } - } - - return true; - -error_ports: - ilog(LOG_ERR, "Failed to get %d consecutive ports on all locals of logical '"STR_FORMAT"'", - num_ports, STR_FMT(&log->name)); - - // free all ports alloc'ed so far for the previous local interfaces - while ((il = t_queue_pop_head(out))) { - free_socket_intf_list(il); - } - - return false; - + return __get_consecutive_ports(out, num_ports, loc->spec, label); } + void free_socket_intf_list(struct socket_intf_list *il) { struct socket_port_link *spl; diff --git a/include/media_socket.h b/include/media_socket.h index a663e3bd5..318b2ce92 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -385,8 +385,11 @@ bool is_local_endpoint(const struct intf_address *addr, unsigned int port); struct socket_port_link get_specific_port(unsigned int port, struct intf_spec *spec, const str *label); -bool get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, - struct call_media *media); + +__attribute__((nonnull(1, 3, 4))) +bool get_consecutive_ports(socket_port_q *out, unsigned int num_ports, struct local_intf *, + const str *label); + stream_fd *stream_fd_new(struct socket_port_link *, call_t *call, struct local_intf *lif); stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(stream_fd *); diff --git a/t/auto-daemon-tests-bundle.pl b/t/auto-daemon-tests-bundle.pl index ef07b09ac..f9429f763 100755 --- a/t/auto-daemon-tests-bundle.pl +++ b/t/auto-daemon-tests-bundle.pl @@ -3361,10 +3361,10 @@ if ($ENV{RTPENGINE_EXTENDED_TESTS}) { is($resp->{interfaces}[0]{name}, 'default', 'intf found'); is($resp->{interfaces}[0]{address}, '203.0.113.1', 'address found'); - is($resp->{interfaces}[0]{ports}{used}, 192, 'port usage'); + is($resp->{interfaces}[0]{ports}{used}, 166, 'port usage'); is($resp->{interfaces}[1]{name}, 'default', 'intf found'); is($resp->{interfaces}[1]{address}, '2001:db8:4321::1', 'address found'); - is($resp->{interfaces}[1]{ports}{used}, 4, 'port usage'); + is($resp->{interfaces}[1]{ports}{used}, 2, 'port usage'); } diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index 600591487..99ebd327b 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -26227,6 +26227,8 @@ a=candidate:ICEBASE 1 UDP 2130706175 2001:db8:4321::1 PORT typ host a=end-of-candidates SDP + + new_call; $resp = rtpe_req('offer', 'SDP with just \n', { 'from-tag' => ft(), SDP => "v=0\no=- 1545997027 1 IN IP4 198.51.101.40\ns=tester\nt=0 0\nm=audio 3000 RTP/AVP 0 8\nc=IN IP4 198.51.100.1\na=foobar\n" } );