Browse Source

MT#55283 Adding support for NG trace to Homer

Support is desired for Kamailio/Rtpengine traffic via UDP.
Adding homer-disable-rtcp-stats and homer-enable-ng config params to
separately control sending to Homer each traffic type. By default rtcp
is `on` when homer parameter is configured. NG is by default disabled.

closes #1802

Change-Id: Ib68fb133cffc5d8945f9b6bf60bab3e80fab9630
pull/1819/head
Lucian Balaceanu 2 years ago
committed by Richard Fuchs
parent
commit
7905811c53
11 changed files with 182 additions and 32 deletions
  1. +94
    -7
      daemon/control_ng.c
  2. +10
    -5
      daemon/control_udp.c
  3. +6
    -6
      daemon/cookie_cache.c
  4. +5
    -7
      daemon/homer.c
  5. +9
    -0
      daemon/main.c
  6. +2
    -2
      daemon/rtcp.c
  7. +23
    -2
      docs/rtpengine.md
  8. +1
    -0
      include/control_ng.h
  9. +26
    -2
      include/cookie_cache.h
  10. +3
    -1
      include/homer.h
  11. +3
    -0
      include/main.h

+ 94
- 7
daemon/control_ng.c View File

@ -20,6 +20,7 @@
#include "statistics.h"
#include "streambuf.h"
#include "str.h"
#include "homer.h"
#include "tcp_listener.h"
#include "main.h"
@ -28,6 +29,7 @@ mutex_t tcp_connections_lock;
GHashTable *rtpe_cngs_hash;
GHashTable *tcp_connections_hash;
static struct cookie_cache ng_cookie_cache;
static bool trace_ng = false;
const char magic_load_limit_strings[__LOAD_LIMIT_MAX][64] = {
[LOAD_LIMIT_MAX_SESSIONS] = "Parallel session limit reached",
@ -62,6 +64,67 @@ const char *ng_command_strings_short[NGC_COUNT] = {
"Pub", "SubReq", "SubAns", "Unsub",
};
typedef struct ng_ctx {
str callid;
int command;
str cookie;
bool should_trace;
const endpoint_t *sin_ep;
const endpoint_t *local_ep;
} ng_ctx;
#define CH(func, ...) do { \
if (trace_ng) \
func( __VA_ARGS__); \
} while (0)
void init_ng_tracing(void) {
if (rtpe_config.homer_ng_on && has_homer())
trace_ng = true;
}
static GString *create_homer_msg(str *cookie, str *data) {
GString *msg = g_string_sized_new(cookie->len + 1 + data->len);
g_string_append_printf(msg, "%.*s %.*s", STR_FMT(cookie), STR_FMT(data));
return msg;
}
static bool should_trace_msg(enum ng_command command) {
switch (command) {
case NGC_PING:
return false;
default:
return true;
}
}
static void homer_fill_values(ng_ctx *hctx, str *callid, int command) {
if (hctx) {
hctx->command = command;
hctx->callid = *callid;
}
}
static void homer_trace_msg_in(ng_ctx *hctx, str *data) {
if (hctx) {
hctx->should_trace = should_trace_msg(hctx->command);
if (hctx->should_trace) {
struct timeval tv;
gettimeofday(&tv, NULL);
GString *msg = create_homer_msg(&hctx->cookie, data);
homer_send(msg, &hctx->callid, hctx->sin_ep, hctx->local_ep, &tv, rtpe_config.homer_ng_capt_proto);
}
}
}
static void homer_trace_msg_out(ng_ctx *hctx, str *data) {
if (hctx && hctx->should_trace) {
struct timeval tv;
gettimeofday(&tv, NULL);
GString *msg = create_homer_msg(&hctx->cookie, data);
homer_send(msg, &hctx->callid, hctx->local_ep, hctx->sin_ep, &tv, rtpe_config.homer_ng_capt_proto);
}
}
static void pretty_print(bencode_item_t *el, GString *s) {
bencode_item_t *chld;
@ -148,7 +211,7 @@ ng_buffer *ng_buffer_new(struct obj *ref) {
return ngbuf;
}
static void control_ng_process_payload(str *reply, str *data, const endpoint_t *sin, char *addr, struct obj *ref,
static void control_ng_process_payload(ng_ctx *hctx, str *reply, str *data, const endpoint_t *sin, char *addr, struct obj *ref,
struct ng_buffer **ngbufp)
{
bencode_item_t *dict, *resp;
@ -328,6 +391,9 @@ static void control_ng_process_payload(str *reply, str *data, const endpoint_t *
errstr = "Unrecognized command";
}
CH(homer_fill_values, hctx, &callid, command);
CH(homer_trace_msg_in, hctx, data);
// stop command timer
gettimeofday(&cmd_stop, NULL);
//print command duration
@ -389,6 +455,7 @@ send_resp:
release_closed_sockets();
log_info_pop_until(&callid);
CH(homer_trace_msg_out ,hctx, reply);
}
int control_ng_process(str *buf, const endpoint_t *sin, char *addr, const sockaddr_t *local,
@ -408,19 +475,39 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, const sockad
*data.s++ = '\0';
data.len--;
str *cached = cookie_cache_lookup(&ng_cookie_cache, &cookie);
cache_entry *cached = cookie_cache_lookup(&ng_cookie_cache, &cookie);
if (cached) {
ilogs(control, LOG_INFO, "Detected command from %s as a duplicate", addr);
cb(&cookie, cached, sin, local, p1);
free(cached);
ng_ctx hctx = {.sin_ep = sin,
.local_ep = p1 ? &(((socket_t*)p1)->local) : NULL,
.cookie = cookie,
.command = cached->command,
.callid = *cached->callid,
.should_trace = should_trace_msg(cached->command)};
CH(homer_trace_msg_in, &hctx, &data);
cb(&cookie, cached->reply, sin, local, p1);
CH(homer_trace_msg_out, &hctx, cached->reply);
cache_entry_free(cached);
return 0;
}
str reply;
g_autoptr(ng_buffer) ngbuf = NULL;
control_ng_process_payload(&reply, &data, sin, addr, ref, &ngbuf);
ng_ctx hctx = {.sin_ep = sin,
.local_ep = p1 ? &(((socket_t*)p1)->local) : NULL,
.cookie = cookie,
.command = -1};
control_ng_process_payload(trace_ng ? &hctx : NULL,
&reply, &data, sin, addr, ref, &ngbuf);
cb(&cookie, &reply, sin, local, p1);
cookie_cache_insert(&ng_cookie_cache, &cookie, &reply);
cache_entry ce = {.reply = &reply, .command = hctx.command, .callid = &hctx.callid};
cookie_cache_insert(&ng_cookie_cache, &cookie, &ce);
return 0;
}
@ -432,7 +519,7 @@ int control_ng_process_plain(str *data, const endpoint_t *sin, char *addr, const
g_autoptr(ng_buffer) ngbuf = NULL;
str reply;
control_ng_process_payload(&reply, data, sin, addr, ref, &ngbuf);
control_ng_process_payload(NULL, &reply, data, sin, addr, ref, &ngbuf);
cb(NULL, &reply, sin, local, p1);
return 0;


+ 10
- 5
daemon/control_udp.c View File

@ -26,7 +26,8 @@ static void control_udp_incoming(struct obj *obj, struct udp_buffer *udp_buf) {
char **out;
struct iovec iov[10];
unsigned int iovlen;
str cookie, *reply;
str cookie, *reply = NULL;
cache_entry *ce;
pcre2_match_data *md = pcre2_match_data_create(30, NULL);
ret = pcre2_match(u->parse_re, (PCRE2_SPTR8) udp_buf->str.s, udp_buf->str.len, 0, 0, md, NULL);
@ -71,11 +72,12 @@ static void control_udp_incoming(struct obj *obj, struct udp_buffer *udp_buf) {
pcre2_substring_list_get(md, (PCRE2_UCHAR ***) &out, NULL);
str_init(&cookie, (void *) out[RE_UDP_COOKIE]);
reply = cookie_cache_lookup(&u->cookie_cache, &cookie);
if (reply) {
ce = cookie_cache_lookup(&u->cookie_cache, &cookie);
if (ce) {
reply = ce->reply;
ilogs(control, LOG_INFO, "Detected command from udp:%s as a duplicate", udp_buf->addr);
socket_sendto_from(udp_buf->listener, reply->s, reply->len, &udp_buf->sin, &udp_buf->local_addr);
free(reply);
cache_entry_free(ce);
goto out;
}
@ -122,7 +124,10 @@ static void control_udp_incoming(struct obj *obj, struct udp_buffer *udp_buf) {
if (reply) {
socket_sendto_from(udp_buf->listener, reply->s, reply->len, &udp_buf->sin, &udp_buf->local_addr);
cookie_cache_insert(&u->cookie_cache, &cookie, reply);
str callid = STR_NULL;
cache_entry new_ce = {.reply = reply, .callid = &callid};
cookie_cache_insert(&u->cookie_cache, &cookie, &new_ce);
free(reply);
}
else


+ 6
- 6
daemon/cookie_cache.c View File

@ -10,7 +10,7 @@
INLINE void cookie_cache_state_init(struct cookie_cache_state *s) {
s->in_use = g_hash_table_new(str_hash, str_equal);
s->cookies = g_hash_table_new_full(str_hash, str_equal, free, free);
s->cookies = g_hash_table_new_full(str_hash, str_equal, free, cache_entry_free);
}
INLINE void cookie_cache_state_cleanup(struct cookie_cache_state *s) {
g_hash_table_destroy(s->cookies);
@ -34,8 +34,8 @@ static void __cookie_cache_check_swap(struct cookie_cache *c) {
}
}
str *cookie_cache_lookup(struct cookie_cache *c, const str *s) {
str *ret;
cache_entry *cookie_cache_lookup(struct cookie_cache *c, const str *s) {
cache_entry *ret;
mutex_lock(&c->lock);
@ -46,7 +46,7 @@ restart:
if (!ret)
ret = g_hash_table_lookup(c->old.cookies, s);
if (ret) {
ret = str_dup(ret);
ret = cache_entry_dup(ret);
mutex_unlock(&c->lock);
return ret;
}
@ -67,11 +67,11 @@ restart:
return NULL;
}
void cookie_cache_insert(struct cookie_cache *c, const str *s, const str *r) {
void cookie_cache_insert(struct cookie_cache *c, const str *s, const struct cache_entry *entry) {
mutex_lock(&c->lock);
g_hash_table_remove(c->current.in_use, s);
g_hash_table_remove(c->old.in_use, s);
g_hash_table_replace(c->current.cookies, str_dup(s), str_dup(r));
g_hash_table_replace(c->current.cookies, str_dup(s), cache_entry_dup(entry));
g_hash_table_remove(c->old.cookies, s);
cond_broadcast(&c->cond);
mutex_unlock(&c->lock);


+ 5
- 7
daemon/homer.c View File

@ -41,7 +41,7 @@ static struct homer_sender *main_homer_sender;
static int send_hepv3 (GString *s, const str *id, int, const endpoint_t *src, const endpoint_t *dst,
const struct timeval *);
const struct timeval *, int hep_capture_proto);
// state handlers
static int __established(struct homer_sender *hs);
@ -203,7 +203,7 @@ void homer_sender_init(const endpoint_t *ep, int protocol, int capture_id) {
// takes over the GString
int homer_send(GString *s, const str *id, const endpoint_t *src,
const endpoint_t *dst, const struct timeval *tv)
const endpoint_t *dst, const struct timeval *tv, int hep_capture_proto)
{
if (!main_homer_sender)
goto out;
@ -214,7 +214,7 @@ int homer_send(GString *s, const str *id, const endpoint_t *src,
ilog(LOG_DEBUG, "JSON to send to Homer: '"STR_FORMAT"'", G_STR_FMT(s));
if (send_hepv3(s, id, main_homer_sender->capture_id, src, dst, tv))
if (send_hepv3(s, id, main_homer_sender->capture_id, src, dst, tv, hep_capture_proto))
goto out;
mutex_lock(&main_homer_sender->lock);
@ -318,11 +318,9 @@ struct hep_generic {
typedef struct hep_generic hep_generic_t;
#define PROTO_RTCP_JSON 0x05
// modifies the GString in place
static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t *src, const endpoint_t *dst,
const struct timeval *tv)
const struct timeval *tv, int hep_capture_proto)
{
struct hep_generic *hg=NULL;
@ -417,7 +415,7 @@ static int send_hepv3 (GString *s, const str *id, int capt_id, const endpoint_t
/* Protocol TYPE */
hg->proto_t.chunk.vendor_id = htons(0x0000);
hg->proto_t.chunk.type_id = htons(0x000b);
hg->proto_t.data = PROTO_RTCP_JSON;
hg->proto_t.data = hep_capture_proto;
hg->proto_t.chunk.length = htons(sizeof(hg->proto_t));
/* Capture ID */


+ 9
- 0
daemon/main.c View File

@ -86,6 +86,7 @@ struct rtpengine_config rtpe_config = {
.interfaces = G_QUEUE_INIT,
.homer_protocol = SOCK_DGRAM,
.homer_id = 2001,
.homer_ng_capt_proto = 0x3d, // first available value in HEP proto specification
.port_min = 30000,
.port_max = 40000,
.redis_db = -1,
@ -581,6 +582,9 @@ static void options(int *argc, char ***argv) {
{ "homer", 0, 0, G_OPTION_ARG_STRING, &homerp, "Address of Homer server for RTCP stats","IP46|HOSTNAME:PORT"},
{ "homer-protocol",0,0,G_OPTION_ARG_STRING, &homerproto, "Transport protocol for Homer (default udp)", "udp|tcp" },
{ "homer-id", 0, 0, G_OPTION_ARG_INT, &rtpe_config.homer_id, "'Capture ID' to use within the HEP protocol", "INT" },
{ "homer-disable-rtcp-stats", 0, 0, G_OPTION_ARG_NONE, &rtpe_config.homer_rtcp_off, "Disable RTCP stats tracing to Homer (enabled by default if homer server enabled)", NULL },
{ "homer-enable-ng", 0, 0, G_OPTION_ARG_NONE, &rtpe_config.homer_ng_on, "Enable NG tracing to Homer", NULL },
{ "homer-ng-capture-proto", 0, 0, G_OPTION_ARG_INT, &rtpe_config.homer_ng_capt_proto, "'Capture protocol type' to use within the HEP protocol (default is 0x3d). Further used by the Homer capture and UI.", "UINT8" },
{ "recording-dir", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.spooldir, "Directory for storing pcap and metadata files", "FILE" },
{ "recording-method",0, 0, G_OPTION_ARG_STRING, &rtpe_config.rec_method, "Strategy for call recording", "pcap|proc|all" },
{ "recording-format",0, 0, G_OPTION_ARG_STRING, &rtpe_config.rec_format, "File format for stored pcap files", "raw|eth" },
@ -801,6 +805,9 @@ static void options(int *argc, char ***argv) {
die("Invalid protocol '%s' (--homer-protocol)", homerproto);
}
if (rtpe_config.homer_ng_capt_proto <0 || rtpe_config.homer_ng_capt_proto > 255)
die("Invalid homer-ng-capture-proto value");
if (rtpe_config.default_tos < 0 || rtpe_config.default_tos > 255)
die("Invalid TOS value");
@ -1094,6 +1101,7 @@ static void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
ini_rtpe_cfg->redis_num_threads = rtpe_config.redis_num_threads;
ini_rtpe_cfg->homer_protocol = rtpe_config.homer_protocol;
ini_rtpe_cfg->homer_id = rtpe_config.homer_id;
ini_rtpe_cfg->homer_ng_capt_proto = rtpe_config.homer_ng_capt_proto;
ini_rtpe_cfg->no_fallback = rtpe_config.no_fallback;
ini_rtpe_cfg->port_min = rtpe_config.port_min;
ini_rtpe_cfg->port_max = rtpe_config.port_max;
@ -1332,6 +1340,7 @@ static void create_everything(void) {
homer_sender_init(&rtpe_config.homer_ep, rtpe_config.homer_protocol, rtpe_config.homer_id);
rtcp_init(); // must come after Homer init
init_ng_tracing(); // must come after Homer init
gettimeofday(&rtpe_latest_graphite_interval_start, NULL);


+ 2
- 2
daemon/rtcp.c View File

@ -1132,7 +1132,7 @@ static void homer_finish(struct rtcp_process_ctx *ctx, call_t *c, const endpoint
str_sanitize(ctx->json);
g_string_append(ctx->json, " }");
if (ctx->json->len > ctx->json_init_len + 2)
homer_send(ctx->json, &c->callid, src, dst, tv);
homer_send(ctx->json, &c->callid, src, dst, tv, PROTO_RTCP_JSON);
else
g_string_free(ctx->json, TRUE);
ctx->json = NULL;
@ -1407,7 +1407,7 @@ static void transcode_sr_wrap(struct rtcp_process_ctx *ctx, struct sender_report
void rtcp_init(void) {
rtcp_handlers.logging = _log_facility_rtcp ? &log_handlers : &dummy_handlers;
rtcp_handlers.homer = has_homer() ? &homer_handlers : &dummy_handlers;
rtcp_handlers.homer = has_homer() && !rtpe_config.homer_rtcp_off ? &homer_handlers : &dummy_handlers;
}


+ 23
- 2
docs/rtpengine.md View File

@ -658,6 +658,8 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp-
capture server.
The transport is HEP version 3 and payload format is JSON.
This argument takes an IP address and a port number as value.
Also enables sending the control NG traffic to a capturing agent.
Payload format does not apply in this case.
- __\-\-homer-protocol=udp__\|__tcp__
@ -669,6 +671,25 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp-
different sources of capture data.
This ID can be specified using this argument.
- __\-\-homer-disable-rtcp-stats__
Disables the default behaviour that RTCP stats are sent when homer
parameter is set. Sending of RTCP and NG are as such decoupled.
- __\-\-homer-enable-ng__
Enables sending control NG packages to a Homer capturing software. The
capturing agent part is not officialy supported OOTB, but it can be
achieved with Kamailio by using the config. For this feature to work one
has to set at least the homer parameter.
- __\-\-homer-ng-capture-proto=__*INT*
The HEP protocol used by Homer contains a "Capture protocol type" UINT8
used by the capturing agent and UI to make further processing. Some values
are registered, but currently 0x3d values onwards are free.
Default value is 0x3d (61).
- __\-\-recording-dir=__*FILE*
An optional argument to specify a path to a directory where PCAP recording
@ -1157,9 +1178,9 @@ call to inject-DTMF won't be sent to __\-\-dtmf-log-dest=__ or __\-\-listen-tcp-
- __\-\-dtls-mtu=__*INT*
Set DTLS MTU to enable fragmenting of large DTLS packets. Defaults to 1200.
Minimum value is 576 as the internet protocol requires that hosts must be able to
Minimum value is 576 as the internet protocol requires that hosts must be able to
process IP datagrams of at least 576 bytes (for IPv4) or 1280 bytes (for IPv6).
This does not preclude link layers with an MTU smaller than this minimum MTU from
This does not preclude link layers with an MTU smaller than this minimum MTU from
conveying IP data. Internet IPv4 path MTU is 68 bytes.
- __\-\-mqtt-host=__*HOST*\|*IP*


+ 1
- 0
include/control_ng.h View File

@ -76,6 +76,7 @@ int control_ng_process(str *buf, const endpoint_t *sin, char *addr, const sockad
void (*cb)(str *, str *, const endpoint_t *, const sockaddr_t *, void *), void *p1, struct obj *);
int control_ng_process_plain(str *buf, const endpoint_t *sin, char *addr, const sockaddr_t *local,
void (*cb)(str *, str *, const endpoint_t *, const sockaddr_t *, void *), void *p1, struct obj *);
void init_ng_tracing(void);
ng_buffer *ng_buffer_new(struct obj *ref);


+ 26
- 2
include/cookie_cache.h View File

@ -12,6 +12,30 @@ struct cookie_cache_state {
GHashTable *cookies;
};
typedef struct cache_entry {
str *reply;
str *callid;
int command;
} cache_entry;
INLINE cache_entry *cache_entry_dup(const cache_entry *s) {
if (!s)
return NULL;
cache_entry *r;
r = malloc(sizeof(*r));
r->reply = str_dup(s->reply);
r->command = s->command;
r->callid = str_dup(s->callid);
return r;
}
INLINE void cache_entry_free(void *p) {
cache_entry *s = p;
if (!s)
return;
free(s->reply);
free(s->callid);
free(s);
}
struct cookie_cache {
mutex_t lock;
cond_t cond;
@ -20,8 +44,8 @@ struct cookie_cache {
};
void cookie_cache_init(struct cookie_cache *);
str *cookie_cache_lookup(struct cookie_cache *, const str *);
void cookie_cache_insert(struct cookie_cache *, const str *, const str *);
cache_entry *cookie_cache_lookup(struct cookie_cache *, const str *);
void cookie_cache_insert(struct cookie_cache *, const str *, const struct cache_entry *);
void cookie_cache_remove(struct cookie_cache *, const str *);
void cookie_cache_cleanup(struct cookie_cache *);


+ 3
- 1
include/homer.h View File

@ -3,9 +3,11 @@
#include "socket.h"
#define PROTO_RTCP_JSON 0x05
void homer_sender_init(const endpoint_t *, int, int);
int homer_send(GString *, const str *, const endpoint_t *, const endpoint_t *,
const struct timeval *tv);
const struct timeval *, int);
int has_homer(void);
#endif

+ 3
- 0
include/main.h View File

@ -71,6 +71,9 @@ struct rtpengine_config {
endpoint_t homer_ep;
int homer_protocol;
int homer_id;
int homer_ng_capt_proto;
gboolean homer_rtcp_off;
gboolean homer_ng_on;
gboolean no_fallback;
gboolean reject_invalid_sdp;
gboolean save_interface_ports;


Loading…
Cancel
Save