Browse Source

MT#55283 move socket_port_link into stream_fd

This allows us to simplify some function signatures

Change-Id: I58f65735ba84ec7a536b1b170d1ef90e266308f5
pull/1910/head
Richard Fuchs 10 months ago
parent
commit
eeeb2d8641
4 changed files with 28 additions and 35 deletions
  1. +4
    -4
      daemon/call.c
  2. +12
    -14
      daemon/media_socket.c
  3. +7
    -14
      daemon/redis.c
  4. +5
    -3
      include/media_socket.h

+ 4
- 4
daemon/call.c View File

@ -878,7 +878,7 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne
ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU " ilog(LOG_ERR | LOG_FLAG_LIMIT, "Failed to set socket CPU "
"affinity: %s", strerror(errno)); "affinity: %s", strerror(errno));
} }
sfd = stream_fd_new(&spl->socket, &spl->links, media->call, il->local_intf);
sfd = stream_fd_new(spl, media->call, il->local_intf);
t_queue_push_tail(&em_il->list, sfd); // not referenced t_queue_push_tail(&em_il->list, sfd); // not referenced
g_free(spl); g_free(spl);
} }
@ -912,9 +912,9 @@ static void __assign_stream_fds(struct call_media *media, sfd_intf_list_q *intf_
if (!sfd) { if (!sfd) {
// create a dummy sfd. needed to hold RTCP crypto context when // create a dummy sfd. needed to hold RTCP crypto context when
// RTCP-mux is in use // RTCP-mux is in use
socket_t sock;
dummy_socket(&sock, &il->local_intf->spec->local_address.addr);
sfd = stream_fd_new(&sock, NULL, media->call, il->local_intf);
struct socket_port_link spl = {0};
dummy_socket(&spl.socket, &il->local_intf->spec->local_address.addr);
sfd = stream_fd_new(&spl, media->call, il->local_intf);
} }
sfd->stream = ps; sfd->stream = ps;


+ 12
- 14
daemon/media_socket.c View File

@ -1064,13 +1064,13 @@ static void release_port_push(void *p) {
__C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port); __C_DBG("Adding the port '%u' to late-release list", lpr->socket.local.port);
t_queue_push_tail(&ports_to_release, lpr); t_queue_push_tail(&ports_to_release, lpr);
} }
static void release_port_poller(socket_t *r, ports_q *links, struct port_pool *pp, struct poller *poller) {
if (!r->local.port || r->fd == -1)
static void release_port_poller(struct socket_port_link *spl, struct poller *poller) {
if (!spl->socket.local.port || spl->socket.fd == -1)
return; return;
struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr)); struct late_port_release *lpr = g_slice_alloc(sizeof(*lpr));
move_socket(&lpr->socket, r);
lpr->pp = pp;
lpr->pp_links = *links;
move_socket(&lpr->socket, &spl->socket);
lpr->pp = spl->pp;
lpr->pp_links = spl->links;
if (!poller) if (!poller)
release_port_push(lpr); release_port_push(lpr);
else { else {
@ -1078,11 +1078,11 @@ static void release_port_poller(socket_t *r, ports_q *links, struct port_pool *p
rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr); rtpe_poller_del_item_callback(poller, lpr->socket.fd, release_port_push, lpr);
} }
} }
static void release_port(socket_t *r, ports_q *links, struct port_pool *pp) {
release_port_poller(r, links, pp, NULL);
static void release_port(struct socket_port_link *spl) {
release_port_poller(spl, NULL);
} }
static void free_port(struct socket_port_link *spl) { static void free_port(struct socket_port_link *spl) {
release_port(&spl->socket, &spl->links, spl->pp);
release_port(spl);
g_free(spl); g_free(spl);
} }
/** /**
@ -3198,24 +3198,22 @@ out:
static void stream_fd_free(stream_fd *f) { static void stream_fd_free(stream_fd *f) {
release_port(&f->socket, &f->port_pool_links, &f->local_intf->spec->port_pool);
release_port(&f->spl);
crypto_cleanup(&f->crypto); crypto_cleanup(&f->crypto);
dtls_connection_cleanup(&f->dtls); dtls_connection_cleanup(&f->dtls);
obj_put(f->call); obj_put(f->call);
} }
stream_fd *stream_fd_new(socket_t *fd, ports_q *links, call_t *call, struct local_intf *lif) {
stream_fd *stream_fd_new(struct socket_port_link *spl, call_t *call, struct local_intf *lif) {
stream_fd *sfd; stream_fd *sfd;
struct poller_item pi; struct poller_item pi;
sfd = obj_alloc0(stream_fd, stream_fd_free); sfd = obj_alloc0(stream_fd, stream_fd_free);
sfd->unique_id = t_queue_get_length(&call->stream_fds); sfd->unique_id = t_queue_get_length(&call->stream_fds);
sfd->socket = *fd;
sfd->call = obj_get(call); sfd->call = obj_get(call);
sfd->local_intf = lif; sfd->local_intf = lif;
if (links)
sfd->port_pool_links = *links;
sfd->spl = *spl;
t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ t_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */
__C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port);
@ -3264,7 +3262,7 @@ void stream_fd_release(stream_fd *sfd) {
&sfd->socket.local); // releases reference &sfd->socket.local); // releases reference
} }
release_port_poller(&sfd->socket, &sfd->port_pool_links, &sfd->local_intf->spec->port_pool, sfd->poller);
release_port_poller(&sfd->spl, sfd->poller);
} }


