Browse Source

MT#56750 Reworked port allocation

Introduce a reworked port allocation in RTPEngine.
The goal of this rework is to:
- simplify the logic of handling free/engaged ports
- eliminate a bottle neck begotten by overcomplicated logic
- potentially resolve the issue with "ran out of ports"
  under heavy loading, when still there must be ports left
  in the ports pool

Change-Id: Ifd2b1565611dd3b86c474a1ea5507fc6152fc212
pull/1642/head
Donat Zenichev 3 years ago
parent
commit
3444febebc
2 changed files with 279 additions and 102 deletions
  1. +275
    -102
      daemon/media_socket.c
  2. +4
    -0
      include/media_socket.h

+ 275
- 102
daemon/media_socket.c View File

@ -658,7 +658,56 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port) {
return 0; return 0;
} }
/* Append a list of free ports within min-max range */
static void __append_free_ports_to_int(struct intf_spec *spec) {
unsigned int ports_amount, count;
GQueue * free_ports_q = &spec->port_pool.free_ports_q;
GHashTable ** free_ports_ht = &spec->port_pool.free_ports_ht;
if (!*free_ports_ht)
*free_ports_ht = g_hash_table_new(g_direct_hash, g_direct_equal);
if (spec->port_pool.max < spec->port_pool.min) {
ilog(LOG_WARNING, "Ports range: max value cannot be less than min");
return;
}
/* range of possible ports */
ports_amount = spec->port_pool.max - spec->port_pool.min + 1;
count = ports_amount;
if (ports_amount == 0) {
ilog(LOG_WARNING, "Ports range: there must be at least 1 port in the range");
return;
}
int port_values[ports_amount];
/* create an array to store the initial values within the range */
for (int i = 0; i < ports_amount; i++)
port_values[i] = spec->port_pool.min + i;
/* generate N random numbers within the given range without duplicates,
* using the rolling dice algorithm */
for (int i = 0; i < ports_amount; i++)
{
int j = ssl_random() % count;
int value = port_values[j];
mutex_lock(&spec->port_pool.free_list_lock);
g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(value));
/* store this new GList as value into the hash table */
GList * l = free_ports_q->tail;
/* The value retrieved from the hash table would then point
* into the queue for quick removal */
g_hash_table_replace(*free_ports_ht, GUINT_TO_POINTER(value), l);
mutex_unlock(&spec->port_pool.free_list_lock);
port_values[j] = port_values[count - 1];
count--;
}
}
// called during single-threaded startup only // called during single-threaded startup only
static void __add_intf_rr_1(struct logical_intf *lif, str *name_base, sockfamily_t *fam) { static void __add_intf_rr_1(struct logical_intf *lif, str *name_base, sockfamily_t *fam) {
struct logical_intf key = {0,}; struct logical_intf key = {0,};
@ -711,13 +760,18 @@ static void __interface_append(struct intf_config *ifa, sockfamily_t *fam, bool
} }
spec = g_hash_table_lookup(__intf_spec_addr_type_hash, &ifa->local_address); spec = g_hash_table_lookup(__intf_spec_addr_type_hash, &ifa->local_address);
if (!spec) { if (!spec) {
spec = g_slice_alloc0(sizeof(*spec)); spec = g_slice_alloc0(sizeof(*spec));
spec->local_address = ifa->local_address; spec->local_address = ifa->local_address;
spec->port_pool.min = ifa->port_min; spec->port_pool.min = ifa->port_min;
spec->port_pool.max = ifa->port_max; spec->port_pool.max = ifa->port_max;
spec->port_pool.free_ports = spec->port_pool.max - spec->port_pool.min + 1;
mutex_init(&spec->port_pool.free_list_lock); mutex_init(&spec->port_pool.free_list_lock);
/* pre-fill the range of used ports */
__append_free_ports_to_int(spec);
g_hash_table_insert(__intf_spec_addr_type_hash, &spec->local_address, spec); g_hash_table_insert(__intf_spec_addr_type_hash, &spec->local_address, spec);
} }
@ -809,59 +863,25 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc
} }
static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec, const str *label) {
struct port_pool *pp;
__C_DBG("attempting to open port %u", port);
pp = &spec->port_pool;
if (bit_array_set(pp->ports_used, port)) {
__C_DBG("port %d in use", port);
return -1;
}
__C_DBG("port %d locked", port);
/**
* Opens a socket for a given port value and edits the iptables accordingly.
* It doesn't provide a port selection logic.
*/
static int add_socket(socket_t *r, unsigned int port, struct intf_spec *spec, const str *label) {
__C_DBG("An attempt to open a socket for port: '%u'", port);
if (open_socket(r, SOCK_DGRAM, port, &spec->local_address.addr)) { if (open_socket(r, SOCK_DGRAM, port, &spec->local_address.addr)) {
__C_DBG("couldn't open port %d", port);
bit_array_clear(pp->ports_used, port);
__C_DBG("Can't open a socket for port: '%d'", port);
return -1; return -1;
} }
iptables_add_rule(r, label); iptables_add_rule(r, label);
socket_timestamping(r); socket_timestamping(r);
g_atomic_int_dec_and_test(&pp->free_ports);
__C_DBG("%d free ports remaining on interface %s", pp->free_ports,
sockaddr_print_buf(&spec->local_address.addr));
__C_DBG("A socket successfully bound for port: '%u'", port);
return 0; return 0;
} }
static void release_port_now(socket_t *r, struct intf_spec *spec) {
unsigned int port = r->local.port;
struct port_pool *pp = &spec->port_pool;
__C_DBG("trying to release port %u", port);
if (close_socket(r) == 0) {
__C_DBG("port %u is released", port);
iptables_del_rule(r);
bit_array_clear(pp->ports_used, port);
g_atomic_int_inc(&pp->free_ports);
if ((port & 1) == 0) {
mutex_lock(&pp->free_list_lock);
if (!bit_array_isset(pp->free_list_used, port)) {
g_queue_push_tail(&pp->free_list, GUINT_TO_POINTER(port));
bit_array_set(pp->free_list_used, port);
}
mutex_unlock(&pp->free_list_lock);
}
} else {
__C_DBG("port %u is NOT released", port);
}
}
/**
* Pushing ports into the `ports_to_release` queue.
*/
static void release_port(socket_t *r, struct intf_spec *spec) { static void release_port(socket_t *r, struct intf_spec *spec) {
if (!r->local.port || r->fd == -1) if (!r->local.port || r->fd == -1)
return; return;
@ -875,6 +895,34 @@ static void free_port(socket_t *r, struct intf_spec *spec) {
release_port(r, spec); release_port(r, spec);
g_slice_free1(sizeof(*r), r); g_slice_free1(sizeof(*r), r);
} }
/**
* Logic responsible for devastating the `ports_to_release` queue.
* It's being called by main poller.
*/
static void release_port_now(socket_t *r, struct intf_spec *spec) {
unsigned int port = r->local.port;
struct port_pool *pp = &spec->port_pool;
GQueue * free_ports_q = &pp->free_ports_q;
GHashTable * free_ports_ht = pp->free_ports_ht;
__C_DBG("trying to release port %u", port);
if (close_socket(r) == 0) {
__C_DBG("A socket for the '%u' has been closed", port);
iptables_del_rule(r);
/* first return the engaged port back */
mutex_lock(&pp->free_list_lock);
g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port));
GList * l = free_ports_q->tail;
g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l);
mutex_unlock(&pp->free_list_lock);
} else {
ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port);
}
}
void release_closed_sockets(void) { void release_closed_sockets(void) {
struct late_port_release *lpr; struct late_port_release *lpr;
while ((lpr = g_queue_pop_head(&ports_to_release))) { while ((lpr = g_queue_pop_head(&ports_to_release))) {
@ -883,91 +931,212 @@ void release_closed_sockets(void) {
} }
} }
/* puts list of socket_t into "out" */
/**
* Puts a list of socket_t objects into "out".
*
* @param num_ports, number of ports we have to engage (1 - rtcp-mux / 2 - one RTP and one RTCP)
* @param wanted_start_port, a pre-defined port (if given), if not given must be 0
* @param spec, interface specifications
* @param out, a list of sockets for this particular session (not a global list)
*/
int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port, int __get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int wanted_start_port,
struct intf_spec *spec, const str *label) struct intf_spec *spec, const str *label)
{ {
int i, cycle = 0;
socket_t *sk;
int port;
struct port_pool *pp;
unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0;
socket_t * sk;
GQueue ports_to_engage = G_QUEUE_INIT; /* usually it's only one RTCP port, theoreticaly can be more */
if (num_ports == 0)
return 0;
struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */
GQueue * free_ports_q;
GHashTable * free_ports_ht;
pp = &spec->port_pool;
if (num_ports == 0) {
ilog(LOG_ERR, "Number of ports to be engaged is '%d', can't handle it like that",
num_ports);
goto fail;
}
__C_DBG("wanted_start_port=%d", wanted_start_port);
/* for the wanted port only one port can be engaged */
if (num_ports > 1 && wanted_start_port > 0) {
ilog(LOG_ERR, "A specific port value is requested, but ports to be engaged > 1");
goto fail;
}
if (wanted_start_port > 0) {
port = wanted_start_port;
__C_DBG("port=%d", port);
} else {
port = g_atomic_int_get(&pp->last_used);
__C_DBG("before randomization port=%d", port);
#if PORT_RANDOM_MIN && PORT_RANDOM_MAX
port += PORT_RANDOM_MIN + (ssl_random() % (PORT_RANDOM_MAX - PORT_RANDOM_MIN));
#endif
__C_DBG("after randomization port=%d", port);
free_ports_q = &pp->free_ports_q;
free_ports_ht = pp->free_ports_ht;
// debug msg if port is in the given interval
if (bit_array_isset(pp->ports_used, port)) {
__C_DBG("port %d is USED in port pool", port);
mutex_lock(&pp->free_list_lock);
unsigned int fport = GPOINTER_TO_UINT(g_queue_pop_head(&pp->free_list));
if (fport)
bit_array_clear(pp->free_list_used, fport);
mutex_unlock(&pp->free_list_lock);
if (fport) {
port = fport;
__C_DBG("Picked port %u from free list", port);
}
/* a presence of free lists data is critical for us */
if (!(free_ports_q && free_ports_q->head) || !free_ports_ht) {
ilog(LOG_ERR, "Failure while trying to get a list of free ports");
goto fail;
}
/* specifically requested port */
if (wanted_start_port > 0) {
ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port);
mutex_lock(&pp->free_list_lock);
GList *l = g_hash_table_lookup(free_ports_ht, GUINT_TO_POINTER(wanted_start_port));
if (!l) {
/* if engaged already, just select any other (so default logic) */
ilog(LOG_WARN, "This requested port has been already engaged, can't take it.");
wanted_start_port = 0; /* take what is proposed by FIFO instead */
} else { } else {
__C_DBG("port %d is NOT USED in port pool", port);
/* we got the port, and we are sure it wasn't engaged */
g_queue_delete_link(free_ports_q, l);
g_hash_table_remove(free_ports_ht, GUINT_TO_POINTER(wanted_start_port));
port = wanted_start_port;
} }
mutex_unlock(&pp->free_list_lock);
}
/* make sure we have ports to be used */
mutex_lock(&pp->free_list_lock);
available_ports = g_queue_get_length(free_ports_q);
mutex_unlock(&pp->free_list_lock);
if (!available_ports && wanted_start_port == 0) {
ilog(LOG_ERR, "Empty ports queue, no more ports left for usage");
goto fail;
} }
while (1) {
__C_DBG("cycle=%d, port=%d", cycle, port);
/* if there is only 1 port left, and it's not rtcp-mux, then
* it makes no sence to conitnue - ran out ports */
if (num_ports > 1 && wanted_start_port == 0 && available_ports == 1) {
ilog(LOG_ERR, "Ran out of ports, can't engage an additional port (for RTCP)");
goto fail;
}
/* Here we try to bind a port to a socket being opened.
*
* cycling here unless:
* - for non rtcp-mux: unless we engage two sequential ports, where RTP port is even
* and the both ports a socket can be opened (get_port())
* - for rtcp-mux: unless we get a socket opened for it (get_port())
* - theoretically more than 2 ports can be requested.
*/
while (1)
{
new_cycle:
if (++allocation_attempts > available_ports) {
ilog(LOG_ERR, "Failure while trying to bind a port to the socket");
goto fail;
}
if (!wanted_start_port) { if (!wanted_start_port) {
if (port < pp->min)
port = pp->min;
if (num_ports > 1 && (port & 1))
port++;
/* For cases with no rtcp-mux: RTP must be an even port,
* and RTCP port is the one next to that.
*/
/* Now only get first possible port for RTP.
* Then additionally make sure that the RTCP port can also be engaged, if needed.
*/
mutex_lock(&pp->free_list_lock);
port = GPOINTER_TO_UINT(g_queue_pop_head(free_ports_q)); /* RTP */
if (!port) {
mutex_unlock(&pp->free_list_lock);
ilog(LOG_ERR, "Failure while trying to get a port from the list");
goto fail;
}
g_hash_table_remove(free_ports_ht, GUINT_TO_POINTER(port)); /* RTP */
mutex_unlock(&pp->free_list_lock);
/* ports for RTP must be even, if there is an additional port for RTCP */
if (num_ports > 1 && (port & 1)) {
/* try again */
mutex_lock(&pp->free_list_lock);
g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port)); /* return port for RTP back */
GList * l = free_ports_q->tail;
g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l);
mutex_unlock(&pp->free_list_lock);
goto new_cycle;
}
/* find additional ports, usually it's only RTCP */
additional_port = port;
for (int i = 1; i < num_ports; i++)
{
additional_port++;
mutex_lock(&pp->free_list_lock);
GList *l = g_hash_table_lookup(free_ports_ht, GUINT_TO_POINTER(additional_port));
if (!l) {
/* try again */
g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port)); /* return port for RTP back */
GList * l = free_ports_q->tail;
g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l);
mutex_unlock(&pp->free_list_lock);
/* check if we managed to enagage anything in previous for-cycles */
while ((additional_port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage))))
{
mutex_lock(&pp->free_list_lock);
g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(additional_port));
GList * l = free_ports_q->tail;
g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(additional_port), l);
mutex_unlock(&pp->free_list_lock);
}
goto new_cycle;
} else {
/* engage this port right away */
g_queue_delete_link(free_ports_q, l);
g_hash_table_remove(free_ports_ht, GUINT_TO_POINTER(additional_port));
mutex_unlock(&pp->free_list_lock);
/* track for which additional ports, we have to open sockets */
g_queue_push_tail(&ports_to_engage, GUINT_TO_POINTER(additional_port));
}
}
} }
for (i = 0; i < num_ports; i++) {
ilog(LOG_DEBUG, "Trying to bind the socket for RTP/RTCP ports (allocation attempt = '%d')",
allocation_attempts);
/* at this point we consider all things before as successfull. Now just add the RTP port */
g_queue_push_head(&ports_to_engage, GUINT_TO_POINTER(port));
while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage))))
{
ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port);
sk = g_slice_alloc0(sizeof(*sk)); sk = g_slice_alloc0(sizeof(*sk));
// fd=0 is a valid file descriptor that may be closed
// accidentally by free_port if previously bounded
sk->fd = -1; sk->fd = -1;
g_queue_push_tail(out, sk); g_queue_push_tail(out, sk);
if (!wanted_start_port && port > pp->max) {
port = 0;
cycle++;
/* if not possible to engage this socket try to re-allocate again */
if (add_socket(sk, port, spec, label)) {
/* if something has been left in `ports_to_engage`, release it right away */
while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage))))
{
mutex_lock(&pp->free_list_lock);
g_queue_push_tail(free_ports_q, GUINT_TO_POINTER(port));
GList * l = free_ports_q->tail;
g_hash_table_replace(free_ports_ht, GUINT_TO_POINTER(port), l);
mutex_unlock(&pp->free_list_lock);
}
/* ports already bound to a socket, will be freed by `free_port()` */
goto release_restart; goto release_restart;
} }
if (get_port(sk, port++, spec, label))
goto release_restart;
} }
/* success */
break; break;
release_restart: release_restart:
/* release all previously engaged sockets */
while ((sk = g_queue_pop_head(out))) while ((sk = g_queue_pop_head(out)))
free_port(sk, spec);
free_port(sk, spec); /* engaged ports will be released here */
if (cycle >= 2 || wanted_start_port > 0)
/* do not re-try for specifically wanted ports */
if (wanted_start_port > 0)
goto fail; goto fail;
ilog(LOG_DEBUG, "Something already keeps this port, trying to take another port(s)");
} }
/* success */ /* success */
g_atomic_int_set(&pp->last_used, port);
__C_DBG("Opened ports %u.. on interface %s for media relay",
ilog(LOG_DEBUG, "Opened a socket on port '%u' (on interface '%s') for a media relay",
((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->local_address.addr)); ((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->local_address.addr));
return 0; return 0;
@ -984,7 +1153,7 @@ int get_consecutive_ports(GQueue *out, unsigned int num_ports, unsigned int num_
struct intf_list *il; struct intf_list *il;
struct local_intf *loc; struct local_intf *loc;
const struct logical_intf *log = media->logical_intf; const struct logical_intf *log = media->logical_intf;
const str *label = &media->call->callid;
const str *label = &media->call->callid; /* call's callid */
/* /*
// debug locals of logical incerface // debug locals of logical incerface
@ -2978,7 +3147,11 @@ void interfaces_free(void) {
for (GList *l = ll; l; l = l->next) { for (GList *l = ll; l; l = l->next) {
struct intf_spec *spec = l->data; struct intf_spec *spec = l->data;
struct port_pool *pp = &spec->port_pool; struct port_pool *pp = &spec->port_pool;
g_queue_clear(&pp->free_list);
g_queue_clear(&pp->free_list); /* TODO: deprecate it */
if (pp->free_ports_ht) {
g_hash_table_destroy(pp->free_ports_ht);
}
g_queue_clear(&pp->free_ports_q);
g_slice_free1(sizeof(*spec), spec); g_slice_free1(sizeof(*spec), spec);
} }
g_list_free(ll); g_list_free(ll);


+ 4
- 0
include/media_socket.h View File

@ -87,7 +87,11 @@ struct port_pool {
mutex_t free_list_lock; mutex_t free_list_lock;
GQueue free_list; GQueue free_list;
BIT_ARRAY_DECLARE(free_list_used, 0x10000); BIT_ARRAY_DECLARE(free_list_used, 0x10000);
GQueue free_ports_q; /* for getting the next free port */
GHashTable * free_ports_ht; /* for a lookup, if the port is used */
}; };
struct intf_address { struct intf_address {
socktype_t *type; socktype_t *type;


Loading…
Cancel
Save