diff --git a/daemon/media_socket.c b/daemon/media_socket.c index adb01d2a7..b8b6ce1bb 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -682,7 +682,7 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port) { return 0; } -static void release_reserved_port(struct port_pool *pp, ports_q *); +static void release_reserved_port(struct port_pool *pp, ports_q *, unsigned int port); static void reserve_additional_port_links(ports_q *ret, struct port_pool *pp, unsigned int port) { for (__auto_type l = pp->overlaps.head; l; l = l->next) { @@ -707,7 +707,7 @@ bail: // Oops. Some spec didn't have the port available. Probably a race condition. // Return everything to its place and report failure by resetting the output // list to empty. - release_reserved_port(pp, ret); + release_reserved_port(pp, ret, port); } /** @@ -739,20 +739,10 @@ static ports_q reserve_port(struct port_pool *pp, unsigned int port) { /** * This function just releases reserved port number, it doesn't provide any binding/unbinding. */ -static void release_reserved_port(struct port_pool *pp, ports_q *list) { - // the list contains links in order: - // first port for port pool - // first port for first overlap pool - // first port for second overlap pool - // first port ... - // second port for port pool - // second port for first overlap pool - // ... - +static void release_reserved_port(struct port_pool *pp, ports_q *list, unsigned int port) { while (list->length) { // remove top link from list, which belongs to our port pool __auto_type link = t_queue_pop_head_link(list); - unsigned int port = GPOINTER_TO_UINT(link->data); { LOCK(&pp->free_list_lock); @@ -779,6 +769,16 @@ static void release_reserved_port(struct port_pool *pp, ports_q *list) { } } } + +static void release_reserved_ports(socket_port_q *ports) { + while (ports->length) { + __auto_type p = t_queue_pop_head(ports); + if (p->links.length) + release_reserved_port(p->pp, &p->links, GPOINTER_TO_UINT(p->links.head->data)); + g_free(p); + } +} + /* Append a list of free ports within the min-max range */ static void __append_free_ports_to_int(struct intf_spec *spec) { unsigned int ports_amount, count; @@ -1108,7 +1108,7 @@ static void release_port_now(socket_t *r, ports_q *list, struct port_pool *pp) { iptables_del_rule(r); /* first return the engaged port back */ - release_reserved_port(pp, list); + release_reserved_port(pp, list, port); } else { ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port); } @@ -1148,6 +1148,34 @@ void append_thread_lpr_to_glob_lpr(void) { mutex_unlock(&ports_to_release_glob_lock); } +static struct socket_port_link get_one_port_link(unsigned int port, struct intf_spec *spec) { + __auto_type links = reserve_port(&spec->port_pool, port); + return (struct socket_port_link) { .links = links, .pp = &spec->port_pool, .socket = { .fd = -1 }}; +} + +static struct socket_port_link get_any_port_link(struct intf_spec *spec) { + struct socket_port_link ret = { .pp = &spec->port_pool, .socket = { .fd = -1 } }; + struct port_pool *pp = &spec->port_pool; + unsigned int port; + + { + // get/reserve port and its primary port link + LOCK(&pp->free_list_lock); + __auto_type port_link = t_queue_pop_head_link(&pp->free_ports_q); + if (!port_link) + return ret; + + port = GPOINTER_TO_UINT(port_link->data); + free_ports_link(pp, port) = NULL; + + t_queue_push_tail_link(&ret.links, port_link); + } + + reserve_additional_port_links(&ret.links, &spec->port_pool, port); + + return ret; +} + /** * Puts a list of `socket_t` objects into the `out`. * @@ -1160,8 +1188,6 @@ bool __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigne struct intf_spec *spec, const str *label) { unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0; - ports_q all_ports = TYPED_GQUEUE_INIT; - ports_q ports_to_engage = TYPED_GQUEUE_INIT; /* usually it's only one RTCP port, theoretically can be more */ struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */ ports_q *free_ports_q; @@ -1189,14 +1215,17 @@ bool __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigne /* specifically requested port */ if (wanted_start_port > 0) { ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port); - all_ports = reserve_port(pp, wanted_start_port); - if (!all_ports.length) { + __auto_type spl = get_one_port_link(wanted_start_port, spec); + if (!spl.links.length) { /* if engaged already, just select any other (so default logic) */ ilog(LOG_WARN, "This requested port has been already engaged, can't take it."); wanted_start_port = 0; /* take what is proposed by FIFO instead */ } else { /* we got the port, and we are sure it wasn't engaged */ port = wanted_start_port; + __auto_type splp = g_new(struct socket_port_link, 1); + *splp = spl; + t_queue_push_tail(out, splp); } } @@ -1241,76 +1270,63 @@ new_cycle: /* Now only get first possible port for RTP. * Then additionally make sure that the RTCP port can also be engaged, if needed. */ - mutex_lock(&pp->free_list_lock); - __auto_type port_link = t_queue_pop_head_link(free_ports_q); - - if (!port_link) { - mutex_unlock(&pp->free_list_lock); + __auto_type spl = get_any_port_link(spec); + if (!spl.links.length) { ilog(LOG_ERR, "Failure while trying to get a port from the list"); goto fail; } - port = GPOINTER_TO_UINT(port_link->data); /* RTP */ - free_ports_link(pp, port) = NULL; - mutex_unlock(&pp->free_list_lock); - - t_queue_push_tail_link(&all_ports, port_link); + port = GPOINTER_TO_UINT(spl.links.head->data); /* RTP */ /* ports for RTP must be even, if there is an additional port for RTCP */ if (num_ports > 1 && (port & 1)) { /* return port for RTP back and try again */ - release_reserved_port(pp, &all_ports); - goto new_cycle; + release_reserved_port(pp, &spl.links, port); + continue; } + __auto_type splp = g_new(struct socket_port_link, 1); + *splp = spl; + t_queue_push_tail(out, splp); + /* find additional ports, usually it's only RTCP */ additional_port = port; for (int i = 1; i < num_ports; i++) { additional_port++; - __auto_type add_port = reserve_port(pp, additional_port); + spl = get_one_port_link(additional_port, spec); - if (!add_port.length) { - /* return port for RTP back and try again */ - release_reserved_port(pp, &all_ports); - /* return additional ports back */ - release_reserved_port(pp, &ports_to_engage); + if (!spl.links.length) { + /* return previously reserved ports and try again */ + release_reserved_ports(out); + /* return additional port back */ + release_reserved_port(pp, &spl.links, additional_port); goto new_cycle; } /* engage this port right away */ /* track for which additional ports, we have to open sockets */ - t_queue_move(&ports_to_engage, &add_port); + splp = g_new(struct socket_port_link, 1); + *splp = spl; + t_queue_push_tail(out, splp); } } ilog(LOG_DEBUG, "Trying to bind the socket for RTP/RTCP ports (allocation attempt = '%d')", allocation_attempts); - /* at this point we consider all things before as successfull. Now just add the RTP port */ - t_queue_move(&all_ports, &ports_to_engage); + /* at this point we consider all things before as successful */ - struct socket_port_link *spl; - while (all_ports.length) { - __auto_type port_link = t_queue_pop_head_link(&all_ports); - port = GPOINTER_TO_UINT(port_link->data); + for (__auto_type l = out->head; l; l = l->next) { + __auto_type spl = l->data; + port = GPOINTER_TO_UINT(spl->links.head->data); ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port); - spl = g_new0(struct socket_port_link, 1); - spl->socket.fd = -1; - spl->pp = pp; - t_queue_push_tail_link(&spl->links, port_link); - t_queue_push_tail(out, spl); - // append other links belonging to the same port - while (all_ports.length && GPOINTER_TO_UINT(t_queue_peek_head(&all_ports)) == port) { - port_link = t_queue_pop_head_link(&all_ports); - t_queue_push_tail_link(&spl->links, port_link); - } /* if not possible to engage this socket, try to reallocate it again */ if (!add_socket(&spl->socket, port, spec, label)) { /* if something has been left in the `ports_to_engage` queue, release it right away */ - release_reserved_port(pp, &all_ports); + release_reserved_ports(out); /* ports which are already bound to a socket, will be freed by `free_port()` */ goto release_restart; } @@ -1320,10 +1336,6 @@ new_cycle: break; release_restart: - /* release all previously engaged sockets */ - while ((spl = t_queue_pop_head(out))) - free_port(spl); /* engaged ports will be released here */ - /* do not re-try for specifically wanted ports */ if (wanted_start_port > 0) goto fail; @@ -1332,8 +1344,10 @@ release_restart: } /* success */ - ilog(LOG_DEBUG, "Opened a socket on port '%u' (on interface '%s') for a media relay", - ((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->local_address.addr)); + ilog(LOG_DEBUG, "Opened %u socket(s) from port '%u' (on interface '%s') for a media relay", + num_ports, + out->head->data->socket.local.port, + sockaddr_print_buf(&spec->local_address.addr)); return true; fail: