From 2fc54462debdeaa6aea747d92d27bffe874202e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Ned=C5=BEibovi=C4=87?= Date: Mon, 1 Feb 2021 11:16:43 -0500 Subject: [PATCH] TT#14008 Squashed commit of the following: MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit closes #1180 commit 023f6cbc9e0b595d1d02116d38d39358fbb9ee49 Merge: ae82034d 86e287d1 Author: Richard Fuchs Date: Mon Feb 1 11:16:04 2021 -0500 Merge branch 'control_ng_tcp' of https://github.com/enreached/rtpengine into enreached-control_ng_tcp Change-Id: I628dcfd30e901dbee97b00bee0b6bba0cb29826f commit 86e287d1173c7c6a2333801bdbc31d960f08140b Author: Damir Nedžibović Date: Mon Feb 1 16:38:53 2021 +0100 Update the readme file. commit 35f5d9e1519bbe2250fa5189fcb7bdfc9130899c Author: Damir Nedžibović Date: Mon Feb 1 16:20:09 2021 +0100 Update docs. commit 336deb4c674a02b212e0aec17cf2b8827202e187 Author: Damir Nedžibović Date: Mon Feb 1 14:28:07 2021 +0100 Replace hex values with chars. commit 37c86a8fa00c3c1daf8c662003265d1611725c40 Merge: 17a10b96 5cd53ef1 Author: Damir Nedžibović Date: Mon Feb 1 13:51:06 2021 +0100 Merge branch 'control_ng_tcp' of github.com:enreached/rtpengine into control_ng_tcp commit 17a10b96ad5f38ff5f3c0b6ce5f39fcb1c218d82 Author: Damir Nedžibović Date: Thu Jan 28 18:09:10 2021 +0100 Add missing include. commit adfddefae33c9ba6075f9a04310be29088622139 Author: Damir Nedžibović Date: Thu Jan 28 18:05:44 2021 +0100 Implementation of control-ng via TCP. commit 5cd53ef1c901679a3746e08330ca14c14c1b3b83 Merge: 1bd3a8fc b28ab075 Author: Damir Nedžibović Date: Mon Feb 1 13:49:50 2021 +0100 Merge branch 'control_ng_tcp' of github.com:enreached/rtpengine into control_ng_tcp commit 1bd3a8fc6dd2a0549208df0e9329c6a3ec322c5b Author: Richard Fuchs Date: Fri Jan 29 10:20:02 2021 -0500 TT#14008 fix AEAD kernel API for < 4.2 Untested whether it actually works closes #1176 Change-Id: If6398632ac62525a673b844cfb4ce842a8aa0346 commit 71a222d7a8e39630621f46fbb7e8c28e40eca4f0 Author: Richard Fuchs Date: Fri Jan 29 13:07:25 2021 -0500 TT#14008 improve log output for stray packets Change-Id: Ic4b03928b279aade761de3ba1646b5c27318e6a3 commit b28ab07532679d4d952f335113b8f40a0d92b233 Merge: 813a4f1c 6e4373af Author: Damir Nedžibović Date: Fri Jan 29 09:21:31 2021 +0100 Merge branch 'control_ng_tcp' of github.com:enreached/rtpengine into control_ng_tcp commit 813a4f1caa3f0859401b2aedcd047283a0f0ca8c Author: Damir Nedžibović Date: Thu Jan 28 18:09:10 2021 +0100 Add missing include. commit 0c87a19c557efc3cc2fce59c5431a2a717d3a505 Author: Damir Nedžibović Date: Thu Jan 28 18:05:44 2021 +0100 Implementation of control-ng via TCP. commit 6e4373affb998f0e583243e649e89096e9b0a242 Merge: fbf74bfe 7799f23a Author: Damir Nedžibović Date: Fri Jan 29 09:19:46 2021 +0100 Merge branch 'control_ng_tcp' of github.com:enreached/rtpengine into control_ng_tcp commit fbf74bfe2d584323ecaf8aadd13b1cfdd8b59e02 Author: Richard Fuchs Date: Wed Jan 27 10:18:14 2021 -0500 TT#14008 fix possible segfault closes #1172 Change-Id: I94bb52c290c2032073e54528283660f03e694033 commit 1a5bcc09052594a36237936df2c20e756f307d0c Author: Richard Fuchs Date: Thu Jan 28 14:49:43 2021 -0500 TT#109251 fix redis restore segfault Change-Id: I501a47b065e7b8ff28a3ac157c0ce567f228557f commit b38b49fd60c7378dde45f853f9f11f89ba7f7d50 Author: Richard Fuchs Date: Thu Jan 28 13:44:44 2021 -0500 TT#102450 added tests for Ia9fa96cf Change-Id: Ic9728e12a012335c30c5640ac0b5c88e39ad24ed commit f33877bfe3135f5dbf4340e07a9dccd7f6ee680d Author: Richard Fuchs Date: Thu Jan 28 13:43:46 2021 -0500 TT#102450 fix some timestamping issues Make sure G.722 rate multiplies is applied in the right places Don't trust encoders to return proper timestamps, but instead track them explicitly based on frame duration Change-Id: Ia9fa96cf662da97159fa170c3a3f37516889e1bd commit 39a25b954dadfedc92dc1a7aa0df223fc261af3b Author: Richard Fuchs Date: Thu Jan 28 14:07:53 2021 -0500 TT#106101 mem leak fix for 554034eb7e Change-Id: I9c410211580d8513a203a29f898970a78175d08b commit 11d11aed907d60e435ca81c7694d0468e45a76e5 Author: Richard Fuchs Date: Thu Jan 28 13:41:28 2021 -0500 TT#14008 clean up some tests Looks like packet order can be an issue in some cases Change-Id: Ib8fb8c553c9d0f2919b24dda1e15e5a23832c619 commit 7799f23aa52a8341991057d0c2d1c59d22c60ae3 Merge: ba7ee9d6 e191e16c Author: Damir Nedžibović Date: Fri Jan 29 09:18:24 2021 +0100 Merge branch 'master' of https://github.com/sipwise/rtpengine into control_ng_tcp commit ba7ee9d6b1e9dc96c7430da9512d7be9ebea32f9 Merge: f805d881 ffe187f1 Author: Damir Nedžibović Date: Thu Jan 28 18:15:58 2021 +0100 Merge branch 'control_ng_tcp' of github.com:enreached/rtpengine into control_ng_tcp commit f805d881fa64cb581f14b11e7096e57b25ce670a Author: Damir Nedžibović Date: Thu Jan 28 18:09:10 2021 +0100 Add missing include. commit c548a3ca4be494532a8c14c55a5dcae69751572a Author: Damir Nedžibović Date: Thu Jan 28 18:05:44 2021 +0100 Implementation of control-ng via TCP. commit ffe187f1166c22f5d204c97534e786fdbfc10052 Author: Damir Nedžibović Date: Thu Jan 28 18:05:44 2021 +0100 Implementation of control-ng via TCP. Change-Id: I2acf208fcff1fa9aa790c31f1097ad9b4b7c3238 --- README.md | 7 +++ daemon/bencode.c | 111 ++++++++++++++++++++++++++++++++- daemon/control_ng.c | 145 +++++++++++++++++++++++++++++++++++++++++-- daemon/main.c | 17 ++++- daemon/rtpengine.pod | 8 ++- include/bencode.h | 4 +- include/control_ng.h | 5 +- include/main.h | 1 + t/Makefile | 4 +- 9 files changed, 287 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 03b50bd20..2dc4ce742 100644 --- a/README.md +++ b/README.md @@ -1961,6 +1961,13 @@ command. Sample return dictionary: "result": "ok" } +The *tcp-ng* Control Protocol +========================= + +*rtpengine* also has support for *ng* control protocol where transport is +TCP (If enabled in the config via the --listen-tcp-ng option). Everything +said for UDP based *ng* protocol counts for TCP variant too. + HTTP/WebSocket support ====================== diff --git a/daemon/bencode.c b/daemon/bencode.c index 4b4eb12e5..e6d1ae2be 100644 --- a/daemon/bencode.c +++ b/daemon/bencode.c @@ -5,7 +5,7 @@ #include #include #include - +#include /* set to 0 for alloc debugging, e.g. through valgrind */ #define BENCODE_MIN_BUFFER_PIECE_LEN 512 @@ -703,3 +703,112 @@ void bencode_buffer_destroy_add(bencode_buffer_t *buf, free_func_t func, void *p li->next = buf->free_list; buf->free_list = li; } + +static int __bencode_string(const char *s, int offset, int len) { + int pos; + unsigned long long sl; + char *end; + + for (pos = offset + 1; s[pos] != ':' && isdigit(s[pos]) && pos < len; ++pos); + if (pos == len) + return -1; + + sl = strtoul(s + offset + 1, &end, 10); + if (s + offset + 1 == end || end != s + pos) + return -2; + + if (pos + sl > len) + return -1; + + return pos + sl + 1; +} + +static int __bencode_integer(const char *s, int offset, int len) { + int pos; + + if (s[offset + 1] == '-') { + if (offset + 3 < len && s[offset + 2] == '0' && s[offset + 3] == 'e') { + return -2; + } + ++offset; + } + + if (s[offset + 1] == 'e') + return -2; + + if (s[offset + 1] == '0') { + if (offset + 2 < len && s[offset + 2] == 'e') + return offset + 3; + return -2; + } + + for (pos = offset + 1; s[pos] != 'e' && pos < len; ++pos) { + if (s[pos] < '0' || s[pos] > '9') + return -2; + } + + if (pos == len) + return -1; + + return pos + 1; +} + +static int __bencode_next(const char *s, int offset, int len); + +static int __bencode_list(const char *s, int offset, int len) { + for (++offset; s[offset] != 'e' && offset < len;) { + offset = __bencode_next(s, offset, len); + if (offset < 0) + return offset; + } + + if (offset == len) + return -1; + + return offset + 1; +} + +static int __bencode_dictionary(const char *s, int offset, int len) { + for (++offset; s[offset] != 'e' && offset < len;) { + offset = __bencode_string(s, offset - 1, len); + if (offset < 0) + return offset; + offset = __bencode_next(s, offset, len); + if (offset < 0) + return offset; + } + + if (offset == len) + return -1; + + return offset + 1; +} + +static int __bencode_next(const char *s, int offset, int len) { + if (offset >= len) + return -1; + switch(s[offset]) { + case 'i': + return __bencode_integer(s, offset, len); + case 'l': + return __bencode_list(s, offset, len); + case 'd': + return __bencode_dictionary(s, offset, len); + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + return __bencode_string(s, offset - 1, len); + } + return -2; +} + +int bencode_valid(const char *s, int len) { + return __bencode_next(s, 0, len); +} diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 91a7cbfa2..b6807bf39 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -1,5 +1,6 @@ #include "control_ng.h" +#include #include #include #include @@ -16,10 +17,14 @@ #include "log_funcs.h" #include "main.h" #include "statistics.h" - +#include "streambuf.h" +#include "str.h" +#include "tcp_listener.h" mutex_t rtpe_cngs_lock; +mutex_t tcp_connections_lock; GHashTable *rtpe_cngs_hash; +GHashTable *tcp_connections_hash; struct control_ng *rtpe_control_ng; static struct cookie_cache ng_cookie_cache; @@ -42,7 +47,6 @@ const char *ng_command_strings_short[NGC_COUNT] = { "PlayDTMF", "Stats", }; - static void timeval_update_request_time(struct request_time *request, const struct timeval *offer_diff) { // lock offers mutex_lock(&request->lock); @@ -390,7 +394,6 @@ out: return funcret; } - static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void *p1) { socket_t *ul = p1; struct iovec iov[3]; @@ -408,13 +411,88 @@ static void control_ng_send(str *cookie, str *body, const endpoint_t *sin, void socket_sendiov(ul, iov, iovlen, sin); } - static void control_ng_incoming(struct obj *obj, struct udp_buffer *udp_buf) { control_ng_process(&udp_buf->str, &udp_buf->sin, udp_buf->addr, control_ng_send, udp_buf->listener, &udp_buf->obj); } +static void control_incoming(struct streambuf_stream *s) { + ilog(LOG_INFO, "New TCP control ng connection from %s", s->addr); + mutex_lock(&tcp_connections_lock); + g_hash_table_insert(tcp_connections_hash, s->addr, s); + mutex_unlock(&tcp_connections_lock); + ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); +} + +static void control_closed(struct streambuf_stream *s) { + ilog(LOG_INFO, "TCP control ng connection from %s is closing", s->addr); + mutex_lock(&tcp_connections_lock); + g_hash_table_remove(tcp_connections_hash, s->addr); + mutex_unlock(&tcp_connections_lock); + ilog(LOG_DEBUG, "TCP connections map size: %d", g_hash_table_size(tcp_connections_hash)); +} + +static str *chunk_message(struct streambuf *b) { + char *p = NULL; + int len, to_del, bsize; + str *ret = NULL; + + mutex_lock(&b->lock); + + for (;;) { + if (b->eof) + break; + + p = memchr(b->buf->str, ' ', b->buf->len); + if (!p) + break; + + len = p - b->buf->str; + if (len == b->buf->len) + break; + + ++p; /* bencode dictionary here */ + bsize = bencode_valid(p, b->buf->str + b->buf->len - p); + if (bsize < 0) + break; /* not enough data to parse bencoded dictionary */ + + p += bsize; + len = p - b->buf->str; + to_del = len; + + ret = str_alloc(len); + memcpy(ret->s, b->buf->str, len); + ret->len = len; + g_string_erase(b->buf, 0, to_del); + + break; + } + + mutex_unlock(&b->lock); + return ret; +} + +static void control_stream_readable(struct streambuf_stream *s) { + str *data; + + ilog(LOG_DEBUG, "Got %ld bytes from %s", s->inbuf->buf->len, s->addr); + while ((data = chunk_message(s->inbuf))) { + ilog(LOG_DEBUG, "Got control ng message from %s", s->addr); + control_ng_process(data, &s->sock.remote, s->addr, control_ng_send, &s->sock, s->parent); + free(data); + } + + if (streambuf_bufsize(s->inbuf) > 1024) { + ilog(LOG_WARNING, "Buffer length exceeded in control connection from %s", s->addr); + goto close; + } + + return; + + close: + streambuf_stream_close(s); +} void control_ng_free(void *p) { struct control_ng *c = p; @@ -433,9 +511,12 @@ void control_ng_free(void *p) { poller_del_item(c->poller, c->udp_listeners[1].fd); close_socket(&c->udp_listeners[0]); close_socket(&c->udp_listeners[1]); + streambuf_listener_shutdown(&c->tcp_listeners[0]); + streambuf_listener_shutdown(&c->tcp_listeners[1]); + if (tcp_connections_hash) + g_hash_table_destroy(tcp_connections_hash); } - struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned char tos) { struct control_ng *c; @@ -463,9 +544,63 @@ struct control_ng *control_ng_new(struct poller *p, endpoint_t *ep, unsigned cha fail2: obj_put(c); return NULL; +} + +struct control_ng *control_ng_tcp_new(struct poller *p, endpoint_t *ep, struct control_ng *ctrl_ng) { + if (!p) + return NULL; + + if (!ctrl_ng) { + ctrl_ng = obj_alloc0("control_ng", sizeof(*ctrl_ng), NULL); + ctrl_ng->udp_listeners[0].fd = -1; + ctrl_ng->udp_listeners[1].fd = -1; + } + + ctrl_ng->poller = p; + + if (streambuf_listener_init(&ctrl_ng->tcp_listeners[0], p, ep, + control_incoming, control_stream_readable, + control_closed, + NULL, + &ctrl_ng->obj)) { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + if (ipv46_any_convert(ep)) { + if (streambuf_listener_init(&ctrl_ng->tcp_listeners[1], p, ep, + control_incoming, control_stream_readable, + control_closed, + NULL, + &ctrl_ng->obj)) { + ilog(LOG_ERR, "Failed to open TCP control port: %s", strerror(errno)); + goto fail; + } + } + + tcp_connections_hash = g_hash_table_new(g_str_hash, g_str_equal); + mutex_init(&tcp_connections_lock); + return ctrl_ng; + +fail: + obj_put(ctrl_ng); + return NULL; +} + +static void notify_tcp_client(gpointer key, gpointer value, gpointer user_data) { + struct streambuf_stream *s = (struct streambuf_stream *)value; + str *to_send = (str *)user_data; + char cookie_buf[16]; + str cookie = STR_CONST_INIT_LEN(cookie_buf, sizeof(cookie_buf)); + rand_hex_str(cookie_buf, sizeof(cookie_buf) / 2); + control_ng_send(&cookie, to_send, &s->sock.remote, &s->sock); } +void notify_ng_tcp_clients(str *data) { + mutex_lock(&tcp_connections_lock); + g_hash_table_foreach(tcp_connections_hash, notify_tcp_client, data); + mutex_unlock(&tcp_connections_lock); +} void control_ng_init() { mutex_init(&rtpe_cngs_lock); diff --git a/daemon/main.c b/daemon/main.c index db3ba905f..bec3d985f 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -368,6 +368,7 @@ static void options(int *argc, char ***argv) { AUTO_CLEANUP_GBUF(listenps); AUTO_CLEANUP_GBUF(listenudps); AUTO_CLEANUP_GBUF(listenngs); + AUTO_CLEANUP_GBUF(listenngtcps); AUTO_CLEANUP_GBUF(listencli); AUTO_CLEANUP_GBUF(graphitep); AUTO_CLEANUP_GBUF(graphite_prefix_s); @@ -400,6 +401,7 @@ static void options(int *argc, char ***argv) { { "listen-tcp", 'l', 0, G_OPTION_ARG_STRING, &listenps, "TCP port to listen on", "[IP:]PORT" }, { "listen-udp", 'u', 0, G_OPTION_ARG_STRING, &listenudps, "UDP port to listen on", "[IP46|HOSTNAME:]PORT" }, { "listen-ng", 'n', 0, G_OPTION_ARG_STRING, &listenngs, "UDP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, + { "listen-tcp-ng", 'N', 0, G_OPTION_ARG_STRING, &listenngtcps, "TCP port to listen on, NG protocol", "[IP46|HOSTNAME:]PORT" }, { "listen-cli", 'c', 0, G_OPTION_ARG_STRING, &listencli, "UDP port to listen on, CLI", "[IP46|HOSTNAME:]PORT" }, { "graphite", 'g', 0, G_OPTION_ARG_STRING, &graphitep, "Address of the graphite server", "IP46|HOSTNAME:PORT" }, { "graphite-interval", 'G', 0, G_OPTION_ARG_INT, &rtpe_config.graphite_interval, "Graphite send interval in seconds", "INT" }, @@ -504,8 +506,8 @@ static void options(int *argc, char ***argv) { if (!if_a) die("Missing option --interface"); - if (!listenps && !listenudps && !listenngs) - die("Missing option --listen-tcp, --listen-udp or --listen-ng"); + if (!listenps && !listenudps && !listenngs && !listenngtcps) + die("Missing option --listen-tcp, --listen-udp or --listen-ng or --listen-tcp-ng"); struct ifaddrs *ifas; if (getifaddrs(&ifas)) { @@ -552,6 +554,10 @@ static void options(int *argc, char ***argv) { if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_listen_ep, listenngs)) die("Invalid IP or port '%s' (--listen-ng)", listenngs); } + if (listenngtcps) { + if (endpoint_parse_any_getaddrinfo(&rtpe_config.ng_tcp_listen_ep, listenngtcps)) + die("Invalid IP or port '%s' (--listen-tcp-ng)", listenngtcps); + } if (listencli) {if (endpoint_parse_any_getaddrinfo(&rtpe_config.cli_listen_ep, listencli)) die("Invalid IP or port '%s' (--listen-cli)", listencli); @@ -787,6 +793,7 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->tcp_listen_ep = rtpe_config.tcp_listen_ep; ini_rtpe_cfg->udp_listen_ep = rtpe_config.udp_listen_ep; ini_rtpe_cfg->ng_listen_ep = rtpe_config.ng_listen_ep; + ini_rtpe_cfg->ng_tcp_listen_ep = rtpe_config.ng_tcp_listen_ep; ini_rtpe_cfg->cli_listen_ep = rtpe_config.cli_listen_ep; ini_rtpe_cfg->redis_ep = rtpe_config.redis_ep; ini_rtpe_cfg->redis_write_ep = rtpe_config.redis_write_ep; @@ -957,6 +964,12 @@ no_kernel: die("Failed to open UDP control connection port"); } + if (rtpe_config.ng_tcp_listen_ep.port) { + rtpe_control_ng = control_ng_tcp_new(rtpe_poller, &rtpe_config.ng_tcp_listen_ep, rtpe_control_ng); + if (!rtpe_control_ng) + die("Failed to open TCP control connection port"); + } + rtpe_cli = NULL; if (rtpe_config.cli_listen_ep.port) { interfaces_exclude_port(rtpe_config.cli_listen_ep.port); diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index de0ac489b..8460a3dd0 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -4,7 +4,7 @@ rtpengine - NGCP proxy for RTP and other UDP based media traffic =head1 SYNOPSIS -B B<--interface>=I... B<--listen-tcp>|B<--listen-udp>|B<--listen-ng>=I... [I