From 3444febebca07caf5cdfe8acab4462cdd542e1f5 Mon Sep 17 00:00:00 2001 From: Donat Zenichev Date: Fri, 17 Mar 2023 14:40:16 +0100 Subject: [PATCH] MT#56750 Reworked port allocation Introduce a reworked port allocation in RTPEngine. The goal of this rework is to: - simplify the logic of handling free/engaged ports - eliminate a bottle neck begotten by overcomplicated logic - potentially resolve the issue with "ran out of ports" under heavy loading, when still there must be ports left in the ports pool Change-Id: Ifd2b1565611dd3b86c474a1ea5507fc6152fc212 --- daemon/media_socket.c | 377 ++++++++++++++++++++++++++++++----------- include/media_socket.h | 4 + 2 files changed, 279 insertions(+), 102 deletions(-) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5be095d3f..a1268a36c 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -658,7 +658,56 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port) { return 0; } +/* Append a list of free ports within min-max range */ +static void __append_free_ports_to_int(struct intf_spec *spec) { + unsigned int ports_amount, count; + GQueue * free_ports_q = &spec->port_pool.free_ports_q; + GHashTable ** free_ports_ht = &spec->port_pool.free_ports_ht; + + if (!*free_ports_ht) + *free_ports_ht = g_hash_table_new(g_direct_hash, g_direct_equal); + + if (spec->port_pool.max < spec->port_pool.min) { + ilog(LOG_WARNING, "Ports range: max value cannot be less than min"); + return; + } + + /* range of possible ports */ + ports_amount = spec->port_pool.max - spec->port_pool.min + 1; + count = ports_amount; + + if (ports_amount == 0) { + ilog(LOG_WARNING, "Ports range: there must be at least 1 port in the range"); + return; + } + + int port_values[ports_amount]; + + /* create an array to store the initial values within the range */ + for (int i = 0; i < ports_amount; i++) + port_values[i] = spec->port_pool.min + i; + + /* generate N random numbers within the given range without duplicates, + * using the rolling dice algorithm */ + for (int i = 0; i < ports_amount; i++) + { + int j = ssl_random() % count; + int value = port_values[j]; + + mutex_lock(&spec->port_pool.free_list_lock); + g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(value)); + /* store this new GList as value into the hash table */ + GList * l = free_ports_q->tail; + /* The value retrieved from the hash table would then point + * into the queue for quick removal */ + g_hash_table_replace(*free_ports_ht, GUINT_TO_POINTER(value), l); + mutex_unlock(&spec->port_pool.free_list_lock); + + port_values[j] = port_values[count - 1]; + count--; + } +} // called during single-threaded startup only static void __add_intf_rr_1(struct logical_intf *lif, str *name_base, sockfamily_t *fam) { struct logical_intf key = {0,}; @@ -711,13 +760,18 @@ static void __interface_append(struct intf_config *ifa, sockfamily_t *fam, bool } spec = g_hash_table_lookup(__intf_spec_addr_type_hash, &ifa->local_address); + if (!spec) { spec = g_slice_alloc0(sizeof(*spec)); spec->local_address = ifa->local_address; spec->port_pool.min = ifa->port_min; spec->port_pool.max = ifa->port_max; - spec->port_pool.free_ports = spec->port_pool.max - spec->port_pool.min + 1; + mutex_init(&spec->port_pool.free_list_lock); + + /* pre-fill the range of used ports */ + __append_free_ports_to_int(spec); + g_hash_table_insert(__intf_spec_addr_type_hash, &spec->local_address, spec); } @@ -809,59 +863,25 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc } - -static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec, const str *label) { - struct port_pool *pp; - - __C_DBG("attempting to open port %u", port); - - pp = &spec->port_pool; - - if (bit_array_set(pp->ports_used, port)) { - __C_DBG("port %d in use", port); - return -1; - } - __C_DBG("port %d locked", port); +/** + * Opens a socket for a given port value and edits the iptables accordingly. + * It doesn't provide a port selection logic. + */ +static int add_socket(socket_t *r, unsigned int port, struct intf_spec *spec, const str *label) { + __C_DBG("An attempt to open a socket for port: '%u'", port); if (open_socket(r, SOCK_DGRAM, port, &spec->local_address.addr)) { - __C_DBG("couldn't open port %d", port); - bit_array_clear(pp->ports_used, port); + __C_DBG("Can't open a socket for port: '%d'", port); return -1; } - iptables_add_rule(r, label); socket_timestamping(r); - - g_atomic_int_dec_and_test(&pp->free_ports); - __C_DBG("%d free ports remaining on interface %s", pp->free_ports, - sockaddr_print_buf(&spec->local_address.addr)); - + __C_DBG("A socket successfully bound for port: '%u'", port); return 0; } - -static void release_port_now(socket_t *r, struct intf_spec *spec) { - unsigned int port = r->local.port; - struct port_pool *pp = &spec->port_pool; - - __C_DBG("trying to release port %u", port); - - if (close_socket(r) == 0) { - __C_DBG("port %u is released", port); - iptables_del_rule(r); - bit_array_clear(pp->ports_used, port); - g_atomic_int_inc(&pp->free_ports); - if ((port & 1) == 0) { - mutex_lock(&pp->free_list_lock); - if (!bit_array_isset(pp->free_list_used, port)) { - g_queue_push_tail(&pp->free_list, GUINT_TO_POINTER(port)); - bit_array_set(pp->free_list_used, port); - } - mutex_unlock(&pp->free_list_lock); - } - } else { - __C_DBG("port %u is NOT released", port); - } -} +/** + * Pushing ports into the `ports_to_release` queue. + */ static void release_port(socket_t *r, struct intf_spec *spec) { if (!r->local.port || r->fd == -1) return; @@ -875,6 +895,34 @@ static void free_port(socket_t *r, struct intf_spec *spec) { release_port(r, spec); g_slice_free1(sizeof(*r), r); } +/** + * Logic responsible for devastating the `ports_to_release` queue. + * It's being called by main poller. + */ +static void release_port_now(socket_t *r, struct intf_spec *spec) { + unsigned int port = r->local.port; + struct port_pool *pp = &spec->port_pool; + + GQueue * free_ports_q = &pp->free_ports_q; + GHashTable * free_ports_ht = pp->free_ports_ht; + + __C_DBG("trying to release port %u", port); + + if (close_socket(r) == 0) { + __C_DBG("A socket for the '%u' has been closed", port); + + iptables_del_rule(r); + + /* first return the engaged port back */ + mutex_lock(&pp->free_list_lock); + g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port)); + GList * l = free_ports_q->tail; + g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l); + mutex_unlock(&pp->free_list_lock); + } else { + ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port); + } +} void release_closed_sockets(void) { struct late_port_release *lpr; while ((lpr = g_queue_pop_head(&ports_to_release))) { @@ -883,91 +931,212 @@ void release_closed_sockets(void) { } } - - -/* puts list of socket_t into "out" */ +/** + * Puts a list of socket_t objects into "out". + * + * @param num_ports, number of ports we have to engage (1 - rtcp-mux / 2 - one RTP and one RTCP) + * @param wanted_start_port, a pre-defined port (if given), if not given must be 0 + * @param spec, interface specifications + * @param out, a list of sockets for this particular session (not a global list) + */ int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port, struct intf_spec *spec, const str *label) { - int i, cycle = 0; - socket_t *sk; - int port; - struct port_pool *pp; + unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0; + socket_t * sk; + GQueue ports_to_engage = G_QUEUE_INIT; /* usually it's only one RTCP port, theoreticaly can be more */ - if (num_ports == 0) - return 0; + struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */ + GQueue * free_ports_q; + GHashTable * free_ports_ht; - pp = &spec->port_pool; + if (num_ports == 0) { + ilog(LOG_ERR, "Number of ports to be engaged is '%d', can't handle it like that", + num_ports); + goto fail; + } - __C_DBG("wanted_start_port=%d", wanted_start_port); + /* for the wanted port only one port can be engaged */ + if (num_ports > 1 && wanted_start_port > 0) { + ilog(LOG_ERR, "A specific port value is requested, but ports to be engaged > 1"); + goto fail; + } - if (wanted_start_port > 0) { - port = wanted_start_port; - __C_DBG("port=%d", port); - } else { - port = g_atomic_int_get(&pp->last_used); - __C_DBG("before randomization port=%d", port); -#if PORT_RANDOM_MIN && PORT_RANDOM_MAX - port += PORT_RANDOM_MIN + (ssl_random() % (PORT_RANDOM_MAX - PORT_RANDOM_MIN)); -#endif - __C_DBG("after randomization port=%d", port); + free_ports_q = &pp->free_ports_q; + free_ports_ht = pp->free_ports_ht; - // debug msg if port is in the given interval - if (bit_array_isset(pp->ports_used, port)) { - __C_DBG("port %d is USED in port pool", port); - mutex_lock(&pp->free_list_lock); - unsigned int fport = GPOINTER_TO_UINT(g_queue_pop_head(&pp->free_list)); - if (fport) - bit_array_clear(pp->free_list_used, fport); - mutex_unlock(&pp->free_list_lock); - if (fport) { - port = fport; - __C_DBG("Picked port %u from free list", port); - } + /* a presence of free lists data is critical for us */ + if (!(free_ports_q && free_ports_q->head) || !free_ports_ht) { + ilog(LOG_ERR, "Failure while trying to get a list of free ports"); + goto fail; + } + + /* specifically requested port */ + if (wanted_start_port > 0) { + ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port); + mutex_lock(&pp->free_list_lock); + GList *l = g_hash_table_lookup(free_ports_ht, GUINT_TO_POINTER(wanted_start_port)); + if (!l) { + /* if engaged already, just select any other (so default logic) */ + ilog(LOG_WARN, "This requested port has been already engaged, can't take it."); + wanted_start_port = 0; /* take what is proposed by FIFO instead */ } else { - __C_DBG("port %d is NOT USED in port pool", port); + /* we got the port, and we are sure it wasn't engaged */ + g_queue_delete_link(free_ports_q, l); + g_hash_table_remove(free_ports_ht, GUINT_TO_POINTER(wanted_start_port)); + port = wanted_start_port; } + mutex_unlock(&pp->free_list_lock); + } + + /* make sure we have ports to be used */ + mutex_lock(&pp->free_list_lock); + available_ports = g_queue_get_length(free_ports_q); + mutex_unlock(&pp->free_list_lock); + + if (!available_ports && wanted_start_port == 0) { + ilog(LOG_ERR, "Empty ports queue, no more ports left for usage"); + goto fail; } - while (1) { - __C_DBG("cycle=%d, port=%d", cycle, port); + /* if there is only 1 port left, and it's not rtcp-mux, then + * it makes no sence to conitnue - ran out ports */ + if (num_ports > 1 && wanted_start_port == 0 && available_ports == 1) { + ilog(LOG_ERR, "Ran out of ports, can't engage an additional port (for RTCP)"); + goto fail; + } + + /* Here we try to bind a port to a socket being opened. + * + * cycling here unless: + * - for non rtcp-mux: unless we engage two sequential ports, where RTP port is even + * and the both ports a socket can be opened (get_port()) + * - for rtcp-mux: unless we get a socket opened for it (get_port()) + * - theoretically more than 2 ports can be requested. + */ + while (1) + { +new_cycle: + if (++allocation_attempts > available_ports) { + ilog(LOG_ERR, "Failure while trying to bind a port to the socket"); + goto fail; + } + if (!wanted_start_port) { - if (port < pp->min) - port = pp->min; - if (num_ports > 1 && (port & 1)) - port++; + /* For cases with no rtcp-mux: RTP must be an even port, + * and RTCP port is the one next to that. + */ + + /* Now only get first possible port for RTP. + * Then additionally make sure that the RTCP port can also be engaged, if needed. + */ + mutex_lock(&pp->free_list_lock); + port = GPOINTER_TO_UINT(g_queue_pop_head(free_ports_q)); /* RTP */ + + if (!port) { + mutex_unlock(&pp->free_list_lock); + ilog(LOG_ERR, "Failure while trying to get a port from the list"); + goto fail; + } + g_hash_table_remove(free_ports_ht, GUINT_TO_POINTER(port)); /* RTP */ + mutex_unlock(&pp->free_list_lock); + + /* ports for RTP must be even, if there is an additional port for RTCP */ + if (num_ports > 1 && (port & 1)) { + /* try again */ + mutex_lock(&pp->free_list_lock); + g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port)); /* return port for RTP back */ + GList * l = free_ports_q->tail; + g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l); + mutex_unlock(&pp->free_list_lock); + goto new_cycle; + } + + /* find additional ports, usually it's only RTCP */ + additional_port = port; + for (int i = 1; i < num_ports; i++) + { + additional_port++; + + mutex_lock(&pp->free_list_lock); + GList *l = g_hash_table_lookup(free_ports_ht, GUINT_TO_POINTER(additional_port)); + + if (!l) { + /* try again */ + g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port)); /* return port for RTP back */ + GList * l = free_ports_q->tail; + g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l); + mutex_unlock(&pp->free_list_lock); + + /* check if we managed to enagage anything in previous for-cycles */ + while ((additional_port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage)))) + { + mutex_lock(&pp->free_list_lock); + g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(additional_port)); + GList * l = free_ports_q->tail; + g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(additional_port), l); + mutex_unlock(&pp->free_list_lock); + } + goto new_cycle; + + } else { + /* engage this port right away */ + g_queue_delete_link(free_ports_q, l); + g_hash_table_remove(free_ports_ht, GUINT_TO_POINTER(additional_port)); + mutex_unlock(&pp->free_list_lock); + + /* track for which additional ports, we have to open sockets */ + g_queue_push_tail(&ports_to_engage, GUINT_TO_POINTER(additional_port)); + } + } } - for (i = 0; i < num_ports; i++) { + ilog(LOG_DEBUG, "Trying to bind the socket for RTP/RTCP ports (allocation attempt = '%d')", + allocation_attempts); + + /* at this point we consider all things before as successfull. Now just add the RTP port */ + g_queue_push_head(&ports_to_engage, GUINT_TO_POINTER(port)); + + while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage)))) + { + ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port); sk = g_slice_alloc0(sizeof(*sk)); - // fd=0 is a valid file descriptor that may be closed - // accidentally by free_port if previously bounded sk->fd = -1; g_queue_push_tail(out, sk); - if (!wanted_start_port && port > pp->max) { - port = 0; - cycle++; + /* if not possible to engage this socket try to re-allocate again */ + if (add_socket(sk, port, spec, label)) { + /* if something has been left in `ports_to_engage`, release it right away */ + while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage)))) + { + mutex_lock(&pp->free_list_lock); + g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port)); + GList * l = free_ports_q->tail; + g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l); + mutex_unlock(&pp->free_list_lock); + } + /* ports already bound to a socket, will be freed by `free_port()` */ goto release_restart; } - - if (get_port(sk, port++, spec, label)) - goto release_restart; } + + /* success */ break; release_restart: + /* release all previously engaged sockets */ while ((sk = g_queue_pop_head(out))) - free_port(sk, spec); + free_port(sk, spec); /* engaged ports will be released here */ - if (cycle >= 2 || wanted_start_port > 0) + /* do not re-try for specifically wanted ports */ + if (wanted_start_port > 0) goto fail; + + ilog(LOG_DEBUG, "Something already keeps this port, trying to take another port(s)"); } /* success */ - g_atomic_int_set(&pp->last_used, port); - - __C_DBG("Opened ports %u.. on interface %s for media relay", + ilog(LOG_DEBUG, "Opened a socket on port '%u' (on interface '%s') for a media relay", ((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->local_address.addr)); return 0; @@ -984,7 +1153,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_ struct intf_list *il; struct local_intf *loc; const struct logical_intf *log = media->logical_intf; - const str *label = &media->call->callid; + const str *label = &media->call->callid; /* call's callid */ /* // debug locals of logical incerface @@ -2978,7 +3147,11 @@ void interfaces_free(void) { for (GList *l = ll; l; l = l->next) { struct intf_spec *spec = l->data; struct port_pool *pp = &spec->port_pool; - g_queue_clear(&pp->free_list); + g_queue_clear(&pp->free_list); /* TODO: deprecate it */ + if (pp->free_ports_ht) { + g_hash_table_destroy(pp->free_ports_ht); + } + g_queue_clear(&pp->free_ports_q); g_slice_free1(sizeof(*spec), spec); } g_list_free(ll); diff --git a/include/media_socket.h b/include/media_socket.h index 0ba80b7ce..22ba1a252 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -87,7 +87,11 @@ struct port_pool { mutex_t free_list_lock; GQueue free_list; + BIT_ARRAY_DECLARE(free_list_used, 0x10000); + + GQueue free_ports_q; /* for getting the next free port */ + GHashTable * free_ports_ht; /* for a lookup, if the port is used */ }; struct intf_address { socktype_t *type;