diff --git a/README.md b/README.md index a68884ff7..55d3d7866 100644 --- a/README.md +++ b/README.md @@ -168,12 +168,10 @@ option and which are reproduced below: -f, --foreground Don't fork to background -m, --port-min=INT Lowest port to use for RTP -M, --port-max=INT Highest port to use for RTP - -r, --redis=IP:PORT Connect to Redis database - -R, --redis-db=INT Which Redis DB to use - -z, --redis-read=IP:PORT Connect to Redis read database - -Z, --redis-read-db=INT Which Redis read DB to use - -w, --redis-write=IP:PORT Connect to Redis write database - -W, --redis-write-db=INT Which Redis write DB to use + -r, --redis=[PW@]IP:PORT/INT Connect to Redis database + -w, --redis-write=[PW@]IP:PORT/INT Connect to Redis write database + --redis-num-threads=INT Number of Redis restore threads + -q, --no-redis-required Start even if can't connect to redis databases -b, --b2b-url=STRING XMLRPC URL of B2B UA -L, --log-level=INT Mask log priorities above this level --log-facility=daemon|local0|... Syslog facility to use for logging @@ -334,11 +332,6 @@ The options are described in more detail below. Log to stderr instead of syslog. Only useful in combination with `--foreground`. -* -x, --xmlrpc-format - - Selects the internal format of the XMLRPC callback message for B2BUA call teardown. 0 is for SEMS, - 1 is for a generic format containing the call-ID only. - * --num-threads How many worker threads to create, must be at least one. The default is to create as many threads @@ -361,9 +354,60 @@ The options are described in more detail below. Delete the call from memory after the specified delay from memory. Can be set to zero for immediate call deletion. -* -r, --redis, -R, --redis-db, -z, --redis-read, -Z, --redis-read-db, -w, --redis-write, -W, --redis-write-db, -b, --b2b-url +* -r, --redis + + Connect to specified Redis database (with the given database number) and use it for persistence + storage. The format of this option is `ADDRESS:PORT/DBNUM`, for example `127.0.0.1:6379/12` + to connect to the Redis DB number 12 running on localhost on the default Redis port. + + If the Redis database is protected with an authentication password, the password can be supplied + by prefixing the argument value with the password, separated by an `@` symbol, for example + `foobar@127.0.0.1:6379/12`. Note that this leaves the password visible in the process list, + posing a security risk if untrusted users access the same system. As an alternative, the password + can also be supplied in the shell environment through the environment variable + `RTPENGINE_REDIS_AUTH_PW`. + + On startup, *rtpengine* will read the contents of this database and restore all calls + stored therein. During runtime operation, *rtpengine* will continually update the database's + contents to keep it current, so that in case of a service disruption, the last state can be restored + upon a restart. + + When this option is given, *rtpengine* will delay startup until the Redis database adopts the + master role (but see below). + +* -w, --redis-write + + Configures a second Redis database for write operations. If this option is given in addition to the + first one, then the first database will be used for read operations (i.e. to restore calls from) while + the second one will be used for write operations (to update states in the database). + + For password protected Redis servers, the environment variable for the password is + `RTPENGINE_REDIS_WRITE_AUTH_PW`. + + When both options are given, *rtpengine* will start and use the Redis database regardless of the + database's role (master or slave). + +* --redis-num-threads + + How many redis restore threads to create. The default is four. + +* -q, --no-redis-required + When this paramter is present or NO_REDIS_REQUIRED='yes' or '1' in config file, rtpengine starts even + if there is no initial connection to redis databases(either to -r or to -w or to both redis). - NGCP-specific options + Be aware that if the -r redis can't be initially connected, sessions are not reloaded upon rtpengine startup, + even though rtpengine still starts. + +* -b, --b2b-url + + Enables and sets the URI for an XMLRPC callback to be made when a call is torn down due to packet + timeout. The special code `%%` can be used in place of an IP address, in which case the source address + of the originating request will be used. + +* -x, --xmlrpc-format + + Selects the internal format of the XMLRPC callback message for B2BUA call teardown. 0 is for SEMS, + 1 is for a generic format containing the call-ID only. * -g, --graphite @@ -557,26 +601,6 @@ then the start-up sequence might look like this: With this setup, the SIP proxy can choose which instance of *rtpengine* to talk to and thus which local interface to use by sending its control messages to either port 2223 or port 2224. -REDIS Database interaction -------------------------- - -Rtpengine is able to write call details in redis database and retore the calls from the same database. -To configure the redis parameters have a look on -r/-R, -z/-Z, -w/-W parameters. - -The three REDIS params could be specified in rtpengine's config file: -* REDIS -> specify the redis database for both read and write ops; expected role of redis is checked to be master and won't start if it is not -* REDIS_READ -> specify the redis database for read ops; expected role of redis may be any, master or slave -* REDIS_WRITE -> specify the redis database for write ops; expected role of redis could be any, master or slave - -One can specify combinations of REDIS/REDIS_READ/WRITE at the same time. For example: -* REDIS=IP1 -> will use IP1 for read and write operations -* REDIS=IP1, REDIS_READ=IP2 -> will use IP1 for write and IP2 for read operations -* REDIS=IP1, REDIS_WRITE=IP2 -> will use IP1 for read and IP2 for write operations -* REDIS=IP1, REDIS_READ=IP2, REDIS_WRITE=IP3 -> will use IP2 for read and IP3 for write operations -* REDIS_READ=IP1, REDIS_WRITE=IP2 -> will use IP1 for read and IP2 for write operations -* REDIS_READ=IP1 -> will use IP1 for read operations; write operations are ignored -* REDIS_WRITE=IP1 -> will use IP1 for write operations; read operations are ignored - The *ng* Control Protocol ========================= @@ -717,6 +741,12 @@ Optionally included keys are: This flag is valid only in an `offer` message and is useful when the call has been transferred to a new endpoint without change of `From` or `To` tags. + - `port latching` + + Forces *rtpengine* to retain its local ports during a signalling exchange even when the + remote endpoint changes its port. + + * `replace` Similar to the `flags` list. Controls which parts of the SDP body should be rewritten. diff --git a/daemon/call.c b/daemon/call.c index 0a690f95c..0da0acbf5 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -573,13 +573,8 @@ static void callmaster_timer(void *ptr) { rwlock_unlock_r(&sfd->call->master_lock); - if (update) { - if (m->conf.redis_write) { - redis_update(ps->call, m->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER); - } else if (m->conf.redis) { - redis_update(ps->call, m->conf.redis, MASTER_REDIS_ROLE, OP_OTHER); - } - } + if (update) + redis_update(ps->call, m->conf.redis_write); next: g_hash_table_remove(hlp.addr_sfd, &ep); @@ -689,7 +684,7 @@ static struct call_media *__get_media(struct call_monologue *ml, GList **it, con } static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigned int num_ports, - const struct endpoint *ep) + const struct endpoint *ep, const struct sdp_ng_flags *flags) { GList *l; struct endpoint_map *em; @@ -712,13 +707,17 @@ static struct endpoint_map *__get_endpoint_map(struct call_media *media, unsigne } if (!ep) /* creating wildcard map */ break; - /* handle zero endpoint address */ - if (is_addr_unspecified(&ep->address) || is_addr_unspecified(&em->endpoint.address)) { + + if (flags && flags->port_latching) + /* do nothing - ignore endpoint addresses */ ; + else if (is_addr_unspecified(&ep->address) || is_addr_unspecified(&em->endpoint.address)) { + /* handle zero endpoint address: only compare ports */ if (ep->port != em->endpoint.port) continue; } else if (memcmp(&em->endpoint, ep, sizeof(*ep))) continue; + if (em->num_ports >= num_ports) { if (is_addr_unspecified(&em->endpoint.address)) em->endpoint.address = ep->address; @@ -772,45 +771,51 @@ next_il: } static void __assign_stream_fds(struct call_media *media, GQueue *intf_sfds) { - GList *l, *k, *m; + GList *l, *k; struct packet_stream *ps; - struct stream_fd *sfd; + struct stream_fd *sfd, *intf_sfd; struct intf_list *il; - int first = 1; + int sfd_found; + + for (k = media->streams.head; k; k = k->next) { + ps = k->data; - for (l = intf_sfds->head; l; l = l->next) { - il = l->data; + g_queue_clear(&ps->sfds); + sfd_found = 0; + intf_sfd = NULL; - for (m = il->list.head, k = media->streams.head; m && k; m = m->next, k = k->next) { - sfd = m->data; - ps = k->data; + for (l = intf_sfds->head; l; l = l->next) { + il = l->data; - if (first) - g_queue_clear(&ps->sfds); + sfd = g_queue_peek_nth(&il->list, ps->component - 1); sfd->stream = ps; g_queue_push_tail(&ps->sfds, sfd); - if (!ps->selected_sfd) - ps->selected_sfd = sfd; + if (ps->selected_sfd == sfd) + sfd_found = 1; + if (ps->selected_sfd && sfd->local_intf == ps->selected_sfd->local_intf) + intf_sfd = sfd; + } - /* XXX: - * check whether previous/currect selected_sfd is actually part of - * current sfds list. - * if selected_sfd changes, take previously selected interface into account. - * handle crypto/dtls resets by moving contexts into sfd struct. - * handle ice resets too. - */ + if (!ps->selected_sfd || !sfd_found) { + if (intf_sfd) + ps->selected_sfd = intf_sfd; + else + ps->selected_sfd = g_queue_peek_nth(&ps->sfds, 0); } - first = 0; + /* XXX: + * handle crypto/dtls resets by moving contexts into sfd struct. + * handle ice resets too. + */ } } static int __wildcard_endpoint_map(struct call_media *media, unsigned int num_ports) { struct endpoint_map *em; - em = __get_endpoint_map(media, num_ports, NULL); + em = __get_endpoint_map(media, num_ports, NULL, NULL); if (!em) return -1; @@ -929,7 +934,13 @@ static int __init_stream(struct packet_stream *ps) { crypto_init(&ps->selected_sfd->crypto, &media->sdes_in.params); if (MEDIA_ISSET(media, DTLS) && !PS_ISSET(ps, FALLBACK_RTCP)) { - active = (PS_ISSET(ps, FILLED) && MEDIA_ISSET(media, SETUP_ACTIVE)); + active = dtls_is_active(&ps->selected_sfd->dtls); + // we try to retain our role if possible, but must handle a role switch + if ((active && !MEDIA_ISSET(media, SETUP_ACTIVE)) + || (!active && !MEDIA_ISSET(media, SETUP_PASSIVE))) + active = -1; + if (active == -1) + active = (PS_ISSET(ps, FILLED) && MEDIA_ISSET(media, SETUP_ACTIVE)); dtls_connection_init(ps, active, call->dtls_cert); if (!PS_ISSET(ps, FINGERPRINT_VERIFIED) && media->fingerprint.hash_func @@ -1139,7 +1150,7 @@ static void __generate_crypto(const struct sdp_ng_flags *flags, struct call_medi } if (flags->opmode == OP_OFFER) { - /* we always offer actpass */ + /* we always must offer actpass */ MEDIA_SET(this, SETUP_PASSIVE); MEDIA_SET(this, SETUP_ACTIVE); } @@ -1346,9 +1357,10 @@ static void __tos_change(struct call *call, const struct sdp_ng_flags *flags) { static void __init_interface(struct call_media *media, const str *ifname, int num_ports) { /* we're holding master_lock in W mode here, so we can safely ignore the * atomic ops */ - //struct local_intf *ifa = (void *) media->local_intf; - if (!media->logical_intf /* || !ifa */) + if (!media->logical_intf) + goto get; + if (media->logical_intf->preferred_family != media->desired_family) goto get; if (!ifname || !ifname->s) return; @@ -1381,7 +1393,8 @@ get: } -static void __dtls_logic(const struct sdp_ng_flags *flags, struct call_media *media, +// process received a=setup and related attributes +static void __dtls_logic(const struct sdp_ng_flags *flags, struct call_media *other_media, struct stream_params *sp) { unsigned int tmp; @@ -1397,8 +1410,7 @@ static void __dtls_logic(const struct sdp_ng_flags *flags, struct call_media *me /* Special case: if this is an offer and actpass is being offered (as it should), * we would normally choose to be active. However, if this is a reinvite and we * were passive previously, we should retain this role. */ - if (flags && flags->opmode == OP_OFFER && MEDIA_ISSET(other_media, SETUP_ACTIVE) - && MEDIA_ISSET(other_media, SETUP_PASSIVE) + if (flags && flags->opmode == OP_OFFER && MEDIA_ARESET2(other_media, SETUP_ACTIVE, SETUP_PASSIVE) && (tmp & (MEDIA_FLAG_SETUP_ACTIVE | MEDIA_FLAG_SETUP_PASSIVE)) == MEDIA_FLAG_SETUP_PASSIVE) MEDIA_CLEAR(other_media, SETUP_ACTIVE); @@ -1566,7 +1578,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, if (sp->rtp_endpoint.port) { /* DTLS stuff */ - __dtls_logic(flags, media, other_media, sp); + __dtls_logic(flags, other_media, sp); /* control rtcp-mux */ __rtcp_mux_logic(flags, media, other_media); @@ -1627,7 +1639,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams, /* get that many ports for each side, and one packet stream for each port, then * assign the ports to the streams */ - em = __get_endpoint_map(media, num_ports, &sp->rtp_endpoint); + em = __get_endpoint_map(media, num_ports, &sp->rtp_endpoint, flags); if (!em) { goto error_ports; } else { @@ -1806,7 +1818,7 @@ void call_destroy(struct call *c) { struct packet_stream *ps=0, *ps2=0; struct stream_fd *sfd; struct poller *p; - GSList *l; + GList *l; int ret; struct call_monologue *ml; struct call_media *md; @@ -1844,12 +1856,9 @@ void call_destroy(struct call *c) { obj_put(c); if (c->redis_call_responsible) { - if (m->conf.redis_write) { - redis_delete(c, m->conf.redis_write, ANY_REDIS_ROLE); - } else if (m->conf.redis) { - redis_delete(c, m->conf.redis, MASTER_REDIS_ROLE); - } + redis_delete(c, m->conf.redis_write); } + rwlock_lock_w(&c->master_lock); /* at this point, no more packet streams can be added */ @@ -2174,7 +2183,7 @@ int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address else ifa = get_any_interface_address(ps->media->logical_intf, ps->media->desired_family); } - ifa_addr = &ifa->spec->address; + ifa_addr = &ifa->spec->local_address; sink = packet_stream_sink(ps); @@ -2185,7 +2194,7 @@ int call_stream_address46(char *o, struct packet_stream *ps, enum stream_address && !is_trickle_ice_address(&sink->advertised_endpoint)) l += sprintf(o + l, "%s", ifa_addr->addr.family->unspec_string); else - l += sprintf(o + l, "%s", sockaddr_print_buf(&ifa_addr->advertised)); + l += sprintf(o + l, "%s", sockaddr_print_buf(&ifa->advertised_address)); *len = l; return ifa_addr->addr.family->af; @@ -2233,6 +2242,7 @@ static void __call_free(void *p) { g_hash_table_destroy(c->tags); g_hash_table_destroy(c->viabranches); g_queue_clear(&c->medias); + g_queue_clear(&c->endpoint_maps); while (c->streams.head) { ps = g_queue_pop_head(&c->streams); @@ -2705,14 +2715,17 @@ void callmaster_get_all_calls(struct callmaster *m, GQueue *q) { } +#if 0 +// unused +// simplifty redis_write <> redis if put back into use static void calls_dump_iterator(void *key, void *val, void *ptr) { struct call *c = val; struct callmaster *m = c->callmaster; if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, OP_OTHER); + redis_update(c, m->conf.redis_write); } else if (m->conf.redis) { - redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, OP_OTHER); + redis_update(c, m->conf.redis); } } @@ -2721,7 +2734,7 @@ void calls_dump_redis(struct callmaster *m) { return; ilog(LOG_DEBUG, "Start dumping all call data to Redis...\n"); - redis_wipe(m->conf.redis, MASTER_REDIS_ROLE); + redis_wipe(m->conf.redis); g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); ilog(LOG_DEBUG, "Finished dumping all call data to Redis\n"); } @@ -2731,7 +2744,7 @@ void calls_dump_redis_read(struct callmaster *m) { return; ilog(LOG_DEBUG, "Start dumping all call data to read Redis...\n"); - redis_wipe(m->conf.redis_read, ANY_REDIS_ROLE); + redis_wipe(m->conf.redis_read); g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); ilog(LOG_DEBUG, "Finished dumping all call data to read Redis\n"); } @@ -2741,10 +2754,11 @@ void calls_dump_redis_write(struct callmaster *m) { return; ilog(LOG_DEBUG, "Start dumping all call data to write Redis...\n"); - redis_wipe(m->conf.redis_write, ANY_REDIS_ROLE); + redis_wipe(m->conf.redis_write); g_hash_table_foreach(m->callhash, calls_dump_iterator, NULL); ilog(LOG_DEBUG, "Finished dumping all call data to write Redis\n"); } +#endif const struct transport_protocol *transport_protocol(const str *s) { int i; diff --git a/daemon/call.h b/daemon/call.h index ec97008c6..f3411b6fe 100644 --- a/daemon/call.h +++ b/daemon/call.h @@ -430,7 +430,6 @@ struct callmaster_config { unsigned int silent_timeout; unsigned int delete_delay; struct redis *redis; - struct redis *redis_read; struct redis *redis_write; struct redis *redis_read_notify; struct event_base *redis_notify_event_base; @@ -440,6 +439,8 @@ struct callmaster_config { enum xmlrpc_format fmt; endpoint_t graphite_ep; int graphite_interval; + + int redis_num_threads; }; struct callmaster { @@ -482,9 +483,9 @@ void callmaster_get_all_calls(struct callmaster *m, GQueue *q); struct timeval add_ongoing_calls_dur_in_interval(struct callmaster *m, struct timeval *iv_start, struct timeval *iv_duration); -void calls_dump_redis(struct callmaster *); -void calls_dump_redis_read(struct callmaster *); -void calls_dump_redis_write(struct callmaster *); +//void calls_dump_redis(struct callmaster *); +//void calls_dump_redis_read(struct callmaster *); +//void calls_dump_redis_write(struct callmaster *); struct call_monologue *__monologue_create(struct call *call); void __monologue_tag(struct call_monologue *ml, const str *tag); void __monologue_viabranch(struct call_monologue *ml, const str *viabranch); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index 76bac7c53..89debe5fe 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -186,11 +186,7 @@ static str *call_update_lookup_udp(char **out, struct callmaster *m, enum call_o sp.index, sp.index, out[RE_UDP_COOKIE], SAF_UDP); rwlock_unlock_w(&c->master_lock); - if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, opmode); - } else if (m->conf.redis) { - redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, opmode); - } + redis_update(c, m->conf.redis_write); gettimeofday(&(monologue->started), NULL); @@ -338,11 +334,7 @@ out2: rwlock_unlock_w(&c->master_lock); streams_free(&s); - if (m->conf.redis_write) { - redis_update(c, m->conf.redis_write, ANY_REDIS_ROLE, opmode); - } else if (m->conf.redis) { - redis_update(c, m->conf.redis, MASTER_REDIS_ROLE, opmode); - } + redis_update(c, m->conf.redis_write); ilog(LOG_INFO, "Returning to SIP proxy: "STR_FORMAT"", STR_FMT0(ret)); obj_put(c); @@ -557,6 +549,8 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu out->reset = 1; else if (it->iov[1].iov_len >= 5 && !memcmp(it->iov[1].iov_base, "SDES-", 5)) ng_sdes_option(out, it, 5); + else if (!bencode_strcmp(it, "port-latching")) + out->port_latching = 1; else ilog(LOG_WARN, "Unknown flag encountered: '"BENCODE_FORMAT"'", BENCODE_FMT(it)); @@ -717,11 +711,7 @@ static const char *call_offer_answer_ng(bencode_item_t *input, struct callmaster ret = sdp_replace(chopper, &parsed, monologue->active_dialogue, &flags); rwlock_unlock_w(&call->master_lock); - if (m->conf.redis_write) { - redis_update(call, m->conf.redis_write, ANY_REDIS_ROLE, opmode); - } else if (m->conf.redis) { - redis_update(call, m->conf.redis, MASTER_REDIS_ROLE, opmode); - } + redis_update(call, m->conf.redis_write); obj_put(call); gettimeofday(&(monologue->started), NULL); diff --git a/daemon/call_interfaces.h b/daemon/call_interfaces.h index 55f1d9a4a..23ee7bea8 100644 --- a/daemon/call_interfaces.h +++ b/daemon/call_interfaces.h @@ -7,6 +7,7 @@ #include "str.h" #include "bencode.h" #include "socket.h" +#include "call.h" @@ -16,6 +17,44 @@ struct callmaster; struct control_stream; struct sockaddr_in6; +struct sdp_ng_flags { + enum call_opmode opmode; + str received_from_family; + str received_from_address; + str media_address; + str transport_protocol_str; + str address_family_str; + const struct transport_protocol *transport_protocol; + sockaddr_t parsed_received_from; + sockaddr_t parsed_media_address; + str direction[2]; + sockfamily_t *address_family; + int tos; + int asymmetric:1, + trust_address:1, + port_latching:1, + replace_origin:1, + replace_sess_conn:1, + ice_remove:1, + ice_force:1, + ice_force_relay:1, + rtcp_mux_offer:1, + rtcp_mux_demux:1, + rtcp_mux_accept:1, + rtcp_mux_reject:1, + strict_source:1, + media_handover:1, + dtls_passive:1, + reset:1, + dtls_off:1, + sdes_off:1, + sdes_unencrypted_srtp:1, + sdes_unencrypted_srtcp:1, + sdes_unauthenticated_srtp:1, + sdes_encrypted_srtp:1, + sdes_encrypted_srtcp:1, + sdes_authenticated_srtp:1; +}; extern int trust_address_def; extern int dtls_passive_def; diff --git a/daemon/cli.c b/daemon/cli.c index a831502a1..02836cee1 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -14,6 +14,7 @@ #include "call.h" #include "cli.h" #include "socket.h" +#include "redis.h" #include "rtpengine_config.h" @@ -349,7 +350,7 @@ static void cli_incoming_list(char* buffer, int len, struct callmaster* m, char* rwlock_lock_r(&m->hashlock); printlen = snprintf(replybuffer, outbufend-replybuffer, "Current sessions (own and foreign) running on rtpengine: %i\n", g_hash_table_size(m->callhash)); ADJUSTLEN(printlen,outbufend,replybuffer); - printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: %i\n", atomic64_get(&m->stats.foreign_sessions)); + printlen = snprintf(replybuffer, outbufend-replybuffer, "Current foreign sessions on rtpengine: "UINT64F"\n", atomic64_get(&m->stats.foreign_sessions)); ADJUSTLEN(printlen,outbufend,replybuffer); rwlock_unlock_r(&m->hashlock); } else if (len>=strlen(LIST_SESSIONS) && strncmp(buffer,LIST_SESSIONS,strlen(LIST_SESSIONS)) == 0) { diff --git a/daemon/dtls.h b/daemon/dtls.h index 17f1541a9..d546310c7 100644 --- a/daemon/dtls.h +++ b/daemon/dtls.h @@ -100,6 +100,14 @@ INLINE int is_dtls(const str *s) { return 0; } +// -1: not initialized, unknown or invalid +// 0 or 1: passive or active +INLINE int dtls_is_active(const struct dtls_connection *d) { + if (!d || !d->init) + return -1; + return d->active ? 1 : 0; +} + diff --git a/daemon/ice.c b/daemon/ice.c index fbf50c544..c822b4782 100644 --- a/daemon/ice.c +++ b/daemon/ice.c @@ -28,7 +28,7 @@ #define PAIR_FORMAT STR_FORMAT":"STR_FORMAT":%lu" #define PAIR_FMT(p) \ - STR_FMT(&(p)->local_intf->spec->ice_foundation), \ + STR_FMT(&(p)->local_intf->ice_foundation), \ STR_FMT(&(p)->remote_candidate->foundation), \ (p)->remote_candidate->component_id @@ -646,7 +646,7 @@ static void __do_ice_check(struct ice_candidate_pair *pair) { ilog(LOG_DEBUG, "Sending %sICE/STUN request for candidate pair "PAIR_FORMAT" from %s to %s", PAIR_ISSET(pair, TO_USE) ? "nominating " : "", - PAIR_FMT(pair), sockaddr_print_buf(&pair->local_intf->spec->address.addr), + PAIR_FMT(pair), sockaddr_print_buf(&pair->local_intf->spec->local_address.addr), endpoint_print_buf(&pair->remote_candidate->endpoint)); stun_binding_request(&pair->remote_candidate->endpoint, transact, &ag->pwd[0], ag->ufrag, @@ -868,7 +868,7 @@ static struct ice_candidate_pair *__learned_candidate(struct ice_agent *ag, stru cand = g_slice_alloc0(sizeof(*cand)); cand->component_id = ps->component; - cand->transport = sfd->local_intf->spec->address.type; // XXX add socket type into socket_t? + cand->transport = sfd->local_intf->spec->local_address.type; // XXX add socket type into socket_t? cand->priority = priority; cand->endpoint = *src; cand->type = ICT_PRFLX; @@ -1050,7 +1050,7 @@ static int __check_valid(struct ice_agent *ag) { ps->selected_sfd = sfd; if (ps->component == 1) ilog(LOG_INFO, "ICE negotiated: local interface %s", - sockaddr_print_buf(&pair->local_intf->spec->address.addr)); + sockaddr_print_buf(&pair->local_intf->spec->local_address.addr)); } } @@ -1210,7 +1210,7 @@ int ice_response(struct stream_fd *sfd, const endpoint_t *src, ilog(LOG_DEBUG, "Received ICE/STUN response code %u for candidate pair "PAIR_FORMAT" from %s to %s", attrs->error_code, PAIR_FMT(pair), endpoint_print_buf(&pair->remote_candidate->endpoint), - sockaddr_print_buf(&ifa->spec->address.addr)); + sockaddr_print_buf(&ifa->spec->local_address.addr)); /* verify endpoints */ err = "ICE/STUN response received, but source address didn't match remote candidate address"; diff --git a/daemon/main.c b/daemon/main.c index 84d129f2c..1b85df939 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -32,11 +32,6 @@ -#define REDIS_MODULE_VERSION "redis/9" - - - - #define die(x...) do { \ fprintf(stderr, x); \ fprintf(stderr, "\n"); \ @@ -65,7 +60,6 @@ endpoint_t ng_listen_ep; endpoint_t cli_listen_ep; endpoint_t graphite_ep; endpoint_t redis_ep; -endpoint_t redis_read_ep; endpoint_t redis_write_ep; static int tos; static int table = -1; @@ -76,8 +70,11 @@ static int port_min = 30000; static int port_max = 40000; static int max_sessions = -1; static int redis_db = -1; -static int redis_read_db = -1; static int redis_write_db = -1; +static int redis_num_threads; +static int no_redis_required; +static char *redis_auth; +static char *redis_write_auth; static char *b2b_url; static enum xmlrpc_format xmlrpc_fmt = XF_SEMS; static int num_threads; @@ -212,8 +209,8 @@ static struct intf_config *if_addr_parse(char *s) { ifa = g_slice_alloc0(sizeof(*ifa)); ifa->name = name; - ifa->address.addr = addr; - ifa->address.advertised = adv; + ifa->local_address.addr = addr; + ifa->advertised_address = adv; ifa->port_min = port_min; ifa->port_max = port_max; @@ -222,6 +219,39 @@ static struct intf_config *if_addr_parse(char *s) { +static int redis_ep_parse(endpoint_t *ep, int *db, char **auth, const char *auth_env, char *str) { + char *sl; + long l; + + sl = strchr(str, '@'); + if (sl) { + *sl = 0; + *auth = str; + str = sl+1; + } + else if ((sl = getenv(auth_env))) + *auth = sl; + + sl = strchr(str, '/'); + if (!sl) + return -1; + *sl = 0; + sl++; + if (!*sl) + return -1; + l = strtol(sl, &sl, 10); + if (*sl != 0) + return -1; + if (l < 0) + return -1; + *db = l; + if (endpoint_parse_any(ep, str)) + return -1; + return 0; +} + + + static void options(int *argc, char ***argv) { char **if_a = NULL; char **iter; @@ -233,7 +263,7 @@ static void options(int *argc, char ***argv) { char *graphitep = NULL; char *graphite_prefix_s = NULL; char *redisps = NULL; - char *redisps_read = NULL, *redisps_write = NULL; + char *redisps_write = NULL; char *log_facility_s = NULL; char *log_facility_cdr_s = NULL; char *log_facility_rtcp_s = NULL; @@ -259,12 +289,10 @@ static void options(int *argc, char ***argv) { { "foreground", 'f', 0, G_OPTION_ARG_NONE, &foreground, "Don't fork to background", NULL }, { "port-min", 'm', 0, G_OPTION_ARG_INT, &port_min, "Lowest port to use for RTP", "INT" }, { "port-max", 'M', 0, G_OPTION_ARG_INT, &port_max, "Highest port to use for RTP", "INT" }, - { "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "IP:PORT" }, - { "redis-db", 'R', 0, G_OPTION_ARG_INT, &redis_db, "Which Redis DB to use", "INT" }, - { "redis-read", 'z', 0, G_OPTION_ARG_STRING, &redisps_read, "Connect to Redis read database", "IP:PORT" }, - { "redis-read-db", 'Z', 0, G_OPTION_ARG_INT, &redis_read_db, "Which Redis read DB to use", "INT" }, - { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "IP:PORT" }, - { "redis-write-db", 'W', 0, G_OPTION_ARG_INT, &redis_write_db,"Which Redis write DB to use", "INT" }, + { "redis", 'r', 0, G_OPTION_ARG_STRING, &redisps, "Connect to Redis database", "[PW@]IP:PORT/INT" }, + { "redis-write",'w', 0, G_OPTION_ARG_STRING, &redisps_write, "Connect to Redis write database", "[PW@]IP:PORT/INT" }, + { "redis-num-threads", 0, 0, G_OPTION_ARG_INT, &redis_num_threads, "Number of Redis restore threads", "INT" }, + { "no-redis-required", 'q', 0, G_OPTION_ARG_NONE, &no_redis_required, "Start no matter of redis connection state", NULL }, { "b2b-url", 'b', 0, G_OPTION_ARG_STRING, &b2b_url, "XMLRPC URL of B2B UA" , "STRING" }, { "log-level", 'L', 0, G_OPTION_ARG_INT, (void *)&log_level,"Mask log priorities above this level","INT" }, { "log-facility",0, 0, G_OPTION_ARG_STRING, &log_facility_s, "Syslog facility to use for logging", "daemon|local0|...|local7"}, @@ -276,7 +304,7 @@ static void options(int *argc, char ***argv) { { "delete-delay", 'd', 0, G_OPTION_ARG_INT, &delete_delay, "Delay for deleting a session from memory.", "INT" }, { "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL }, { "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL }, - { "max-sessions", 0, 0, G_OPTION_ARG_INT, &max_sessions, "Limit of maximum number of sessions", NULL }, + { "max-sessions", 0, 0, G_OPTION_ARG_INT, &max_sessions, "Limit of maximum number of sessions", "INT" }, { NULL, } }; @@ -335,26 +363,14 @@ static void options(int *argc, char ***argv) { if (silent_timeout <= 0) silent_timeout = 3600; - if (redisps) { - if (endpoint_parse_any(&redis_ep, redisps)) - die("Invalid IP or port (--redis)"); - if (redis_db < 0) - die("Must specify Redis DB number (--redis-db) when using Redis"); - } + if (redisps) + if (redis_ep_parse(&redis_ep, &redis_db, &redis_auth, "RTPENGINE_REDIS_AUTH_PW", redisps)) + die("Invalid Redis endpoint [IP:PORT/INT] (--redis)"); - if (redisps_read) { - if (endpoint_parse_any(&redis_read_ep, redisps_read)) - die("Invalid Redis read IP or port (--redis-read)"); - if (redis_read_db < 0) - die("Must specify Redis read DB number (--redis-read-db) when using Redis"); - } - - if (redisps_write) { - if (endpoint_parse_any(&redis_write_ep, redisps_write)) - die("Invalid Redis write IP or port (--redis-write)"); - if (redis_write_db < 0) - die("Must specify Redis write DB number (--redis-write-db) when using Redis"); - } + if (redisps_write) + if (redis_ep_parse(&redis_write_ep, &redis_write_db, &redis_write_auth, + "RTPENGINE_REDIS_WRITE_AUTH_PW", redisps_write)) + die("Invalid Redis endpoint [IP:PORT/INT] (--redis-write)"); if (xmlrpc_fmt > 1) die("Invalid XMLRPC format"); @@ -526,6 +542,15 @@ no_kernel: mc.fmt = xmlrpc_fmt; mc.graphite_ep = graphite_ep; mc.graphite_interval = graphite_interval; + if (redis_num_threads < 1) { +#ifdef _SC_NPROCESSORS_ONLN + redis_num_threads = sysconf( _SC_NPROCESSORS_ONLN ); +#endif + if (redis_num_threads < 1) { + redis_num_threads = REDIS_RESTORE_NUM_THREADS; + } + } + mc.redis_num_threads = redis_num_threads; ct = NULL; if (tcp_listen_ep.port) { @@ -558,22 +583,21 @@ no_kernel: die("Failed to open UDP CLI connection port"); } - if (!is_addr_unspecified(&redis_ep.address)) { - mc.redis = mc.redis_read_notify = redis_new(&redis_ep, redis_db, MASTER_REDIS_ROLE); - if (!mc.redis) - die("Cannot start up without Redis database"); + if (!is_addr_unspecified(&redis_write_ep.address)) { + mc.redis_write = redis_new(&redis_write_ep, redis_write_db, redis_write_auth, ANY_REDIS_ROLE, no_redis_required); + if (!mc.redis_write) + die("Cannot start up without running Redis %s write database! See also NO_REDIS_REQUIRED paramter.", + endpoint_print_buf(&redis_write_ep)); } - if (!is_addr_unspecified(&redis_read_ep.address)) { - mc.redis_read = mc.redis_read_notify = redis_new(&redis_read_ep, redis_read_db, ANY_REDIS_ROLE); - if (!mc.redis_read) - die("Cannot start up without Redis read database"); - } + if (!is_addr_unspecified(&redis_ep.address)) { + mc.redis = mc.redis_read_notify = redis_new(&redis_ep, redis_db, redis_auth, mc.redis_write ? ANY_REDIS_ROLE : MASTER_REDIS_ROLE, no_redis_required); + if (!mc.redis) + die("Cannot start up without running Redis %s database! See also NO_REDIS_REQUIRED paramter.", + endpoint_print_buf(&redis_ep)); - if (!is_addr_unspecified(&redis_write_ep.address)) { - mc.redis_write = mc.redis_read_notify = redis_new(&redis_write_ep, redis_write_db, ANY_REDIS_ROLE); if (!mc.redis_write) - die("Cannot start up without Redis write database"); + mc.redis_write = mc.redis; } ctx->m->conf = mc; @@ -582,24 +606,21 @@ no_kernel: daemonize(); wpidfile(); - // start redis restore timer - gettimeofday(&redis_start, NULL); + if (mc.redis) { + // start redis restore timer + gettimeofday(&redis_start, NULL); - // restore - if (mc.redis_read) { - if (redis_restore(ctx->m, mc.redis_read, ANY_REDIS_ROLE)) - die("Refusing to continue without working Redis read database"); - } else if (mc.redis) { - if (redis_restore(ctx->m, mc.redis, MASTER_REDIS_ROLE)) + // restore + if (redis_restore(ctx->m, mc.redis)) die("Refusing to continue without working Redis database"); - } - // stop redis restore timer - gettimeofday(&redis_stop, NULL); + // stop redis restore timer + gettimeofday(&redis_stop, NULL); - // print redis restore duration - redis_diff += timeval_diff(&redis_stop, &redis_start) / 1000.0; - ilog(LOG_INFO, "Redis restore time = %.0lf ms", redis_diff); + // print redis restore duration + redis_diff += timeval_diff(&redis_stop, &redis_start) / 1000.0; + ilog(LOG_INFO, "Redis restore time = %.0lf ms", redis_diff); + } gettimeofday(&ctx->m->latest_graphite_interval_start, NULL); @@ -620,7 +641,7 @@ int main(int argc, char **argv) { thread_create_detach(sighandler, NULL); thread_create_detach(poller_timer_loop, ctx.p); - if (!is_addr_unspecified(&redis_read_ep.address) || !is_addr_unspecified(&redis_ep.address)) + if (!is_addr_unspecified(&redis_ep.address)) thread_create_detach(redis_notify, ctx.m); if (!is_addr_unspecified(&graphite_ep.address)) diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5dd7dd798..b9e1e7328 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -239,13 +239,13 @@ static int has_free_ports_loc(struct local_intf *loc, unsigned int num_ports) { if (num_ports > loc->spec->port_pool.free_ports) { ilog(LOG_ERR, "Didn't found %d ports available for %.*s/%s", num_ports, loc->logical->name.len, loc->logical->name.s, - sockaddr_print_buf(&loc->spec->address.addr)); + sockaddr_print_buf(&loc->spec->local_address.addr)); return 0; } __C_DBG("Found %d ports available for %.*s/%s from total of %d free ports", num_ports, loc->logical->name.len, loc->logical->name.s, - sockaddr_print_buf(&loc->spec->address.addr), + sockaddr_print_buf(&loc->spec->local_address.addr), loc->spec->port_pool.free_ports); return 1; @@ -437,28 +437,29 @@ static void __interface_append(struct intf_config *ifa, sockfamily_t *fam) { lif->preferred_family = fam; lif->addr_hash = g_hash_table_new(__addr_type_hash, __addr_type_eq); g_hash_table_insert(__logical_intf_name_family_hash, lif, lif); - if (ifa->address.addr.family == fam) { + if (ifa->local_address.addr.family == fam) { q = __interface_list_for_family(fam); g_queue_push_tail(q, lif); } } - spec = g_hash_table_lookup(__intf_spec_addr_type_hash, &ifa->address); + spec = g_hash_table_lookup(__intf_spec_addr_type_hash, &ifa->local_address); if (!spec) { spec = g_slice_alloc0(sizeof(*spec)); - spec->address = ifa->address; - ice_foundation(&spec->ice_foundation); + spec->local_address = ifa->local_address; spec->port_pool.min = ifa->port_min; spec->port_pool.max = ifa->port_max; spec->port_pool.free_ports = spec->port_pool.max - spec->port_pool.min + 1; - g_hash_table_insert(__intf_spec_addr_type_hash, &spec->address, spec); + g_hash_table_insert(__intf_spec_addr_type_hash, &spec->local_address, spec); } ifc = uid_slice_alloc(ifc, &lif->list); + ice_foundation(&ifc->ice_foundation); + ifc->advertised_address = ifa->advertised_address; ifc->spec = spec; ifc->logical = lif; - g_hash_table_insert(lif->addr_hash, (void *) &ifc->spec->address, ifc); + g_hash_table_insert(lif->addr_hash, (void *) &ifc->spec->local_address, ifc); } void interfaces_init(GQueue *interfaces) { @@ -477,7 +478,7 @@ void interfaces_init(GQueue *interfaces) { /* build primary lists first */ for (l = interfaces->head; l; l = l->next) { ifa = l->data; - __interface_append(ifa, ifa->address.addr.family); + __interface_append(ifa, ifa->local_address.addr.family); } /* then append to each other as lower-preference alternatives */ @@ -485,7 +486,7 @@ void interfaces_init(GQueue *interfaces) { fam = get_socket_family_enum(i); for (l = interfaces->head; l; l = l->next) { ifa = l->data; - if (ifa->address.addr.family == fam) + if (ifa->local_address.addr.family == fam) continue; __interface_append(ifa, fam); } @@ -534,7 +535,7 @@ struct local_intf *get_any_interface_address(const struct logical_intf *lif, soc /* XXX family specific? unify? */ static int get_port6(socket_t *r, unsigned int port, struct intf_spec *spec) { - if (open_socket(r, SOCK_DGRAM, port, &spec->address.addr)) + if (open_socket(r, SOCK_DGRAM, port, &spec->local_address.addr)) return -1; return 0; @@ -563,7 +564,8 @@ static int get_port(socket_t *r, unsigned int port, struct intf_spec *spec) { } g_atomic_int_dec_and_test(&pp->free_ports); - __C_DBG("%d free ports remaining on interface %s", pp->free_ports, sockaddr_print_buf(&spec->address.addr)); + __C_DBG("%d free ports remaining on interface %s", pp->free_ports, + sockaddr_print_buf(&spec->local_address.addr)); return 0; } @@ -662,12 +664,12 @@ release_restart: g_atomic_int_set(&pp->last_used, port); __C_DBG("Opened ports %u.. on interface %s for media relay", - ((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->address.addr)); + ((socket_t *) out->head->data)->local.port, sockaddr_print_buf(&spec->local_address.addr)); return 0; fail: ilog(LOG_ERR, "Failed to get %u consecutive ports on interface %s for media relay", - num_ports, sockaddr_print_buf(&spec->address.addr)); + num_ports, sockaddr_print_buf(&spec->local_address.addr)); return -1; } @@ -1393,13 +1395,8 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { out: ca = sfd->call ? : NULL; - if (ca && update) { - if (ca->callmaster->conf.redis_write) { - redis_update(ca, ca->callmaster->conf.redis, ANY_REDIS_ROLE, OP_OTHER); - } else if (ca->callmaster->conf.redis) { - redis_update(ca, ca->callmaster->conf.redis, MASTER_REDIS_ROLE, OP_OTHER); - } - } + if (ca && update) + redis_update(ca, ca->callmaster->conf.redis_write); done: log_info_clear(); } @@ -1428,6 +1425,7 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo sfd->call = obj_get(call); sfd->local_intf = lif; g_queue_push_tail(&call->stream_fds, sfd); /* hand over ref */ + g_slice_free1(sizeof(*fd), fd); /* moved into sfd, thus free */ __C_DBG("stream_fd_new localport=%d", sfd->socket.local.port); diff --git a/daemon/media_socket.h b/daemon/media_socket.h index 04e96682b..44277dfca 100644 --- a/daemon/media_socket.h +++ b/daemon/media_socket.h @@ -33,22 +33,23 @@ struct port_pool { struct intf_address { socktype_t *type; sockaddr_t addr; - sockaddr_t advertised; }; struct intf_config { str name; - struct intf_address address; + struct intf_address local_address; + sockaddr_t advertised_address; unsigned int port_min, port_max; }; struct intf_spec { - struct intf_address address; - str ice_foundation; + struct intf_address local_address; struct port_pool port_pool; }; struct local_intf { struct intf_spec *spec; + sockaddr_t advertised_address; unsigned int unique_id; /* starting with 0 - serves as preference */ const struct logical_intf *logical; + str ice_foundation; }; struct intf_list { const struct local_intf *local_intf; @@ -88,7 +89,7 @@ void free_intf_list(struct intf_list *il); void free_socket_intf_list(struct intf_list *il); INLINE int open_intf_socket(socket_t *r, unsigned int port, const struct local_intf *lif) { - return open_socket(r, SOCK_DGRAM, port, &lif->spec->address.addr); + return open_socket(r, SOCK_DGRAM, port, &lif->spec->local_address.addr); } void kernelize(struct packet_stream *); diff --git a/daemon/redis.c b/daemon/redis.c index 68a3baa80..d8f09ad63 100644 --- a/daemon/redis.c +++ b/daemon/redis.c @@ -5,6 +5,7 @@ #include #include #include +#include #include #include "redis.h" @@ -75,15 +76,22 @@ static redisReply *redis_get(struct redis *r, int type, const char *fmt, ...) { static int redisCommandNR(redisContext *r, const char *fmt, ...) { va_list ap; redisReply *ret; + int i = 0; va_start(ap, fmt); ret = redisvCommand(r, fmt, ap); va_end(ap); - if (ret) - freeReplyObject(ret); + if (!ret) + return -1; + + if (ret->type == REDIS_REPLY_ERROR) { + i = -1; + ilog(LOG_WARNING, "Redis returned error to command '%s': %s", fmt, ret->str); + } - return ret ? 0 : -1; + freeReplyObject(ret); + return i; } @@ -123,7 +131,7 @@ static void redis_consume(struct redis *r) { /* called with r->lock held if necessary */ -static int redis_connect(struct redis *r, int wait, int role) { +static int redis_connect(struct redis *r, int wait) { struct timeval tv; redisReply *rp; char *s; @@ -141,8 +149,14 @@ static int redis_connect(struct redis *r, int wait, int role) { if (r->ctx->err) goto err2; - if (redisCommandNR(r->ctx, "PING")) - goto err2; + if (r->auth) { + if (redisCommandNR(r->ctx, "AUTH %s", r->auth)) + goto err2; + } + else { + if (redisCommandNR(r->ctx, "PING")) + goto err2; + } if (redisCommandNR(r->ctx, "SELECT %i", r->db)) goto err2; @@ -160,19 +174,23 @@ static int redis_connect(struct redis *r, int wait, int role) { } if (!memcmp(s, "role:master", 9)) { - if (role == MASTER_REDIS_ROLE || role == ANY_REDIS_ROLE) { - ilog(LOG_INFO, "Connected to Redis in master mode"); + if (r->role == MASTER_REDIS_ROLE || r->role == ANY_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis %s in master mode", + endpoint_print_buf(&r->endpoint)); goto done; - } else if (role == SLAVE_REDIS_ROLE) { - ilog(LOG_INFO, "Connected to Redis in master mode, but wanted mode is slave; retrying..."); + } else if (r->role == SLAVE_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis %s in master mode, but wanted mode is slave; retrying...", + endpoint_print_buf(&r->endpoint)); goto next; } } else if (!memcmp(s, "role:slave", 8)) { - if (role == SLAVE_REDIS_ROLE || role == ANY_REDIS_ROLE) { - ilog(LOG_INFO, "Connected to Redis in slave mode"); + if (r->role == SLAVE_REDIS_ROLE || r->role == ANY_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis %s in slave mode", + endpoint_print_buf(&r->endpoint)); goto done; - } else if (role == MASTER_REDIS_ROLE) { - ilog(LOG_INFO, "Connected to Redis in slave mode, but wanted mode is master; retrying..."); + } else if (r->role == MASTER_REDIS_ROLE) { + ilog(LOG_INFO, "Connected to Redis %s in slave mode, but wanted mode is master; retrying...", + endpoint_print_buf(&r->endpoint)); goto next; } } else { @@ -194,23 +212,25 @@ done: err3: freeReplyObject(rp); err2: - if (r->ctx->err) - rlog(LOG_ERR, "Redis error: %s", r->ctx->errstr); - redisFree(r->ctx); - r->ctx = NULL; + if (r->ctx->err) { + rlog(LOG_ERR, "Failed to connect to Redis %s, error: %s", + endpoint_print_buf(&r->endpoint), r->ctx->errstr); + return -1; + } err: - rlog(LOG_ERR, "Failed to connect to master Redis database"); + rlog(LOG_ERR, "Failed to connect to Redis %s", + endpoint_print_buf(&r->endpoint)); return -1; } int str_cut(char *str, int begin, int len) { - int l = strlen(str); + int l = strlen(str); - if (len < 0) len = l - begin; - if (begin + len > l) len = l - begin; - memmove(str + begin, str + begin + len, l - len + 1); + if (len < 0) len = l - begin; + if (begin + len > l) len = l - begin; + memmove(str + begin, str + begin + len, l - len + 1); - return len; + return len; } static void redis_restore_call(struct redis *r, struct callmaster *m, const redisReply *id); @@ -222,11 +242,11 @@ void onRedisNotification(redisAsyncContext *actx, void *reply, void *privdata) { struct call* c; str callid; char db_str[16]; memset(&db_str, 0, 8); - char* pdbstr = db_str; - unsigned char* p = 0; + char *pdbstr = db_str; + char *p = 0; int dbno; - if (!(cm->conf.redis_read) && !(cm->conf.redis)) { + if (!(cm->conf.redis)) { rlog(LOG_ERROR, "A redis notification has been there but role was not 'master' or 'read'"); return; } @@ -317,51 +337,53 @@ err: void redis_notify_event_base_loopbreak(struct callmaster *cm) { event_base_loopbreak(cm->conf.redis_notify_event_base); - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, NULL, "punsubscribe"); } void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace) { - char* main_db_str[256]; memset(&main_db_str,0,256); - sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace); + char main_db_str[256]; + + memset(&main_db_str, 0, 256); + sprintf(main_db_str,"psubscribe __keyspace@%i*:notifier-*", keyspace); - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); } void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace) { - char* main_db_str[256]; memset(&main_db_str,0,256); - sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace); + char main_db_str[256]; + + memset(&main_db_str, 0, 256); + sprintf(main_db_str,"punsubscribe __keyspace@%i*:notifier-*", keyspace); - redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); + redisAsyncCommand(cm->conf.redis_notify_async_context, onRedisNotification, (void*)cm, main_db_str); } void redis_notify(void *d) { struct callmaster *cm = d; struct redis *r = 0; - if (cm->conf.redis_read) { - r = cm->conf.redis_read; - } else if (cm->conf.redis) { + if (cm->conf.redis) { r = cm->conf.redis; } else { - rlog(LOG_INFO, "I do not subscribe to redis notifications since redis role is not 'master' or 'read'"); + rlog(LOG_INFO, "I do not subscribe to redis notifications since no REDIS is configured"); return; } - cm->conf.redis_notify_event_base = event_base_new(); + cm->conf.redis_notify_event_base = event_base_new(); - cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); - if (cm->conf.redis_notify_async_context->err) { - rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr); - return; - } + cm->conf.redis_notify_async_context = redisAsyncConnect(r->host, r->endpoint.port); + if (cm->conf.redis_notify_async_context->err) { + rlog(LOG_ERROR, "Redis Notification error: %s\n", cm->conf.redis_notify_async_context->errstr); + return; + } - redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); + redisLibeventAttach(cm->conf.redis_notify_async_context, cm->conf.redis_notify_event_base); - redis_notify_subscribe_keyspace(cm,r->db); - event_base_dispatch(cm->conf.redis_notify_event_base); + redis_notify_subscribe_keyspace(cm,r->db); + event_base_dispatch(cm->conf.redis_notify_event_base); } -struct redis *redis_new(const endpoint_t *ep, int db, int role) { +struct redis *redis_new(const endpoint_t *ep, int db, const char *auth, enum redis_role role, int no_redis_required) { struct redis *r; r = g_slice_alloc0(sizeof(*r)); @@ -369,11 +391,25 @@ struct redis *redis_new(const endpoint_t *ep, int db, int role) { r->endpoint = *ep; sockaddr_print(&ep->address, r->host, sizeof(r->host)); r->db = db; + r->auth = auth; + r->role = role; + r->state = REDIS_STATE_DISCONNECTED; + r->no_redis_required = no_redis_required; mutex_init(&r->lock); - if (redis_connect(r, 10, role)) + if (redis_connect(r, 10)) { + if (r->no_redis_required) { + rlog(LOG_WARN, "Starting with no initial connection to Redis %s !", + endpoint_print_buf(&r->endpoint)); + return r; + } goto err; + } + // redis is connected + rlog(LOG_INFO, "Established initial connection to Redis %s", + endpoint_print_buf(&r->endpoint)); + r->state = REDIS_STATE_CONNECTED; return r; err: @@ -393,13 +429,38 @@ static void redis_close(struct redis *r) { -/* called with r->lock held if necessary */ -static void redis_check_conn(struct redis *r, int role) { - if (redisCommandNR(r->ctx, "PING") == 0) - return; - rlog(LOG_INFO, "Lost connection to Redis"); - if (redis_connect(r, 1, role)) - abort(); +/* must be called with r->lock held */ +static int redis_check_conn(struct redis *r) { + // try redis connection + if (redisCommandNR(r->ctx, "PING") == 0) { + // redis is connected + // redis_check_conn() executed well + return 0; + } + + // redis is disconnected + if (r->state == REDIS_STATE_CONNECTED) { + rlog(LOG_ERR, "Lost connection to Redis %s", + endpoint_print_buf(&r->endpoint)); + r->state = REDIS_STATE_DISCONNECTED; + } + + // try redis reconnect -> will free current r->ctx + if (redis_connect(r, 1)) { + // redis is disconnected + // redis_check_conn() executed well + return 0; + } + + // redis is connected + if (r->state == REDIS_STATE_DISCONNECTED) { + rlog(LOG_INFO, "RE-Established connection to Redis %s", + endpoint_print_buf(&r->endpoint)); + r->state = REDIS_STATE_CONNECTED; + } + + // redis_check_conn() executed well + return 0; } @@ -1182,7 +1243,6 @@ struct thread_ctx { GQueue r_q; mutex_t r_m; }; -#define RESTORE_NUM_THREADS 4 static void restore_thread(void *call_p, void *ctx_p) { struct thread_ctx *ctx = ctx_p; @@ -1202,7 +1262,7 @@ static void restore_thread(void *call_p, void *ctx_p) { mutex_unlock(&ctx->r_m); } -int redis_restore(struct callmaster *m, struct redis *r, int role) { +int redis_restore(struct callmaster *m, struct redis *r) { redisReply *calls, *call; int i, ret = -1; GThreadPool *gtp; @@ -1214,7 +1274,15 @@ int redis_restore(struct callmaster *m, struct redis *r, int role) { log_level |= LOG_FLAG_RESTORE; rlog(LOG_DEBUG, "Restoring calls from Redis..."); - redis_check_conn(r, role); + + mutex_lock(&r->lock); + redis_check_conn(r); + if (r->state == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + ret = 0; + goto err; + } + mutex_unlock(&r->lock); calls = redis_get(r, REDIS_REPLY_ARRAY, "SMEMBERS calls"); @@ -1226,9 +1294,9 @@ int redis_restore(struct callmaster *m, struct redis *r, int role) { ctx.m = m; mutex_init(&ctx.r_m); g_queue_init(&ctx.r_q); - for (i = 0; i < RESTORE_NUM_THREADS; i++) - g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, role)); - gtp = g_thread_pool_new(restore_thread, &ctx, RESTORE_NUM_THREADS, TRUE, NULL); + for (i = 0; i < m->conf.redis_num_threads; i++) + g_queue_push_tail(&ctx.r_q, redis_new(&r->endpoint, r->db, r->auth, r->role, r->no_redis_required)); + gtp = g_thread_pool_new(restore_thread, &ctx, m->conf.redis_num_threads, TRUE, NULL); for (i = 0; i < calls->elements; i++) { call = calls->element[i]; @@ -1339,8 +1407,7 @@ static void redis_update_dtls_fingerprint(struct redis *r, const char *pref, con */ /* must be called lock-free */ - -void redis_update(struct call *c, struct redis *r, int role, enum call_opmode opmode) { +void redis_update(struct call *c, struct redis *r) { GList *l, *n, *k, *m; struct call_monologue *ml, *ml2; @@ -1355,7 +1422,11 @@ void redis_update(struct call *c, struct redis *r, int role, enum call_opmode op return; mutex_lock(&r->lock); - redis_check_conn(r, role); + redis_check_conn(r); + if (r->state == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + return ; + } rwlock_lock_r(&c->master_lock); @@ -1617,12 +1688,16 @@ err: } /* must be called lock-free */ -void redis_delete(struct call *c, struct redis *r, int role) { +void redis_delete(struct call *c, struct redis *r) { if (!r) return; mutex_lock(&r->lock); - redis_check_conn(r, role); + redis_check_conn(r); + if (r->state == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + return ; + } rwlock_lock_r(&c->master_lock); if (redisCommandNR(r->ctx, "SELECT %i", c->redis_hosted_db)) @@ -1648,12 +1723,16 @@ err: -void redis_wipe(struct redis *r, int role) { +void redis_wipe(struct redis *r) { if (!r) return; mutex_lock(&r->lock); - redis_check_conn(r, role); + redis_check_conn(r); + if (r->state == REDIS_STATE_DISCONNECTED) { + mutex_unlock(&r->lock); + return ; + } redisCommandNR(r->ctx, "DEL calls"); mutex_unlock(&r->lock); } diff --git a/daemon/redis.h b/daemon/redis.h index 462bc7556..3adce6197 100644 --- a/daemon/redis.h +++ b/daemon/redis.h @@ -15,9 +15,19 @@ #include "call.h" -#define MASTER_REDIS_ROLE 0 -#define SLAVE_REDIS_ROLE 1 -#define ANY_REDIS_ROLE 2 +#define REDIS_RESTORE_NUM_THREADS 4 + + +enum redis_role { + MASTER_REDIS_ROLE = 0, + SLAVE_REDIS_ROLE = 1, + ANY_REDIS_ROLE = 2, +}; + +enum redis_state { + REDIS_STATE_DISCONNECTED = 0, + REDIS_STATE_CONNECTED = 1, +}; struct callmaster; struct call; @@ -27,11 +37,16 @@ struct call; struct redis { endpoint_t endpoint; char host[64]; + enum redis_role role; redisContext *ctx; int db; + const char *auth; mutex_t lock; unsigned int pipeline; + + int state; + int no_redis_required; }; struct redis_hash { redisReply *rr; @@ -77,11 +92,11 @@ INLINE gboolean g_hash_table_insert_check(GHashTable *h, gpointer k, gpointer v) void redis_notify(void *d); -struct redis *redis_new(const endpoint_t *, int, int); -int redis_restore(struct callmaster *, struct redis *, int); -void redis_update(struct call *, struct redis *, int, enum call_opmode); -void redis_delete(struct call *, struct redis *, int); -void redis_wipe(struct redis *, int); +struct redis *redis_new(const endpoint_t *, int, const char *, enum redis_role, int no_redis_required); +int redis_restore(struct callmaster *, struct redis *); +void redis_update(struct call *, struct redis *); +void redis_delete(struct call *, struct redis *); +void redis_wipe(struct redis *); void redis_notify_event_base_loopbreak(struct callmaster *cm); void redis_notify_subscribe_keyspace(struct callmaster *cm, int keyspace); void redis_notify_unsubscribe_keyspace(struct callmaster *cm, int keyspace); diff --git a/daemon/rtcp.c b/daemon/rtcp.c index 0eefc908e..bf226c853 100644 --- a/daemon/rtcp.c +++ b/daemon/rtcp.c @@ -480,7 +480,7 @@ void print_rtcp_common(char** cdrbufcur, const pjmedia_rtcp_common *common) { common->p, common->count, common->pt, - common->length, + ntohl(common->length), ntohl(common->ssrc)); } @@ -502,8 +502,8 @@ void print_rtcp_rr(char** cdrbufcur, const pjmedia_rtcp_rr* rr) { *cdrbufcur += sprintf(*cdrbufcur,"ssrc=%u, fraction_lost=%u, packet_loss=%u, last_seq=%u, jitter=%u, last_sr=%u, delay_since_last_sr=%u, ", ntohl(rr->ssrc), - ntohl(rr->fract_lost), - ntohl(packet_loss), + rr->fract_lost, + packet_loss, ntohl(rr->last_seq), ntohl(rr->jitter), ntohl(rr->lsr), diff --git a/daemon/sdp.c b/daemon/sdp.c index a77410636..454fac211 100644 --- a/daemon/sdp.c +++ b/daemon/sdp.c @@ -16,6 +16,7 @@ #include "rtp.h" #include "ice.h" #include "socket.h" +#include "call_interfaces.h" struct network_address { str network_type; @@ -190,6 +191,7 @@ struct sdp_attribute { ATTR_FINGERPRINT, ATTR_SETUP, ATTR_RTPMAP, + ATTR_IGNORE, } attr; union { @@ -520,8 +522,9 @@ static int parse_attribute_crypto(struct sdp_attribute *output) { return 0; error: - ilog(LOG_ERROR, "Failed to parse a=crypto attribute: %s", err); - return -1; + ilog(LOG_ERROR, "Failed to parse a=crypto attribute, ignoring: %s", err); + output->attr = ATTR_IGNORE; + return 0; } static int parse_attribute_rtcp(struct sdp_attribute *output) { @@ -1533,6 +1536,7 @@ static int process_session_attributes(struct sdp_chopper *chop, struct sdp_attri case ATTR_SENDRECV: case ATTR_FINGERPRINT: case ATTR_SETUP: + case ATTR_IGNORE: goto strip; case ATTR_GROUP: @@ -1596,10 +1600,12 @@ static int process_media_attributes(struct sdp_chopper *chop, struct sdp_media * case ATTR_RTCP_MUX: if (flags->ice_force_relay) break; + // fall thru case ATTR_INACTIVE: case ATTR_SENDONLY: case ATTR_RECVONLY: case ATTR_SENDRECV: + case ATTR_IGNORE: goto strip; case ATTR_EXTMAP: @@ -1693,7 +1699,7 @@ static void insert_candidate(struct sdp_chopper *chop, struct stream_fd *sfd, priority = ice_priority_pref(type_pref, local_pref, ps->component); chopper_append_c(chop, "a=candidate:"); - chopper_append_str(chop, &ifa->spec->ice_foundation); + chopper_append_str(chop, &ifa->ice_foundation); chopper_append_printf(chop, " %u UDP %lu ", ps->component, priority); insert_ice_address(chop, sfd); chopper_append_c(chop, " typ "); diff --git a/daemon/sdp.h b/daemon/sdp.h index c8e78facc..ed76f7844 100644 --- a/daemon/sdp.h +++ b/daemon/sdp.h @@ -7,44 +7,6 @@ #include "media_socket.h" -struct sdp_ng_flags { - enum call_opmode opmode; - str received_from_family; - str received_from_address; - str media_address; - str transport_protocol_str; - str address_family_str; - const struct transport_protocol *transport_protocol; - sockaddr_t parsed_received_from; - sockaddr_t parsed_media_address; - str direction[2]; - sockfamily_t *address_family; - int tos; - int asymmetric:1, - trust_address:1, - replace_origin:1, - replace_sess_conn:1, - ice_remove:1, - ice_force:1, - ice_force_relay:1, - rtcp_mux_offer:1, - rtcp_mux_demux:1, - rtcp_mux_accept:1, - rtcp_mux_reject:1, - strict_source:1, - media_handover:1, - dtls_passive:1, - reset:1, - dtls_off:1, - sdes_off:1, - sdes_unencrypted_srtp:1, - sdes_unencrypted_srtcp:1, - sdes_unauthenticated_srtp:1, - sdes_encrypted_srtp:1, - sdes_encrypted_srtcp:1, - sdes_authenticated_srtp:1; -}; - struct sdp_chopper { str *input; int position; diff --git a/daemon/socket.c b/daemon/socket.c index 653411997..3f69074d6 100644 --- a/daemon/socket.c +++ b/daemon/socket.c @@ -160,9 +160,9 @@ static int __ip4_is_specified(const sockaddr_t *a) { } static int __ip6_is_specified(const sockaddr_t *a) { return a->u.ipv6.s6_addr32[0] != 0 - && a->u.ipv6.s6_addr32[1] != 0 - && a->u.ipv6.s6_addr32[2] != 0 - && a->u.ipv6.s6_addr32[3] != 0; + || a->u.ipv6.s6_addr32[1] != 0 + || a->u.ipv6.s6_addr32[2] != 0 + || a->u.ipv6.s6_addr32[3] != 0; } static int __ip4_sockaddr2endpoint(endpoint_t *ep, const void *p) { const struct sockaddr_in *sin = p; diff --git a/daemon/stun.c b/daemon/stun.c index 0a03a03fd..2b52746cc 100644 --- a/daemon/stun.c +++ b/daemon/stun.c @@ -25,6 +25,7 @@ #define STUN_XOR_MAPPED_ADDRESS 0x0020 #define STUN_PRIORITY 0x0024 #define STUN_USE_CANDIDATE 0x0025 +#define STUN_SOFTWARE 0x8022 #define STUN_FINGERPRINT 0x8028 #define STUN_ICE_CONTROLLED 0x8029 #define STUN_ICE_CONTROLLING 0x802a @@ -100,6 +101,11 @@ struct priority { u_int32_t priority; } __attribute__ ((packed)); +struct software { + struct tlv tlv; + char str[128]; +} __attribute__ ((packed)); + @@ -182,7 +188,7 @@ static int stun_attributes(struct stun_attrs *out, str *s, u_int16_t *unknowns, out->priority = ntohl(*((u_int32_t *) attr.s)); break; - case 0x8022: /* software */ + case STUN_SOFTWARE: break; /* ignore but suppress warning message */ case STUN_XOR_MAPPED_ADDRESS: @@ -282,6 +288,8 @@ INLINE void __output_add(struct msghdr *mh, struct tlv *tlv, unsigned int len, u __output_add(mh, &(attr)->tlv, len + sizeof(struct tlv), code, NULL, 0) #define output_add_data(mh, attr, code, data, len) \ __output_add(mh, &(attr)->tlv, sizeof(*(attr)), code, data, len) +#define output_add_data_len_pad(mh, attr, code, data, len) \ + __output_add(mh, &(attr)->tlv, sizeof((attr)->tlv), code, data, len) static void __output_finish(struct msghdr *mh) { @@ -294,6 +302,12 @@ static void output_finish_src(struct msghdr *mh) { __output_finish(mh); } +static void software(struct msghdr *mh, struct software *sw) { + int i; + i = snprintf(sw->str, sizeof(sw->str) - 1, "rtpengine-%s", RTPENGINE_VERSION); + output_add_data_len_pad(mh, sw, STUN_SOFTWARE, sw->str, i); +} + static void fingerprint(struct msghdr *mh, struct fingerprint *fp) { int i; struct iovec *iov; @@ -355,9 +369,11 @@ static void stun_error_len(struct stream_fd *sfd, const endpoint_t *sin, struct fingerprint fp; struct generic aa; struct msghdr mh; - struct iovec iov[7]; /* hdr, ec, reason, aa, attr_cont, mi, fp */ + struct software sw; + struct iovec iov[9]; /* hdr, ec, reason, aa, attr_cont, mi, fp, sw x2 */ output_init(&mh, iov, &hdr, STUN_BINDING_ERROR_RESPONSE, req->transaction); + software(&mh, &sw); ec.codes = htonl(((code / 100) << 8) | (code % 100)); output_add_data(&mh, &ec, STUN_ERROR_CODE, reason, len); @@ -445,11 +461,13 @@ static int stun_binding_success(struct stream_fd *sfd, struct header *req, struc struct msg_integrity mi; struct fingerprint fp; struct msghdr mh; - struct iovec iov[4]; /* hdr, xma, mi, fp */ + struct software sw; + struct iovec iov[6]; /* hdr, xma, mi, fp, sw x2 */ output_init(&mh, iov, &hdr, STUN_BINDING_SUCCESS_RESPONSE, req->transaction); + software(&mh, &sw); - xma.port = htons(sin->port) ^ (STUN_COOKIE >> 16); + xma.port = htons(sin->port ^ (STUN_COOKIE >> 16)); if (sin->address.family->af == AF_INET) { xma.family = htons(0x01); xma.address[0] = sin->address.u.ipv4.s_addr ^ htonl(STUN_COOKIE); @@ -617,7 +635,7 @@ int stun_binding_request(const endpoint_t *dst, u_int32_t transaction[3], str *p { struct header hdr; struct msghdr mh; - struct iovec iov[8]; /* hdr, username x2, ice_controlled/ing, priority, uc, fp, mi */ + struct iovec iov[10]; /* hdr, username x2, ice_controlled/ing, priority, uc, fp, mi, sw x2 */ char username_buf[256]; int i; struct generic un_attr; @@ -626,8 +644,10 @@ int stun_binding_request(const endpoint_t *dst, u_int32_t transaction[3], str *p struct generic uc; struct fingerprint fp; struct msg_integrity mi; + struct software sw; output_init(&mh, iov, &hdr, STUN_BINDING_REQUEST, transaction); + software(&mh, &sw); i = snprintf(username_buf, sizeof(username_buf), STR_FORMAT":"STR_FORMAT, STR_FMT(&ufrags[0]), STR_FMT(&ufrags[1])); diff --git a/debian/control b/debian/control index c9323dba1..c4ee1d931 100644 --- a/debian/control +++ b/debian/control @@ -4,8 +4,7 @@ Priority: extra Maintainer: Sipwise Development Team Build-Depends: debhelper (>= 5), iptables-dev (>= 1.4), - libcurl4-openssl-dev | libcurl4-gnutls-dev | - libcurl3-openssl-dev | libcurl3-gnutls-dev, + libcurl4-openssl-dev | libcurl4-gnutls-dev | libcurl3-openssl-dev | libcurl3-gnutls-dev, libglib2.0-dev (>= 2.30), libhiredis-dev, libevent-2.0-5, @@ -15,7 +14,7 @@ Build-Depends: debhelper (>= 5), libxmlrpc-c3-dev (>= 1.16.07) | libxmlrpc-core-c3-dev (>= 1.16.07), markdown, zlib1g-dev -Standards-Version: 3.9.6 +Standards-Version: 3.9.7 Homepage: http://sipwise.com/ Package: ngcp-rtpengine-daemon diff --git a/debian/ngcp-rtpengine-daemon.default b/debian/ngcp-rtpengine-daemon.default index fab2f2510..161088ed8 100644 --- a/debian/ngcp-rtpengine-daemon.default +++ b/debian/ngcp-rtpengine-daemon.default @@ -17,10 +17,12 @@ TABLE=0 # PORT_MAX=50000 # REDIS=127.0.0.1:6379 # REDIS_DB=1 -# REDIS_READ=127.0.0.1:6379 -# REDIS_READ_DB=1 +# REDIS_AUTH_PW=foobar # REDIS_WRITE=127.0.0.1:6379 # REDIS_WRITE_DB=1 +# REDIS_WRITE_AUTH_PW=foobar +# REDIS_NUM_THREADS=8 +# NO_REDIS_REQUIRED=yes # B2B_URL=http://127.0.0.1:8090/ # LOG_LEVEL=6 # LOG_FACILITY=daemon @@ -30,6 +32,6 @@ TABLE=0 # DELETE_DELAY=30 # GRAPHITE=9006 # GRAPHITE_INTERVAL=60 -# GRAPHITE_PREFIX=myownprefix +# GRAPHITE_PREFIX=myownprefix. # MAX_SESSIONS=5000 # CREATE_IPTABLES_CHAIN=no diff --git a/debian/ngcp-rtpengine-daemon.init b/debian/ngcp-rtpengine-daemon.init index 7fbabc63f..5bf1e891f 100755 --- a/debian/ngcp-rtpengine-daemon.init +++ b/debian/ngcp-rtpengine-daemon.init @@ -62,12 +62,12 @@ fi [ -z "$TOS" ] || OPTIONS="$OPTIONS --tos=$TOS" [ -z "$PORT_MIN" ] || OPTIONS="$OPTIONS --port-min=$PORT_MIN" [ -z "$PORT_MAX" ] || OPTIONS="$OPTIONS --port-max=$PORT_MAX" -[ -z "$REDIS" ] || OPTIONS="$OPTIONS --redis=$REDIS" -[ -z "$REDIS_DB" ] || OPTIONS="$OPTIONS --redis-db=$REDIS_DB" -[ -z "$REDIS_READ" ] || OPTIONS="$OPTIONS --redis-read=$REDIS_READ" -[ -z "$REDIS_READ_DB" ] || OPTIONS="$OPTIONS --redis-read-db=$REDIS_READ_DB" -[ -z "$REDIS_WRITE" ] || OPTIONS="$OPTIONS --redis-write=$REDIS_WRITE" -[ -z "$REDIS_WRITE_DB" ] || OPTIONS="$OPTIONS --redis-write-db=$REDIS_WRITE_DB" +[ -z "$REDIS" -o -z "$REDIS_DB" ] || OPTIONS="$OPTIONS --redis=$REDIS/$REDIS_DB" +[ -z "$REDIS_AUTH_PW" ] || export RTPENGINE_REDIS_AUTH_PW="$REDIS_AUTH_PW" +[ -z "$REDIS_WRITE" -o -z "$REDIS_WRITE_DB" ] || OPTIONS="$OPTIONS --redis-write=$REDIS_WRITE/$REDIS_WRITE_DB" +[ -z "$REDIS_WRITE_AUTH_PW" ] || export RTPENGINE_REDIS_WRITE_AUTH_PW="$REDIS_WRITE_AUTH_PW" +[ -z "$REDIS_NUM_THREADS" ] || OPTIONS="$OPTIONS --redis-num-threads=$REDIS_NUM_THREADS" +[ -z "$NO_REDIS_REQUIRED" -o \( "$NO_REDIS_REQUIRED" != "1" -a "$NO_REDIS_REQUIRED" != "yes" \) ] || OPTIONS="$OPTIONS --no-redis-required" [ -z "$B2B_URL" ] || OPTIONS="$OPTIONS --b2b-url=$B2B_URL" [ -z "$NO_FALLBACK" -o \( "$NO_FALLBACK" != "1" -a "$NO_FALLBACK" != "yes" \) ] || OPTIONS="$OPTIONS --no-fallback" OPTIONS="$OPTIONS --table=$TABLE" diff --git a/el/rtpengine.init b/el/rtpengine.init index 5b584eb59..9bd441446 100644 --- a/el/rtpengine.init +++ b/el/rtpengine.init @@ -108,14 +108,9 @@ build_opts() { OPTS+=" --port-max=$PORT_MAX" fi - if [[ -n "$REDIS" ]] + if [[ -n "$REDIS" -a -n "$REDIS_DB" ]] then - OPTS+=" --redis=$REDIS" - fi - - if [[ -n "$REDIS_DB" ]] - then - OPTS+=" --redis-db=$REDIS_DB" + OPTS+=" --redis=$REDIS/$REDIS_DB" fi if [[ -n "$B2B_URL" ]] diff --git a/tests/simulator-ng.pl b/tests/simulator-ng.pl index 8a93fbc4d..e2894519a 100755 --- a/tests/simulator-ng.pl +++ b/tests/simulator-ng.pl @@ -18,7 +18,7 @@ use SRTP; my ($NUM, $RUNTIME, $STREAMS, $PAYLOAD, $INTERVAL, $RTCP_INTERVAL, $STATS_INTERVAL) = (1000, 30, 1, 160, 20, 5, 5); my ($NODEL, $IP, $IPV6, $KEEPGOING, $REINVITES, $PROTOS, $DEST, $SUITES, $NOENC, $RTCPMUX, $BUNDLE, $LAZY, - $CHANGE_SSRC); + $CHANGE_SSRC, $PORT_LATCHING); GetOptions( 'no-delete' => \$NODEL, 'num-calls=i' => \$NUM, @@ -40,6 +40,7 @@ GetOptions( 'bundle' => \$BUNDLE, 'lazy-params' => \$LAZY, 'change-ssrc' => \$CHANGE_SSRC, + 'port-latching' => \$PORT_LATCHING, ) or die; ($IP || $IPV6) or die("at least one of --local-ip or --local-ipv6 must be given"); @@ -635,6 +636,7 @@ a=rtpmap:111 opus/48000/2 'received from' => [ qw(IP4 127.0.0.1) ], 'rtcp-mux' => ['demux'], }; + $PORT_LATCHING and push(@{$dict->{flags}}, 'port latching'); #$viabranch and $dict->{'via-branch'} = $viabranch; if ($op eq 'offer') { $dict->{'from-tag'} = $$A{tag}; diff --git a/utils/ng-client b/utils/ng-client index a46b5fc0e..c69512fab 100755 --- a/utils/ng-client +++ b/utils/ng-client @@ -43,6 +43,7 @@ GetOptions( 'TOS=i' => \$options{'TOS'}, 'delete-delay=i' => \$options{'delete-delay'}, 'reset' => \$options{'reset'}, + 'port-latching' => \$options{'port latching'}, ) or die; my $cmd = shift(@ARGV) or die; @@ -52,7 +53,7 @@ my %packet = (command => $cmd); for my $x (split(',', 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,TOS,DTLS,via-branch,delete-delay')) { defined($options{$x}) and $packet{$x} = $options{$x}; } -for my $x (split(',', 'trust address,symmetric,asymmetric,force,strict source,media handover,sip source address,reset')) { +for my $x (split(',', 'trust address,symmetric,asymmetric,force,strict source,media handover,sip source address,reset,port latching')) { defined($options{$x}) and push(@{$packet{flags}}, $x); } for my $x (split(',', 'origin,session connection')) { diff --git a/utils/srtp-debug-helper b/utils/srtp-debug-helper index 1c0794957..c0c795340 100755 --- a/utils/srtp-debug-helper +++ b/utils/srtp-debug-helper @@ -25,11 +25,14 @@ else { $pack = pack("H*", $pack); } +my $roc = $ARGV[3] // 0; + print("Packet length: " . length($pack) . " bytes\n"); -my ($dec, $roc, $tag, $hmac) = SRTP::decrypt_rtp($cs, $skey, $ssalt, $sauth, 0, $pack); +my ($dec, $roc, $tag, $hmac) = SRTP::decrypt_rtp($cs, $skey, $ssalt, $sauth, $roc, $pack); print("Auth tag from packet: " . unpack("H*", $tag) . "\n"); -print("Computer auth tag: " . unpack("H*", $hmac) . "\n"); +print("Computed auth tag: " . unpack("H*", $hmac) . "\n"); +print("Decoded packet: " . unpack("H*", $dec) . "\n");