Browse Source

TT#74301 merge jitter buffer PR #834

refactored

closes #834

Change-Id: I174cc6e365af54fb66d2dd78be02c601c5d5d645
changes/49/37049/7
Richard Fuchs 6 years ago
parent
commit
ef0d6a3a80
12 changed files with 492 additions and 4 deletions
  1. +1
    -1
      daemon/Makefile
  2. +5
    -0
      daemon/call.c
  3. +376
    -0
      daemon/jitter_buffer.c
  4. +13
    -0
      daemon/main.c
  5. +20
    -1
      daemon/media_socket.c
  6. +10
    -0
      daemon/rtpengine.pod
  7. +2
    -0
      include/call.h
  8. +57
    -0
      include/jitter_buffer.h
  9. +2
    -0
      include/main.h
  10. +3
    -0
      include/media_socket.h
  11. +1
    -0
      t/.gitignore
  12. +2
    -2
      t/Makefile

+ 1
- 1
daemon/Makefile View File

@ -127,7 +127,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c
bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c load.c dtmf.c timerthread.c media_player.c
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c
LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c
ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.c resample.c


+ 5
- 0
daemon/call.c View File

@ -45,6 +45,7 @@
#include "graphite.h"
#include "codec.h"
#include "media_player.h"
#include "jitter_buffer.h"
/* also serves as array index for callstream->peers[] */
@ -905,6 +906,9 @@ struct packet_stream *__packet_stream_new(struct call *call) {
recording_init_stream(stream);
stream->send_timer = send_timer_new(stream);
if (rtpe_config.jb_length)
stream->jb = jitter_buffer_new(call);
return stream;
}
@ -2257,6 +2261,7 @@ no_stats_output:
ps = l->data;
send_timer_put(&ps->send_timer);
jitter_buffer_free(&ps->jb);
__unkernelize(ps);
dtls_shutdown(ps);
ps->selected_sfd = NULL;


+ 376
- 0
daemon/jitter_buffer.c View File

