diff --git a/daemon/call.c b/daemon/call.c index 68929770f..ecd36721b 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -871,16 +871,17 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne em_il->local_intf = il->local_intf; t_queue_push_tail(&em->intf_sfds, em_il); - socket_t *sock; - while ((sock = t_queue_pop_head(&il->list))) { - set_tos(sock, media->call->tos); + struct socket_port_link *spl; + while ((spl = t_queue_pop_head(&il->list))) { + set_tos(&spl->socket, media->call->tos); if (media->call->cpu_affinity >= 0) { - if (socket_cpu_affinity(sock, media->call->cpu_affinity)) + if (socket_cpu_affinity(&spl->socket, media->call->cpu_affinity)) ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " "affinity: %s", strerror(errno)); } - sfd = stream_fd_new(sock, media->call, il->local_intf); + sfd = stream_fd_new(&spl->socket, spl->link, media->call, il->local_intf); t_queue_push_tail(&em_il->list, sfd); // not referenced + g_free(spl); } next_il: @@ -912,9 +913,9 @@ static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_ if (!sfd) { // create a dummy sfd. needed to hold RTCP crypto context when // RTCP-mux is in use - socket_t *sock = g_slice_alloc(sizeof(*sock)); - dummy_socket(sock, &il->local_intf->spec->local_address.addr); - sfd = stream_fd_new(sock, media->call, il->local_intf); + socket_t sock; + dummy_socket(&sock, &il->local_intf->spec->local_address.addr); + sfd = stream_fd_new(&sock, NULL, media->call, il->local_intf); } sfd->stream = ps; diff --git a/daemon/media_socket.c b/daemon/media_socket.c index fdb9a5dac..2d5972b4f 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -85,6 +85,7 @@ struct packet_handler_ctx { struct late_port_release { socket_t socket; struct port_pool *pp; + ports_list *pp_link; }; struct interface_stats_interval { struct interface_stats_block stats; @@ -682,19 +683,18 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port) { /** * This function just (globally) reserves a port number, it doesn't provide any binding/unbinding. */ -static void reserve_port(struct port_pool *pp, - ports_list *value_looked_up, unsigned int port) { - - t_queue_delete_link(&pp->free_ports_q, value_looked_up); +static void reserve_port(struct port_pool *pp, ports_list *link) { + t_queue_unlink(&pp->free_ports_q, link); + unsigned int port = GPOINTER_TO_UINT(link->data); free_ports_link(pp, port) = NULL; } /** * This function just releases reserved port number, it doesn't provide any binding/unbinding. */ -static void release_reserved_port(struct port_pool *pp, unsigned int port) { - t_queue_push_tail(&pp->free_ports_q, GUINT_TO_POINTER(port)); - __auto_type l = pp->free_ports_q.tail; - free_ports_link(pp, port) = l; +static void release_reserved_port(struct port_pool *pp, ports_list *link) { + t_queue_push_tail_link(&pp->free_ports_q, link); + unsigned int port = GPOINTER_TO_UINT(link->data); + free_ports_link(pp, port) = link; } /* Append a list of free ports within the min-max range */ static void __append_free_ports_to_int(struct intf_spec *spec) { @@ -907,8 +907,10 @@ void interfaces_exclude_port(unsigned int port) { mutex_lock(&pp->free_list_lock); __auto_type ll = free_ports_link(pp, port); - if (ll) - reserve_port(pp, ll, port); + if (ll) { + reserve_port(pp, ll); + t_list_free(ll); + } mutex_unlock(&pp->free_list_lock); } } @@ -961,12 +963,13 @@ static void release_port_push(void *p) { __C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port); t_queue_push_tail(&ports_to_release, lpr); } -static void release_port_poller(socket_t *r, struct port_pool *pp, struct poller *poller) { +static void release_port_poller(socket_t *r, ports_list *link, struct port_pool *pp, struct poller *poller) { if (!r->local.port || r->fd == -1) return; struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr)); move_socket(&lpr->socket, r); lpr->pp = pp; + lpr->pp_link = link; if (!poller) release_port_push(lpr); else { @@ -974,18 +977,18 @@ static void release_port_poller(socket_t *r, struct port_pool *pp, struct poller rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr); } } -static void release_port(socket_t *r, struct port_pool *pp) { - release_port_poller(r, pp, NULL); +static void release_port(socket_t *r, ports_list *link, struct port_pool *pp) { + release_port_poller(r, link, pp, NULL); } -static void free_port(socket_t *r, struct port_pool *pp) { - release_port(r, pp); - g_slice_free1(sizeof(*r), r); +static void free_port(struct socket_port_link *spl, struct port_pool *pp) { + release_port(&spl->socket, spl->link, pp); + g_free(spl); } /** * Logic responsible for devastating the `ports_to_release` queue. * It's being called by main poller. */ -static void release_port_now(socket_t *r, struct port_pool *pp) { +static void release_port_now(socket_t *r, ports_list *link, struct port_pool *pp) { unsigned int port = r->local.port; __C_DBG("Trying to release the port '%u'", port); @@ -997,7 +1000,7 @@ static void release_port_now(socket_t *r, struct port_pool *pp) { /* first return the engaged port back */ mutex_lock(&pp->free_list_lock); - release_reserved_port(pp, port); + release_reserved_port(pp, link); mutex_unlock(&pp->free_list_lock); } else { ilog(LOG_WARNING, "Unable to close the socket for port '%u'", port); @@ -1022,7 +1025,7 @@ enum thread_looper_action release_closed_sockets(void) { mutex_unlock(&ports_to_release_glob_lock); while ((lpr = t_queue_pop_head(&ports_left))) { - release_port_now(&lpr->socket, lpr->pp); + release_port_now(&lpr->socket, lpr->pp_link, lpr->pp); g_slice_free1(sizeof(*lpr), lpr); } } @@ -1046,12 +1049,12 @@ void append_thread_lpr_to_glob_lpr(void) { * @param spec, interface specifications * @param out, a list of sockets for this particular session (not a global list) */ -int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port, +int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port, struct intf_spec *spec, const str *label) { unsigned int allocation_attempts = 0, available_ports = 0, additional_port = 0, port = 0; - socket_t * sk; - GQueue ports_to_engage = G_QUEUE_INIT; /* usually it's only one RTCP port, theoretically can be more */ + ports_list *port_link = NULL; + ports_q ports_to_engage = TYPED_GQUEUE_INIT; /* usually it's only one RTCP port, theoretically can be more */ struct port_pool * pp = &spec->port_pool; /* port pool for a given local interface */ ports_q *free_ports_q; @@ -1080,14 +1083,14 @@ int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int if (wanted_start_port > 0) { ilog(LOG_DEBUG, "A specific port value is requested, wanted_start_port: '%d'", wanted_start_port); mutex_lock(&pp->free_list_lock); - __auto_type l = free_ports_link(pp, wanted_start_port); - if (!l) { + port_link = free_ports_link(pp, wanted_start_port); + if (!port_link) { /* if engaged already, just select any other (so default logic) */ ilog(LOG_WARN, "This requested port has been already engaged, can't take it."); wanted_start_port = 0; /* take what is proposed by FIFO instead */ } else { /* we got the port, and we are sure it wasn't engaged */ - reserve_port(pp, l, wanted_start_port); + reserve_port(pp, port_link); port = wanted_start_port; } mutex_unlock(&pp->free_list_lock); @@ -1135,21 +1138,23 @@ new_cycle: * Then additionally make sure that the RTCP port can also be engaged, if needed. */ mutex_lock(&pp->free_list_lock); - port = GPOINTER_TO_UINT(t_queue_pop_head(free_ports_q)); /* RTP */ + port_link = t_queue_pop_head_link(free_ports_q); - if (!port) { + if (!port_link) { mutex_unlock(&pp->free_list_lock); ilog(LOG_ERR, "Failure while trying to get a port from the list"); goto fail; } - free_ports_link(pp, port) = NULL; /* RTP */ + + port = GPOINTER_TO_UINT(port_link->data); /* RTP */ + free_ports_link(pp, port) = NULL; mutex_unlock(&pp->free_list_lock); /* ports for RTP must be even, if there is an additional port for RTCP */ if (num_ports > 1 && (port & 1)) { /* return port for RTP back and try again */ mutex_lock(&pp->free_list_lock); - release_reserved_port(pp, port); + release_reserved_port(pp, port_link); mutex_unlock(&pp->free_list_lock); goto new_cycle; } @@ -1161,31 +1166,30 @@ new_cycle: additional_port++; mutex_lock(&pp->free_list_lock); - __auto_type l = additional_port <= pp->max ? free_ports_link(pp, additional_port) : NULL; + __auto_type add_link = additional_port <= pp->max ? free_ports_link(pp, additional_port) : NULL; - if (!l) { + if (!add_link) { /* return port for RTP back and try again */ - release_reserved_port(pp, port); + release_reserved_port(pp, port_link); mutex_unlock(&pp->free_list_lock); /* check if we managed to enagage anything in previous for-cycles */ - while ((additional_port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage)))) + while ((add_link = t_queue_pop_head_link(&ports_to_engage))) { mutex_lock(&pp->free_list_lock); /* return additional ports back */ - release_reserved_port(pp, additional_port); + release_reserved_port(pp, add_link); mutex_unlock(&pp->free_list_lock); } goto new_cycle; + } - } else { - /* engage this port right away */ - reserve_port(pp, l, additional_port); - mutex_unlock(&pp->free_list_lock); + /* engage this port right away */ + reserve_port(pp, add_link); + mutex_unlock(&pp->free_list_lock); - /* track for which additional ports, we have to open sockets */ - g_queue_push_tail(&ports_to_engage, GUINT_TO_POINTER(additional_port)); - } + /* track for which additional ports, we have to open sockets */ + t_queue_push_tail_link(&ports_to_engage, add_link); } } @@ -1193,22 +1197,25 @@ new_cycle: allocation_attempts); /* at this point we consider all things before as successfull. Now just add the RTP port */ - g_queue_push_head(&ports_to_engage, GUINT_TO_POINTER(port)); + t_queue_push_head_link(&ports_to_engage, port_link); - while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage)))) + struct socket_port_link *spl; + while ((port_link = t_queue_pop_head_link(&ports_to_engage))) { + port = GPOINTER_TO_UINT(port_link->data); ilog(LOG_DEBUG, "Trying to bind the socket for port = '%d'", port); - sk = g_slice_alloc0(sizeof(*sk)); - sk->fd = -1; - t_queue_push_tail(out, sk); + spl = g_new0(struct socket_port_link, 1); + spl->socket.fd = -1; + spl->link = port_link; + t_queue_push_tail(out, spl); /* if not possible to engage this socket, try to reallocate it again */ - if (!add_socket(sk, port, spec, label)) { + if (!add_socket(&spl->socket, port, spec, label)) { /* if something has been left in the `ports_to_engage` queue, release it right away */ - while ((port = GPOINTER_TO_UINT(g_queue_pop_head(&ports_to_engage)))) + while ((port_link = t_queue_pop_head(&ports_to_engage))) { mutex_lock(&pp->free_list_lock); - release_reserved_port(pp, port); + release_reserved_port(pp, port_link); mutex_unlock(&pp->free_list_lock); } /* ports which are already bound to a socket, will be freed by `free_port()` */ @@ -1221,8 +1228,8 @@ new_cycle: release_restart: /* release all previously engaged sockets */ - while ((sk = t_queue_pop_head(out))) - free_port(sk, pp); /* engaged ports will be released here */ + while ((spl = t_queue_pop_head(out))) + free_port(spl, pp); /* engaged ports will be released here */ /* do not re-try for specifically wanted ports */ if (wanted_start_port > 0) @@ -1294,10 +1301,10 @@ error_ports: } void free_socket_intf_list(struct socket_intf_list *il) { - socket_t *sock; + struct socket_port_link *spl; - while ((sock = t_queue_pop_head(&il->list))) - free_port(sock, &il->local_intf->spec->port_pool); + while ((spl = t_queue_pop_head(&il->list))) + free_port(spl, &il->local_intf->spec->port_pool); g_slice_free1(sizeof(*il), il); } void free_sfd_intf_list(struct sfd_intf_list *il) { @@ -3110,14 +3117,14 @@ out: static void stream_fd_free(stream_fd *f) { - release_port(&f->socket, &f->local_intf->spec->port_pool); + release_port(&f->socket, f->port_pool_link, &f->local_intf->spec->port_pool); crypto_cleanup(&f->crypto); dtls_connection_cleanup(&f->dtls); obj_put(f->call); } -stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { +stream_fd *stream_fd_new(socket_t *fd, ports_list *link, call_t *call, struct local_intf *lif) { stream_fd *sfd; struct poller_item pi; @@ -3126,8 +3133,8 @@ stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif) { sfd->socket = *fd; sfd->call = obj_get(call); sfd->local_intf = lif; + sfd->port_pool_link = link; t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ - g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */ __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); @@ -3175,7 +3182,7 @@ void stream_fd_release(stream_fd *sfd) { &sfd->socket.local); // releases reference } - release_port_poller(&sfd->socket, &sfd->local_intf->spec->port_pool, sfd->poller); + release_port_poller(&sfd->socket, sfd->port_pool_link, &sfd->local_intf->spec->port_pool, sfd->poller); } diff --git a/daemon/redis.c b/daemon/redis.c index 4478ca71f..6bc365ad1 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -1381,10 +1381,11 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) { sockfamily_t *fam; struct logical_intf *lif; struct local_intf *loc; - socket_q q = TYPED_GQUEUE_INIT; + socket_port_q q = TYPED_GQUEUE_INIT; unsigned int loc_uid; stream_fd *sfd; socket_t *sock; + socket_t local_sock; int port, fd; const char *err; @@ -1419,21 +1420,28 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) { if (!loc) goto err; + struct socket_port_link *spl = NULL; + ports_list *link = NULL; + if (fd != -1) { err = "failed to open ports"; if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid)) goto err; err = "no port returned"; - sock = t_queue_pop_head(&q); - if (!sock) + spl = t_queue_pop_head(&q); + if (!spl) goto err; + sock = &spl->socket; + link = spl->link; set_tos(sock, c->tos); } else { - sock = g_slice_alloc(sizeof(*sock)); + sock = &local_sock; dummy_socket(sock, &loc->spec->local_address.addr); } - sfd = stream_fd_new(sock, c, loc); + sfd = stream_fd_new(sock, link, c, loc); + if (spl) + g_free(spl); if (redis_hash_get_sdes_params1(&sfd->crypto.params, rh, "") == -1) return -1; diff --git a/include/media_socket.h b/include/media_socket.h index 9d98cefe2..36bdcd6d6 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -88,6 +88,11 @@ struct logical_intf { typedef void port_t; TYPED_GQUEUE(ports, port_t) +struct socket_port_link { + socket_t socket; + ports_list *link; +}; + struct port_pool { unsigned int min, max; @@ -161,6 +166,8 @@ void interface_sampled_rate_stats_destroy(struct interface_sampled_rate_stats *) struct interface_stats_block *interface_sampled_rate_stats_get(struct interface_sampled_rate_stats *s, struct local_intf *lif, long long *time_diff_us); +TYPED_GQUEUE(socket_port, struct socket_port_link) + struct local_intf { struct intf_spec *spec; struct intf_address advertised_address; @@ -172,13 +179,13 @@ struct local_intf { }; struct socket_intf_list { struct local_intf *local_intf; - socket_q list; + socket_port_q list; }; struct sfd_intf_list { struct local_intf *local_intf; stream_fd_q list; }; -TYPED_GQUEUE(socket_intf_list, struct socket_intf_list) +TYPED_GQUEUE(socket_intf_list, struct socket_intf_list) /* RO */ TYPED_GQUEUE(sfd_intf_list, struct sfd_intf_list) /** @@ -205,6 +212,7 @@ struct stream_fd { unsigned int unique_id; /* RO */ socket_t socket; /* RO */ struct local_intf *local_intf; /* RO */ + ports_list *port_pool_link; /* RO */ /* stream_fd object holds a reference to the call it belongs to. * Which in turn holds references to all stream_fd objects it contains, @@ -213,7 +221,7 @@ struct stream_fd { * The call is only released when it has been dissociated from all stream_fd objects, * which happens during call teardown. */ - call_t *call; /* RO */ + call_t *call; /* RO */ struct packet_stream *stream; /* LOCK: call->master_lock */ struct crypto_context crypto; /* IN direction, LOCK: stream->in_lock */ struct dtls_connection dtls; /* LOCK: stream->in_lock */ @@ -280,13 +288,10 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc void interfaces_exclude_port(unsigned int port); int is_local_endpoint(const struct intf_address *addr, unsigned int port); -//int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const call_t *c); -//void release_port(socket_t *r, const struct local_intf *); - -int __get_consecutive_ports(socket_q *out, unsigned int num_ports, unsigned int wanted_start_port, +int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port, struct intf_spec *spec, const str *); int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media); -stream_fd *stream_fd_new(socket_t *fd, call_t *call, struct local_intf *lif); +stream_fd *stream_fd_new(socket_t *fd, ports_list *link, call_t *call, struct local_intf *lif); stream_fd *stream_fd_lookup(const endpoint_t *); void stream_fd_release(stream_fd *); enum thread_looper_action release_closed_sockets(void);