diff --git a/README.md b/README.md index 3c6fd7f70..b3971e5f5 100644 --- a/README.md +++ b/README.md @@ -1414,6 +1414,22 @@ Optionally included keys are: been requested for transcoding. Note that not all codecs support all packetization intervals. +* `supports` + + Contains a list of strings. Each string indicates support for an additional feature + that the controlling SIP proxy supports. Currently defined values are: + + * `load limit` + + Indicates support for an extension to the *ng* protocol to facilitate certain load + balancing mechanisms. If *rtpengine* is configured with certain session or load + limit options enabled (such as the `max-sessions` option), then normally *rtpengine* + would reply with an error to an `offer` if one of the limits is exceeded. If support + for the `load limit` extension is indicated, then instead of replying with an error, + *rtpengine* responds with the string `load limit` in the `result` key of the response + dictionary. The response dictionary may also contain the optional key `message` with + an explanatory string. No other key is required in the response dictionary. + An example of a complete `offer` request dictionary could be (SDP body abbreviated): diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index b644f4f66..c1b3c84f7 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -17,6 +17,7 @@ #include "str.h" #include "control_tcp.h" #include "control_udp.h" +#include "control_ng.h" #include "rtp.h" #include "ice.h" #include "recording.h" @@ -558,6 +559,10 @@ static void call_ng_flags_replace(struct sdp_ng_flags *out, str *s, void *dummy) ilog(LOG_WARN, "Unknown 'replace' flag encountered: '" STR_FORMAT "'", STR_FMT(s)); } +static void call_ng_flags_supports(struct sdp_ng_flags *out, str *s, void *dummy) { + if (!str_cmp(s, "load limit")) + out->supports_load_limit = 1; +} static void call_ng_flags_codec_list(struct sdp_ng_flags *out, str *s, void *qp) { str *s_copy; s_copy = g_slice_alloc(sizeof(*s_copy)); @@ -646,6 +651,7 @@ static void call_ng_process_flags(struct sdp_ng_flags *out, bencode_item_t *inpu call_ng_flags_list(out, input, "flags", call_ng_flags_flags, NULL); call_ng_flags_list(out, input, "replace", call_ng_flags_replace, NULL); + call_ng_flags_list(out, input, "supports", call_ng_flags_supports, NULL); diridx = 0; if ((list = bencode_dictionary_get_expect(input, "direction", BENCODE_LIST))) { @@ -714,13 +720,38 @@ static void call_ng_free_flags(struct sdp_ng_flags *flags) { g_queue_clear_full(&flags->codec_transcode, str_slice_free); } +static enum load_limit_reasons call_offer_session_limit(void) { + enum load_limit_reasons ret = LOAD_LIMIT_NONE; + + rwlock_lock_r(&rtpe_config.config_lock); + if (rtpe_config.max_sessions>=0) { + rwlock_lock_r(&rtpe_callhash_lock); + if (g_hash_table_size(rtpe_callhash) - + atomic64_get(&rtpe_stats.foreign_sessions) >= rtpe_config.max_sessions) + { + /* foreign calls can't get rejected + * total_rejected_sess applies only to "own" sessions */ + atomic64_inc(&rtpe_totalstats.total_rejected_sess); + atomic64_inc(&rtpe_totalstats_interval.total_rejected_sess); + ilog(LOG_ERROR, "Parallel session limit reached (%i)",rtpe_config.max_sessions); + + ret = LOAD_LIMIT_MAX_SESSIONS; + } + rwlock_unlock_r(&rtpe_callhash_lock); + } + + rwlock_unlock_r(&rtpe_config.config_lock); + + return ret; +} + static const char *call_offer_answer_ng(bencode_item_t *input, bencode_item_t *output, enum call_opmode opmode, const char* addr, const endpoint_t *sin) { str sdp, fromtag, totag = STR_NULL, callid, viabranch; str label = STR_NULL; - char *errstr; + const char *errstr; GQueue parsed = G_QUEUE_INIT; GQueue streams = G_QUEUE_INIT; struct call *call; @@ -744,12 +775,24 @@ static const char *call_offer_answer_ng(bencode_item_t *input, bencode_dictionary_get_str(input, "via-branch", &viabranch); bencode_dictionary_get_str(input, "label", &label); - if (sdp_parse(&sdp, &parsed)) - return "Failed to parse SDP"; - call_ng_process_flags(&flags, input); flags.opmode = opmode; + if (opmode == OP_OFFER) { + enum load_limit_reasons limit = call_offer_session_limit(); + if (limit != LOAD_LIMIT_NONE) { + if (!flags.supports_load_limit) + errstr = "Parallel session limit reached"; // legacy protocol + else + errstr = magic_load_limit_strings[limit]; + goto out; + } + } + + errstr = "Failed to parse SDP"; + if (sdp_parse(&sdp, &parsed)) + goto out; + if (flags.loop_protect && sdp_is_duplicate(&parsed)) { ilog(LOG_INFO, "Ignoring message as SDP has already been processed by us"); bencode_dictionary_add_str(output, "sdp", &sdp); @@ -874,25 +917,6 @@ out: const char *call_offer_ng(bencode_item_t *input, bencode_item_t *output, const char* addr, const endpoint_t *sin) { - rwlock_lock_r(&rtpe_config.config_lock); - if (rtpe_config.max_sessions>=0) { - rwlock_lock_r(&rtpe_callhash_lock); - if (g_hash_table_size(rtpe_callhash) - - atomic64_get(&rtpe_stats.foreign_sessions) >= rtpe_config.max_sessions) { - rwlock_unlock_r(&rtpe_callhash_lock); - /* foreign calls can't get rejected - * total_rejected_sess applies only to "own" sessions */ - atomic64_inc(&rtpe_totalstats.total_rejected_sess); - atomic64_inc(&rtpe_totalstats_interval.total_rejected_sess); - ilog(LOG_ERROR, "Parallel session limit reached (%i)",rtpe_config.max_sessions); - - rwlock_unlock_r(&rtpe_config.config_lock); - return "Parallel session limit reached"; - } - rwlock_unlock_r(&rtpe_callhash_lock); - } - - rwlock_unlock_r(&rtpe_config.config_lock); return call_offer_answer_ng(input, output, OP_OFFER, addr, sin); } diff --git a/daemon/call_interfaces.h b/daemon/call_interfaces.h index d58c1f52f..7e60a7837 100644 --- a/daemon/call_interfaces.h +++ b/daemon/call_interfaces.h @@ -58,6 +58,7 @@ struct sdp_ng_flags { reset:1, record_call:1, loop_protect:1, + supports_load_limit:1, dtls_off:1, sdes_off:1, sdes_unencrypted_srtp:1, diff --git a/daemon/control_ng.c b/daemon/control_ng.c index d73c267b6..aabc43b83 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -20,6 +20,10 @@ mutex_t rtpe_cngs_lock; GHashTable *rtpe_cngs_hash; struct control_ng *rtpe_control_ng; +const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = { + [LOAD_LIMIT_MAX_SESSIONS] = "Parallel session limit reached", +}; + static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) { // lock offers @@ -112,7 +116,7 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin struct control_ng *c = (void *) obj; bencode_buffer_t bencbuf; bencode_item_t *dict, *resp; - str cmd, cookie, data, reply, *to_send, callid; + str cmd = STR_NULL, cookie, data, reply, *to_send, callid; const char *errstr, *resultstr; struct iovec iov[3]; unsigned int iovlen; @@ -262,11 +266,18 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin goto send_resp; err_send: - ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", addr, errstr, STR_FMT(&data)); - bencode_dictionary_add_string(resp, "result", "error"); - bencode_dictionary_add_string(resp, "error-reason", errstr); - g_atomic_int_inc(&cur->errors); - cmd = STR_NULL; + if (errstr < magic_load_limit_strings[0] || errstr > magic_load_limit_strings[__LOAD_LIMIT_MAX-1]) { + ilog(LOG_WARNING, "Protocol error in packet from %s: %s ["STR_FORMAT"]", + addr, errstr, STR_FMT(&data)); + bencode_dictionary_add_string(resp, "result", "error"); + bencode_dictionary_add_string(resp, "error-reason", errstr); + g_atomic_int_inc(&cur->errors); + cmd = STR_NULL; + } + else { + bencode_dictionary_add_string(resp, "result", "load limit"); + bencode_dictionary_add_string(resp, "message", errstr); + } send_resp: bencode_collapse_str(resp, &reply); diff --git a/daemon/control_ng.h b/daemon/control_ng.h index c0dbb3af7..deea2928a 100644 --- a/daemon/control_ng.h +++ b/daemon/control_ng.h @@ -35,4 +35,12 @@ extern mutex_t rtpe_cngs_lock; extern GHashTable *rtpe_cngs_hash; extern struct control_ng *rtpe_control_ng; +enum load_limit_reasons { + LOAD_LIMIT_NONE = -1, + LOAD_LIMIT_MAX_SESSIONS = 0, + + __LOAD_LIMIT_MAX +}; +extern const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64]; + #endif diff --git a/utils/rtpengine-ng-client b/utils/rtpengine-ng-client index aa89907c4..35785aeff 100755 --- a/utils/rtpengine-ng-client +++ b/utils/rtpengine-ng-client @@ -53,6 +53,8 @@ GetOptions( 'ptime=i' => \$options{'ptime'}, 'flags=s@' => \$options{'flags'}, 'codec-options-flat' => \$options{'codec options flag'}, + 'flags=s@' => \$options{'flags'}, + 'supports=s@' => \$options{'supports'}, ) or die; my $cmd = shift(@ARGV) or die; @@ -71,7 +73,7 @@ for my $x (split(/,/, 'trust address,symmetric,asymmetric,force,strict source,me for my $x (split(/,/, 'origin,session connection')) { defined($options{'replace-' . $x}) and push(@{$packet{replace}}, $x); } -for my $x (split(/,/, 'rtcp-mux,SDES')) { +for my $x (split(/,/, 'rtcp-mux,SDES,supports')) { $packet{$x} = $options{$x} if defined($options{$x}) && ref($options{$x}) eq 'ARRAY'; }