@ -0,0 +1,376 @@
#include "jitter_buffer.h"
#include "timerthread.h"
#include "media_socket.h"
#include "call.h"
#include "codec.h"
#include "main.h"
#define INITIAL_PACKETS 0x1E
#define CONT_SEQ_COUNT 0x64
#define CONT_MISS_COUNT 0x0A
#define CONT_INCORRECT_BUFFERING 0x14
static struct timerthread jitter_buffer_thread;
void jitter_buffer_init(void) {
timerthread_init(&jitter_buffer_thread, timerthread_queue_run);
}
// jb is locked
static void reset_jitter_buffer(struct jitter_buffer *jb) {
ilog(LOG_DEBUG, "reset_jitter_buffer");
jb->first_send_ts = 0;
jb->first_send.tv_sec = 0;
jb->first_send.tv_usec = 0;
jb->first_seq = 0;
jb->rtptime_delta = 0;
jb->buffer_len = 0;
jb->cont_frames = 0;
jb->cont_miss = 0;
jb->next_exp_seq = 0;
jb->clock_rate = 0;
jb->payload_type = 0;
jb->cont_buff_err = 0;
jb->buf_decremented = 0;
jb->clock_drift_val = 0;
jb->clock_drift_enable = 0;
jb_packet_free(&jb->p);
jb->num_resets++;
//disable jitter buffer in case of more than 2 resets
if(jb->num_resets > 2 && jb->call)
jb->disabled = 1;
}
static int get_clock_rate(struct media_packet *mp, int payload_type) {
const struct rtp_payload_type *rtp_pt = NULL;
struct jitter_buffer *jb = mp->stream->jb;
int clock_rate = 0;
if(jb->clock_rate && jb->payload_type == payload_type)
return jb->clock_rate;
struct codec_handler *transcoder = codec_handler_get(mp->media, payload_type);
if(transcoder) {
if(transcoder->source_pt.payload_type == payload_type)
rtp_pt = &transcoder->source_pt;
if(transcoder->dest_pt.payload_type == payload_type)
rtp_pt = &transcoder->dest_pt;
}
if(rtp_pt) {
clock_rate = jb->clock_rate = rtp_pt->clock_rate;
jb->payload_type = payload_type;
}
else
ilog(LOG_DEBUG, "clock_rate not present payload_type = %d", payload_type);
return clock_rate;
}
static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) {
char *buf = malloc(s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM);
if (!buf) {
ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno));
return NULL;
}
struct jb_packet *p = g_slice_alloc0(sizeof(*p));
p->buf = buf;
p->mp = *mp;
obj_hold(p->mp.sfd);
str_init_len(&p->mp.raw, buf + RTP_BUFFER_HEAD_ROOM, s->len);
memcpy(p->mp.raw.s, s->s, s->len);
if(rtp_payload(&p->mp.rtp, &p->mp.payload, s)) {
jb_packet_free(&p);
return NULL;
}
return p;
}
// jb is locked
static void check_buffered_packets(struct jitter_buffer *jb) {
if (g_tree_nnodes(jb->ttq.entries) >= (2* rtpe_config.jb_length)) {
ilog(LOG_DEBUG, "Jitter reset due to buffer overflow");
reset_jitter_buffer(jb);
}
}
// jb is locked
static int queue_packet(struct media_packet *mp, struct jb_packet *p) {
struct jitter_buffer *jb = mp->stream->jb;
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate || !jb->first_send.tv_sec) {
ilog(LOG_DEBUG, "Jitter reset due to clockrate");
reset_jitter_buffer(jb);
return 1;
}
long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts;
int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq;
if(!jb->rtptime_delta && seq_diff) {
jb->rtptime_delta = ts_diff/seq_diff;
}
p->ttq_entry.when = jb->first_send;
long long ts_diff_us =
(long long) (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate;
ts_diff_us += (jb->clock_drift_val * seq_diff);
if(jb->buf_decremented) {
ts_diff_us += 5000; //add 5ms delta when 2 packets are scheduled around same time
jb->buf_decremented = 0;
}
timeval_add_usec(&p->ttq_entry.when, ts_diff_us);
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now);
if (ts_diff_us > 3000000) { // more than three second, can't be right
jb->first_send.tv_sec = 0;
jb->rtptime_delta = 0;
}
timerthread_queue_push(&jb->ttq, &p->ttq_entry);
return 0;
}
static void handle_clock_drift(struct media_packet *mp) {
ilog(LOG_DEBUG, "handle_clock_drift");
unsigned long ts = ntohl(mp->rtp->timestamp);
struct jitter_buffer *jb = mp->stream->jb;
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate) {
return;
}
long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts;
int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq;
long long ts_diff_us =
(long long) (ts_diff)* 1000000 / clockrate;
struct timeval to_send = jb->first_send;
timeval_add_usec(&to_send, ts_diff_us);
long long time_diff = timeval_diff(&rtpe_now, &to_send);
jb->clock_drift_val = time_diff/seq_diff;
jb->clock_drift_enable = 0;
jb->cont_buff_err = 0;
}
int buffer_packet(struct media_packet *mp, const str *s) {
struct jb_packet *p = NULL;
int ret = 1; // must call stream_packet
mp->stream = mp->sfd->stream;
mp->media = mp->stream->media;
mp->call = mp->sfd->call;
struct call *call = mp->call;
rwlock_lock_r(&call->master_lock);
struct jitter_buffer *jb = mp->stream->jb;
if (!jb || jb->disabled)
goto end;
ilog(LOG_DEBUG, "Handling JB packet on: %s:%d", sockaddr_print_buf(&mp->stream->endpoint.address),
mp->stream->endpoint.port);
p = get_jb_packet(mp, s);
if (!p)
goto end;
mp = &p->mp;
int payload_type = (mp->rtp->m_pt & 0x7f);
mutex_lock(&jb->lock);
if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change
jb->first_send.tv_sec = 0;
jb->rtptime_delta = 0;
}
if(jb->clock_drift_enable)
handle_clock_drift(mp);
if (jb->first_send.tv_sec) {
ret = queue_packet(mp,p);
if(!ret && jb->p) {
// push first packet into jitter buffer
queue_packet(&jb->p->mp,jb->p);
jb->p = NULL;
}
}
else {
// store data from first packet and use for successive packets and queue the first packet
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate){
jb->initial_pkts++;
if(jb->initial_pkts > INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any
reset_jitter_buffer(jb);
}
goto end_unlock;
}
p->ttq_entry.when = jb->first_send = rtpe_now;
jb->first_send_ts = ts;
jb->first_seq = ntohs(mp->rtp->seq_num);
jb->p = p;
ret = 0;
}
// packet consumed?
if (ret == 0)
p = NULL;
check_buffered_packets(jb);
end_unlock:
mutex_unlock(&jb->lock);
end:
rwlock_unlock_r(&call->master_lock);
if (p)
jb_packet_free(&p);
return ret;
}
static void increment_buffer(struct jitter_buffer *jb) {
if(jb->buffer_len < rtpe_config.jb_length)
jb->buffer_len++;
}
static void decrement_buffer(struct jitter_buffer *jb) {
if(jb->buffer_len > 0) {
jb->buffer_len--;
jb->buf_decremented = 1;
}
}
static int set_jitter_values(struct media_packet *mp) {
int ret=0;
int curr_seq = ntohs(mp->rtp->seq_num);
struct jitter_buffer *jb = mp->stream->jb;
if(jb->next_exp_seq) {
mutex_lock(&jb->lock);
if(curr_seq > jb->next_exp_seq) {
ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq);
increment_buffer(jb);
jb->cont_frames = 0;
jb->cont_miss++;
}
else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed
jb->cont_frames = 0;
jb->cont_miss++;
ret=1;
}
else {
jb->cont_frames++;
jb->cont_miss = 0;
if(jb->cont_frames >= CONT_SEQ_COUNT) {
decrement_buffer(jb);
jb->cont_frames = 0;
ilog(LOG_DEBUG, "Received continous frames Buffer len=%d", jb->buffer_len);
}
}
if(jb->cont_miss >= CONT_MISS_COUNT)
reset_jitter_buffer(jb);
mutex_unlock(&jb->lock);
}
if(curr_seq >= jb->next_exp_seq)
jb->next_exp_seq = curr_seq + 1;
int len = g_tree_nnodes(jb->ttq.entries);
if(len > jb->buffer_len || len < jb->buffer_len) {
jb->cont_buff_err++;
if((jb->cont_buff_err > CONT_INCORRECT_BUFFERING) && rtpe_config.jb_clock_drift)
jb->clock_drift_enable=1;
}
else
jb->cont_buff_err = 0;
return ret;
}
static void __jb_send_later(struct timerthread_queue *ttq, void *p) {
struct jb_packet *cp = p;
set_jitter_values(&cp->mp);
play_buffered(p);
};
// jb and call are locked
static void __jb_send_now(struct timerthread_queue *ttq, void *p) {
struct jitter_buffer *jb = (void *) ttq;
mutex_unlock(&jb->lock);
rwlock_unlock_r(&jb->call->master_lock);
__jb_send_later(ttq, p);
rwlock_lock_r(&jb->call->master_lock);
mutex_lock(&jb->lock);
};
static void __jb_free(void *p) {
struct jitter_buffer *jb = p;
jitter_buffer_free(&jb);
}
void __jb_packet_free(void *p) {
struct jb_packet *jbp = p;
jb_packet_free(&jbp);
}
void jitter_buffer_loop(void *p) {
ilog(LOG_DEBUG, "jitter_buffer_loop");
timerthread_run(&jitter_buffer_thread);
}
struct jitter_buffer *jitter_buffer_new(struct call *c) {
ilog(LOG_DEBUG, "creating jitter_buffer");
struct jitter_buffer *jb = timerthread_queue_new("jitter_buffer", sizeof(*jb),
&jitter_buffer_thread,
__jb_send_now,
__jb_send_later,
__jb_free, __jb_packet_free);
mutex_init(&jb->lock);
jb->call = obj_get(c);
return jb;
}
void jitter_buffer_free(struct jitter_buffer **jbp) {
if (!jbp || !*jbp)
return;
ilog(LOG_DEBUG, "freeing jitter_buffer");
mutex_destroy(&(*jbp)->lock);
if ((*jbp)->call)
obj_put((*jbp)->call);
g_slice_free1(sizeof(**jbp), *jbp);
*jbp = NULL;
}
void jb_packet_free(struct jb_packet **jbp) {
if (!jbp || !*jbp)
return;
free((*jbp)->buf);
if ((*jbp)->mp.sfd)
obj_put((*jbp)->mp.sfd);
g_slice_free1(sizeof(**jbp), *jbp);
*jbp = NULL;
}

