From d7fa0689f9f64439321d1e9f3b8d8ae9155385ab Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Thu, 17 Nov 2016 15:43:43 -0500 Subject: [PATCH] TT#5566 rudimentary wav file output Change-Id: Icdc97a9dc849bba6ba6add12d0bdd17f8b7712cd --- daemon/Makefile | 4 +- daemon/crypto.c | 1 + daemon/media_socket.c | 1 + daemon/rtp.c | 55 +------- daemon/rtp.h | 11 +- daemon/rtplib.c | 1 + lib/.gitignore | 1 + {daemon => lib}/compat.h | 0 lib/rtplib.c | 74 ++++++++++ lib/rtplib.h | 22 +++ {daemon => lib}/str.h | 0 recording-daemon/.ycm_extra_conf.py | 1 + recording-daemon/Makefile | 9 +- recording-daemon/decoder.c | 200 +++++++++++++++++++++++++++ recording-daemon/decoder.h | 16 +++ recording-daemon/metafile.c | 48 +++++-- recording-daemon/packet.c | 181 ++++++++++++++++++++++++ recording-daemon/packet.h | 10 ++ recording-daemon/pcre.c | 18 +-- recording-daemon/pcre.h | 2 +- recording-daemon/{aux.c => recaux.c} | 2 +- recording-daemon/{aux.h => recaux.h} | 4 +- recording-daemon/rtplib.c | 1 + recording-daemon/stream.c | 31 +++-- recording-daemon/types.h | 56 +++++++- 25 files changed, 639 insertions(+), 110 deletions(-) create mode 120000 daemon/rtplib.c create mode 100644 lib/.gitignore rename {daemon => lib}/compat.h (100%) create mode 100644 lib/rtplib.c create mode 100644 lib/rtplib.h rename {daemon => lib}/str.h (100%) create mode 100644 recording-daemon/decoder.c create mode 100644 recording-daemon/decoder.h create mode 100644 recording-daemon/packet.c create mode 100644 recording-daemon/packet.h rename recording-daemon/{aux.c => recaux.c} (94%) rename recording-daemon/{aux.h => recaux.h} (87%) create mode 120000 recording-daemon/rtplib.c diff --git a/daemon/Makefile b/daemon/Makefile index 06d7c41fd..a1a2c02ad 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -7,7 +7,7 @@ CFLAGS+= `pkg-config --cflags zlib` CFLAGS+= `pkg-config --cflags openssl` CFLAGS+= `pkg-config --cflags libevent_pthreads` CFLAGS+= `pcre-config --cflags` -CFLAGS+= -I../kernel-module/ +CFLAGS+= -I. -I../kernel-module/ -I../lib/ CFLAGS+= -D_GNU_SOURCE ifeq ($(RTPENGINE_VERSION),) @@ -68,7 +68,7 @@ endif SRCS= main.c kernel.c poller.c aux.c control_tcp.c streambuf.c call.c control_udp.c redis.c \ bencode.c cookie_cache.c udp_listener.c control_ng.c sdp.c str.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.c dtls.c log.c cli.c graphite.c ice.c socket.c \ - media_socket.c rtcp_xr.c homer.c recording.c + media_socket.c rtcp_xr.c homer.c recording.c rtplib.c OBJS= $(SRCS:.c=.o) diff --git a/daemon/crypto.c b/daemon/crypto.c index 4caaf0c50..2861d256a 100644 --- a/daemon/crypto.c +++ b/daemon/crypto.c @@ -12,6 +12,7 @@ #include "rtp.h" #include "rtcp.h" #include "log.h" +#include "rtplib.h" diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 8caadabc6..46caf99bc 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -19,6 +19,7 @@ #include "log_funcs.h" #include "poller.h" #include "recording.h" +#include "rtplib.h" #ifndef PORT_RANDOM_MIN diff --git a/daemon/rtp.c b/daemon/rtp.c index 561651879..cbe1cef4f 100644 --- a/daemon/rtp.c +++ b/daemon/rtp.c @@ -8,14 +8,7 @@ #include "str.h" #include "crypto.h" #include "log.h" - - - - -struct rtp_extension { - u_int16_t undefined; - u_int16_t length; -} __attribute__ ((packed)); +#include "rtplib.h" @@ -89,52 +82,6 @@ error: return -1; } -int rtp_payload(struct rtp_header **out, str *p, const str *s) { - struct rtp_header *rtp; - struct rtp_extension *ext; - const char *err; - - err = "short packet (header)"; - if (s->len < sizeof(*rtp)) - goto error; - - rtp = (void *) s->s; - err = "invalid header version"; - if ((rtp->v_p_x_cc & 0xc0) != 0x80) /* version 2 */ - goto error; - - if (!p) - goto done; - - *p = *s; - /* fixed header */ - str_shift(p, sizeof(*rtp)); - /* csrc list */ - err = "short packet (CSRC list)"; - if (str_shift(p, (rtp->v_p_x_cc & 0xf) * 4)) - goto error; - - if ((rtp->v_p_x_cc & 0x10)) { - /* extension */ - err = "short packet (extension header)"; - if (p->len < sizeof(*ext)) - goto error; - ext = (void *) p->s; - err = "short packet (header extensions)"; - if (str_shift(p, 4 + ntohs(ext->length) * 4)) - goto error; - } - -done: - *out = rtp; - - return 0; - -error: - ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Error parsing RTP header: %s", err); - return -1; -} - static u_int64_t packet_index(struct crypto_context *c, struct rtp_header *rtp) { u_int16_t seq; diff --git a/daemon/rtp.h b/daemon/rtp.h index d4a230843..86d84006c 100644 --- a/daemon/rtp.h +++ b/daemon/rtp.h @@ -9,15 +9,7 @@ struct crypto_context; - -struct rtp_header { - unsigned char v_p_x_cc; - unsigned char m_pt; - u_int16_t seq_num; - u_int32_t timestamp; - u_int32_t ssrc; - u_int32_t csrc[]; -} __attribute__ ((packed)); +struct rtp_header; struct rtp_payload_type { unsigned int payload_type; @@ -31,7 +23,6 @@ struct rtp_payload_type { -int rtp_payload(struct rtp_header **out, str *p, const str *s); const struct rtp_payload_type *rtp_payload_type(unsigned int, GHashTable *); int rtp_avp2savp(str *, struct crypto_context *); diff --git a/daemon/rtplib.c b/daemon/rtplib.c new file mode 120000 index 000000000..e6640de16 --- /dev/null +++ b/daemon/rtplib.c @@ -0,0 +1 @@ +../lib/rtplib.c \ No newline at end of file diff --git a/lib/.gitignore b/lib/.gitignore new file mode 100644 index 000000000..5761abcfd --- /dev/null +++ b/lib/.gitignore @@ -0,0 +1 @@ +*.o diff --git a/daemon/compat.h b/lib/compat.h similarity index 100% rename from daemon/compat.h rename to lib/compat.h diff --git a/lib/rtplib.c b/lib/rtplib.c new file mode 100644 index 000000000..1ee8ea3a7 --- /dev/null +++ b/lib/rtplib.c @@ -0,0 +1,74 @@ +#include "rtplib.h" +#include +#include "str.h" +#include "log.h" + + + +struct rtp_extension { + u_int16_t undefined; + u_int16_t length; +} __attribute__ ((packed)); + + + + + +int rtp_payload(struct rtp_header **out, str *p, const str *s) { + struct rtp_header *rtp; + struct rtp_extension *ext; + const char *err; + + err = "short packet (header)"; + if (s->len < sizeof(*rtp)) + goto error; + + rtp = (void *) s->s; + err = "invalid header version"; + if ((rtp->v_p_x_cc & 0xc0) != 0x80) /* version 2 */ + goto error; + + if (!p) + goto done; + + *p = *s; + /* fixed header */ + str_shift(p, sizeof(*rtp)); + /* csrc list */ + err = "short packet (CSRC list)"; + if (str_shift(p, (rtp->v_p_x_cc & 0xf) * 4)) + goto error; + + if ((rtp->v_p_x_cc & 0x10)) { + /* extension */ + err = "short packet (extension header)"; + if (p->len < sizeof(*ext)) + goto error; + ext = (void *) p->s; + err = "short packet (header extensions)"; + if (str_shift(p, 4 + ntohs(ext->length) * 4)) + goto error; + } + +done: + *out = rtp; + + return 0; + +error: + ilog(LOG_WARNING | LOG_FLAG_LIMIT, "Error parsing RTP header: %s", err); + return -1; +} + + +int rtp_padding(struct rtp_header *header, str *payload) { + if (!(header->v_p_x_cc & 0x20)) + return 0; // no padding + if (payload->len == 0) + return -1; + unsigned int padding = (unsigned char) payload->s[payload->len - 1]; + if (payload->len < padding) + return -1; + payload->len -= padding; + return 0; +} diff --git a/lib/rtplib.h b/lib/rtplib.h new file mode 100644 index 000000000..54b61df60 --- /dev/null +++ b/lib/rtplib.h @@ -0,0 +1,22 @@ +#ifndef _RTPLIB_H_ +#define _RTPLIB_H_ + +#include +#include "str.h" + + +struct rtp_header { + unsigned char v_p_x_cc; + unsigned char m_pt; + uint16_t seq_num; + uint32_t timestamp; + uint32_t ssrc; + uint32_t csrc[]; +} __attribute__ ((packed)); + + +int rtp_payload(struct rtp_header **out, str *p, const str *s); +int rtp_padding(struct rtp_header *header, str *payload); + + +#endif diff --git a/daemon/str.h b/lib/str.h similarity index 100% rename from daemon/str.h rename to lib/str.h diff --git a/recording-daemon/.ycm_extra_conf.py b/recording-daemon/.ycm_extra_conf.py index 25eb311e7..0d613498b 100644 --- a/recording-daemon/.ycm_extra_conf.py +++ b/recording-daemon/.ycm_extra_conf.py @@ -18,6 +18,7 @@ flags = [ '-fno-strict-aliasing', '-I/usr/include/glib-2.0', '-I/usr/lib/x86_64-linux-gnu/glib-2.0/include', +'-I../lib/', '-pthread', '-D_GNU_SOURCE', '-D__DEBUG=1', diff --git a/recording-daemon/Makefile b/recording-daemon/Makefile index 5341fe470..d603948a3 100644 --- a/recording-daemon/Makefile +++ b/recording-daemon/Makefile @@ -1,12 +1,12 @@ TARGET= rtpengine-recording CC?=gcc -CFLAGS= -g -Wall -pthread +CFLAGS= -g -Wall -pthread -I. -I../lib/ CFLAGS+= -std=c99 CFLAGS+= -D_GNU_SOURCE -D_POSIX_SOURCE -D_POSIX_C_SOURCE CFLAGS+= `pkg-config --cflags glib-2.0` CFLAGS+= `pkg-config --cflags gthread-2.0` -#CFLAGS+= `pcre-config --cflags` +CFLAGS+= `pcre-config --cflags` CFLAGS+= `pkg-config --cflags libavcodec` CFLAGS+= `pkg-config --cflags libavformat` CFLAGS+= `pkg-config --cflags libavutil` @@ -20,7 +20,7 @@ endif LDFLAGS= -lm LDFLAGS+= `pkg-config --libs glib-2.0` LDFLAGS+= `pkg-config --libs gthread-2.0` -#LDFLAGS+= `pcre-config --libs` +LDFLAGS+= `pcre-config --libs` LDFLAGS+= `pkg-config --libs libavcodec` LDFLAGS+= `pkg-config --libs libavformat` LDFLAGS+= `pkg-config --libs libavutil` @@ -35,7 +35,8 @@ ifneq ($(DBG),yes) endif endif -SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c aux.c +SRCS= epoll.c garbage.c inotify.c main.c metafile.c stream.c recaux.c rtplib.c packet.c \ + decoder.c OBJS= $(SRCS:.c=.o) diff --git a/recording-daemon/decoder.c b/recording-daemon/decoder.c new file mode 100644 index 000000000..1f0aea0a0 --- /dev/null +++ b/recording-daemon/decoder.c @@ -0,0 +1,200 @@ +#include "decoder.h" +#include +#include +#include +#include +#include "types.h" +#include "log.h" + + +struct decoder_s { + AVCodecContext *avcctx; + AVPacket avpkt; + AVFrame *frame; + unsigned long rtp_ts; + uint64_t pts; +}; + + +struct output_s { + AVCodecContext *avcctx; + AVFormatContext *fmtctx; + AVStream *avst; + AVPacket avpkt; +}; + + +decoder_t *decoder_new(unsigned int payload_type, const char *payload_str) { + decoder_t *ret = g_slice_alloc0(sizeof(*ret)); + + // XXX error reporting + AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_PCM_ALAW); + ret->avcctx = avcodec_alloc_context3(codec); + if (!ret->avcctx) + goto err; + ret->avcctx->channels = 1; + ret->avcctx->sample_rate = 8000; + int i = avcodec_open2(ret->avcctx, codec, NULL); + if (i) + goto err; + + av_init_packet(&ret->avpkt); + ret->frame = av_frame_alloc(); + if (!ret->frame) + goto err; + + ret->pts = (uint64_t) -1LL; + ret->rtp_ts = (unsigned long) -1L; + + return ret; + +err: + decoder_close(ret); + return NULL; +} + + +static int output_add(output_t *output, AVFrame *frame) { + if (!output) + return -1; + +#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0) + int ret = avcodec_send_frame(output->avcctx, frame); + dbg("send frame ret %i", ret); + if (ret) + return -1; + + ret = avcodec_receive_packet(output->avcctx, &output->avpkt); + dbg("receive packet ret %i", ret); + if (ret) + return -1; +#else + int got_packet = 0; + int ret = avcodec_encode_audio2(output->avcctx, &output->avpkt, frame, &got_packet); + dbg("encode frame ret %i, got packet %i", ret, got_packet); + if (!got_packet) + return 0; +#endif + + av_write_frame(output->fmtctx, &output->avpkt); + + return 0; +} + + +int decoder_input(decoder_t *dec, const str *data, unsigned long ts, output_t *output) { + if (G_UNLIKELY(!dec)) + return -1; + + if (G_UNLIKELY(dec->rtp_ts == (unsigned long) -1L)) { + // initialize pts + dec->pts = 0; + } + else { + // shift pts according to rtp ts shift + dec->pts += (ts - dec->rtp_ts) * output->avst->time_base.num * 8000 / output->avst->time_base.den; + } + dec->rtp_ts = ts; + + dec->avpkt.data = (unsigned char *) data->s; + dec->avpkt.size = data->len; + dec->avpkt.pts = dec->pts; + +#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0) + int ret = avcodec_send_packet(dec->avcctx, &dec->avpkt); + dbg("send packet ret %i", ret); + if (ret) + return -1; + + ret = avcodec_receive_frame(dec->avcctx, dec->frame); + dbg("receive frame ret %i", ret); + if (ret) + return -1; +#else + int got_frame = 0; + int ret = avcodec_decode_audio4(dec->avcctx, dec->frame, &got_frame, &dec->avpkt); + dbg("decode frame ret %i, got frame %i", ret, got_frame); + if (!got_frame) + return 0; +#endif + + dec->frame->pts = dec->frame->pkt_pts; + + output_add(output, dec->frame); + + return 0; +} + + +output_t *output_new(const char *filename) { + output_t *ret = g_slice_alloc0(sizeof(*ret)); + + // XXX error reporting + ret->fmtctx = avformat_alloc_context(); + if (!ret->fmtctx) + goto err; + ret->fmtctx->oformat = av_guess_format("wav", NULL, NULL); // XXX better way? + if (!ret->fmtctx->oformat) + goto err; + + AVCodec *codec = avcodec_find_encoder(AV_CODEC_ID_PCM_S16LE); + // XXX error handling + ret->avst = avformat_new_stream(ret->fmtctx, codec); + if (!ret->avst) + goto err; +#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0) + ret->avcctx = avcodec_alloc_context3(codec); + if (!ret->avcctx) + goto err; +#else + ret->avcctx = ret->avst->codec; +#endif + + ret->avcctx->channels = 1; + ret->avcctx->sample_rate = 8000; + ret->avcctx->sample_fmt = AV_SAMPLE_FMT_S16; + ret->avcctx->time_base = (AVRational){8000,1}; + ret->avst->time_base = ret->avcctx->time_base; + +#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(57, 0, 0) + avcodec_parameters_from_context(ret->avst->codecpar, ret->avcctx); +#endif + + int i = avcodec_open2(ret->avcctx, codec, NULL); + if (i) + goto err; + i = avio_open(&ret->fmtctx->pb, filename, AVIO_FLAG_WRITE); + if (i < 0) + goto err; + i = avformat_write_header(ret->fmtctx, NULL); + if (i) + goto err; + + av_init_packet(&ret->avpkt); + + return ret; + +err: + output_close(ret); + return NULL; +} + + +void decoder_close(decoder_t *dec) { + if (!dec) + return; + avcodec_free_context(&dec->avcctx); + av_frame_free(&dec->frame); + g_slice_free1(sizeof(*dec), dec); +} + + +void output_close(output_t *output) { + if (!output) + return; + av_write_trailer(output->fmtctx); + avcodec_close(output->avcctx); + avio_closep(&output->fmtctx->pb); + avformat_free_context(output->fmtctx); + g_slice_free1(sizeof(*output), output); +} diff --git a/recording-daemon/decoder.h b/recording-daemon/decoder.h new file mode 100644 index 000000000..5ae47dfc4 --- /dev/null +++ b/recording-daemon/decoder.h @@ -0,0 +1,16 @@ +#ifndef _DECODER_H_ +#define _DECODER_H_ + +#include "types.h" +#include "str.h" + + +decoder_t *decoder_new(unsigned int payload_type, const char *payload_str); +int decoder_input(decoder_t *, const str *, unsigned long ts, output_t *); +void decoder_close(decoder_t *); + +output_t *output_new(const char *filename); +void output_close(output_t *); + + +#endif diff --git a/recording-daemon/metafile.c b/recording-daemon/metafile.c index b42e74675..c3c660e89 100644 --- a/recording-daemon/metafile.c +++ b/recording-daemon/metafile.c @@ -10,7 +10,8 @@ #include "stream.h" #include "garbage.h" #include "main.h" -#include "aux.h" +#include "recaux.h" +#include "packet.h" static pthread_mutex_t metafiles_lock = PTHREAD_MUTEX_INITIALIZER; @@ -32,6 +33,7 @@ static void meta_free(void *ptr) { } g_ptr_array_free(mf->streams, TRUE); g_slice_free1(sizeof(*mf), mf); + g_hash_table_destroy(mf->ssrc_hash); } @@ -65,6 +67,15 @@ static void meta_rtp_payload_type(metafile_t *mf, unsigned long mnum, unsigned i char *payload_type) { dbg("payload type in media %lu num %u is %s", mnum, payload_num, payload_type); + + if (payload_num >= 128) { + ilog(LOG_ERR, "Payload type number %u is invalid", payload_num); + return; + } + + pthread_mutex_lock(&mf->payloads_lock); + mf->payload_types[payload_num] = g_string_chunk_insert(mf->gsc, payload_type); + pthread_mutex_unlock(&mf->payloads_lock); } @@ -86,23 +97,37 @@ static void meta_section(metafile_t *mf, char *section, char *content, unsigned } -void metafile_change(char *name) { +// returns mf locked +static metafile_t *metafile_get(char *name) { // get or create metafile metadata pthread_mutex_lock(&metafiles_lock); metafile_t *mf = g_hash_table_lookup(metafiles, name); - if (!mf) { - dbg("allocating metafile info for %s", name); - mf = g_slice_alloc0(sizeof(*mf)); - mf->gsc = g_string_chunk_new(0); - mf->name = g_string_chunk_insert(mf->gsc, name); - pthread_mutex_init(&mf->lock, NULL); - mf->streams = g_ptr_array_new(); - g_hash_table_insert(metafiles, mf->name, mf); - } + if (mf) + goto out; + + dbg("allocating metafile info for %s", name); + mf = g_slice_alloc0(sizeof(*mf)); + mf->gsc = g_string_chunk_new(0); + mf->name = g_string_chunk_insert(mf->gsc, name); + pthread_mutex_init(&mf->lock, NULL); + pthread_mutex_init(&mf->payloads_lock, NULL); + mf->streams = g_ptr_array_new(); + mf->ssrc_hash = g_hash_table_new_full(g_direct_hash, g_direct_equal, NULL, ssrc_free); + + g_hash_table_insert(metafiles, mf->name, mf); + +out: // switch locks pthread_mutex_lock(&mf->lock); pthread_mutex_unlock(&metafiles_lock); + return mf; +} + + +void metafile_change(char *name) { + metafile_t *mf = metafile_get(name); + char fnbuf[PATH_MAX]; snprintf(fnbuf, sizeof(fnbuf), "%s/%s", SPOOL_DIR, name); @@ -131,6 +156,7 @@ void metafile_change(char *name) { close(fd); // process contents of metadata file + // XXX use "str" type? char *head = s->str; char *endp = s->str + s->len; while (head < endp) { diff --git a/recording-daemon/packet.c b/recording-daemon/packet.c new file mode 100644 index 000000000..90d8c7eb4 --- /dev/null +++ b/recording-daemon/packet.c @@ -0,0 +1,181 @@ +#include "packet.h" +#include +#include +#include +#include +#include +#include "types.h" +#include "log.h" +#include "rtplib.h" +#include "str.h" +#include "decoder.h" + + +static int packet_cmp(const void *A, const void *B, void *dummy) { + const packet_t *a = A, *b = B; + + if (a->seq < b->seq) + return -1; + if (a->seq > b->seq) + return 1; + return 0; +} + + +static void packet_free(void *p) { + packet_t *packet = p; + if (!packet) + return; + free(packet->buffer); + g_slice_free1(sizeof(*packet), packet); +} + + +void ssrc_free(void *p) { + ssrc_t *s = p; + g_tree_destroy(s->packets); + output_close(s->output); + for (int i = 0; i < G_N_ELEMENTS(s->decoders); i++) + decoder_close(s->decoders[i]); + g_slice_free1(sizeof(*s), s); +} + + +// mf must be unlocked; returns ssrc locked +static ssrc_t *ssrc_get(metafile_t *mf, unsigned long ssrc) { + pthread_mutex_lock(&mf->lock); + ssrc_t *ret = g_hash_table_lookup(mf->ssrc_hash, GUINT_TO_POINTER(ssrc)); + if (ret) + goto out; + + ret = g_slice_alloc0(sizeof(*ret)); + pthread_mutex_init(&ret->lock, NULL); + ret->metafile = mf; + ret->ssrc = ssrc; + ret->packets = g_tree_new_full(packet_cmp, NULL, NULL, packet_free); + ret->seq = -1; + + char buf[256]; + snprintf(buf, sizeof(buf), "%s-%08lx.wav", mf->parent, ssrc); + ret->output = output_new(buf); + + g_hash_table_insert(mf->ssrc_hash, GUINT_TO_POINTER(ssrc), ret); + +out: + pthread_mutex_lock(&ret->lock); + pthread_mutex_unlock(&mf->lock); + return ret; +} + + +static gboolean ssrc_tree_get_first(void *key, void *val, void *data) { + packet_t **out = data; + *out = val; + return TRUE; +} + + +// ssrc is locked and must be unlocked when returning +static void ssrc_run(ssrc_t *ssrc) { + while (1) { + // inspect first packet to see if seq is correct + packet_t *first = NULL; + g_tree_foreach(ssrc->packets, ssrc_tree_get_first, &first); + if (!first) + break; + if (first->seq != ssrc->seq) + break; // need to wait for more + + // determine payload type and run decoder + unsigned int payload_type = first->rtp->m_pt & 0x7f; + metafile_t *mf = ssrc->metafile; + pthread_mutex_lock(&mf->payloads_lock); + char *payload_str = mf->payload_types[payload_type]; + pthread_mutex_unlock(&mf->payloads_lock); + + dbg("processing packet seq %i, payload type is %s", first->seq, payload_str); + g_tree_steal(ssrc->packets, first); + + // check if we have a decoder for this payload type yet + if (!ssrc->decoders[payload_type]) + ssrc->decoders[payload_type] = decoder_new(payload_type, payload_str); + // XXX error handling + + decoder_input(ssrc->decoders[payload_type], &first->payload, ntohl(first->rtp->timestamp), + ssrc->output); + + packet_free(first); + dbg("packets left in queue: %i", g_tree_nnodes(ssrc->packets)); + ssrc->seq = (ssrc->seq + 1) & 0xffff; + } + + pthread_mutex_unlock(&ssrc->lock); +} + + +// stream is unlocked, buf is malloc'd +void packet_process(stream_t *stream, unsigned char *buf, unsigned len) { + packet_t *packet = g_slice_alloc0(sizeof(*packet)); + packet->buffer = buf; // handing it over + + // XXX more checking here + str bufstr; + str_init_len(&bufstr, packet->buffer, len); + packet->ip = (void *) bufstr.s; + // XXX kernel already does this - add metadata? + if (packet->ip->version == 4) { + if (str_shift(&bufstr, packet->ip->ihl << 2)) + goto err; + } + else { + packet->ip = NULL; + packet->ip6 = (void *) bufstr.s; + if (str_shift(&bufstr, sizeof(*packet->ip6))) + goto err; + } + + packet->udp = (void *) bufstr.s; + str_shift(&bufstr, sizeof(*packet->udp)); + + if (rtp_payload(&packet->rtp, &packet->payload, &bufstr)) + goto err; + if (rtp_padding(packet->rtp, &packet->payload)) + goto err; + + packet->seq = ntohs(packet->rtp->seq_num); + dbg("packet parsed successfully, seq %u", packet->seq); + + // insert into ssrc queue + ssrc_t *ssrc = ssrc_get(stream->metafile, ntohl(packet->rtp->ssrc)); + + // check seq for dupes + if (G_UNLIKELY(ssrc->seq == -1)) { + // first packet we see + ssrc->seq = packet->seq; + goto seq_ok; + } + + int diff = packet->seq - ssrc->seq; + if (diff >= 0x8000) + goto dupe; + if (diff < 0 && diff > -0x8000) + goto dupe; + // seq ok - fall thru +seq_ok: + if (g_tree_lookup(ssrc->packets, packet)) + goto dupe; + g_tree_insert(ssrc->packets, packet, packet); + + // got a new packet, run the decoder + ssrc_run(ssrc); + return; + +dupe: + dbg("skipping dupe packet (new seq %i prev seq %i)", packet->seq, ssrc->seq); + pthread_mutex_unlock(&ssrc->lock); + return; + +err: + ilog(LOG_WARN, "Failed to parse packet headers"); + packet_free(packet); +} diff --git a/recording-daemon/packet.h b/recording-daemon/packet.h new file mode 100644 index 000000000..c261189b3 --- /dev/null +++ b/recording-daemon/packet.h @@ -0,0 +1,10 @@ +#ifndef _PACKET_H_ +#define _PACKET_H_ + +#include "types.h" + +void ssrc_free(void *p); + +void packet_process(stream_t *, unsigned char *, unsigned len); + +#endif diff --git a/recording-daemon/pcre.c b/recording-daemon/pcre.c index fff425a17..d94837cc4 100644 --- a/recording-daemon/pcre.c +++ b/recording-daemon/pcre.c @@ -3,12 +3,12 @@ #include "log.h" -void pcre_build(pcre_t *out, const char *pattern) { - const char *errptr; - int erroff; - - out->re = pcre_compile(pattern, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); - if (!out->re) - die("Failed to compile PCRE '%s': %s (at %i)", pattern, errptr, erroff); - out->extra = pcre_study(out->re, 0, &errptr); -} +//void pcre_build(pcre_t *out, const char *pattern) { +// const char *errptr; +// int erroff; +// +// out->re = pcre_compile(pattern, PCRE_DOLLAR_ENDONLY | PCRE_DOTALL, &errptr, &erroff, NULL); +// if (!out->re) +// die("Failed to compile PCRE '%s': %s (at %i)", pattern, errptr, erroff); +// out->extra = pcre_study(out->re, 0, &errptr); +//} diff --git a/recording-daemon/pcre.h b/recording-daemon/pcre.h index f982340a7..3c63c2f49 100644 --- a/recording-daemon/pcre.h +++ b/recording-daemon/pcre.h @@ -3,6 +3,6 @@ #include "types.h" -void pcre_build(pcre_t *out, const char *pattern); +//void pcre_build(pcre_t *out, const char *pattern); #endif diff --git a/recording-daemon/aux.c b/recording-daemon/recaux.c similarity index 94% rename from recording-daemon/aux.c rename to recording-daemon/recaux.c index 67bd59502..01b25405e 100644 --- a/recording-daemon/aux.c +++ b/recording-daemon/recaux.c @@ -1,4 +1,4 @@ -#include "aux.h" +#include "recaux.h" #include #include diff --git a/recording-daemon/aux.h b/recording-daemon/recaux.h similarity index 87% rename from recording-daemon/aux.h rename to recording-daemon/recaux.h index b00d6240b..6fd887cd2 100644 --- a/recording-daemon/aux.h +++ b/recording-daemon/recaux.h @@ -1,5 +1,5 @@ -#ifndef _AUX_H_ -#define _AUX_H_ +#ifndef _RECAUX_H_ +#define _RECAUX_H_ extern int __thread __sscanf_hack_var; diff --git a/recording-daemon/rtplib.c b/recording-daemon/rtplib.c new file mode 120000 index 000000000..e6640de16 --- /dev/null +++ b/recording-daemon/rtplib.c @@ -0,0 +1 @@ +../lib/rtplib.c \ No newline at end of file diff --git a/recording-daemon/stream.c b/recording-daemon/stream.c index 3900177ed..0ae9ab2b7 100644 --- a/recording-daemon/stream.c +++ b/recording-daemon/stream.c @@ -4,11 +4,12 @@ #include #include #include -#include +#include #include "metafile.h" #include "epoll.h" #include "log.h" #include "main.h" +#include "packet.h" // stream is locked @@ -35,8 +36,17 @@ static void stream_handler(handler_t *handler) { if (stream->fd == -1) goto out; - char buf[65535]; - int ret = read(stream->fd, buf, sizeof(buf)); + static const int maxbuflen = 65535; + static const int alloclen = maxbuflen +#ifdef AV_INPUT_BUFFER_PADDING_SIZE + + AV_INPUT_BUFFER_PADDING_SIZE +#endif +#ifdef FF_INPUT_BUFFER_PADDING_SIZE + + FF_INPUT_BUFFER_PADDING_SIZE +#endif + ; + unsigned char *buf = malloc(alloclen); + int ret = read(stream->fd, buf, maxbuflen); if (ret == 0) { ilog(LOG_INFO, "EOF on stream %s", stream->name); stream_close(stream); @@ -48,6 +58,11 @@ static void stream_handler(handler_t *handler) { goto out; } + // got a packet + pthread_mutex_unlock(&stream->lock); + packet_process(stream, buf, ret); + return; + out: pthread_mutex_unlock(&stream->lock); } @@ -66,6 +81,7 @@ static stream_t *stream_get(metafile_t *mf, unsigned long id) { pthread_mutex_init(&ret->lock, NULL); ret->fd = -1; ret->id = id; + ret->metafile = mf; out: return ret; @@ -89,15 +105,6 @@ void stream_open(metafile_t *mf, unsigned long id, char *name) { return; } - stream->avinf = av_find_input_format("rtp"); - ilog(LOG_DEBUG, "avinf %p", stream->avinf); - - stream->avfctx = avformat_alloc_context(); - unsigned char *buf = av_malloc(1024); // ? - stream->avfctx->pb = avio_alloc_context(buf, 1024, 1, NULL, NULL, NULL, NULL); - int ret = avformat_open_input(&stream->avfctx, "", stream->avinf, NULL); - ilog(LOG_DEBUG, "ret %i avfctx %p", ret, stream->avfctx); - // add to epoll stream->handler.ptr = stream; stream->handler.func = stream_handler; diff --git a/recording-daemon/types.h b/recording-daemon/types.h index 2fabc61e1..12694778e 100644 --- a/recording-daemon/types.h +++ b/recording-daemon/types.h @@ -6,38 +6,86 @@ #include #include #include -#include +#include "str.h" +struct iphdr; +struct ip6_hdr; +struct udphdr; +struct rtp_header; + + +struct handler_s; typedef struct handler_s handler_t; +struct metafile_s; +typedef struct metafile_s metafile_t; +struct decoder_s; +typedef struct decoder_s decoder_t; +struct output_s; +typedef struct output_s output_t; + typedef void handler_func(handler_t *); + struct handler_s { handler_func *func; void *ptr; }; + struct stream_s { pthread_mutex_t lock; char *name; + metafile_t *metafile; unsigned long id; int fd; handler_t handler; - AVInputFormat *avinf; - AVFormatContext *avfctx; }; typedef struct stream_s stream_t; + +struct packet_s { + void *buffer; + // pointers into buffer + struct iphdr *ip; + struct ip6_hdr *ip6; + struct udphdr *udp; + int seq; + struct rtp_header *rtp; + str payload; + +}; +typedef struct packet_s packet_t; + + +struct ssrc_s { + pthread_mutex_t lock; + metafile_t *metafile; + unsigned long ssrc; + GTree *packets; // contains packet_t objects + int seq; // next expected seq + decoder_t *decoders[128]; + output_t *output; +}; +typedef struct ssrc_s ssrc_t; + + struct metafile_s { pthread_mutex_t lock; char *name; char *parent; char *call_id; off_t pos; + GStringChunk *gsc; // XXX limit max size + GPtrArray *streams; + GHashTable *ssrc_hash; // contains ssrc_t objects + + pthread_mutex_t payloads_lock; + char *payload_types[128]; }; -typedef struct metafile_s metafile_t; + // struct pcre_s { // pcre *re;