+ 7
- 14
daemon/redis.c View File

@ -1382,8 +1382,6 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) {
socket_port_q q = TYPED_GQUEUE_INIT; socket_port_q q = TYPED_GQUEUE_INIT;
unsigned int loc_uid; unsigned int loc_uid;
stream_fd *sfd; stream_fd *sfd;
socket_t *sock;
socket_t local_sock;
int port, fd; int port, fd;
const char *err; const char *err;
@ -1418,28 +1416,23 @@ static int redis_sfds(call_t *c, struct redis_list *sfds) {
if (!loc) if (!loc)
goto err; goto err;
struct socket_port_link *spl = NULL;
ports_q *links = NULL;
if (fd != -1) { if (fd != -1) {
err = "failed to open ports"; err = "failed to open ports";
if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid)) if (__get_consecutive_ports(&q, 1, port, loc->spec, &c->callid))
goto err; goto err;
err = "no port returned"; err = "no port returned";
spl = t_queue_pop_head(&q);
struct socket_port_link *spl = t_queue_pop_head(&q);
if (!spl) if (!spl)
goto err; goto err;
sock = &spl->socket;
links = &spl->links;
set_tos(sock, c->tos);
set_tos(&spl->socket, c->tos);
sfd = stream_fd_new(spl, c, loc);
g_free(spl);
} }
else { else {
sock = &local_sock;
dummy_socket(sock, &loc->spec->local_address.addr);
struct socket_port_link spl = {0};
dummy_socket(&spl.socket, &loc->spec->local_address.addr);
sfd = stream_fd_new(&spl, c, loc);
} }
sfd = stream_fd_new(sock, links, c, loc);
if (spl)
g_free(spl);
if (redis_hash_get_sdes_params1(&sfd->crypto.params, rh, "") == -1) if (redis_hash_get_sdes_params1(&sfd->crypto.params, rh, "") == -1)
return -1; return -1;


+ 5
- 3
include/media_socket.h View File

@ -215,9 +215,11 @@ struct stream_fd {
struct obj obj; struct obj obj;
unsigned int unique_id; /* RO */ unsigned int unique_id; /* RO */
socket_t socket; /* RO */
union {
socket_t socket; /* RO - alias */
struct socket_port_link spl; /* RO */
};
struct local_intf *local_intf; /* RO */ struct local_intf *local_intf; /* RO */
ports_q port_pool_links; /* RO */
/* stream_fd object holds a reference to the call it belongs to. /* stream_fd object holds a reference to the call it belongs to.
* Which in turn holds references to all stream_fd objects it contains, * Which in turn holds references to all stream_fd objects it contains,
@ -296,7 +298,7 @@ int is_local_endpoint(const struct intf_address *addr, unsigned int port);
int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port, int __get_consecutive_ports(socket_port_q *out, unsigned int num_ports, unsigned int wanted_start_port,
struct intf_spec *spec, const str *); struct intf_spec *spec, const str *);
int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media); int get_consecutive_ports(socket_intf_list_q *out, unsigned int num_ports, unsigned int num_intfs, struct call_media *media);
stream_fd *stream_fd_new(socket_t *fd, ports_q *links, call_t *call, struct local_intf *lif);
stream_fd *stream_fd_new(struct socket_port_link *, call_t *call, struct local_intf *lif);
stream_fd *stream_fd_lookup(const endpoint_t *); stream_fd *stream_fd_lookup(const endpoint_t *);
void stream_fd_release(stream_fd *); void stream_fd_release(stream_fd *);
enum thread_looper_action release_closed_sockets(void); enum thread_looper_action release_closed_sockets(void);


Loading…
Cancel
Save