diff --git a/daemon/call.c b/daemon/call.c index 11231a465..f0e58729c 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -841,8 +841,6 @@ static struct endpoint_map *__hunt_endpoint_map(struct call_media *media, unsign static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigned int num_ports, const struct endpoint *ep, const sdp_ng_flags *flags, bool always_reuse) { - stream_fd *sfd; - socket_intf_list_q intf_sockets = TYPED_GQUEUE_INIT; unsigned int want_interfaces = __media_want_interfaces(media); if (num_ports > 16) @@ -876,39 +874,15 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne t_queue_init(&em->intf_sfds); t_queue_push_tail(&media->endpoint_maps, em); em->num_ports = num_ports; - - if (!get_consecutive_ports(&intf_sockets, num_ports, want_interfaces, media)) - return NULL; - media->endpoint_map = em; - __C_DBG("allocating stream_fds for %u ports", num_ports); - MEDIA_CLEAR(media, PUBLIC); - - struct socket_intf_list *il; - while ((il = t_queue_pop_head(&intf_sockets))) { - if (il->list.length != num_ports) - goto next_il; - + // populate intf_sfds list + const struct logical_intf *log = media->logical_intf; + const local_intf_list *intf = log->list.head; + for (unsigned int i = 0; i < want_interfaces && intf; i++, intf = intf->next) { struct sfd_intf_list *em_il = g_new0(__typeof(*em_il), 1); - em_il->local_intf = il->local_intf; + em_il->local_intf = intf->data; t_queue_push_tail(&em->intf_sfds, em_il); - - struct socket_port_link *spl; - while ((spl = t_queue_pop_head(&il->list))) { - set_tos(&spl->socket, media->call->tos); - if (media->call->cpu_affinity >= 0) { - if (socket_cpu_affinity(&spl->socket, media->call->cpu_affinity)) - ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " - "affinity: %s", strerror(errno)); - } - sfd = stream_fd_new(spl, media->call, il->local_intf); - t_queue_push_tail(&em_il->list, sfd); // not referenced - g_free(spl); - } - -next_il: - free_socket_intf_list(il); } return em; @@ -972,8 +946,6 @@ static bool __wildcard_endpoint_map(struct call_media *media, unsigned int num_p if (!em) return false; - __assign_stream_fds(media, &em->intf_sfds); - return true; } @@ -1310,9 +1282,6 @@ static bool __init_streams(struct call_media *A, const struct stream_params *sp, } bf_copy_same(&a->ps_flags, &A->media_flags, SHARED_FLAG_ICE); - if (!__init_stream(a)) - return false; - /* RTCP */ if (!MEDIA_ISSET(A, RTCP_MUX)) PS_CLEAR(a, RTCP); @@ -1352,9 +1321,6 @@ static bool __init_streams(struct call_media *A, const struct stream_params *sp, PS_CLEAR(a, ZERO_ADDR); - if (!__init_stream(a)) - return false; - recording_setup_stream(ax); // RTP recording_setup_stream(a); // RTCP @@ -2629,6 +2595,12 @@ static void monologue_media_start(struct call_monologue *ml) { if (!media) continue; + for (__auto_type l = media->streams.head; l; l = l->next) + __init_stream(l->data); + + if (media->bundle && media->bundle != media && MEDIA_ISSET(media, BUNDLE_ONLY)) + continue; + ice_start(media->ice_agent); call_media_state_machine(media); } @@ -3719,6 +3691,59 @@ static void monologue_bundle_accept(struct call_monologue *ml, sdp_ng_flags *fla monologue_bundle_set_fds(ml); } +static bool media_open_ports(struct call_media *media) { + struct endpoint_map *em = media->endpoint_map; + const str *label = &media->call->callid; + + if (media->bundle && media->bundle != media && MEDIA_ISSET(media, BUNDLE_ONLY)) + return true; // no ports needed, will be shared from bundle head + + for (__auto_type l = em->intf_sfds.head; l; l = l->next) { + struct sfd_intf_list *em_il = l->data; + if (em_il->list.length) + continue; // not empty, we can use these + + __C_DBG("allocating stream_fds for %u ports", em->num_ports); + MEDIA_CLEAR(media, PUBLIC); + + socket_port_q q = TYPED_GQUEUE_INIT; // XXX use some sort of intrusive list for this + if (!get_consecutive_ports(&q, em->num_ports, em_il->local_intf, label)) + return false; + + struct socket_port_link *spl; + while ((spl = t_queue_pop_head(&q))) { + set_tos(&spl->socket, media->call->tos); + if (media->call->cpu_affinity >= 0) { + if (socket_cpu_affinity(&spl->socket, media->call->cpu_affinity)) + ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " + "affinity: %s", strerror(errno)); + } + __auto_type sfd = stream_fd_new(spl, media->call, em_il->local_intf); + t_queue_push_tail(&em_il->list, sfd); // not referenced + g_free(spl); // XXX eliminate this + } + } + + __assign_stream_fds(media, &media->endpoint_map->intf_sfds); + + return true; +} + +static bool monologue_open_ports(struct call_monologue *ml) { + for (unsigned int i = 0; i < ml->medias->len; i++) { + __auto_type media = ml->medias->pdata[i]; + if (!media) + continue; + if (!media->endpoint_map) + continue; // disabled media + + if (!media_open_ports(media)) + return false; + } + + return true; +} + /* called with call->master_lock held in W */ int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q *streams, sdp_ng_flags *flags) @@ -3936,15 +3961,13 @@ int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q * * assign the ports to the streams */ em = __get_endpoint_map(receiver_media, num_ports_this, &sp->rtp_endpoint, flags, sender_media->bundle ? true : false); - if (!em) { + if (!em) goto error_ports; - } if (flags->disable_jb && receiver_media->call) CALL_SET(receiver_media->call, DISABLE_JB); __num_media_streams(receiver_media, num_ports_this); - __assign_stream_fds(receiver_media, &em->intf_sfds); if (__num_media_streams(sender_media, num_ports_other)) { /* new streams created on OTHER side. normally only happens in @@ -3965,6 +3988,12 @@ int monologue_offer_answer(struct call_monologue *monologues[2], sdp_streams_q * monologue_bundle_offer(receiver_ml, flags); monologue_bundle_check_consistency(receiver_ml); monologue_bundle_check_heads(receiver_ml); + + if (!monologue_open_ports(receiver_ml)) + goto error_ports; + if (!monologue_open_ports(sender_ml)) + goto error_ports; + monologue_bundle_set_fds(receiver_ml); monologue_bundle_set_sinks(sender_ml); monologue_bundle_set_sinks(receiver_ml); @@ -4315,7 +4344,6 @@ int monologue_publish(struct call_monologue *ml, sdp_streams_q *streams, sdp_ng_ return -1; // XXX error - no ports __num_media_streams(media, num_ports); - __assign_stream_fds(media, &em->intf_sfds); // XXX this should be covered by __update_init_subscribers ? if (!__init_streams(media, sp, flags)) @@ -4324,6 +4352,7 @@ int monologue_publish(struct call_monologue *ml, sdp_streams_q *streams, sdp_ng_ ice_update(media->ice_agent, sp, false); } + monologue_open_ports(ml); monologue_media_start(ml); return 0; @@ -4409,7 +4438,6 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca return -1; // XXX error - no ports __num_media_streams(dst_media, num_ports); - __assign_stream_fds(dst_media, &em->intf_sfds); if (!__init_streams(dst_media, NULL, flags)) return -1; @@ -4417,6 +4445,8 @@ static int monologue_subscribe_request1(struct call_monologue *src_ml, struct ca update_init_subscribers(src_media, NULL, NULL, flags->opmode); } + monologue_open_ports(dst_ml); + return 0; } /* called with call->master_lock held in W */ @@ -6133,7 +6163,8 @@ int call_delete_branch_by_id(const str *callid, const str *branch, struct call_media *call_make_transform_media(struct call_monologue *ml, const str *type, enum media_type type_id, const str *media_id, const endpoint_t *remote, const str *interface) { - struct call_media *ret = call_get_media(ml, type, type_id, media_id, false, ml->medias->len + 1, str_ht_null()); + struct call_media *ret = call_get_media(ml, type, type_id, media_id, false, ml->medias->len + 1, + str_ht_null()); if (!media_id->len) generate_mid(ret, ret->unique_id); @@ -6146,10 +6177,12 @@ struct call_media *call_make_transform_media(struct call_monologue *ml, const st struct endpoint_map *em = __get_endpoint_map(ret, 1, remote, NULL, true); if (!em) - return false; + return NULL; __num_media_streams(ret, 1); - __assign_stream_fds(ret, &em->intf_sfds); + + if (!media_open_ports(ret)) + return NULL; return ret; } @@ -6166,6 +6199,9 @@ bool monologue_transform(struct call_monologue *ml, sdp_ng_flags *flags, medias_ __auto_type m = call_make_transform_media(ml, &media->type, codec_get_type(&media->type), &media->id, &media->destination, &flags->interface); + if (!m) + return false; + media->id = m->media_id; t_queue_push_tail(out_q, m); diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 15ffb9714..e357cf0ad 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -1378,57 +1378,12 @@ fail: return false; } -/* puts a list of "struct intf_list" into "out", containing socket_t list */ -bool get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, - struct call_media *media) +bool get_consecutive_ports(socket_port_q *out, unsigned int num_ports, struct local_intf *loc, + const str *label) { - struct socket_intf_list *il; - struct local_intf *loc; - const struct logical_intf *log = media->logical_intf; - const str *label = &media->call->callid; /* call's callid */ - - /* - // debug locals of logical incerface - char ip[100]; - for (l = log->list.head; l; l = l->next) { - loc = l->data; - inet_ntop(loc->spec->local_address.addr.family->af, &loc->spec->local_address.addr.u, ip, sizeof(ip)); - } - ilog(LOG_DEBUG, ""); - */ - - for (auto_iter(l, log->list.head); l; l = l->next) { - if (out->length >= num_intfs) - break; - - loc = l->data; - - il = g_new0(__typeof(*il), 1); - il->local_intf = loc; - t_queue_push_tail(out, il); - if (G_LIKELY(__get_consecutive_ports(&il->list, num_ports, loc->spec, label))) { - // success - found available ports on local interfaces, so far - continue; - } else { - // fail - did not found available ports on at least one local interface - goto error_ports; - } - } - - return true; - -error_ports: - ilog(LOG_ERR, "Failed to get %d consecutive ports on all locals of logical '"STR_FORMAT"'", - num_ports, STR_FMT(&log->name)); - - // free all ports alloc'ed so far for the previous local interfaces - while ((il = t_queue_pop_head(out))) { - free_socket_intf_list(il); - } - - return false; - + return __get_consecutive_ports(out, num_ports, loc->spec, label); } + void free_socket_intf_list(struct socket_intf_list *il) { struct socket_port_link *spl; diff --git a/include/media_socket.h b/include/media_socket.h index a663e3bd5..318b2ce92 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -385,8 +385,11 @@ bool is_local_endpoint(const struct intf_address *addr, unsigned int port); struct socket_port_link get_specific_port(unsigned int port, struct intf_spec *spec, const str *label); -bool get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, - struct call_media *media); + +__attribute__((nonnull(1, 3, 4))) +bool get_consecutive_ports(socket_port_q *out, unsigned int num_ports, struct local_intf *, + const str *label); + stream_fd *stream_fd_new(struct socket_port_link *, call_t *call, struct local_intf *lif); stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(stream_fd *); diff --git a/t/auto-daemon-tests-bundle.pl b/t/auto-daemon-tests-bundle.pl index ef07b09ac..f9429f763 100755 --- a/t/auto-daemon-tests-bundle.pl +++ b/t/auto-daemon-tests-bundle.pl @@ -3361,10 +3361,10 @@ if ($ENV{RTPENGINE_EXTENDED_TESTS}) { is($resp->{interfaces}[0]{name}, 'default', 'intf found'); is($resp->{interfaces}[0]{address}, '203.0.113.1', 'address found'); - is($resp->{interfaces}[0]{ports}{used}, 192, 'port usage'); + is($resp->{interfaces}[0]{ports}{used}, 166, 'port usage'); is($resp->{interfaces}[1]{name}, 'default', 'intf found'); is($resp->{interfaces}[1]{address}, '2001:db8:4321::1', 'address found'); - is($resp->{interfaces}[1]{ports}{used}, 4, 'port usage'); + is($resp->{interfaces}[1]{ports}{used}, 2, 'port usage'); } diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index 600591487..99ebd327b 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -26227,6 +26227,8 @@ a=candidate:ICEBASE 1 UDP 2130706175 2001:db8:4321::1 PORT typ host a=end-of-candidates SDP + + new_call; $resp = rtpe_req('offer', 'SDP with just \n', { 'from-tag' => ft(), SDP => "v=0\no=- 1545997027 1 IN IP4 198.51.101.40\ns=tester\nt=0 0\nm=audio 3000 RTP/AVP 0 8\nc=IN IP4 198.51.100.1\na=foobar\n" } );