+ 13
- 0
daemon/main.c View File

@ -44,6 +44,7 @@
#include "ssllib.h"
#include "media_player.h"
#include "dtmf.h"
#include "jitter_buffer.h"
@ -376,6 +377,9 @@ static void options(int *argc, char ***argv) {
{ "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_pass,"MySQL connection credentials", "PASSWORD" },
{ "mysql-query",0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_query,"MySQL select query", "STRING" },
{ "endpoint-learning",0,0,G_OPTION_ARG_STRING, &endpoint_learning, "RTP endpoint learning algorithm", "delayed|immediate|off|heuristic" },
{ "jitter-buffer",0, 0, G_OPTION_ARG_INT, &rtpe_config.jb_length, "Size of jitter buffer", "INT" },
{ "jb-clock-drift",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_clock_drift,"Compensate for source clock drift",NULL },
{ NULL, }
};
@ -564,6 +568,9 @@ static void options(int *argc, char ***argv) {
die("Invalid --endpoint-learning option ('%s')", endpoint_learning);
}
rtpe_config.endpoint_learning = el_config;
if (rtpe_config.jb_length < 0)
die("Invalid negative jitter buffer size");
}
void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
@ -635,6 +642,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
ini_rtpe_cfg->rec_method = g_strdup(rtpe_config.rec_method);
ini_rtpe_cfg->rec_format = g_strdup(rtpe_config.rec_format);
ini_rtpe_cfg->jb_length = rtpe_config.jb_length;
ini_rtpe_cfg->jb_clock_drift = rtpe_config.jb_clock_drift;
}
static void early_init(void) {
@ -670,6 +679,7 @@ static void init_everything(void) {
codeclib_init(0);
media_player_init();
dtmf_init();
jitter_buffer_init();
}
@ -847,6 +857,9 @@ int main(int argc, char **argv) {
thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
#endif
thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
if (rtpe_config.jb_length > 0)
thread_create_detach_prio(jitter_buffer_loop, NULL, rtpe_config.scheduling,
rtpe_config.priority);
}


+ 20
- 1
daemon/media_socket.c View File

@ -26,6 +26,7 @@
#include "main.h"
#include "codec.h"
#include "media_player.h"
#include "jitter_buffer.h"
#ifndef PORT_RANDOM_MIN
@ -1947,7 +1948,15 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
ilog(LOG_WARNING, "UDP packet possibly truncated");
str_init_len(&phc.s, buf + RTP_BUFFER_HEAD_ROOM, ret);
ret = stream_packet(&phc);
if (sfd->stream->jb) {
ret = buffer_packet(&phc.mp, &phc.s);
if (ret == 1)
ret = stream_packet(&phc);
}
else
ret = stream_packet(&phc);
if (G_UNLIKELY(ret < 0))
ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret));
else if (phc.update)
@ -2020,3 +2029,13 @@ const struct transport_protocol *transport_protocol(const str *s) {
out:
return NULL;
}
void play_buffered(struct jb_packet *cp) {
struct packet_handler_ctx phc;
ZERO(phc);
phc.mp = cp->mp;
phc.s = cp->mp.raw;
//phc.buffered_packet = buffered;
stream_packet(&phc);
jb_packet_free(&cp);
}

