Browse Source

further split up call.c and rework socket handling

pull/163/head
Richard Fuchs 11 years ago
parent
commit
eb5de58dfc
10 changed files with 1250 additions and 1152 deletions
  1. +89
    -1086
      daemon/call.c
  2. +8
    -7
      daemon/call.h
  3. +4
    -4
      daemon/call_interfaces.c
  4. +1
    -1
      daemon/cli.c
  5. +13
    -13
      daemon/dtls.c
  6. +19
    -10
      daemon/ice.c
  7. +1083
    -12
      daemon/media_socket.c
  8. +17
    -3
      daemon/media_socket.h
  9. +14
    -14
      daemon/sdp.c
  10. +2
    -2
      daemon/stun.c

+ 89
- 1086
daemon/call.c
File diff suppressed because it is too large
View File


+ 8
- 7
daemon/call.h View File

@ -259,7 +259,9 @@ struct stream_params {
struct endpoint_map {
struct endpoint endpoint;
GQueue sfds;
unsigned int num_ports;
const struct logical_intf *logical_intf;
GQueue intf_sfds; /* list of struct intf_list - contains stream_fd list */
int wildcard:1;
};
@ -288,7 +290,8 @@ struct packet_stream {
struct call *call; /* RO */
unsigned int component; /* RO, starts with 1 */
struct stream_fd *sfd; /* LOCK: call->master_lock */
GQueue sfds; /* LOCK: call->master_lock */
struct stream_fd * volatile selected_sfd;
struct packet_stream *rtp_sink; /* LOCK: call->master_lock */
struct packet_stream *rtcp_sink; /* LOCK: call->master_lock */
struct packet_stream *rtcp_sibling; /* LOCK: call->master_lock */
@ -329,7 +332,7 @@ struct call_media {
/* local_address is protected by call->master_lock in W mode, but may
* still be modified if the lock is held in R mode, therefore we use
* atomic ops to access it when holding an R lock. */
const volatile struct local_intf *local_intf;
//const volatile struct local_intf *local_intf;
struct ice_agent *ice_agent;
@ -449,7 +452,6 @@ struct call_monologue *__monologue_create(struct call *call);
void __monologue_tag(struct call_monologue *ml, const str *tag);
void __monologue_viabranch(struct call_monologue *ml, const str *viabranch);
struct stream_fd *__stream_fd_new(socket_t *fd, struct call_media *);
int __get_consecutive_ports(socket_t *array, int array_len, int wanted_start_port, const struct call_media *);
struct packet_stream *__packet_stream_new(struct call *call);
@ -463,12 +465,11 @@ int call_delete_branch(struct callmaster *m, const str *callid, const str *branc
const str *fromtag, const str *totag, bencode_item_t *output, int delete_delay);
void call_destroy(struct call *);
enum call_stream_state call_stream_state_machine(struct packet_stream *);
void call_media_state_machine(struct call_media *m);
void call_media_unkernelize(struct call_media *media);
void kernelize(struct packet_stream *);
int call_stream_address(char *, struct packet_stream *, enum stream_address_format, int *);
int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address_format format,
int *len, struct local_intf *ifa);
int *len, const struct local_intf *ifa);
const struct transport_protocol *transport_protocol(const str *s);


+ 4
- 4
daemon/call_interfaces.c View File

@ -31,7 +31,7 @@ static int call_stream_address_gstring(GString *o, struct packet_stream *ps, enu
int len, ret;
char buf[64]; /* 64 bytes ought to be enough for anybody */
ret = call_stream_address(buf, ps, format, &len);
ret = call_stream_address46(buf, ps, format, &len, NULL);
g_string_append_len(o, buf, len);
return ret;
}
@ -66,7 +66,7 @@ found:
if (format == SAF_TCP)
call_stream_address_gstring(o, ps, format);
port = ps->sfd ? ps->sfd->socket.local.port : 0;
port = ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0;
g_string_append_printf(o, (format == 1) ? "%i " : " %i", port);
if (format == SAF_UDP) {
@ -811,8 +811,8 @@ static void ng_stats_stream(bencode_item_t *list, const struct packet_stream *ps
dict = bencode_list_add_dictionary(list);
if (ps->sfd)
bencode_dictionary_add_integer(dict, "local port", ps->sfd->socket.local.port);
if (ps->selected_sfd)
bencode_dictionary_add_integer(dict, "local port", ps->selected_sfd->socket.local.port);
ng_stats_endpoint(bencode_dictionary_add_dictionary(dict, "endpoint"), &ps->endpoint);
ng_stats_endpoint(bencode_dictionary_add_dictionary(dict, "advertised endpoint"),
&ps->advertised_endpoint);


+ 1
- 1
daemon/cli.c View File

@ -184,7 +184,7 @@ static void cli_incoming_list_callid(char* buffer, int len, struct callmaster* m
printlen = snprintf(replybuffer,(outbufend-replybuffer), "------ Media #%u, port %5u <> %15s:%-5u%s, "
""UINT64F" p, "UINT64F" b, "UINT64F" e, "UINT64F" last_packet\n",
md->index,
(unsigned int) (ps->sfd ? ps->sfd->socket.local.port : 0),
(unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0),
sockaddr_print_buf(&ps->endpoint.address), ps->endpoint.port,
(!PS_ISSET(ps, RTP) && PS_ISSET(ps, RTCP)) ? " (RTCP)" : "",
atomic64_get(&ps->stats.packets),


+ 13
- 13
daemon/dtls.c View File

@ -475,12 +475,12 @@ int dtls_connection_init(struct packet_stream *ps, int active, struct dtls_cert
struct dtls_connection *d;
unsigned long err;
if (!ps || !ps->sfd)
if (!ps || !ps->selected_sfd)
return 0;
__DBG("dtls_connection_init(%i)", active);
d = &ps->sfd->dtls;
d = &ps->selected_sfd->dtls;
if (d->init) {
if ((d->active && active) || (!d->active && !active))
@ -516,7 +516,7 @@ int dtls_connection_init(struct packet_stream *ps, int active, struct dtls_cert
if (!d->r_bio || !d->w_bio)
goto error;
SSL_set_app_data(d->ssl, ps->sfd); /* XXX obj reference here? */
SSL_set_app_data(d->ssl, ps->selected_sfd); /* XXX obj reference here? */
SSL_set_bio(d->ssl, d->r_bio, d->w_bio);
SSL_set_mode(d->ssl, SSL_MODE_ENABLE_PARTIAL_WRITE | SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
@ -606,15 +606,15 @@ found:
if (d->active) {
/* we're the client */
crypto_init(&ps->crypto, &client);
crypto_init(&ps->sfd->crypto, &server);
crypto_init(&ps->selected_sfd->crypto, &server);
}
else {
/* we're the server */
crypto_init(&ps->crypto, &server);
crypto_init(&ps->sfd->crypto, &client);
crypto_init(&ps->selected_sfd->crypto, &client);
}
crypto_dump_keys(&ps->crypto, &ps->sfd->crypto);
crypto_dump_keys(&ps->crypto, &ps->selected_sfd->crypto);
return 0;
@ -635,12 +635,12 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) {
struct msghdr mh;
struct iovec iov;
if (!ps || !ps->sfd)
if (!ps || !ps->selected_sfd)
return 0;
if (!MEDIA_ISSET(ps->media, DTLS))
return 0;
d = &ps->sfd->dtls;
d = &ps->selected_sfd->dtls;
if (s)
__DBG("dtls packet input: len %u %02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x",
@ -664,7 +664,7 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) {
ret = try_connect(d);
if (ret == -1) {
ilog(LOG_ERROR, "DTLS error on local port %u", ps->sfd->socket.local.port);
ilog(LOG_ERROR, "DTLS error on local port %u", ps->selected_sfd->socket.local.port);
/* fatal error */
dtls_connection_cleanup(d);
return 0;
@ -717,7 +717,7 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) {
stream_msg_mh_src(ps, &mh);
socket_sendmsg(&ps->sfd->socket, &mh, fsin);
socket_sendmsg(&ps->selected_sfd->socket, &mh, fsin);
return 0;
}
@ -726,12 +726,12 @@ int dtls(struct packet_stream *ps, const str *s, const endpoint_t *fsin) {
void dtls_shutdown(struct packet_stream *ps) {
struct dtls_connection *d;
if (!ps || !ps->sfd)
if (!ps || !ps->selected_sfd)
return;
__DBG("dtls_shutdown");
d = &ps->sfd->dtls;
d = &ps->selected_sfd->dtls;
if (!d->init)
return;
@ -748,7 +748,7 @@ void dtls_shutdown(struct packet_stream *ps) {
}
crypto_reset(&ps->crypto);
crypto_reset(&ps->sfd->crypto);
crypto_reset(&ps->selected_sfd->crypto);
}
void dtls_connection_cleanup(struct dtls_connection *c) {


+ 19
- 10
daemon/ice.c View File

@ -620,7 +620,7 @@ static void __do_ice_check(struct ice_candidate_pair *pair) {
stun_binding_request(&pair->remote_candidate->endpoint, transact, &ag->pwd[0], ag->ufrag,
AGENT_ISSET(ag, CONTROLLING), tie_breaker,
prio, &pair->local_intf->spec->address.addr, &ps->sfd->socket,
prio, &pair->local_intf->spec->address.addr, &ps->selected_sfd->socket,
PAIR_ISSET(pair, TO_USE));
}
@ -711,7 +711,7 @@ static void __do_ice_checks(struct ice_agent *ag) {
/* skip dead streams */
ps = pair->packet_stream;
if (!ps || !ps->sfd)
if (!ps || !ps->selected_sfd)
continue;
if (PAIR_ISSET(pair, FAILED))
continue;
@ -972,10 +972,11 @@ found:
static int __check_valid(struct ice_agent *ag) {
struct call_media *media = ag->media;
struct packet_stream *ps;
GList *l, *k;
GList *l, *k, *m;
GQueue all_compos;
struct ice_candidate_pair *pair;
const struct local_intf *ifa;
// const struct local_intf *ifa;
struct stream_fd *sfd;
__get_complete_valid_pairs(&all_compos, ag);
@ -988,12 +989,13 @@ static int __check_valid(struct ice_agent *ag) {
ilog(LOG_DEBUG, "ICE completed, using pair "PAIR_FORMAT, PAIR_FMT(pair));
AGENT_SET(ag, COMPLETED);
ifa = g_atomic_pointer_get(&media->local_intf);
if (ifa != pair->local_intf
&& g_atomic_pointer_compare_and_exchange(&media->local_intf, ifa,
pair->local_intf))
ilog(LOG_INFO, "ICE negotiated: local interface %s",
sockaddr_print_buf(&pair->local_intf->spec->address.addr));
// XXX restore this
// ifa = g_atomic_pointer_get(&media->local_intf);
// if (ifa != pair->local_intf
// && g_atomic_pointer_compare_and_exchange(&media->local_intf, ifa,
// pair->local_intf))
// ilog(LOG_INFO, "ICE negotiated: local interface %s",
// sockaddr_print_buf(&pair->local_intf->spec->address.addr));
for (l = media->streams.head, k = all_compos.head; l && k; l = l->next, k = k->next) {
ps = l->data;
@ -1006,6 +1008,13 @@ static int __check_valid(struct ice_agent *ag) {
ps->endpoint = pair->remote_candidate->endpoint;
}
mutex_unlock(&ps->out_lock);
for (m = ps->sfds.head; m; m = m->next) {
sfd = m->data;
if (sfd->local_intf != pair->local_intf)
continue;
ps->selected_sfd = sfd;
}
}
call_media_unkernelize(media);


+ 1083
- 12
daemon/media_socket.c
File diff suppressed because it is too large
View File


+ 17
- 3
daemon/media_socket.h View File

@ -49,6 +49,10 @@ struct local_intf {
unsigned int preference; /* starting with 0 */
const struct logical_intf *logical;
};
struct intf_list {
const struct local_intf *local_intf;
GQueue list;
};
struct stream_fd {
struct obj obj;
socket_t socket; /* RO */
@ -67,14 +71,24 @@ struct logical_intf *get_logical_interface(const str *name, sockfamily_t *fam);
struct local_intf *get_interface_address(const struct logical_intf *lif, sockfamily_t *fam);
struct local_intf *get_any_interface_address(const struct logical_intf *lif, sockfamily_t *fam);
int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c);
void release_port(socket_t *r, const struct local_intf *);
void set_tos(int fd, unsigned int tos);
//int get_port(socket_t *r, unsigned int port, const struct local_intf *lif, const struct call *c);
//void release_port(socket_t *r, const struct local_intf *);
void set_tos(socket_t *, unsigned int tos);
int get_consecutive_ports(GQueue *out, unsigned int num_ports, const struct logical_intf *log);
struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct local_intf *lif);
void free_intf_list(struct intf_list *il);
void free_socket_intf_list(struct intf_list *il);
INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_intf *lif) {
return open_socket(r, SOCK_DGRAM, port, &lif->spec->address.addr);
}
void kernelize(struct packet_stream *);
void __unkernelize(struct packet_stream *);
void unkernelize(struct packet_stream *);
void __stream_unconfirm(struct packet_stream *);
/* XXX shouldnt be necessary */
INLINE struct local_intf *get_interface_from_address(const struct logical_intf *lif,
const sockaddr_t *addr, socktype_t *type)


+ 14
- 14
daemon/sdp.c View File

@ -1400,7 +1400,7 @@ static int replace_media_port(struct sdp_chopper *chop, struct sdp_media *media,
if (copy_up_to(chop, port))
return -1;
p = ps->sfd ? ps->sfd->socket.local.port : 0;
p = ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0;
chopper_append_printf(chop, "%u", p);
if (skip_over(chop, port))
@ -1415,7 +1415,7 @@ static int replace_consecutive_port_count(struct sdp_chopper *chop, struct sdp_m
int cons;
struct packet_stream *ps_n;
if (media->port_count == 1 || !ps->sfd)
if (media->port_count == 1 || !ps->selected_sfd)
return 0;
for (cons = 1; cons < media->port_count; cons++) {
@ -1423,7 +1423,7 @@ static int replace_consecutive_port_count(struct sdp_chopper *chop, struct sdp_m
if (!j)
goto warn;
ps_n = j->data;
if (ps_n->sfd->socket.local.port != ps->sfd->socket.local.port + cons * 2) {
if (ps_n->selected_sfd->socket.local.port != ps->selected_sfd->socket.local.port + cons * 2) {
warn:
ilog(LOG_WARN, "Failed to handle consecutive ports");
break;
@ -1435,18 +1435,18 @@ warn:
return 0;
}
static int insert_ice_address(struct sdp_chopper *chop, struct packet_stream *ps, struct local_intf *ifa) {
static int insert_ice_address(struct sdp_chopper *chop, struct packet_stream *ps, const struct local_intf *ifa) {
char buf[64];
int len;
call_stream_address46(buf, ps, SAF_ICE, &len, ifa);
chopper_append_dup(chop, buf, len);
chopper_append_printf(chop, " %u", ps->sfd->socket.local.port);
chopper_append_printf(chop, " %u", ps->selected_sfd->socket.local.port);
return 0;
}
static int insert_raddr_rport(struct sdp_chopper *chop, struct packet_stream *ps, struct local_intf *ifa) {
static int insert_raddr_rport(struct sdp_chopper *chop, struct packet_stream *ps, const struct local_intf *ifa) {
char buf[64];
int len;
@ -1454,7 +1454,7 @@ static int insert_raddr_rport(struct sdp_chopper *chop, struct packet_stream *ps
call_stream_address46(buf, ps, SAF_ICE, &len, ifa);
chopper_append_dup(chop, buf, len);
chopper_append_c(chop, " rport ");
chopper_append_printf(chop, "%u", ps->sfd->socket.local.port);
chopper_append_printf(chop, "%u", ps->selected_sfd->socket.local.port);
return 0;
}
@ -1480,7 +1480,7 @@ static int replace_network_address(struct sdp_chopper *chop, struct network_addr
if (!is_addr_unspecified(&flags->parsed_media_address))
len = sprintf(buf, "%s", sockaddr_print_buf(&flags->parsed_media_address));
else
call_stream_address(buf, ps, SAF_NG, &len);
call_stream_address46(buf, ps, SAF_NG, &len, NULL);
chopper_append_dup(chop, buf, len);
if (skip_over(chop, &address->address))
@ -1682,7 +1682,7 @@ out:
static void insert_candidate(struct sdp_chopper *chop, struct packet_stream *ps, unsigned int component,
unsigned int type_pref, unsigned int local_pref, enum ice_candidate_type type,
struct local_intf *ifa)
const struct local_intf *ifa)
{
unsigned long priority;
@ -1703,7 +1703,7 @@ static void insert_candidates(struct sdp_chopper *chop, struct packet_stream *rt
struct sdp_ng_flags *flags, struct sdp_media *sdp_media)
{
GList *l;
struct local_intf *ifa;
const struct local_intf *ifa;
unsigned int pref;
struct call_media *media;
const struct logical_intf *lif;
@ -1728,7 +1728,7 @@ static void insert_candidates(struct sdp_chopper *chop, struct packet_stream *rt
lif = ag ? ag->logical_intf : media->logical_intf;
if (ag && AGENT_ISSET(ag, COMPLETED)) {
ifa = g_atomic_pointer_get(&media->local_intf);
ifa = rtp->selected_sfd->local_intf;
insert_candidate(chop, rtp, 1, type_pref, ifa->preference, cand_type, ifa);
if (rtcp) /* rtcp-mux only possible in answer */
insert_candidate(chop, rtcp, 2, type_pref, ifa->preference, cand_type, ifa);
@ -1940,7 +1940,7 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call_monologu
assert(j->data == ps_rtcp);
}
if (!sdp_media->port_num || !ps->sfd)
if (!sdp_media->port_num || !ps->selected_sfd)
goto next;
if (MEDIA_ARESET2(call_media, SEND, RECV))
@ -1954,13 +1954,13 @@ int sdp_replace(struct sdp_chopper *chop, GQueue *sessions, struct call_monologu
if (MEDIA_ISSET(call_media, RTCP_MUX) && flags->opmode == OP_ANSWER) {
chopper_append_c(chop, "a=rtcp:");
chopper_append_printf(chop, "%u", ps->sfd->socket.local.port);
chopper_append_printf(chop, "%u", ps->selected_sfd->socket.local.port);
chopper_append_c(chop, "\r\na=rtcp-mux\r\n");
ps_rtcp = NULL;
}
else if (ps_rtcp && !flags->ice_force_relay) {
chopper_append_c(chop, "a=rtcp:");
chopper_append_printf(chop, "%u", ps_rtcp->sfd->socket.local.port);
chopper_append_printf(chop, "%u", ps_rtcp->selected_sfd->socket.local.port);
if (!MEDIA_ISSET(call_media, RTCP_MUX))
chopper_append_c(chop, "\r\n");
else


+ 2
- 2
daemon/stun.c View File

@ -378,7 +378,7 @@ static void stun_error_len(struct packet_stream *ps, const endpoint_t *sin, cons
fingerprint(&mh, &fp);
output_finish_src(&mh, dst);
socket_sendmsg(&ps->sfd->socket, &mh, sin);
socket_sendmsg(&ps->selected_sfd->socket, &mh, sin);
}
#define stun_error(ps, sin, dst, req, code, reason) \
@ -479,7 +479,7 @@ static int stun_binding_success(struct packet_stream *ps, struct header *req, st
fingerprint(&mh, &fp);
output_finish_src(&mh, dst);
socket_sendmsg(&ps->sfd->socket, &mh, sin);
socket_sendmsg(&ps->selected_sfd->socket, &mh, sin);
return 0;
}


Loading…
Cancel
Save