From 83965bdb1f84890794fed39284d4f0849011265f Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 27 Feb 2019 16:55:48 -0500 Subject: [PATCH] TT#50652 infrastructure to support reading/playing of media files Change-Id: I8212f5c74bcb9557d41f80ef3a1595f53bafde8a --- daemon/Makefile | 2 +- daemon/call.c | 6 ++ daemon/call_interfaces.c | 44 ++++++++++++ daemon/control_ng.c | 4 ++ daemon/main.c | 16 ++++- daemon/media_player.c | 141 ++++++++++++++++++++++++++++++++++++++ daemon/timerthread.c | 3 + include/call.h | 2 + include/call_interfaces.h | 1 + include/control_ng.h | 1 + include/main.h | 1 + include/media_player.h | 47 +++++++++++++ include/timerthread.h | 9 +++ t/.gitignore | 1 + t/Makefile | 6 +- utils/rtpengine-ng-client | 3 +- 16 files changed, 280 insertions(+), 7 deletions(-) create mode 100644 daemon/media_player.c create mode 100644 include/media_player.h diff --git a/daemon/Makefile b/daemon/Makefile index ddbfb4d07..ba51810ac 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -123,7 +123,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c load.c dtmf.c timerthread.c + codec.c load.c dtmf.c timerthread.c media_player.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call.c b/daemon/call.c index 6ce545685..723810320 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -44,6 +44,7 @@ #include "main.h" #include "graphite.h" #include "codec.h" +#include "media_player.h" /* also serves as array index for callstream->peers[] */ @@ -2224,6 +2225,11 @@ void call_destroy(struct call *c) { ice_shutdown(&md->ice_agent); } + + if (ml->player) { + media_player_stop(ml->player); + media_player_put(&ml->player); + } } k = g_hash_table_get_values(c->ssrc_hash->ht); diff --git a/daemon/call_interfaces.c b/daemon/call_interfaces.c index aca279061..e25edebbb 100644 --- a/daemon/call_interfaces.c +++ b/daemon/call_interfaces.c @@ -27,6 +27,7 @@ #include "streambuf.h" #include "main.h" #include "load.h" +#include "media_player.h" static pcre *info_re; @@ -1731,6 +1732,49 @@ out: } +const char *call_play_media_ng(bencode_item_t *input, bencode_item_t *output) { + str callid, fromtag, file; + struct call *call; + struct call_monologue *monologue; + const char *err = NULL; + + if (!bencode_dictionary_get_str(input, "call-id", &callid)) + return "No call-id in message"; + call = call_get_opmode(&callid, OP_OTHER); + if (!call) + return "Unknown call-id"; + + err = "No participant party specified"; + if (bencode_dictionary_get_str(input, "from-tag", &fromtag)) { + monologue = call_get_mono_dialogue(call, &fromtag, NULL, NULL); + err = "Unknown monologue from-tag"; + if (!monologue) + goto out; + } + else + goto out; + + if (!monologue->player) + monologue->player = media_player_new(monologue); + + err = "No media file specified"; + if (bencode_dictionary_get_str(input, "file", &file)) { + err = "Failed to start media playback from file"; + if (media_player_play_file(monologue->player, &file)) + goto out; + } + else + goto out; + + err = NULL; + +out: + rwlock_unlock_w(&call->master_lock); + obj_put(call); + return err; +} + + int call_interfaces_init() { const char *errptr; int erroff; diff --git a/daemon/control_ng.c b/daemon/control_ng.c index 39f56dbd3..8225f8aac 100644 --- a/daemon/control_ng.c +++ b/daemon/control_ng.c @@ -243,6 +243,10 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin errstr = call_unblock_media_ng(dict, resp); g_atomic_int_inc(&cur->unblock_media); break; + case CSH_LOOKUP("play media"): + errstr = call_play_media_ng(dict, resp); + g_atomic_int_inc(&cur->play_media); + break; default: errstr = "Unrecognized command"; } diff --git a/daemon/main.c b/daemon/main.c index c5cf23f1b..378c579fd 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -42,6 +42,7 @@ #include "codeclib.h" #include "load.h" #include "ssllib.h" +#include "media_player.h" @@ -67,6 +68,7 @@ struct rtpengine_config rtpe_config = { .redis_connect_timeout = 1000, .rec_method = "pcap", .rec_format = "raw", + .media_num_threads = -1, }; @@ -341,6 +343,7 @@ static void options(int *argc, char ***argv) { { "log-format", 0, 0, G_OPTION_ARG_STRING, &log_format, "Log prefix format", "default|parsable"}, { "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &rtpe_config.fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only, 2: Kamailio", "INT" }, { "num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.num_threads, "Number of worker threads to create", "INT" }, + { "media-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.media_num_threads, "Number of worker threads for media playback", "INT" }, { "delete-delay", 'd', 0, G_OPTION_ARG_INT, &rtpe_config.delete_delay, "Delay for deleting a session from memory.", "INT" }, { "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL }, { "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL }, @@ -559,6 +562,7 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->redis_write_db = rtpe_config.redis_write_db; ini_rtpe_cfg->no_redis_required = rtpe_config.no_redis_required; ini_rtpe_cfg->num_threads = rtpe_config.num_threads; + ini_rtpe_cfg->media_num_threads = rtpe_config.media_num_threads; ini_rtpe_cfg->fmt = rtpe_config.fmt; ini_rtpe_cfg->log_format = rtpe_config.log_format; ini_rtpe_cfg->redis_allowed_errors = rtpe_config.redis_allowed_errors; @@ -617,6 +621,7 @@ static void init_everything(void) { abort(); statistics_init(); codeclib_init(0); + media_player_init(); } @@ -744,7 +749,7 @@ no_kernel: int main(int argc, char **argv) { - int idx=0; + int idx; early_init(); options(&argc, &argv); @@ -776,9 +781,14 @@ int main(int argc, char **argv) { service_notify("READY=1\n"); - for (;idx +#include +#include +#include "obj.h" +#include "log.h" +#include "timerthread.h" +#include "call.h" +#include "str.h" + + + +static struct timerthread media_player_thread; + + + +// appropriate lock must be held +static void media_player_shutdown(struct media_player *mp) { + ilog(LOG_DEBUG, "shutting down media_player"); + timerthread_obj_deschedule(&mp->tt_obj); + avformat_free_context(mp->fmtctx); + mp->fmtctx = NULL; +} + + +void media_player_stop(struct media_player *mp) { + media_player_shutdown(mp); +} + + +static void __media_player_free(void *p) { + struct media_player *mp = p; + + ilog(LOG_DEBUG, "freeing media_player"); + + media_player_shutdown(mp); + mutex_destroy(&mp->lock); + obj_put(mp->call); +} + + +// call->master_lock held in W +struct media_player *media_player_new(struct call_monologue *ml) { + ilog(LOG_DEBUG, "creating media_player"); + + struct media_player *mp = obj_alloc0("media_player", sizeof(*mp), __media_player_free); + + mp->tt_obj.tt = &media_player_thread; + mutex_init(&mp->lock); + mp->call = obj_get(ml->call); + mp->ml = ml; + + av_init_packet(&mp->pkt); + mp->pkt.data = NULL; + mp->pkt.size = 0; + + return mp; +} + + +// appropriate lock must be held +static void media_player_read_packet(struct media_player *mp) { + int ret = av_read_frame(mp->fmtctx, &mp->pkt); + if (ret < 0) { + if (ret == AVERROR_EOF) { + ilog(LOG_DEBUG, "EOF reading from media stream"); + return; + } + ilog(LOG_ERR, "Error while reading from media stream"); + return; + } + + if (!mp->fmtctx->streams) { + ilog(LOG_ERR, "No AVStream present in format context"); + goto out; + } + + AVStream *avs = mp->fmtctx->streams[0]; + if (!avs) { + ilog(LOG_ERR, "No AVStream present in format context"); + goto out; + } + + long long us_dur = mp->pkt.duration * 1000000LL * avs->time_base.num / avs->time_base.den; + ilog(LOG_DEBUG, "read media packet: duration %llu (%lli us), time_base %i/%i", + (unsigned long long) mp->pkt.duration, us_dur, + avs->time_base.num, avs->time_base.den); + + timeval_add_usec(&mp->next_run, us_dur); + timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run); + +out: + av_packet_unref(&mp->pkt); +} + + +// call->master_lock held in W +int media_player_play_file(struct media_player *mp, const str *file) { + media_player_shutdown(mp); + + char file_s[PATH_MAX]; + snprintf(file_s, sizeof(file_s), STR_FORMAT, STR_FMT(file)); + + int ret = avformat_open_input(&mp->fmtctx, file_s, NULL, NULL); + if (ret < 0) + return -1; + + // start playback now + + mp->next_run = rtpe_now; + media_player_read_packet(mp); + + return 0; +} + + +static void media_player_run(void *ptr) { + struct media_player *mp = ptr; + struct call *call = mp->call; + + ilog(LOG_DEBUG, "running scheduled media_player"); + + rwlock_lock_r(&call->master_lock); + mutex_lock(&mp->lock); + + media_player_read_packet(mp); + + mutex_unlock(&mp->lock); + rwlock_unlock_r(&call->master_lock); +} + + +void media_player_init(void) { + timerthread_init(&media_player_thread, media_player_run); +} + + +void media_player_loop(void *p) { + ilog(LOG_DEBUG, "media_player_loop"); + timerthread_run(&media_player_thread); +} diff --git a/daemon/timerthread.c b/daemon/timerthread.c index 86e6ccca5..0b02ceb90 100644 --- a/daemon/timerthread.c +++ b/daemon/timerthread.c @@ -60,6 +60,9 @@ void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, const struc if (!tt_obj) return; + ilog(LOG_DEBUG, "scheduling timer object at %llu.%06lu", (unsigned long long) tv->tv_sec, + (unsigned long) tv->tv_usec); + struct timerthread *tt = tt_obj->tt; if (tt_obj->next_check.tv_sec && timeval_cmp(&tt_obj->next_check, tv) <= 0) return; /* already scheduled sooner */ diff --git a/include/call.h b/include/call.h index e9ebab8a4..2acc90944 100644 --- a/include/call.h +++ b/include/call.h @@ -198,6 +198,7 @@ struct ice_agent; struct ssrc_hash; struct codec_handler; struct rtp_payload_type; +struct media_player; typedef bencode_buffer_t call_buffer_t; @@ -361,6 +362,7 @@ struct call_monologue { struct call_monologue *active_dialogue; GQueue medias; GHashTable *media_ids; + struct media_player *player; int block_dtmf:1; int block_media:1; diff --git a/include/call_interfaces.h b/include/call_interfaces.h index edbc6ada1..6fea0c36e 100644 --- a/include/call_interfaces.h +++ b/include/call_interfaces.h @@ -108,6 +108,7 @@ const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *); const char *call_block_media_ng(bencode_item_t *, bencode_item_t *); const char *call_unblock_media_ng(bencode_item_t *, bencode_item_t *); +const char *call_play_media_ng(bencode_item_t *, bencode_item_t *); void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output, struct call_stats *totals); diff --git a/include/control_ng.h b/include/control_ng.h index ef8b3489f..59fde9ef7 100644 --- a/include/control_ng.h +++ b/include/control_ng.h @@ -25,6 +25,7 @@ struct control_ng_stats { int unblock_dtmf; int block_media; int unblock_media; + int play_media; int errors; }; diff --git a/include/main.h b/include/main.h index 1636bfd12..86f759fb7 100644 --- a/include/main.h +++ b/include/main.h @@ -65,6 +65,7 @@ struct rtpengine_config { char *redis_auth; char *redis_write_auth; int num_threads; + int media_num_threads; char *spooldir; char *rec_method; char *rec_format; diff --git a/include/media_player.h b/include/media_player.h new file mode 100644 index 000000000..e77aee420 --- /dev/null +++ b/include/media_player.h @@ -0,0 +1,47 @@ +#ifndef _MEDIA_PLAYER_H_ +#define _MEDIA_PLAYER_H_ + + +#include +#include +#include "auxlib.h" +#include "timerthread.h" +#include "str.h" + + + +struct call; +struct call_monologue; + + +struct media_player { + struct timerthread_obj tt_obj; + mutex_t lock; + struct call *call; + struct call_monologue *ml; + + struct timeval next_run; + + AVFormatContext *fmtctx; + AVPacket pkt; +}; + + +struct media_player *media_player_new(struct call_monologue *); +int media_player_play_file(struct media_player *, const str *); +void media_player_stop(struct media_player *); + +void media_player_init(void); +void media_player_loop(void *); + + + +INLINE void media_player_put(struct media_player **mp) { + if (!*mp) + return; + obj_put(&(*mp)->tt_obj); + *mp = NULL; +} + + +#endif diff --git a/include/timerthread.h b/include/timerthread.h index 7ea4f299b..3f19d9bb6 100644 --- a/include/timerthread.h +++ b/include/timerthread.h @@ -30,4 +30,13 @@ void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct time void timerthread_obj_deschedule(struct timerthread_obj *); +INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) { + if (!tt_obj) + return; + mutex_lock(&tt_obj->tt->lock); + timerthread_obj_schedule_abs_nl(tt_obj, tv); + mutex_unlock(&tt_obj->tt->lock); +} + + #endif diff --git a/t/.gitignore b/t/.gitignore index e377840f1..bab3807d2 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -47,3 +47,4 @@ dtmf.c const_str_hash-test.strhash tests-preload.so timerthread.c +media_player.c diff --git a/t/Makefile b/t/Makefile index 8d24eeab4..a853bf0c0 100644 --- a/t/Makefile +++ b/t/Makefile @@ -65,7 +65,8 @@ endif LIBSRCS+= codeclib.c resample.c socket.c streambuf.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ - cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c + cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ + media_player.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -116,7 +117,8 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \ rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ - streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o + streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ + media_player.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o diff --git a/utils/rtpengine-ng-client b/utils/rtpengine-ng-client index 58306179b..9a3a5bfbd 100755 --- a/utils/rtpengine-ng-client +++ b/utils/rtpengine-ng-client @@ -66,13 +66,14 @@ GetOptions( 'generate-mid' => \$options{'generate mid'}, 'fragment' => \$options{'fragment'}, 'original-sendrecv' => \$options{'original sendrecv'}, + 'file=s' => \$options{'file'}, ) or die; my $cmd = shift(@ARGV) or die; my %packet = (command => $cmd); -for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address')) { +for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address,file')) { defined($options{$x}) and $packet{$x} = \$options{$x}; } for my $x (split(/,/, 'TOS,delete-delay')) {