+ 10
- 0
daemon/rtpengine.pod View File

@ -671,6 +671,16 @@ seen, that address is used. Otherwise, if a packet with a matching source port
(but different address) is seen, that address is used. Otherwise, the source
address of any incoming packet seen is used.
=item B<--jitter-buffer=>I<INT>
Size of (incoming) jitter buffer in packets. A value of zero (the default)
disables the jitter buffer. The jitter buffer is currently only implemented for
userspace operation.
=item B<--jb-clock-drift>
Enable clock drift compensation for the jitter buffer.
=back
=head1 INTERFACES


+ 2
- 0
include/call.h View File

@ -193,6 +193,7 @@ struct rtp_payload_type;
struct media_player;
struct send_timer;
struct transport_protocol;
struct jitter_buffer;
typedef bencode_buffer_t call_buffer_t;
@ -266,6 +267,7 @@ struct packet_stream {
struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these
*ssrc_out; /* LOCK: out_lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
struct stats stats;
struct stats kernel_stats;


+ 57
- 0
include/jitter_buffer.h View File

@ -0,0 +1,57 @@
#ifndef _JITTER_BUFFER_H_
#define _JITTER_BUFFER_H_
#include "auxlib.h"
#include "socket.h"
#include "timerthread.h"
#include "media_socket.h"
//#include "codec.h"
//
//struct packet_handler_ctx;
struct jb_packet;
struct media_packet;
//
struct jb_packet {
struct timerthread_queue_entry ttq_entry;
char *buf;
struct media_packet mp;
//int buffered;
};
struct jitter_buffer {
struct timerthread_queue ttq;
mutex_t lock;
unsigned long first_send_ts;
struct timeval first_send;
unsigned int first_seq;
unsigned int rtptime_delta;
unsigned int next_exp_seq;
unsigned int cont_frames;
unsigned int cont_miss;
unsigned int clock_rate;
unsigned int payload_type;
unsigned int num_resets;
unsigned int initial_pkts;
unsigned int cont_buff_err;
int buffer_len;
int clock_drift_val;
int clock_drift_enable; //flag for buffer overflow underflow
int buf_decremented;
struct jb_packet *p;
struct call *call;
int disabled;
};
void jitter_buffer_init(void);
struct jitter_buffer *jitter_buffer_new(struct call *);
void jitter_buffer_free(struct jitter_buffer **);
int buffer_packet(struct media_packet *mp, const str *s);
void jb_packet_free(struct jb_packet **jbp);
//int set_jitter_values(struct media_packet *mp);
void jitter_buffer_loop(void *p);
#endif

+ 2
- 0
include/main.h View File

@ -93,6 +93,8 @@ struct rtpengine_config {
char *mysql_query;
endpoint_t dtmf_udp_ep;
enum endpoint_learning endpoint_learning;
int jb_length;
int jb_clock_drift;
};


+ 3
- 0
include/media_socket.h View File

@ -19,6 +19,7 @@ struct media_packet;
struct transport_protocol;
struct ssrc_ctx;
struct rtpengine_srtp;
struct jb_packet;
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct stream_fd *, const endpoint_t *,
@ -174,6 +175,8 @@ const struct streamhandler *determine_handler(const struct transport_protocol *i
struct call_media *out_media, int must_recrypt);
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp);
const struct transport_protocol *transport_protocol(const str *s);
//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered);
void play_buffered(struct jb_packet *cp);
/* XXX shouldn't be necessary */
/*


+ 1
- 0
t/.gitignore View File

@ -53,3 +53,4 @@ test-dtmf-detect
*-test
dtmf_rx_fillin.h
*-test.c
jitter_buffer.c

+ 2
- 2
t/Makefile View File

@ -70,7 +70,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c
DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c
media_player.c jitter_buffer.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c
endif
@ -128,7 +128,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr
rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \
control_ng.strhash.o \
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o dtmflib.o
media_player.o jitter_buffer.o dtmflib.o
payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \
resample.o dtmflib.o


Loading…
Cancel
Save