Browse Source

TT#50652 infrastructure to support reading/playing of media files

Change-Id: I8212f5c74bcb9557d41f80ef3a1595f53bafde8a
changes/16/27616/10
Richard Fuchs 7 years ago
parent
commit
83965bdb1f
16 changed files with 280 additions and 7 deletions
  1. +1
    -1
      daemon/Makefile
  2. +6
    -0
      daemon/call.c
  3. +44
    -0
      daemon/call_interfaces.c
  4. +4
    -0
      daemon/control_ng.c
  5. +13
    -3
      daemon/main.c
  6. +141
    -0
      daemon/media_player.c
  7. +3
    -0
      daemon/timerthread.c
  8. +2
    -0
      include/call.h
  9. +1
    -0
      include/call_interfaces.h
  10. +1
    -0
      include/control_ng.h
  11. +1
    -0
      include/main.h
  12. +47
    -0
      include/media_player.h
  13. +9
    -0
      include/timerthread.h
  14. +1
    -0
      t/.gitignore
  15. +4
    -2
      t/Makefile
  16. +2
    -1
      utils/rtpengine-ng-client

+ 1
- 1
daemon/Makefile View File

@ -123,7 +123,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c
bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c load.c dtmf.c timerthread.c
codec.c load.c dtmf.c timerthread.c media_player.c
LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c
ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.c resample.c


+ 6
- 0
daemon/call.c View File

@ -44,6 +44,7 @@
#include "main.h"
#include "graphite.h"
#include "codec.h"
#include "media_player.h"
/* also serves as array index for callstream->peers[] */
@ -2224,6 +2225,11 @@ void call_destroy(struct call *c) {
ice_shutdown(&md->ice_agent);
}
if (ml->player) {
media_player_stop(ml->player);
media_player_put(&ml->player);
}
}
k = g_hash_table_get_values(c->ssrc_hash->ht);


+ 44
- 0
daemon/call_interfaces.c View File

