From cbf6ad7041cd95489f6273fd023e75679b4d52c7 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 19 Aug 2020 10:01:02 -0400 Subject: [PATCH] TT#88951 abstractise CLI handling and writing Change-Id: I004cbc4a8690b11822ab54f34af01557b87c6a02 --- daemon/cli.c | 543 ++++++++++++++++++++++++++------------------------ include/cli.h | 9 + 2 files changed, 288 insertions(+), 264 deletions(-) diff --git a/daemon/cli.c b/daemon/cli.c index 3f1845a96..ddfcff923 100644 --- a/daemon/cli.c +++ b/daemon/cli.c @@ -32,64 +32,64 @@ #include "rtpengine_config.h" -typedef void (*cli_handler_func)(str *, struct streambuf *); +typedef void (*cli_handler_func)(str *, struct cli_writer *); typedef struct { const char *cmd; cli_handler_func handler; } cli_handler_t; -static void cli_incoming_list(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set(str *instr, struct streambuf *replybuffer); -static void cli_incoming_params(str *instr, struct streambuf *replybuffer); -static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer); -static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer); -static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer); -static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer); - -static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_maxcpu(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_maxload(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_maxbw(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_offertimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_redisallowederrors(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_redisdisabletime(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_redisconnecttimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_rediscmdtimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_set_controltos(str *instr, struct streambuf *replybuffer); - -static void cli_incoming_params_start(str *instr, struct streambuf *replybuffer); -static void cli_incoming_params_current(str *instr, struct streambuf *replybuffer); -static void cli_incoming_params_diff(str *instr, struct streambuf *replybuffer); -static void cli_incoming_params_revert(str *instr, struct streambuf *replybuffer); - -static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_maxcpu(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_maxload(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_maxbw(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_counters(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_silenttimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_offertimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_finaltimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_redisallowederrors(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_redisdisabletime(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_redisconnecttimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_rediscmdtimeout(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_controltos(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_interfaces(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffer); -static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuffer); +static void cli_incoming_list(str *instr, struct cli_writer *cw); +static void cli_incoming_set(str *instr, struct cli_writer *cw); +static void cli_incoming_params(str *instr, struct cli_writer *cw); +static void cli_incoming_terminate(str *instr, struct cli_writer *cw); +static void cli_incoming_ksadd(str *instr, struct cli_writer *cw); +static void cli_incoming_ksrm(str *instr, struct cli_writer *cw); +static void cli_incoming_kslist(str *instr, struct cli_writer *cw); + +static void cli_incoming_set_maxopenfiles(str *instr, struct cli_writer *cw); +static void cli_incoming_set_maxsessions(str *instr, struct cli_writer *cw); +static void cli_incoming_set_maxcpu(str *instr, struct cli_writer *cw); +static void cli_incoming_set_maxload(str *instr, struct cli_writer *cw); +static void cli_incoming_set_maxbw(str *instr, struct cli_writer *cw); +static void cli_incoming_set_timeout(str *instr, struct cli_writer *cw); +static void cli_incoming_set_silenttimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_set_offertimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_set_finaltimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_set_loglevel(str *instr, struct cli_writer *cw); +static void cli_incoming_set_redisallowederrors(str *instr, struct cli_writer *cw); +static void cli_incoming_set_redisdisabletime(str *instr, struct cli_writer *cw); +static void cli_incoming_set_redisconnecttimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_set_rediscmdtimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_set_controltos(str *instr, struct cli_writer *cw); + +static void cli_incoming_params_start(str *instr, struct cli_writer *cw); +static void cli_incoming_params_current(str *instr, struct cli_writer *cw); +static void cli_incoming_params_diff(str *instr, struct cli_writer *cw); +static void cli_incoming_params_revert(str *instr, struct cli_writer *cw); + +static void cli_incoming_list_numsessions(str *instr, struct cli_writer *cw); +static void cli_incoming_list_maxsessions(str *instr, struct cli_writer *cw); +static void cli_incoming_list_maxcpu(str *instr, struct cli_writer *cw); +static void cli_incoming_list_maxload(str *instr, struct cli_writer *cw); +static void cli_incoming_list_maxbw(str *instr, struct cli_writer *cw); +static void cli_incoming_list_maxopenfiles(str *instr, struct cli_writer *cw); +static void cli_incoming_list_totals(str *instr, struct cli_writer *cw); +static void cli_incoming_list_counters(str *instr, struct cli_writer *cw); +static void cli_incoming_list_sessions(str *instr, struct cli_writer *cw); +static void cli_incoming_list_timeout(str *instr, struct cli_writer *cw); +static void cli_incoming_list_silenttimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_list_offertimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_list_finaltimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_list_loglevel(str *instr, struct cli_writer *cw); +static void cli_incoming_list_loglevel(str *instr, struct cli_writer *cw); +static void cli_incoming_list_redisallowederrors(str *instr, struct cli_writer *cw); +static void cli_incoming_list_redisdisabletime(str *instr, struct cli_writer *cw); +static void cli_incoming_list_redisconnecttimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_list_rediscmdtimeout(str *instr, struct cli_writer *cw); +static void cli_incoming_list_controltos(str *instr, struct cli_writer *cw); +static void cli_incoming_list_interfaces(str *instr, struct cli_writer *cw); +static void cli_incoming_list_jsonstats(str *instr, struct cli_writer *cw); +static void cli_incoming_list_transcoders(str *instr, struct cli_writer *cw); static const cli_handler_t cli_top_handlers[] = { { "list", cli_incoming_list }, @@ -155,18 +155,18 @@ static const cli_handler_t cli_params_handlers[] = { }; static void cli_handler_do(const cli_handler_t *handlers, str *instr, - struct streambuf *replybuffer) + struct cli_writer *cw) { const cli_handler_t *h; for (h = handlers; h->cmd; h++) { if (str_shift_cmp(instr, h->cmd)) continue; - h->handler(instr, replybuffer); + h->handler(instr, cw); return; } - streambuf_printf(replybuffer, "%s:%s\n", "Unknown or incomplete command:", instr->s); + cw->cw_printf(cw, "%s:%s\n", "Unknown or incomplete command:", instr->s); } static void destroy_own_foreign_calls(unsigned int foreign_call, unsigned int uint_keyspace_db) { @@ -235,12 +235,12 @@ static void destroy_keyspace_foreign_calls(unsigned int uint_keyspace_db) { destroy_own_foreign_calls(CT_FOREIGN_CALL, uint_keyspace_db); } -static void cli_incoming_params_start(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_params_start(str *instr, struct cli_writer *cw) { int count = 0; GList *s; struct intf_config *ifa; - streambuf_printf(replybuffer, "log-level = %d\ntable = %d\nmax-sessions = %d\ntimeout = %d\nsilent-timeout = %d\n" + cw->cw_printf(cw, "log-level = %d\ntable = %d\nmax-sessions = %d\ntimeout = %d\nsilent-timeout = %d\n" "final-timeout = %d\noffer-timeout = %d\n" "delete-delay = %d\nredis-expires = %d\ntos = %d\ncontrol-tos = %d\ngraphite-interval = %d\nredis-num-threads = %d\n" "homer-protocol = %d\nhomer-id = %d\nno-fallback = %d\nport-min = %d\nport-max = %d\nredis = %s:%d/%d\n" @@ -266,31 +266,31 @@ static void cli_incoming_params_start(str *instr, struct streambuf *replybuffer) for(s = initial_rtpe_config.interfaces.head; s ; s = s->next) { ifa = s->data; - streambuf_printf(replybuffer,"interface[%d] = %s\\%s \n", count, ifa->name.s, sockaddr_print_buf(&(ifa->local_address.addr))); + cw->cw_printf(cw,"interface[%d] = %s\\%s \n", count, ifa->name.s, sockaddr_print_buf(&(ifa->local_address.addr))); ++count; } count=0; for (s = initial_rtpe_config.redis_subscribed_keyspaces.head; s ; s = s->next) { - streambuf_printf(replybuffer,"keyspace[%d] = %d \n", count, GPOINTER_TO_UINT(s->data)); + cw->cw_printf(cw,"keyspace[%d] = %d \n", count, GPOINTER_TO_UINT(s->data)); ++count; } - streambuf_printf(replybuffer, "b2b_url = %s\nredis-auth = %s\nredis-write-auth = %s\nrecording-dir = %s\nrecording-method = %s\n" + cw->cw_printf(cw, "b2b_url = %s\nredis-auth = %s\nredis-write-auth = %s\nrecording-dir = %s\nrecording-method = %s\n" "recording-format = %s\niptables-chain = %s\n", initial_rtpe_config.b2b_url, initial_rtpe_config.redis_auth, initial_rtpe_config.redis_write_auth, initial_rtpe_config.spooldir, initial_rtpe_config.rec_method, initial_rtpe_config.rec_format, initial_rtpe_config.iptables_chain); - streambuf_printf(replybuffer,"listen-tcp = %s:%d\nlisten-udp = %s:%d\nlisten-ng = %s:%d\nlisten-cli = %s:%d\n", + cw->cw_printf(cw,"listen-tcp = %s:%d\nlisten-udp = %s:%d\nlisten-ng = %s:%d\nlisten-cli = %s:%d\n", sockaddr_print_buf(&initial_rtpe_config.tcp_listen_ep.address), initial_rtpe_config.tcp_listen_ep.port, sockaddr_print_buf(&initial_rtpe_config.udp_listen_ep.address), initial_rtpe_config.udp_listen_ep.port, sockaddr_print_buf(&initial_rtpe_config.ng_listen_ep.address), initial_rtpe_config.ng_listen_ep.port, sockaddr_print_buf(&initial_rtpe_config.cli_listen_ep.address), initial_rtpe_config.cli_listen_ep.port); } -static void cli_incoming_params_current(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_params_current(str *instr, struct cli_writer *cw) { int count = 0; GList *c; struct intf_config *ifa; - streambuf_printf(replybuffer, "log-level = %d\ntable = %d\nmax-sessions = %d\ntimeout = %d\nsilent-timeout = %d\n" + cw->cw_printf(cw, "log-level = %d\ntable = %d\nmax-sessions = %d\ntimeout = %d\nsilent-timeout = %d\n" "final-timeout = %d\noffer-timeout = %d\n" "delete-delay = %d\nredis-expires = %d\ntos = %d\ncontrol-tos = %d\ngraphite-interval = %d\nredis-num-threads = %d\n" "homer-protocol = %d\nhomer-id = %d\nno-fallback = %d\nport-min = %d\nport-max = %d\nredis-db = %d\n" @@ -313,26 +313,26 @@ static void cli_incoming_params_current(str *instr, struct streambuf *replybuffe for(c = rtpe_config.interfaces.head; c ; c = c->next) { ifa = c->data; - streambuf_printf(replybuffer,"interface[%d] = %s\\%s \n", count, ifa->name.s, sockaddr_print_buf(&(ifa->local_address.addr))); + cw->cw_printf(cw,"interface[%d] = %s\\%s \n", count, ifa->name.s, sockaddr_print_buf(&(ifa->local_address.addr))); ++count; } count=0; for (c = rtpe_config.redis_subscribed_keyspaces.head; c ; c = c->next) { - streambuf_printf(replybuffer,"keyspace[%d] = %d \n", count, GPOINTER_TO_UINT(c->data)); + cw->cw_printf(cw,"keyspace[%d] = %d \n", count, GPOINTER_TO_UINT(c->data)); ++count; } - streambuf_printf(replybuffer, "b2b_url = %s\nredis-auth = %s\nredis-write-auth = %s\nrecording-dir = %s\nrecording-method = %s\n" + cw->cw_printf(cw, "b2b_url = %s\nredis-auth = %s\nredis-write-auth = %s\nrecording-dir = %s\nrecording-method = %s\n" "recording-format = %s\niptables-chain = %s\n", rtpe_config.b2b_url, rtpe_config.redis_auth, rtpe_config.redis_write_auth, rtpe_config.spooldir, rtpe_config.rec_method, rtpe_config.rec_format, rtpe_config.iptables_chain); - streambuf_printf(replybuffer,"listen-tcp = %s:%d\nlisten-udp = %s:%d\nlisten-ng = %s:%d\nlisten-cli = %s:%d\n", + cw->cw_printf(cw,"listen-tcp = %s:%d\nlisten-udp = %s:%d\nlisten-ng = %s:%d\nlisten-cli = %s:%d\n", sockaddr_print_buf(&rtpe_config.tcp_listen_ep.address), rtpe_config.tcp_listen_ep.port, sockaddr_print_buf(&rtpe_config.udp_listen_ep.address), rtpe_config.udp_listen_ep.port, sockaddr_print_buf(&rtpe_config.ng_listen_ep.address), rtpe_config.ng_listen_ep.port, sockaddr_print_buf(&rtpe_config.cli_listen_ep.address), rtpe_config.cli_listen_ep.port); } -static void int_diff_print_sz(long long start_param, void* current_param, size_t sz, char* param, struct streambuf *replybuffer, char* option) { +static void int_diff_print_sz(long long start_param, void* current_param, size_t sz, char* param, struct cli_writer *cw, char* option) { long long cur_param; if (sz == sizeof(int)) @@ -346,7 +346,7 @@ static void int_diff_print_sz(long long start_param, void* current_param, size_t if(start_param != cur_param) { if (strcmp(option, "diff") == 0) { - streambuf_printf(replybuffer, "%s: %lld => %lld\n", param, start_param, cur_param); + cw->cw_printf(cw, "%s: %lld => %lld\n", param, start_param, cur_param); } else if(strcmp(option, "revert") == 0) { if (sz == sizeof(int)) *(int *) current_param = start_param; @@ -360,9 +360,9 @@ static void int_diff_print_sz(long long start_param, void* current_param, size_t #define int_diff_print(struct_member, option_string) \ int_diff_print_sz((long long) initial_rtpe_config.struct_member, (void *) &rtpe_config.struct_member, sizeof(rtpe_config.struct_member), \ - option_string, replybuffer, option) + option_string, cw, option) -static void cli_incoming_diff_or_revert(struct streambuf *replybuffer, char* option) { +static void cli_incoming_diff_or_revert(struct cli_writer *cw, char* option) { int_diff_print(common.log_level, "log-level"); int_diff_print(max_sessions, "max-sessions"); int_diff_print(cpu_limit, "max-cpu"); @@ -378,28 +378,28 @@ static void cli_incoming_diff_or_revert(struct streambuf *replybuffer, char* opt int_diff_print(redis_connect_timeout, "redis_connect_timeout-db"); } -static void cli_incoming_params_diff(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_params_diff(str *instr, struct cli_writer *cw) { - cli_incoming_diff_or_revert(replybuffer, "diff"); + cli_incoming_diff_or_revert(cw, "diff"); } -static void cli_incoming_params_revert(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_params_revert(str *instr, struct cli_writer *cw) { - cli_incoming_diff_or_revert(replybuffer, "revert"); + cli_incoming_diff_or_revert(cw, "revert"); } -static void cli_incoming_list_counters(str *instr, struct streambuf *replybuffer) { - streambuf_printf(replybuffer, "\nCurrent per-second counters:\n\n"); - streambuf_printf(replybuffer, " Packets per second :%" PRIu64 "\n", +static void cli_incoming_list_counters(str *instr, struct cli_writer *cw) { + cw->cw_printf(cw, "\nCurrent per-second counters:\n\n"); + cw->cw_printf(cw, " Packets per second :%" PRIu64 "\n", atomic64_get(&rtpe_stats.packets)); - streambuf_printf(replybuffer, " Bytes per second :%" PRIu64 "\n", + cw->cw_printf(cw, " Bytes per second :%" PRIu64 "\n", atomic64_get(&rtpe_stats.bytes)); - streambuf_printf(replybuffer, " Errors per second :%" PRIu64 "\n", + cw->cw_printf(cw, " Errors per second :%" PRIu64 "\n", atomic64_get(&rtpe_stats.errors)); } -static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_totals(str *instr, struct cli_writer *cw) { AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); for (GList *l = metrics->head; l; l = l->next) { @@ -408,111 +408,111 @@ static void cli_incoming_list_totals(str *instr, struct streambuf *replybuffer) continue; if (m->value_long) { if (!strcmp(m->descr, "")) - streambuf_printf(replybuffer, "%s\n", m->value_long); + cw->cw_printf(cw, "%s\n", m->value_long); else - streambuf_printf(replybuffer, " %-48s:%s\n", m->descr, m->value_long); + cw->cw_printf(cw, " %-48s:%s\n", m->descr, m->value_long); } else - streambuf_printf(replybuffer, "%s\n", m->descr); + cw->cw_printf(cw, "%s\n", m->descr); } } -static void cli_incoming_list_numsessions(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_numsessions(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_callhash_lock); - streambuf_printf(replybuffer, "Current sessions own: "UINT64F"\n", g_hash_table_size(rtpe_callhash) - atomic64_get(&rtpe_stats.foreign_sessions)); - streambuf_printf(replybuffer, "Current sessions foreign: "UINT64F"\n", atomic64_get(&rtpe_stats.foreign_sessions)); - streambuf_printf(replybuffer, "Current sessions total: %i\n", g_hash_table_size(rtpe_callhash)); + cw->cw_printf(cw, "Current sessions own: "UINT64F"\n", g_hash_table_size(rtpe_callhash) - atomic64_get(&rtpe_stats.foreign_sessions)); + cw->cw_printf(cw, "Current sessions foreign: "UINT64F"\n", atomic64_get(&rtpe_stats.foreign_sessions)); + cw->cw_printf(cw, "Current sessions total: %i\n", g_hash_table_size(rtpe_callhash)); rwlock_unlock_r(&rtpe_callhash_lock); - streambuf_printf(replybuffer, "Current transcoded media: "UINT64F"\n", atomic64_get(&rtpe_stats.transcoded_media)); + cw->cw_printf(cw, "Current transcoded media: "UINT64F"\n", atomic64_get(&rtpe_stats.transcoded_media)); } -static void cli_incoming_list_maxsessions(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_maxsessions(str *instr, struct cli_writer *cw) { /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions); + cw->cw_printf(cw, "Maximum sessions configured on rtpengine: %d\n", rtpe_config.max_sessions); return ; } -static void cli_incoming_list_maxcpu(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_maxcpu(str *instr, struct cli_writer *cw) { /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "Maximum CPU usage configured on rtpengine: %.1f\n", (double) rtpe_config.cpu_limit / 100.0); + cw->cw_printf(cw, "Maximum CPU usage configured on rtpengine: %.1f\n", (double) rtpe_config.cpu_limit / 100.0); return ; } -static void cli_incoming_list_maxload(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_maxload(str *instr, struct cli_writer *cw) { /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "Maximum load average configured on rtpengine: %.2f\n", (double) rtpe_config.load_limit / 100.0); + cw->cw_printf(cw, "Maximum load average configured on rtpengine: %.2f\n", (double) rtpe_config.load_limit / 100.0); return ; } -static void cli_incoming_list_maxbw(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_maxbw(str *instr, struct cli_writer *cw) { /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "Maximum bandwidth configured on rtpengine: %" PRIu64 "\n", + cw->cw_printf(cw, "Maximum bandwidth configured on rtpengine: %" PRIu64 "\n", rtpe_config.bw_limit); return ; } -static void cli_incoming_list_maxopenfiles(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_maxopenfiles(str *instr, struct cli_writer *cw) { struct rlimit rlim; pid_t pid = getpid(); if (getrlimit(RLIMIT_NOFILE, &rlim) == -1) { - streambuf_printf(replybuffer, "Fail getting rtpengine configured limits; cat /proc/%u/limits\n", pid); + cw->cw_printf(cw, "Fail getting rtpengine configured limits; cat /proc/%u/limits\n", pid); return ; } if (rlim.rlim_cur == RLIM_INFINITY) { - streambuf_printf(replybuffer, "Maximum open-files configured on rtpengine: infinite; cat /proc/%u/limits\n", pid); + cw->cw_printf(cw, "Maximum open-files configured on rtpengine: infinite; cat /proc/%u/limits\n", pid); } else { - streambuf_printf(replybuffer, "Maximum open-files configured on rtpengine: %lld; cat /proc/%u/limits\n", (long long) rlim.rlim_cur, pid); + cw->cw_printf(cw, "Maximum open-files configured on rtpengine: %lld; cat /proc/%u/limits\n", (long long) rlim.rlim_cur, pid); } return ; } -static void cli_incoming_list_timeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_timeout(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "TIMEOUT=%u\n", rtpe_config.timeout); + cw->cw_printf(cw, "TIMEOUT=%u\n", rtpe_config.timeout); rwlock_unlock_r(&rtpe_config.config_lock); return ; } -static void cli_incoming_list_silenttimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_silenttimeout(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "SILENT_TIMEOUT=%u\n", rtpe_config.silent_timeout); + cw->cw_printf(cw, "SILENT_TIMEOUT=%u\n", rtpe_config.silent_timeout); rwlock_unlock_r(&rtpe_config.config_lock); return ; } -static void cli_incoming_list_finaltimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_finaltimeout(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "FINAL_TIMEOUT=%u\n", rtpe_config.final_timeout); + cw->cw_printf(cw, "FINAL_TIMEOUT=%u\n", rtpe_config.final_timeout); rwlock_unlock_r(&rtpe_config.config_lock); return ; } -static void cli_incoming_list_offertimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_offertimeout(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); /* don't lock anything while reading the value */ - streambuf_printf(replybuffer, "OFFER_TIMEOUT=%u\n", rtpe_config.offer_timeout); + cw->cw_printf(cw, "OFFER_TIMEOUT=%u\n", rtpe_config.offer_timeout); rwlock_unlock_r(&rtpe_config.config_lock); return ; } -static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_callid(str *instr, struct cli_writer *cw) { struct call *c = 0; struct call_monologue *ml; struct call_media *md; @@ -524,18 +524,18 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) char *local_addr; if (instr->len == 0) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } c = call_get(instr); if (!c) { - streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n", instr->s); + cw->cw_printf(cw, "\nCall Id not found (%s).\n\n", instr->s); return; } - streambuf_printf(replybuffer, + cw->cw_printf(cw, "\ncallid: %s\ndeletionmark: %s\ncreated: %i\nproxy: %s\ntos: %u\nlast_signal: %llu\n" "redis_keyspace: %i\nforeign: %s\n\n", c->callid.s, c->ml_deleted ? "yes" : "no", (int) c->created.tv_sec, c->created_from, @@ -551,7 +551,7 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) timeval_subtract(&tim_result_duration, &now, &ml->started); - streambuf_printf(replybuffer, "--- Tag '" STR_FORMAT "', type: %s, label '" STR_FORMAT "', " + cw->cw_printf(cw, "--- Tag '" STR_FORMAT "', type: %s, label '" STR_FORMAT "', " "branch '" STR_FORMAT "', " "callduration " "%ld.%06ld, in dialogue with '" STR_FORMAT "'\n", @@ -568,14 +568,14 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) const struct rtp_payload_type *rtp_pt = __rtp_stats_codec(md); - streambuf_printf(replybuffer, "------ Media #%u (" STR_FORMAT " over %s) using ", + cw->cw_printf(cw, "------ Media #%u (" STR_FORMAT " over %s) using ", md->index, STR_FMT(&md->type), md->protocol ? md->protocol->name : "(unknown)"); if (!rtp_pt) - streambuf_printf(replybuffer, "unknown codec\n"); + cw->cw_printf(cw, "unknown codec\n"); else - streambuf_printf(replybuffer, STR_FORMAT "\n", STR_FMT(&rtp_pt->encoding_with_params)); + cw->cw_printf(cw, STR_FORMAT "\n", STR_FMT(&rtp_pt->encoding_with_params)); for (o = md->streams.head; o; o = o->next) { ps = o->data; @@ -586,7 +586,7 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) local_addr = ps->selected_sfd ? sockaddr_print_buf(&ps->selected_sfd->socket.local.address) : "0.0.0.0"; - streambuf_printf(replybuffer, "-------- Port %15s:%-5u <> %15s:%-5u%s, SSRC %" PRIx32 ", " + cw->cw_printf(cw, "-------- Port %15s:%-5u <> %15s:%-5u%s, SSRC %" PRIx32 ", " "" UINT64F " p, " UINT64F " b, " UINT64F " e, " UINT64F " ts", local_addr, (unsigned int) (ps->selected_sfd ? ps->selected_sfd->socket.local.port : 0), @@ -599,22 +599,22 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer) atomic64_get(&ps->last_packet)); #if RE_HAS_MEASUREDELAY if (PS_ISSET(ps, RTP) || !PS_ISSET(ps, RTCP)) - streambuf_printf(replybuffer, ", %.9f delay_min, %.9f delay_avg, %.9f delay_max", + cw->cw_printf(cw, ", %.9f delay_min, %.9f delay_avg, %.9f delay_max", (double) ps->stats.delay_min / 1000000, (double) ps->stats.delay_avg / 1000000, (double) ps->stats.delay_max / 1000000); #endif - streambuf_printf(replybuffer, "\n"); + cw->cw_printf(cw, "\n"); } } } - streambuf_printf(replybuffer, "\n"); + cw->cw_printf(cw, "\n"); rwlock_unlock_w(&c->master_lock); // because of call_get(..) obj_put(c); } -static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_sessions(str *instr, struct cli_writer *cw) { GHashTableIter iter; gpointer key, value; str *ptrkey; @@ -626,14 +626,14 @@ static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer static const char* LIST_FOREIGN = "foreign"; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } rwlock_lock_r(&rtpe_callhash_lock); if (g_hash_table_size(rtpe_callhash)==0) { - streambuf_printf(replybuffer, "No sessions on this media relay.\n"); + cw->cw_printf(cw, "No sessions on this media relay.\n"); rwlock_unlock_r(&rtpe_callhash_lock); return; } @@ -664,7 +664,7 @@ static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer break; } - streambuf_printf(replybuffer, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created.tv_sec, call->created_from, call->redis_hosted_db, IS_FOREIGN_CALL(call)?"yes":"no"); + cw->cw_printf(cw, "callid: %60s | deletionmark:%4s | created:%12i | proxy:%s | redis_keyspace:%i | foreign:%s\n", ptrkey->s, call->ml_deleted?"yes":"no", (int)call->created.tv_sec, call->created_from, call->redis_hosted_db, IS_FOREIGN_CALL(call)?"yes":"no"); } rwlock_unlock_r(&rtpe_callhash_lock); @@ -672,21 +672,21 @@ static void cli_incoming_list_sessions(str *instr, struct streambuf *replybuffer ; } else if (str_cmp(instr, LIST_OWN) == 0) { if (!found_own) { - streambuf_printf(replybuffer, "No own sessions on this media relay.\n"); + cw->cw_printf(cw, "No own sessions on this media relay.\n"); } } else if (str_cmp(instr, LIST_FOREIGN) == 0) { if (!found_foreign) { - streambuf_printf(replybuffer, "No foreign sessions on this media relay.\n"); + cw->cw_printf(cw, "No foreign sessions on this media relay.\n"); } } else { // list session for callid - cli_incoming_list_callid(instr, replybuffer); + cli_incoming_list_callid(instr, cw); } return; } -static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_maxopenfiles(str *instr, struct cli_writer *cw) { unsigned long open_files_num; pid_t pid; char *endptr; @@ -695,7 +695,7 @@ static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuf unsigned long min_open_files_num = (1 << 16); if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -703,30 +703,30 @@ static void cli_incoming_set_maxopenfiles(str *instr, struct streambuf *replybuf open_files_num = strtoul(instr->s, &endptr, 10); if ((errno == ERANGE && (open_files_num == ULONG_MAX)) || (errno != 0 && open_files_num == 0)) { - streambuf_printf(replybuffer, "Fail setting open_files to %s; errno=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail setting open_files to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail setting open_files to %s; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail setting open_files to %s; no digists found\n", instr->s); return; } else if (open_files_num < min_open_files_num) { - streambuf_printf(replybuffer, "Fail setting open_files to %lu; can't set it under %lu\n", open_files_num, min_open_files_num); + cw->cw_printf(cw, "Fail setting open_files to %lu; can't set it under %lu\n", open_files_num, min_open_files_num); return; } else if (rlim(RLIMIT_NOFILE, open_files_num) == -1){ - streambuf_printf(replybuffer, "Fail setting open_files to %lu; errno = %d\n", open_files_num, errno); + cw->cw_printf(cw, "Fail setting open_files to %lu; errno = %d\n", open_files_num, errno); return; } else { pid = getpid(); - streambuf_printf(replybuffer, "Success setting open_files to %lu; cat /proc/%u/limits\n", open_files_num, pid); + cw->cw_printf(cw, "Success setting open_files to %lu; cat /proc/%u/limits\n", open_files_num, pid); } } -static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_maxsessions(str *instr, struct cli_writer *cw) { long maxsessions_num; int disabled = -1; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -734,34 +734,34 @@ static void cli_incoming_set_maxsessions(str *instr, struct streambuf *replybuff maxsessions_num = strtol(instr->s, &endptr, 10); if ((errno == ERANGE && (maxsessions_num == LONG_MAX || maxsessions_num == LONG_MIN)) || (errno != 0 && maxsessions_num == 0)) { - streambuf_printf(replybuffer, "Fail setting maxsessions to %s; errno=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail setting maxsessions to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail setting maxsessions to %s; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail setting maxsessions to %s; no digists found\n", instr->s); return; } else if (maxsessions_num < disabled) { - streambuf_printf(replybuffer, "Fail setting maxsessions to %ld; either positive or -1 values allowed\n", maxsessions_num); + cw->cw_printf(cw, "Fail setting maxsessions to %ld; either positive or -1 values allowed\n", maxsessions_num); } else if (maxsessions_num == disabled) { rwlock_lock_w(&rtpe_config.config_lock); rtpe_config.max_sessions = maxsessions_num; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting maxsessions to %ld; disable feature\n", maxsessions_num); + cw->cw_printf(cw, "Success setting maxsessions to %ld; disable feature\n", maxsessions_num); } else { rwlock_lock_w(&rtpe_config.config_lock); rtpe_config.max_sessions = maxsessions_num; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting maxsessions to %ld\n", maxsessions_num); + cw->cw_printf(cw, "Success setting maxsessions to %ld\n", maxsessions_num); } return; } // XXX lots of code duplication, unify those set functions -static void cli_incoming_set_maxcpu(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_maxcpu(str *instr, struct cli_writer *cw) { char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -769,26 +769,26 @@ static void cli_incoming_set_maxcpu(str *instr, struct streambuf *replybuffer) { double num = strtod(instr->s, &endptr); if ((errno == ERANGE && (num == HUGE_VAL || num == -HUGE_VAL)) || (errno != 0 && num == 0) || isnan(num) || !isfinite(num)) { - streambuf_printf(replybuffer, "Fail setting maxcpu to %s; errno=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail setting maxcpu to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail setting maxcpu to %s; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail setting maxcpu to %s; no digists found\n", instr->s); return; } else { rwlock_lock_w(&rtpe_config.config_lock); rtpe_config.cpu_limit = num * 100; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting maxcpu to %.1f\n", num); + cw->cw_printf(cw, "Success setting maxcpu to %.1f\n", num); } return; } -static void cli_incoming_set_maxload(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_maxload(str *instr, struct cli_writer *cw) { char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -796,26 +796,26 @@ static void cli_incoming_set_maxload(str *instr, struct streambuf *replybuffer) double num = strtod(instr->s, &endptr); if ((errno == ERANGE && (num == HUGE_VAL || num == -HUGE_VAL)) || (errno != 0 && num == 0) || isnan(num) || !isfinite(num)) { - streambuf_printf(replybuffer, "Fail setting maxload to %s; errno=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail setting maxload to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail setting maxload to %s; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail setting maxload to %s; no digists found\n", instr->s); return; } else { rwlock_lock_w(&rtpe_config.config_lock); rtpe_config.load_limit = num * 100; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting maxload to %.2f\n", num); + cw->cw_printf(cw, "Success setting maxload to %.2f\n", num); } return; } -static void cli_incoming_set_maxbw(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_maxbw(str *instr, struct cli_writer *cw) { char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -823,27 +823,27 @@ static void cli_incoming_set_maxbw(str *instr, struct streambuf *replybuffer) { uint64_t num = strtoull(instr->s, &endptr, 10); if ((errno == ERANGE && (num == ULLONG_MAX)) || (errno != 0 && num == 0) ) { - streambuf_printf(replybuffer, "Fail setting maxbw to %s; errno=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail setting maxbw to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail setting maxbw to %s; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail setting maxbw to %s; no digists found\n", instr->s); return; } else { rwlock_lock_w(&rtpe_config.config_lock); rtpe_config.bw_limit = num * 100; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting maxbw to %" PRIu64 "\n", num); + cw->cw_printf(cw, "Success setting maxbw to %" PRIu64 "\n", num); } return; } -static void cli_incoming_set_gentimeout(str *instr, struct streambuf *replybuffer, int *conf_timeout) { +static void cli_incoming_set_gentimeout(str *instr, struct cli_writer *cw, int *conf_timeout) { long timeout_num; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -851,65 +851,65 @@ static void cli_incoming_set_gentimeout(str *instr, struct streambuf *replybuffe timeout_num = strtol(instr->s, &endptr, 10); if ((errno == ERANGE && (timeout_num == ULONG_MAX)) || (errno != 0 && timeout_num == 0) || timeout_num < 0 || timeout_num >= INT_MAX) { - streambuf_printf(replybuffer, "Fail setting timeout to %s; errno=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail setting timeout to %s; errno=%d\n", instr->s, errno); return; } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail setting timeout to %s; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail setting timeout to %s; no digists found\n", instr->s); return; } else { rwlock_lock_w(&rtpe_config.config_lock); *conf_timeout = (int) timeout_num; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting timeout to %lu\n", timeout_num); + cw->cw_printf(cw, "Success setting timeout to %lu\n", timeout_num); } } -static void cli_incoming_set_timeout(str *instr, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.timeout); +static void cli_incoming_set_timeout(str *instr, struct cli_writer *cw) { + cli_incoming_set_gentimeout(instr, cw, &rtpe_config.timeout); } -static void cli_incoming_set_silenttimeout(str *instr, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.silent_timeout); +static void cli_incoming_set_silenttimeout(str *instr, struct cli_writer *cw) { + cli_incoming_set_gentimeout(instr, cw, &rtpe_config.silent_timeout); } -static void cli_incoming_set_finaltimeout(str *instr, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.final_timeout); +static void cli_incoming_set_finaltimeout(str *instr, struct cli_writer *cw) { + cli_incoming_set_gentimeout(instr, cw, &rtpe_config.final_timeout); } -static void cli_incoming_set_offertimeout(str *instr, struct streambuf *replybuffer) { - cli_incoming_set_gentimeout(instr, replybuffer, &rtpe_config.offer_timeout); +static void cli_incoming_set_offertimeout(str *instr, struct cli_writer *cw) { + cli_incoming_set_gentimeout(instr, cw, &rtpe_config.offer_timeout); } -static void cli_incoming_list(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list(str *instr, struct cli_writer *cw) { if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } - cli_handler_do(cli_list_handlers, instr, replybuffer); + cli_handler_do(cli_list_handlers, instr, cw); } -static void cli_incoming_set(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set(str *instr, struct cli_writer *cw) { if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } - cli_handler_do(cli_set_handlers, instr, replybuffer); + cli_handler_do(cli_set_handlers, instr, cw); } -static void cli_incoming_params(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_params(str *instr, struct cli_writer *cw) { if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } - cli_handler_do(cli_params_handlers, instr, replybuffer); + cli_handler_do(cli_params_handlers, instr, cw); } -static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_terminate(str *instr, struct cli_writer *cw) { struct call* c=0; struct call_monologue *ml; GList *i; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -923,7 +923,7 @@ static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { // update cli ilog(LOG_INFO,"All calls terminated by operator."); - streambuf_printf(replybuffer, "%s\n", "All calls terminated by operator."); + cw->cw_printf(cw, "%s\n", "All calls terminated by operator."); return; @@ -934,7 +934,7 @@ static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { // update cli ilog(LOG_INFO,"All own calls terminated by operator."); - streambuf_printf(replybuffer, "%s\n", "All own calls terminated by operator."); + cw->cw_printf(cw, "%s\n", "All own calls terminated by operator."); return; @@ -945,7 +945,7 @@ static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { // update cli ilog(LOG_INFO,"All foreign calls terminated by operator."); - streambuf_printf(replybuffer, "%s\n", "All foreign calls terminated by operator."); + cw->cw_printf(cw, "%s\n", "All foreign calls terminated by operator."); return; } @@ -954,7 +954,7 @@ static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { c = call_get(instr); if (!c) { - streambuf_printf(replybuffer, "\nCall Id not found (%s).\n\n",instr->s); + cw->cw_printf(cw, "\nCall Id not found (%s).\n\n",instr->s); return; } @@ -966,7 +966,7 @@ static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { } } - streambuf_printf(replybuffer, "\nCall Id (%s) successfully terminated by operator.\n\n",instr->s); + cw->cw_printf(cw, "\nCall Id (%s) successfully terminated by operator.\n\n",instr->s); ilog(LOG_WARN, "Call Id (%s) successfully terminated by operator.",instr->s); rwlock_unlock_w(&c->master_lock); @@ -975,12 +975,12 @@ static void cli_incoming_terminate(str *instr, struct streambuf *replybuffer) { obj_put(c); } -static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_ksadd(str *instr, struct cli_writer *cw) { unsigned long uint_keyspace_db; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -988,29 +988,29 @@ static void cli_incoming_ksadd(str *instr, struct streambuf *replybuffer) { uint_keyspace_db = strtoul(instr->s, &endptr, 10); if ((errno == ERANGE && (uint_keyspace_db == ULONG_MAX)) || (errno != 0 && uint_keyspace_db == 0)) { - streambuf_printf(replybuffer, "Fail adding keyspace %s to redis notifications; errono=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail adding keyspace %s to redis notifications; errono=%d\n", instr->s, errno); } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail adding keyspace %s to redis notifications; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail adding keyspace %s to redis notifications; no digists found\n", instr->s); } else { rwlock_lock_w(&rtpe_config.config_lock); if (!g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db))) { g_queue_push_tail(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)); redis_notify_subscribe_action(SUBSCRIBE_KEYSPACE, uint_keyspace_db); - streambuf_printf(replybuffer, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db); + cw->cw_printf(cw, "Success adding keyspace %lu to redis notifications.\n", uint_keyspace_db); } else { - streambuf_printf(replybuffer, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db); + cw->cw_printf(cw, "Keyspace %lu is already among redis notifications.\n", uint_keyspace_db); } rwlock_unlock_w(&rtpe_config.config_lock); } } -static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_ksrm(str *instr, struct cli_writer *cw) { GList *l; unsigned long uint_keyspace_db; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -1019,45 +1019,52 @@ static void cli_incoming_ksrm(str *instr, struct streambuf *replybuffer) { rwlock_lock_w(&rtpe_config.config_lock); if ((errno == ERANGE && (uint_keyspace_db == ULONG_MAX)) || (errno != 0 && uint_keyspace_db == 0)) { - streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; errono=%d\n", instr->s, errno); + cw->cw_printf(cw, "Fail removing keyspace %s to redis notifications; errono=%d\n", instr->s, errno); } else if (endptr == instr->s) { - streambuf_printf(replybuffer, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s); + cw->cw_printf(cw, "Fail removing keyspace %s to redis notifications; no digists found\n", instr->s); } else if ((l = g_queue_find(&rtpe_config.redis_subscribed_keyspaces, GUINT_TO_POINTER(uint_keyspace_db)))) { // remove this keyspace redis_notify_subscribe_action(UNSUBSCRIBE_KEYSPACE, uint_keyspace_db); g_queue_remove(&rtpe_config.redis_subscribed_keyspaces, l->data); - streambuf_printf(replybuffer, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db); + cw->cw_printf(cw, "Successfully unsubscribed from keyspace %lu.\n", uint_keyspace_db); // destroy foreign calls for this keyspace destroy_keyspace_foreign_calls(uint_keyspace_db); // update cli - streambuf_printf(replybuffer, "Successfully removed all foreign calls for keyspace %lu.\n", uint_keyspace_db); + cw->cw_printf(cw, "Successfully removed all foreign calls for keyspace %lu.\n", uint_keyspace_db); } else { - streambuf_printf(replybuffer, "Keyspace %lu is not among redis notifications.\n", uint_keyspace_db); + cw->cw_printf(cw, "Keyspace %lu is not among redis notifications.\n", uint_keyspace_db); } rwlock_unlock_w(&rtpe_config.config_lock); } -static void cli_incoming_kslist(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_kslist(str *instr, struct cli_writer *cw) { GList *l; - streambuf_printf(replybuffer, "\nSubscribed-on keyspaces:\n"); + cw->cw_printf(cw, "\nSubscribed-on keyspaces:\n"); rwlock_lock_r(&rtpe_config.config_lock); for (l = rtpe_config.redis_subscribed_keyspaces.head; l; l = l->next) { - streambuf_printf(replybuffer, "%u ", GPOINTER_TO_UINT(l->data)); + cw->cw_printf(cw, "%u ", GPOINTER_TO_UINT(l->data)); } rwlock_unlock_r(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "\n"); + cw->cw_printf(cw, "\n"); } static void cli_incoming(struct streambuf_stream *s) { ilog(LOG_INFO, "New cli connection from %s", s->addr); } +static void cli_streambuf_printf(struct cli_writer *cw, const char *fmt, ...) { + va_list va; + va_start(va, fmt); + streambuf_vprintf(cw->ptr, fmt, va); + va_end(va); +} + static void cli_stream_readable(struct streambuf_stream *s) { static const int MAXINPUT = 1024; char *inbuf; @@ -1075,13 +1082,21 @@ static void cli_stream_readable(struct streambuf_stream *s) { ilog(LOG_INFO, "Got CLI command: %s%s%s", FMT_M(inbuf)); str_init(&instr, inbuf); - cli_handler_do(cli_top_handlers, &instr, s->outbuf); + struct cli_writer cw = { + .cw_printf = cli_streambuf_printf, + .ptr = s->outbuf, + }; + cli_handle(&instr, &cw); free(inbuf); streambuf_stream_shutdown(s); log_info_clear(); } +void cli_handle(str *instr, struct cli_writer *cw) { + cli_handler_do(cli_top_handlers, instr, cw); +} + static void cli_free(void *p) { struct cli *c = p; streambuf_listener_shutdown(&c->listeners[0]); @@ -1128,40 +1143,40 @@ fail: return NULL; } -static void cli_incoming_list_loglevel(str *instr, struct streambuf *replybuffer) { - streambuf_printf(replybuffer, "%i\n", get_log_level()); +static void cli_incoming_list_loglevel(str *instr, struct cli_writer *cw) { + cw->cw_printf(cw, "%i\n", get_log_level()); } -static void cli_incoming_set_loglevel(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_loglevel(str *instr, struct cli_writer *cw) { int nl; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } nl = atoi(instr->s); if (nl < 1 || nl > 7) { - streambuf_printf(replybuffer, "Invalid log level '%s', must be number between 1 and 7\n", + cw->cw_printf(cw, "Invalid log level '%s', must be number between 1 and 7\n", instr->s); return; } g_atomic_int_set(&rtpe_config.common.log_level, nl); - streambuf_printf(replybuffer, "Success setting loglevel to %i\n", nl); + cw->cw_printf(cw, "Success setting loglevel to %i\n", nl); } -static void cli_incoming_list_redisallowederrors(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_redisallowederrors(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_allowed_errors); + cw->cw_printf(cw, "%d\n", rtpe_config.redis_allowed_errors); rwlock_unlock_r(&rtpe_config.config_lock); } -static void cli_incoming_set_redisallowederrors(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_redisallowederrors(str *instr, struct cli_writer *cw) { long allowed_errors; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } @@ -1172,28 +1187,28 @@ static void cli_incoming_set_redisallowederrors(str *instr, struct streambuf *re rtpe_config.redis_allowed_errors = allowed_errors; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting redis-allowed-errors to %ld\n", allowed_errors); + cw->cw_printf(cw, "Success setting redis-allowed-errors to %ld\n", allowed_errors); } -static void cli_incoming_list_redisdisabletime(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_redisdisabletime(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_disable_time); + cw->cw_printf(cw, "%d\n", rtpe_config.redis_disable_time); rwlock_unlock_r(&rtpe_config.config_lock); } -static void cli_incoming_set_redisdisabletime(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_redisdisabletime(str *instr, struct cli_writer *cw) { long seconds; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } errno = 0; seconds = strtol(instr->s, &endptr, 10); if (seconds < 0) { - streambuf_printf(replybuffer, "Invalid redis-disable-time value %ld, must be >= 0\n", seconds); + cw->cw_printf(cw, "Invalid redis-disable-time value %ld, must be >= 0\n", seconds); return; } @@ -1201,64 +1216,64 @@ static void cli_incoming_set_redisdisabletime(str *instr, struct streambuf *repl rtpe_config.redis_disable_time = seconds; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting redis-disable-time to %ld\n", seconds); + cw->cw_printf(cw, "Success setting redis-disable-time to %ld\n", seconds); } -static void cli_incoming_list_redisconnecttimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_redisconnecttimeout(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_connect_timeout); + cw->cw_printf(cw, "%d\n", rtpe_config.redis_connect_timeout); rwlock_unlock_r(&rtpe_config.config_lock); } -static void cli_incoming_set_redisconnecttimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_redisconnecttimeout(str *instr, struct cli_writer *cw) { long timeout; char *endptr; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return ; } errno = 0; timeout = strtol(instr->s, &endptr, 10); if (timeout <= 0) { - streambuf_printf(replybuffer, "Invalid redis-connect-timeout value %ld, must be > 0\n", timeout); + cw->cw_printf(cw, "Invalid redis-connect-timeout value %ld, must be > 0\n", timeout); return; } rwlock_lock_w(&rtpe_config.config_lock); rtpe_config.redis_connect_timeout = timeout; rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting redis-connect-timeout to %ld\n", timeout); + cw->cw_printf(cw, "Success setting redis-connect-timeout to %ld\n", timeout); } -static void cli_incoming_list_rediscmdtimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_rediscmdtimeout(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "%d\n", rtpe_config.redis_cmd_timeout); + cw->cw_printf(cw, "%d\n", rtpe_config.redis_cmd_timeout); rwlock_unlock_r(&rtpe_config.config_lock); } -static void cli_incoming_set_rediscmdtimeout(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_rediscmdtimeout(str *instr, struct cli_writer *cw) { long timeout; char *endptr; int fail = 0; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return; } errno = 0; timeout = strtol(instr->s, &endptr, 10); if (timeout < 0) { - streambuf_printf(replybuffer, "Invalid redis-cmd-timeout value %ld, must be >= 0\n", timeout); + cw->cw_printf(cw, "Invalid redis-cmd-timeout value %ld, must be >= 0\n", timeout); return; } rwlock_lock_w(&rtpe_config.config_lock); if (rtpe_config.redis_cmd_timeout == timeout) { rwlock_unlock_w(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "Success setting redis-cmd-timeout to %ld\n", timeout); + cw->cw_printf(cw, "Success setting redis-cmd-timeout to %ld\n", timeout); return; } rtpe_config.redis_cmd_timeout = timeout; @@ -1266,61 +1281,61 @@ static void cli_incoming_set_rediscmdtimeout(str *instr, struct streambuf *reply if (timeout == 0) { - streambuf_printf(replybuffer, "Warning: Setting redis-cmd-timeout to 0 (no timeout) will require a redis reconnect\n"); + cw->cw_printf(cw, "Warning: Setting redis-cmd-timeout to 0 (no timeout) will require a redis reconnect\n"); if (rtpe_redis && redis_reconnect(rtpe_redis)) { - streambuf_printf(replybuffer, "Failed reconnecting to redis\n"); + cw->cw_printf(cw, "Failed reconnecting to redis\n"); fail = 1; } if (rtpe_redis && redis_reconnect(rtpe_redis_write)) { - streambuf_printf(replybuffer, "Failed reconnecting to redis-write\n"); + cw->cw_printf(cw, "Failed reconnecting to redis-write\n"); fail = 1; } if (rtpe_redis && redis_reconnect(rtpe_redis_notify)) { - streambuf_printf(replybuffer, "Failed reconnecting to redis-notify\n"); + cw->cw_printf(cw, "Failed reconnecting to redis-notify\n"); fail = 1; } } else { if (rtpe_redis && redis_set_timeout(rtpe_redis, timeout)) { - streambuf_printf(replybuffer, "Failed setting redis-cmd-timeout for redis %ld\n", timeout); + cw->cw_printf(cw, "Failed setting redis-cmd-timeout for redis %ld\n", timeout); fail = 1; } if (rtpe_redis_write && redis_set_timeout(rtpe_redis_write, timeout)) { - streambuf_printf(replybuffer, "Failed setting redis-cmd-timeout for redis-write %ld\n", timeout); + cw->cw_printf(cw, "Failed setting redis-cmd-timeout for redis-write %ld\n", timeout); fail = 1; } if (rtpe_redis_notify && redis_set_timeout(rtpe_redis_notify, timeout)) { - streambuf_printf(replybuffer, "Failed setting redis-cmd-timeout for redis-notify %ld\n", timeout); + cw->cw_printf(cw, "Failed setting redis-cmd-timeout for redis-notify %ld\n", timeout); fail = 1; } } if (!fail) - streambuf_printf(replybuffer, "Success setting redis-cmd-timeout to %ld\n", timeout); + cw->cw_printf(cw, "Success setting redis-cmd-timeout to %ld\n", timeout); } -static void cli_incoming_list_interfaces(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_interfaces(str *instr, struct cli_writer *cw) { for (GList *l = all_local_interfaces.head; l; l = l->next) { struct local_intf *lif = l->data; // only show first-order interface entries: socket families must match if (lif->logical->preferred_family != lif->spec->local_address.addr.family) continue; - streambuf_printf(replybuffer, "Interface '%s' address '%s' (%s)\n", lif->logical->name.s, + cw->cw_printf(cw, "Interface '%s' address '%s' (%s)\n", lif->logical->name.s, sockaddr_print_buf(&lif->spec->local_address.addr), lif->spec->local_address.addr.family->name); - streambuf_printf(replybuffer, " Port range: %5u - %5u\n", + cw->cw_printf(cw, " Port range: %5u - %5u\n", lif->spec->port_pool.min, lif->spec->port_pool.max); unsigned int f = g_atomic_int_get(&lif->spec->port_pool.free_ports); unsigned int l = g_atomic_int_get(&lif->spec->port_pool.last_used); unsigned int r = lif->spec->port_pool.max - lif->spec->port_pool.min + 1; - streambuf_printf(replybuffer, " Ports used: %5u / %5u (%5.1f%%)\n", + cw->cw_printf(cw, " Ports used: %5u / %5u (%5.1f%%)\n", r - f, r, (double) (r - f) * 100.0 / r); - streambuf_printf(replybuffer, " Last port used: %5u\n", + cw->cw_printf(cw, " Last port used: %5u\n", l); } } -static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_jsonstats(str *instr, struct cli_writer *cw) { AUTO_CLEANUP_INIT(GQueue *metrics, statistics_free_metrics, statistics_gather_metrics()); for (GList *l = metrics->head; l; l = l->next) { @@ -1329,35 +1344,35 @@ static void cli_incoming_list_jsonstats(str *instr, struct streambuf *replybuffe continue; if (m->is_follow_up) - streambuf_printf(replybuffer, ","); + cw->cw_printf(cw, ","); if (m->value_short) - streambuf_printf(replybuffer, "\"%s\":%s", m->label, m->value_short); + cw->cw_printf(cw, "\"%s\":%s", m->label, m->value_short); else if (m->is_bracket) - streambuf_printf(replybuffer, "%s", m->label); + cw->cw_printf(cw, "%s", m->label); else - streambuf_printf(replybuffer, "\"%s\":", m->label); + cw->cw_printf(cw, "\"%s\":", m->label); } } -static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_transcoders(str *instr, struct cli_writer *cw) { mutex_lock(&rtpe_codec_stats_lock); GList *chains = g_hash_table_get_keys(rtpe_codec_stats); if (!chains) - streambuf_printf(replybuffer, "No stats entries\n"); + cw->cw_printf(cw, "No stats entries\n"); else { int last_tv_sec = rtpe_now.tv_sec - 1; unsigned int idx = last_tv_sec & 1; for (GList *l = chains; l; l = l->next) { char *chain = l->data; struct codec_stats *stats_entry = g_hash_table_lookup(rtpe_codec_stats, chain); - streambuf_printf(replybuffer, "%s: %i transcoders\n", chain, g_atomic_int_get(&stats_entry->num_transcoders)); + cw->cw_printf(cw, "%s: %i transcoders\n", chain, g_atomic_int_get(&stats_entry->num_transcoders)); if (g_atomic_int_get(&stats_entry->last_tv_sec[idx]) != last_tv_sec) continue; - streambuf_printf(replybuffer, " " UINT64F " packets/s\n", atomic64_get(&stats_entry->packets_input[idx])); - streambuf_printf(replybuffer, " " UINT64F " bytes/s\n", atomic64_get(&stats_entry->bytes_input[idx])); - streambuf_printf(replybuffer, " " UINT64F " samples/s\n", atomic64_get(&stats_entry->pcm_samples[idx])); + cw->cw_printf(cw, " " UINT64F " packets/s\n", atomic64_get(&stats_entry->packets_input[idx])); + cw->cw_printf(cw, " " UINT64F " bytes/s\n", atomic64_get(&stats_entry->bytes_input[idx])); + cw->cw_printf(cw, " " UINT64F " samples/s\n", atomic64_get(&stats_entry->pcm_samples[idx])); } } @@ -1366,26 +1381,26 @@ static void cli_incoming_list_transcoders(str *instr, struct streambuf *replybuf g_list_free(chains); } -static void cli_incoming_list_controltos(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_list_controltos(str *instr, struct cli_writer *cw) { rwlock_lock_r(&rtpe_config.config_lock); - streambuf_printf(replybuffer, "%d\n", rtpe_config.control_tos); + cw->cw_printf(cw, "%d\n", rtpe_config.control_tos); rwlock_unlock_r(&rtpe_config.config_lock); } -static void cli_incoming_set_controltos(str *instr, struct streambuf *replybuffer) { +static void cli_incoming_set_controltos(str *instr, struct cli_writer *cw) { long tos; char *endptr; int i; if (str_shift(instr, 1)) { - streambuf_printf(replybuffer, "%s\n", "More parameters required."); + cw->cw_printf(cw, "%s\n", "More parameters required."); return ; } errno = 0; tos = strtol(instr->s, &endptr, 10); if (tos < 0 || tos > 255) { - streambuf_printf(replybuffer, "Invalid control-tos value %ld, must be between 0 and 255\n", tos); + cw->cw_printf(cw, "Invalid control-tos value %ld, must be between 0 and 255\n", tos); return; } @@ -1399,5 +1414,5 @@ static void cli_incoming_set_controltos(str *instr, struct streambuf *replybuffe } } - streambuf_printf(replybuffer, "Success setting redis-connect-timeout to %ld\n", tos); + cw->cw_printf(cw, "Success setting redis-connect-timeout to %ld\n", tos); } diff --git a/include/cli.h b/include/cli.h index 7f6264d55..58ee45a3b 100644 --- a/include/cli.h +++ b/include/cli.h @@ -13,6 +13,15 @@ struct cli { struct streambuf_listener listeners[2]; }; +struct cli_writer; + +struct cli_writer { + void (*cw_printf)(struct cli_writer *, const char *, ...) __attribute__ ((format (printf, 2, 3))); + void *ptr; +}; + struct cli *cli_new(struct poller *p, endpoint_t *); +void cli_handle(str *instr, struct cli_writer *); + #endif /* CLI_UDP_H_ */