diff --git a/daemon/Makefile b/daemon/Makefile index 91b7edc7a..7d0b11254 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -127,7 +127,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c load.c dtmf.c timerthread.c media_player.c + codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call.c b/daemon/call.c index 62c4d5fa6..7b180343a 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -45,6 +45,7 @@ #include "graphite.h" #include "codec.h" #include "media_player.h" +#include "jitter_buffer.h" /* also serves as array index for callstream->peers[] */ @@ -905,6 +906,9 @@ struct packet_stream *__packet_stream_new(struct call *call) { recording_init_stream(stream); stream->send_timer = send_timer_new(stream); + if (rtpe_config.jb_length) + stream->jb = jitter_buffer_new(call); + return stream; } @@ -2257,6 +2261,7 @@ no_stats_output: ps = l->data; send_timer_put(&ps->send_timer); + jitter_buffer_free(&ps->jb); __unkernelize(ps); dtls_shutdown(ps); ps->selected_sfd = NULL; diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c new file mode 100644 index 000000000..9482f607e --- /dev/null +++ b/daemon/jitter_buffer.c @@ -0,0 +1,376 @@ +#include "jitter_buffer.h" +#include "timerthread.h" +#include "media_socket.h" +#include "call.h" +#include "codec.h" +#include "main.h" + +#define INITIAL_PACKETS 0x1E +#define CONT_SEQ_COUNT 0x64 +#define CONT_MISS_COUNT 0x0A +#define CONT_INCORRECT_BUFFERING 0x14 + + +static struct timerthread jitter_buffer_thread; + + +void jitter_buffer_init(void) { + timerthread_init(&jitter_buffer_thread, timerthread_queue_run); +} + +// jb is locked +static void reset_jitter_buffer(struct jitter_buffer *jb) { + ilog(LOG_DEBUG, "reset_jitter_buffer"); + + jb->first_send_ts = 0; + jb->first_send.tv_sec = 0; + jb->first_send.tv_usec = 0; + jb->first_seq = 0; + jb->rtptime_delta = 0; + jb->buffer_len = 0; + jb->cont_frames = 0; + jb->cont_miss = 0; + jb->next_exp_seq = 0; + jb->clock_rate = 0; + jb->payload_type = 0; + jb->cont_buff_err = 0; + jb->buf_decremented = 0; + jb->clock_drift_val = 0; + jb->clock_drift_enable = 0; + + jb_packet_free(&jb->p); + + jb->num_resets++; + + //disable jitter buffer in case of more than 2 resets + if(jb->num_resets > 2 && jb->call) + jb->disabled = 1; +} + +static int get_clock_rate(struct media_packet *mp, int payload_type) { + const struct rtp_payload_type *rtp_pt = NULL; + struct jitter_buffer *jb = mp->stream->jb; + int clock_rate = 0; + + if(jb->clock_rate && jb->payload_type == payload_type) + return jb->clock_rate; + + struct codec_handler *transcoder = codec_handler_get(mp->media, payload_type); + if(transcoder) { + if(transcoder->source_pt.payload_type == payload_type) + rtp_pt = &transcoder->source_pt; + if(transcoder->dest_pt.payload_type == payload_type) + rtp_pt = &transcoder->dest_pt; + } + + if(rtp_pt) { + clock_rate = jb->clock_rate = rtp_pt->clock_rate; + jb->payload_type = payload_type; + } + else + ilog(LOG_DEBUG, "clock_rate not present payload_type = %d", payload_type); + + return clock_rate; +} + +static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { + char *buf = malloc(s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM); + if (!buf) { + ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno)); + return NULL; + } + + struct jb_packet *p = g_slice_alloc0(sizeof(*p)); + + p->buf = buf; + p->mp = *mp; + obj_hold(p->mp.sfd); + + str_init_len(&p->mp.raw, buf + RTP_BUFFER_HEAD_ROOM, s->len); + memcpy(p->mp.raw.s, s->s, s->len); + + if(rtp_payload(&p->mp.rtp, &p->mp.payload, s)) { + jb_packet_free(&p); + return NULL; + } + + return p; +} + +// jb is locked +static void check_buffered_packets(struct jitter_buffer *jb) { + if (g_tree_nnodes(jb->ttq.entries) >= (2* rtpe_config.jb_length)) { + ilog(LOG_DEBUG, "Jitter reset due to buffer overflow"); + reset_jitter_buffer(jb); + } +} + +// jb is locked +static int queue_packet(struct media_packet *mp, struct jb_packet *p) { + struct jitter_buffer *jb = mp->stream->jb; + unsigned long ts = ntohl(mp->rtp->timestamp); + int payload_type = (mp->rtp->m_pt & 0x7f); + int clockrate = get_clock_rate(mp, payload_type); + + if(!clockrate || !jb->first_send.tv_sec) { + ilog(LOG_DEBUG, "Jitter reset due to clockrate"); + reset_jitter_buffer(jb); + return 1; + } + long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; + int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + if(!jb->rtptime_delta && seq_diff) { + jb->rtptime_delta = ts_diff/seq_diff; + } + p->ttq_entry.when = jb->first_send; + long long ts_diff_us = + (long long) (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate; + + ts_diff_us += (jb->clock_drift_val * seq_diff); + + if(jb->buf_decremented) { + ts_diff_us += 5000; //add 5ms delta when 2 packets are scheduled around same time + jb->buf_decremented = 0; + } + timeval_add_usec(&p->ttq_entry.when, ts_diff_us); + + ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); + + if (ts_diff_us > 3000000) { // more than three second, can't be right + jb->first_send.tv_sec = 0; + jb->rtptime_delta = 0; + } + + timerthread_queue_push(&jb->ttq, &p->ttq_entry); + + return 0; +} + +static void handle_clock_drift(struct media_packet *mp) { + ilog(LOG_DEBUG, "handle_clock_drift"); + unsigned long ts = ntohl(mp->rtp->timestamp); + struct jitter_buffer *jb = mp->stream->jb; + int payload_type = (mp->rtp->m_pt & 0x7f); + int clockrate = get_clock_rate(mp, payload_type); + if(!clockrate) { + return; + } + long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; + int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + long long ts_diff_us = + (long long) (ts_diff)* 1000000 / clockrate; + struct timeval to_send = jb->first_send; + timeval_add_usec(&to_send, ts_diff_us); + long long time_diff = timeval_diff(&rtpe_now, &to_send); + + jb->clock_drift_val = time_diff/seq_diff; + jb->clock_drift_enable = 0; + jb->cont_buff_err = 0; +} + +int buffer_packet(struct media_packet *mp, const str *s) { + struct jb_packet *p = NULL; + int ret = 1; // must call stream_packet + + mp->stream = mp->sfd->stream; + mp->media = mp->stream->media; + mp->call = mp->sfd->call; + struct call *call = mp->call; + + rwlock_lock_r(&call->master_lock); + + struct jitter_buffer *jb = mp->stream->jb; + if (!jb || jb->disabled) + goto end; + + ilog(LOG_DEBUG, "Handling JB packet on: %s:%d", sockaddr_print_buf(&mp->stream->endpoint.address), + mp->stream->endpoint.port); + + p = get_jb_packet(mp, s); + if (!p) + goto end; + + mp = &p->mp; + + int payload_type = (mp->rtp->m_pt & 0x7f); + + mutex_lock(&jb->lock); + + if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change + jb->first_send.tv_sec = 0; + jb->rtptime_delta = 0; + } + + if(jb->clock_drift_enable) + handle_clock_drift(mp); + + if (jb->first_send.tv_sec) { + ret = queue_packet(mp,p); + if(!ret && jb->p) { + // push first packet into jitter buffer + queue_packet(&jb->p->mp,jb->p); + jb->p = NULL; + } + } + else { + // store data from first packet and use for successive packets and queue the first packet + unsigned long ts = ntohl(mp->rtp->timestamp); + int payload_type = (mp->rtp->m_pt & 0x7f); + int clockrate = get_clock_rate(mp, payload_type); + if(!clockrate){ + jb->initial_pkts++; + if(jb->initial_pkts > INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any + reset_jitter_buffer(jb); + } + goto end_unlock; + } + + p->ttq_entry.when = jb->first_send = rtpe_now; + jb->first_send_ts = ts; + jb->first_seq = ntohs(mp->rtp->seq_num); + jb->p = p; + ret = 0; + } + + // packet consumed? + if (ret == 0) + p = NULL; + + check_buffered_packets(jb); + +end_unlock: + mutex_unlock(&jb->lock); + +end: + rwlock_unlock_r(&call->master_lock); + if (p) + jb_packet_free(&p); + return ret; +} + +static void increment_buffer(struct jitter_buffer *jb) { + if(jb->buffer_len < rtpe_config.jb_length) + jb->buffer_len++; +} + +static void decrement_buffer(struct jitter_buffer *jb) { + if(jb->buffer_len > 0) { + jb->buffer_len--; + jb->buf_decremented = 1; + } +} + +static int set_jitter_values(struct media_packet *mp) { + int ret=0; + int curr_seq = ntohs(mp->rtp->seq_num); + struct jitter_buffer *jb = mp->stream->jb; + if(jb->next_exp_seq) { + mutex_lock(&jb->lock); + if(curr_seq > jb->next_exp_seq) { + ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq); + increment_buffer(jb); + jb->cont_frames = 0; + jb->cont_miss++; + } + else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed + jb->cont_frames = 0; + jb->cont_miss++; + ret=1; + } + else { + jb->cont_frames++; + jb->cont_miss = 0; + if(jb->cont_frames >= CONT_SEQ_COUNT) { + decrement_buffer(jb); + jb->cont_frames = 0; + ilog(LOG_DEBUG, "Received continous frames Buffer len=%d", jb->buffer_len); + } + } + + if(jb->cont_miss >= CONT_MISS_COUNT) + reset_jitter_buffer(jb); + mutex_unlock(&jb->lock); + } + if(curr_seq >= jb->next_exp_seq) + jb->next_exp_seq = curr_seq + 1; + + int len = g_tree_nnodes(jb->ttq.entries); + + if(len > jb->buffer_len || len < jb->buffer_len) { + jb->cont_buff_err++; + if((jb->cont_buff_err > CONT_INCORRECT_BUFFERING) && rtpe_config.jb_clock_drift) + jb->clock_drift_enable=1; + } + else + jb->cont_buff_err = 0; + + return ret; +} + +static void __jb_send_later(struct timerthread_queue *ttq, void *p) { + struct jb_packet *cp = p; + set_jitter_values(&cp->mp); + play_buffered(p); +}; +// jb and call are locked +static void __jb_send_now(struct timerthread_queue *ttq, void *p) { + struct jitter_buffer *jb = (void *) ttq; + + mutex_unlock(&jb->lock); + rwlock_unlock_r(&jb->call->master_lock); + + __jb_send_later(ttq, p); + + rwlock_lock_r(&jb->call->master_lock); + mutex_lock(&jb->lock); +}; +static void __jb_free(void *p) { + struct jitter_buffer *jb = p; + jitter_buffer_free(&jb); +} +void __jb_packet_free(void *p) { + struct jb_packet *jbp = p; + jb_packet_free(&jbp); +} + +void jitter_buffer_loop(void *p) { + ilog(LOG_DEBUG, "jitter_buffer_loop"); + timerthread_run(&jitter_buffer_thread); +} + +struct jitter_buffer *jitter_buffer_new(struct call *c) { + ilog(LOG_DEBUG, "creating jitter_buffer"); + + struct jitter_buffer *jb = timerthread_queue_new("jitter_buffer", sizeof(*jb), + &jitter_buffer_thread, + __jb_send_now, + __jb_send_later, + __jb_free, __jb_packet_free); + mutex_init(&jb->lock); + jb->call = obj_get(c); + return jb; +} + +void jitter_buffer_free(struct jitter_buffer **jbp) { + if (!jbp || !*jbp) + return; + + ilog(LOG_DEBUG, "freeing jitter_buffer"); + + mutex_destroy(&(*jbp)->lock); + if ((*jbp)->call) + obj_put((*jbp)->call); + g_slice_free1(sizeof(**jbp), *jbp); + *jbp = NULL; +} + +void jb_packet_free(struct jb_packet **jbp) { + if (!jbp || !*jbp) + return; + + free((*jbp)->buf); + if ((*jbp)->mp.sfd) + obj_put((*jbp)->mp.sfd); + g_slice_free1(sizeof(**jbp), *jbp); + *jbp = NULL; +} diff --git a/daemon/main.c b/daemon/main.c index 8e4083756..ca18d0fe0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -44,6 +44,7 @@ #include "ssllib.h" #include "media_player.h" #include "dtmf.h" +#include "jitter_buffer.h" @@ -376,6 +377,9 @@ static void options(int *argc, char ***argv) { { "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_pass,"MySQL connection credentials", "PASSWORD" }, { "mysql-query",0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_query,"MySQL select query", "STRING" }, { "endpoint-learning",0,0,G_OPTION_ARG_STRING, &endpoint_learning, "RTP endpoint learning algorithm", "delayed|immediate|off|heuristic" }, + { "jitter-buffer",0, 0, G_OPTION_ARG_INT, &rtpe_config.jb_length, "Size of jitter buffer", "INT" }, + { "jb-clock-drift",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_clock_drift,"Compensate for source clock drift",NULL }, + { NULL, } }; @@ -564,6 +568,9 @@ static void options(int *argc, char ***argv) { die("Invalid --endpoint-learning option ('%s')", endpoint_learning); } rtpe_config.endpoint_learning = el_config; + + if (rtpe_config.jb_length < 0) + die("Invalid negative jitter buffer size"); } void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { @@ -635,6 +642,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->rec_method = g_strdup(rtpe_config.rec_method); ini_rtpe_cfg->rec_format = g_strdup(rtpe_config.rec_format); + ini_rtpe_cfg->jb_length = rtpe_config.jb_length; + ini_rtpe_cfg->jb_clock_drift = rtpe_config.jb_clock_drift; } static void early_init(void) { @@ -670,6 +679,7 @@ static void init_everything(void) { codeclib_init(0); media_player_init(); dtmf_init(); + jitter_buffer_init(); } @@ -847,6 +857,9 @@ int main(int argc, char **argv) { thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority); #endif thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling, rtpe_config.priority); + if (rtpe_config.jb_length > 0) + thread_create_detach_prio(jitter_buffer_loop, NULL, rtpe_config.scheduling, + rtpe_config.priority); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a29c1993a..b172017eb 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -26,6 +26,7 @@ #include "main.h" #include "codec.h" #include "media_player.h" +#include "jitter_buffer.h" #ifndef PORT_RANDOM_MIN @@ -1947,7 +1948,15 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { ilog(LOG_WARNING, "UDP packet possibly truncated"); str_init_len(&phc.s, buf + RTP_BUFFER_HEAD_ROOM, ret); - ret = stream_packet(&phc); + + if (sfd->stream->jb) { + ret = buffer_packet(&phc.mp, &phc.s); + if (ret == 1) + ret = stream_packet(&phc); + } + else + ret = stream_packet(&phc); + if (G_UNLIKELY(ret < 0)) ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret)); else if (phc.update) @@ -2020,3 +2029,13 @@ const struct transport_protocol *transport_protocol(const str *s) { out: return NULL; } + +void play_buffered(struct jb_packet *cp) { + struct packet_handler_ctx phc; + ZERO(phc); + phc.mp = cp->mp; + phc.s = cp->mp.raw; + //phc.buffered_packet = buffered; + stream_packet(&phc); + jb_packet_free(&cp); +} diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index 619dd631f..9831c041e 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -671,6 +671,16 @@ seen, that address is used. Otherwise, if a packet with a matching source port (but different address) is seen, that address is used. Otherwise, the source address of any incoming packet seen is used. +=item B<--jitter-buffer=>I + +Size of (incoming) jitter buffer in packets. A value of zero (the default) +disables the jitter buffer. The jitter buffer is currently only implemented for +userspace operation. + +=item B<--jb-clock-drift> + +Enable clock drift compensation for the jitter buffer. + =back =head1 INTERFACES diff --git a/include/call.h b/include/call.h index 022ed8391..3de4534f9 100644 --- a/include/call.h +++ b/include/call.h @@ -193,6 +193,7 @@ struct rtp_payload_type; struct media_player; struct send_timer; struct transport_protocol; +struct jitter_buffer; typedef bencode_buffer_t call_buffer_t; @@ -266,6 +267,7 @@ struct packet_stream { struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these *ssrc_out; /* LOCK: out_lock */ struct send_timer *send_timer; /* RO */ + struct jitter_buffer *jb; /* RO */ struct stats stats; struct stats kernel_stats; diff --git a/include/jitter_buffer.h b/include/jitter_buffer.h new file mode 100644 index 000000000..83ba9ba68 --- /dev/null +++ b/include/jitter_buffer.h @@ -0,0 +1,57 @@ +#ifndef _JITTER_BUFFER_H_ +#define _JITTER_BUFFER_H_ + +#include "auxlib.h" +#include "socket.h" +#include "timerthread.h" +#include "media_socket.h" +//#include "codec.h" +// +//struct packet_handler_ctx; +struct jb_packet; +struct media_packet; +// +struct jb_packet { + struct timerthread_queue_entry ttq_entry; + char *buf; + struct media_packet mp; + //int buffered; +}; + +struct jitter_buffer { + struct timerthread_queue ttq; + mutex_t lock; + unsigned long first_send_ts; + struct timeval first_send; + unsigned int first_seq; + unsigned int rtptime_delta; + unsigned int next_exp_seq; + unsigned int cont_frames; + unsigned int cont_miss; + unsigned int clock_rate; + unsigned int payload_type; + unsigned int num_resets; + unsigned int initial_pkts; + unsigned int cont_buff_err; + int buffer_len; + int clock_drift_val; + int clock_drift_enable; //flag for buffer overflow underflow + int buf_decremented; + struct jb_packet *p; + struct call *call; + int disabled; +}; + +void jitter_buffer_init(void); + +struct jitter_buffer *jitter_buffer_new(struct call *); +void jitter_buffer_free(struct jitter_buffer **); + +int buffer_packet(struct media_packet *mp, const str *s); +void jb_packet_free(struct jb_packet **jbp); + +//int set_jitter_values(struct media_packet *mp); + +void jitter_buffer_loop(void *p); + +#endif diff --git a/include/main.h b/include/main.h index 1ac8feb41..5ca67d486 100644 --- a/include/main.h +++ b/include/main.h @@ -93,6 +93,8 @@ struct rtpengine_config { char *mysql_query; endpoint_t dtmf_udp_ep; enum endpoint_learning endpoint_learning; + int jb_length; + int jb_clock_drift; }; diff --git a/include/media_socket.h b/include/media_socket.h index 967a96cf9..6e42aee14 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -19,6 +19,7 @@ struct media_packet; struct transport_protocol; struct ssrc_ctx; struct rtpengine_srtp; +struct jb_packet; typedef int rtcp_filter_func(struct media_packet *, GQueue *); typedef int (*rewrite_func)(str *, struct packet_stream *, struct stream_fd *, const endpoint_t *, @@ -174,6 +175,8 @@ const struct streamhandler *determine_handler(const struct transport_protocol *i struct call_media *out_media, int must_recrypt); int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp); const struct transport_protocol *transport_protocol(const str *s); +//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered); +void play_buffered(struct jb_packet *cp); /* XXX shouldn't be necessary */ /* diff --git a/t/.gitignore b/t/.gitignore index f686cdefb..d0321297f 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -53,3 +53,4 @@ test-dtmf-detect *-test dtmf_rx_fillin.h *-test.c +jitter_buffer.c diff --git a/t/Makefile b/t/Makefile index 1885435fe..93d4e2e73 100644 --- a/t/Makefile +++ b/t/Makefile @@ -70,7 +70,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ - media_player.c + media_player.c jitter_buffer.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -128,7 +128,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ - media_player.o dtmflib.o + media_player.o jitter_buffer.o dtmflib.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o dtmflib.o