@ -27,6 +27,7 @@
#include "streambuf.h"
#include "main.h"
#include "load.h"
#include "media_player.h"
static pcre *info_re;
@ -1731,6 +1732,49 @@ out:
}
const char *call_play_media_ng(bencode_item_t *input, bencode_item_t *output) {
str callid, fromtag, file;
struct call *call;
struct call_monologue *monologue;
const char *err = NULL;
if (!bencode_dictionary_get_str(input, "call-id", &callid))
return "No call-id in message";
call = call_get_opmode(&callid, OP_OTHER);
if (!call)
return "Unknown call-id";
err = "No participant party specified";
if (bencode_dictionary_get_str(input, "from-tag", &fromtag)) {
monologue = call_get_mono_dialogue(call, &fromtag, NULL, NULL);
err = "Unknown monologue from-tag";
if (!monologue)
goto out;
}
else
goto out;
if (!monologue->player)
monologue->player = media_player_new(monologue);
err = "No media file specified";
if (bencode_dictionary_get_str(input, "file", &file)) {
err = "Failed to start media playback from file";
if (media_player_play_file(monologue->player, &file))
goto out;
}
else
goto out;
err = NULL;
out:
rwlock_unlock_w(&call->master_lock);
obj_put(call);
return err;
}
int call_interfaces_init() {
const char *errptr;
int erroff;


+ 4
- 0
daemon/control_ng.c View File

@ -243,6 +243,10 @@ static void control_ng_incoming(struct obj *obj, str *buf, const endpoint_t *sin
errstr = call_unblock_media_ng(dict, resp);
g_atomic_int_inc(&cur->unblock_media);
break;
case CSH_LOOKUP("play media"):
errstr = call_play_media_ng(dict, resp);
g_atomic_int_inc(&cur->play_media);
break;
default:
errstr = "Unrecognized command";
}


+ 13
- 3
daemon/main.c View File

@ -42,6 +42,7 @@
#include "codeclib.h"
#include "load.h"
#include "ssllib.h"
#include "media_player.h"
@ -67,6 +68,7 @@ struct rtpengine_config rtpe_config = {
.redis_connect_timeout = 1000,
.rec_method = "pcap",
.rec_format = "raw",
.media_num_threads = -1,
};
@ -341,6 +343,7 @@ static void options(int *argc, char ***argv) {
{ "log-format", 0, 0, G_OPTION_ARG_STRING, &log_format, "Log prefix format", "default|parsable"},
{ "xmlrpc-format",'x', 0, G_OPTION_ARG_INT, &rtpe_config.fmt, "XMLRPC timeout request format to use. 0: SEMS DI, 1: call-id only, 2: Kamailio", "INT" },
{ "num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.num_threads, "Number of worker threads to create", "INT" },
{ "media-num-threads", 0, 0, G_OPTION_ARG_INT, &rtpe_config.media_num_threads, "Number of worker threads for media playback", "INT" },
{ "delete-delay", 'd', 0, G_OPTION_ARG_INT, &rtpe_config.delete_delay, "Delay for deleting a session from memory.", "INT" },
{ "sip-source", 0, 0, G_OPTION_ARG_NONE, &sip_source, "Use SIP source address by default", NULL },
{ "dtls-passive", 0, 0, G_OPTION_ARG_NONE, &dtls_passive_def,"Always prefer DTLS passive role", NULL },
@ -559,6 +562,7 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
ini_rtpe_cfg->redis_write_db = rtpe_config.redis_write_db;
ini_rtpe_cfg->no_redis_required = rtpe_config.no_redis_required;
ini_rtpe_cfg->num_threads = rtpe_config.num_threads;
ini_rtpe_cfg->media_num_threads = rtpe_config.media_num_threads;
ini_rtpe_cfg->fmt = rtpe_config.fmt;
ini_rtpe_cfg->log_format = rtpe_config.log_format;
ini_rtpe_cfg->redis_allowed_errors = rtpe_config.redis_allowed_errors;
@ -617,6 +621,7 @@ static void init_everything(void) {
abort();
statistics_init();
codeclib_init(0);
media_player_init();
}
@ -744,7 +749,7 @@ no_kernel:
int main(int argc, char **argv) {
int idx=0;
int idx;
early_init();
options(&argc, &argv);
@ -776,9 +781,14 @@ int main(int argc, char **argv) {
service_notify("READY=1\n");
for (;idx<rtpe_config.num_threads;++idx) {
for (idx = 0; idx < rtpe_config.num_threads; ++idx)
thread_create_detach_prio(poller_loop, rtpe_poller, rtpe_config.scheduling, rtpe_config.priority);
}
if (rtpe_config.media_num_threads < 0)
rtpe_config.media_num_threads = rtpe_config.num_threads;
for (idx = 0; idx < rtpe_config.media_num_threads; ++idx)
thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
while (!rtpe_shutdown) {
usleep(100000);


+ 141
- 0
daemon/media_player.c View File

@ -0,0 +1,141 @@
#include "media_player.h"
#include <glib.h>
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include "obj.h"
#include "log.h"
#include "timerthread.h"
#include "call.h"
#include "str.h"
static struct timerthread media_player_thread;
// appropriate lock must be held
static void media_player_shutdown(struct media_player *mp) {
ilog(LOG_DEBUG, "shutting down media_player");
timerthread_obj_deschedule(&mp->tt_obj);
avformat_free_context(mp->fmtctx);
mp->fmtctx = NULL;
}
void media_player_stop(struct media_player *mp) {
media_player_shutdown(mp);
}
static void __media_player_free(void *p) {
struct media_player *mp = p;
ilog(LOG_DEBUG, "freeing media_player");
media_player_shutdown(mp);
mutex_destroy(&mp->lock);
obj_put(mp->call);
}
// call->master_lock held in W
struct media_player *media_player_new(struct call_monologue *ml) {
ilog(LOG_DEBUG, "creating media_player");
struct media_player *mp = obj_alloc0("media_player", sizeof(*mp), __media_player_free);
mp->tt_obj.tt = &media_player_thread;
mutex_init(&mp->lock);
mp->call = obj_get(ml->call);
mp->ml = ml;
av_init_packet(&mp->pkt);
mp->pkt.data = NULL;
mp->pkt.size = 0;
return mp;
}
// appropriate lock must be held
static void media_player_read_packet(struct media_player *mp) {
int ret = av_read_frame(mp->fmtctx, &mp->pkt);
if (ret < 0) {
if (ret == AVERROR_EOF) {
ilog(LOG_DEBUG, "EOF reading from media stream");
return;
}
ilog(LOG_ERR, "Error while reading from media stream");
return;
}
if (!mp->fmtctx->streams) {
ilog(LOG_ERR, "No AVStream present in format context");
goto out;
}
AVStream *avs = mp->fmtctx->streams[0];
if (!avs) {
ilog(LOG_ERR, "No AVStream present in format context");
goto out;
}
long long us_dur = mp->pkt.duration * 1000000LL * avs->time_base.num / avs->time_base.den;
ilog(LOG_DEBUG, "read media packet: duration %llu (%lli us), time_base %i/%i",
(unsigned long long) mp->pkt.duration, us_dur,
avs->time_base.num, avs->time_base.den);
timeval_add_usec(&mp->next_run, us_dur);
timerthread_obj_schedule_abs(&mp->tt_obj, &mp->next_run);
out:
av_packet_unref(&mp->pkt);
}
// call->master_lock held in W
int media_player_play_file(struct media_player *mp, const str *file) {
media_player_shutdown(mp);
char file_s[PATH_MAX];
snprintf(file_s, sizeof(file_s), STR_FORMAT, STR_FMT(file));
int ret = avformat_open_input(&mp->fmtctx, file_s, NULL, NULL);
if (ret < 0)
return -1;
// start playback now
mp->next_run = rtpe_now;
media_player_read_packet(mp);
return 0;
}
static void media_player_run(void *ptr) {
struct media_player *mp = ptr;
struct call *call = mp->call;
ilog(LOG_DEBUG, "running scheduled media_player");
rwlock_lock_r(&call->master_lock);
mutex_lock(&mp->lock);
media_player_read_packet(mp);
mutex_unlock(&mp->lock);
rwlock_unlock_r(&call->master_lock);
}
void media_player_init(void) {
timerthread_init(&media_player_thread, media_player_run);
}
void media_player_loop(void *p) {
ilog(LOG_DEBUG, "media_player_loop");
timerthread_run(&media_player_thread);
}

+ 3
- 0
daemon/timerthread.c View File

@ -60,6 +60,9 @@ void timerthread_obj_schedule_abs_nl(struct timerthread_obj *tt_obj, const struc
if (!tt_obj)
return;
ilog(LOG_DEBUG, "scheduling timer object at %llu.%06lu", (unsigned long long) tv->tv_sec,
(unsigned long) tv->tv_usec);
struct timerthread *tt = tt_obj->tt;
if (tt_obj->next_check.tv_sec && timeval_cmp(&tt_obj->next_check, tv) <= 0)
return; /* already scheduled sooner */


+ 2
- 0
include/call.h View File

@ -198,6 +198,7 @@ struct ice_agent;
struct ssrc_hash;
struct codec_handler;
struct rtp_payload_type;
struct media_player;
typedef bencode_buffer_t call_buffer_t;
@ -361,6 +362,7 @@ struct call_monologue {
struct call_monologue *active_dialogue;
GQueue medias;
GHashTable *media_ids;
struct media_player *player;
int block_dtmf:1;
int block_media:1;


+ 1
- 0
include/call_interfaces.h View File

@ -108,6 +108,7 @@ const char *call_block_dtmf_ng(bencode_item_t *, bencode_item_t *);
const char *call_unblock_dtmf_ng(bencode_item_t *, bencode_item_t *);
const char *call_block_media_ng(bencode_item_t *, bencode_item_t *);
const char *call_unblock_media_ng(bencode_item_t *, bencode_item_t *);
const char *call_play_media_ng(bencode_item_t *, bencode_item_t *);
void ng_call_stats(struct call *call, const str *fromtag, const str *totag, bencode_item_t *output,
struct call_stats *totals);


+ 1
- 0
include/control_ng.h View File

@ -25,6 +25,7 @@ struct control_ng_stats {
int unblock_dtmf;
int block_media;
int unblock_media;
int play_media;
int errors;
};


+ 1
- 0
include/main.h View File

@ -65,6 +65,7 @@ struct rtpengine_config {
char *redis_auth;
char *redis_write_auth;
int num_threads;
int media_num_threads;
char *spooldir;
char *rec_method;
char *rec_format;


+ 47
- 0
include/media_player.h View File

@ -0,0 +1,47 @@
#ifndef _MEDIA_PLAYER_H_
#define _MEDIA_PLAYER_H_
#include <libavformat/avformat.h>
#include <libavcodec/avcodec.h>
#include "auxlib.h"
#include "timerthread.h"
#include "str.h"
struct call;
struct call_monologue;
struct media_player {
struct timerthread_obj tt_obj;
mutex_t lock;
struct call *call;
struct call_monologue *ml;
struct timeval next_run;
AVFormatContext *fmtctx;
AVPacket pkt;
};
struct media_player *media_player_new(struct call_monologue *);
int media_player_play_file(struct media_player *, const str *);
void media_player_stop(struct media_player *);
void media_player_init(void);
void media_player_loop(void *);
INLINE void media_player_put(struct media_player **mp) {
if (!*mp)
return;
obj_put(&(*mp)->tt_obj);
*mp = NULL;
}
#endif

+ 9
- 0
include/timerthread.h View File

@ -30,4 +30,13 @@ void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct time
void timerthread_obj_deschedule(struct timerthread_obj *);
INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) {
if (!tt_obj)
return;
mutex_lock(&tt_obj->tt->lock);
timerthread_obj_schedule_abs_nl(tt_obj, tv);
mutex_unlock(&tt_obj->tt->lock);
}
#endif

+ 1
- 0
t/.gitignore View File

@ -47,3 +47,4 @@ dtmf.c
const_str_hash-test.strhash
tests-preload.so
timerthread.c
media_player.c

+ 4
- 2
t/Makefile View File

@ -65,7 +65,8 @@ endif
LIBSRCS+= codeclib.c resample.c socket.c streambuf.c
DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c
endif
@ -116,7 +117,8 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr
kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \
rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \
control_ng.strhash.o \
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o
payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \
resample.o


+ 2
- 1
utils/rtpengine-ng-client View File

@ -66,13 +66,14 @@ GetOptions(
'generate-mid' => \$options{'generate mid'},
'fragment' => \$options{'fragment'},
'original-sendrecv' => \$options{'original sendrecv'},
'file=s' => \$options{'file'},
) or die;
my $cmd = shift(@ARGV) or die;
my %packet = (command => $cmd);
for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address')) {
for my $x (split(/,/, 'from-tag,to-tag,call-id,transport protocol,media address,ICE,address family,DTLS,via-branch,media address,ptime,xmlrpc-callback,metadata,address,file')) {
defined($options{$x}) and $packet{$x} = \$options{$x};
}
for my $x (split(/,/, 'TOS,delete-delay')) {


Loading…
Cancel
Save