Browse Source

Merge branch 'jb_new' of https://github.com/balajeesv/rtpengine

Change-Id: I73fc3529938bf65e40f86a4e526c8bf77e199492
pull/940/head
Richard Fuchs 6 years ago
parent
commit
ab53fdc6f6
21 changed files with 1062 additions and 324 deletions
  1. +1
    -1
      daemon/Makefile
  2. +5
    -0
      daemon/call.c
  3. +7
    -7
      daemon/codec.c
  4. +358
    -0
      daemon/jitter_buffer.c
  5. +13
    -0
      daemon/main.c
  6. +42
    -87
      daemon/media_player.c
  7. +20
    -1
      daemon/media_socket.c
  8. +10
    -0
      daemon/rtpengine.pod
  9. +141
    -0
      daemon/timerthread.c
  10. +2
    -0
      include/call.h
  11. +2
    -2
      include/codec.h
  12. +59
    -0
      include/jitter_buffer.h
  13. +2
    -0
      include/main.h
  14. +2
    -4
      include/media_player.h
  15. +3
    -0
      include/media_socket.h
  16. +28
    -0
      include/timerthread.h
  17. +242
    -0
      perl/NGCP/Rtpengine/AutoTest.pm
  18. +1
    -0
      t/.gitignore
  19. +4
    -2
      t/Makefile
  20. +94
    -0
      t/auto-daemon-tests-jb.pl
  21. +26
    -220
      t/auto-daemon-tests.pl

+ 1
- 1
daemon/Makefile View File

@ -127,7 +127,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c
bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c load.c dtmf.c timerthread.c media_player.c
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c
LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c
ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.c resample.c


+ 5
- 0
daemon/call.c View File

@ -45,6 +45,7 @@
#include "graphite.h"
#include "codec.h"
#include "media_player.h"
#include "jitter_buffer.h"
/* also serves as array index for callstream->peers[] */
@ -905,6 +906,9 @@ struct packet_stream *__packet_stream_new(struct call *call) {
recording_init_stream(stream);
stream->send_timer = send_timer_new(stream);
if (rtpe_config.jb_length)
stream->jb = jitter_buffer_new(call);
return stream;
}
@ -2257,6 +2261,7 @@ no_stats_output:
ps = l->data;
send_timer_put(&ps->send_timer);
jb_put(&ps->jb);
__unkernelize(ps);
dtls_shutdown(ps);
ps->selected_sfd = NULL;


+ 7
- 7
daemon/codec.c View File

@ -965,35 +965,35 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch,
p->s.len = payload_len + sizeof(struct rtp_header);
payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type);
p->free_func = free;
p->source = handler;
p->ttq_entry.source = handler;
p->rtp = rh;
// this packet is dynamically allocated, so we're able to schedule it.
// determine scheduled time to send
if (ch->first_send.tv_sec && ch->encoder_format.clockrate) {
// scale first_send from first_send_ts to ts
p->to_send = ch->first_send;
p->ttq_entry.when = ch->first_send;
uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around
unsigned long long ts_diff_us =
(unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate
* ch->handler->dest_pt.codec_def->clockrate_mult;
timeval_add_usec(&p->to_send, ts_diff_us);
timeval_add_usec(&p->ttq_entry.when, ts_diff_us);
// how far in the future is this?
ts_diff_us = timeval_diff(&p->to_send, &rtpe_now); // negative wrap-around to positive OK
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // negative wrap-around to positive OK
if (ts_diff_us > 1000000) // more than one second, can't be right
ch->first_send.tv_sec = 0; // fix it up below
}
if (!ch->first_send.tv_sec) {
p->to_send = ch->first_send = rtpe_now;
p->ttq_entry.when = ch->first_send = rtpe_now;
ch->first_send_ts = ts;
}
ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu",
ntohs(rh->seq_num),
ts,
(long unsigned) p->to_send.tv_sec,
(long unsigned) p->to_send.tv_usec);
(long unsigned) p->ttq_entry.when.tv_sec,
(long unsigned) p->ttq_entry.when.tv_usec);
g_queue_push_tail(&mp->packets_out, p);


+ 358
- 0
daemon/jitter_buffer.c View File

@ -0,0 +1,358 @@
#include "jitter_buffer.h"
#include "timerthread.h"
#include "media_socket.h"
#include "call.h"
#include "codec.h"
#include "main.h"
#include <math.h>
#define INITIAL_PACKETS 0x1E
#define CONT_SEQ_COUNT 0x64
#define CONT_MISS_COUNT 0x0A
#define CLOCK_DRIFT_MULT 0x14
static struct timerthread jitter_buffer_thread;
void jitter_buffer_init(void) {
ilog(LOG_INFO, "jitter_buffer_init");
timerthread_init(&jitter_buffer_thread, timerthread_queue_run);
}
// jb is locked
static void reset_jitter_buffer(struct jitter_buffer *jb) {
ilog(LOG_INFO, "reset_jitter_buffer");
jb->first_send_ts = 0;
jb->first_send.tv_sec = 0;
jb->first_send.tv_usec = 0;
jb->first_seq = 0;
jb->rtptime_delta = 0;
jb->buffer_len = 0;
jb->cont_frames = 0;
jb->cont_miss = 0;
jb->next_exp_seq = 0;
jb->clock_rate = 0;
jb->payload_type = 0;
jb->drift_mult_factor = 0;
jb->buf_decremented = 0;
jb->clock_drift_val = 0;
jb->num_resets++;
//disable jitter buffer in case of more than 2 resets
if(jb->num_resets > 2 && jb->call)
jb->disabled = 1;
}
static int get_clock_rate(struct media_packet *mp, int payload_type) {
const struct rtp_payload_type *rtp_pt = NULL;
struct jitter_buffer *jb = mp->stream->jb;
int clock_rate = 0;
if(jb->clock_rate && jb->payload_type == payload_type)
return jb->clock_rate;
struct codec_handler *transcoder = codec_handler_get(mp->media, payload_type);
if(transcoder) {
if(transcoder->source_pt.payload_type == payload_type)
rtp_pt = &transcoder->source_pt;
if(transcoder->dest_pt.payload_type == payload_type)
rtp_pt = &transcoder->dest_pt;
}
if(rtp_pt) {
clock_rate = jb->clock_rate = rtp_pt->clock_rate;
jb->payload_type = payload_type;
}
else
ilog(LOG_DEBUG, "clock_rate not present payload_type = %d", payload_type);
return clock_rate;
}
static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) {
char *buf = malloc(s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM);
if (!buf) {
ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno));
return NULL;
}
struct jb_packet *p = g_slice_alloc0(sizeof(*p));
p->buf = buf;
p->mp = *mp;
obj_hold(p->mp.sfd);
str_init_len(&p->mp.raw, buf + RTP_BUFFER_HEAD_ROOM, s->len);
memcpy(p->mp.raw.s, s->s, s->len);
if(rtp_payload(&p->mp.rtp, &p->mp.payload, &p->mp.raw)) {
jb_packet_free(&p);
return NULL;
}
return p;
}
// jb is locked
static void check_buffered_packets(struct jitter_buffer *jb) {
if (g_tree_nnodes(jb->ttq.entries) >= (2* rtpe_config.jb_length)) {
ilog(LOG_DEBUG, "Jitter reset due to buffer overflow");
reset_jitter_buffer(jb);
}
}
// jb is locked
static int queue_packet(struct media_packet *mp, struct jb_packet *p) {
struct jitter_buffer *jb = mp->stream->jb;
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate || !jb->first_send.tv_sec) {
ilog(LOG_DEBUG, "Jitter reset due to clockrate");
reset_jitter_buffer(jb);
return 1;
}
long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts;
int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq;
if(!jb->rtptime_delta && seq_diff) {
jb->rtptime_delta = ts_diff/seq_diff;
}
p->ttq_entry.when = jb->first_send;
long long ts_diff_us =
(long long) (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate;
ts_diff_us += (jb->clock_drift_val * seq_diff);
if(jb->buf_decremented) {
ts_diff_us += 5000; //add 5ms delta when 2 packets are scheduled around same time
jb->buf_decremented = 0;
}
timeval_add_usec(&p->ttq_entry.when, ts_diff_us);
ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now);
if (ts_diff_us > 3000000) { // more than three second, can't be right
jb->first_send.tv_sec = 0;
jb->rtptime_delta = 0;
}
timerthread_queue_push(&jb->ttq, &p->ttq_entry);
return 0;
}
static void handle_clock_drift(struct media_packet *mp) {
ilog(LOG_DEBUG, "handle_clock_drift");
struct jitter_buffer *jb = mp->stream->jb;
int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq;
int mult_factor = pow(2, jb->drift_mult_factor);
if(seq_diff < (mult_factor * CLOCK_DRIFT_MULT))
return;
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate) {
return;
}
long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts;
long long ts_diff_us =
(long long) (ts_diff)* 1000000 / clockrate;
struct timeval to_send = jb->first_send;
timeval_add_usec(&to_send, ts_diff_us);
long long time_diff = timeval_diff(&rtpe_now, &to_send);
jb->clock_drift_val = time_diff/seq_diff;
jb->drift_mult_factor++;
}
int buffer_packet(struct media_packet *mp, const str *s) {
struct jb_packet *p = NULL;
int ret = 1; // must call stream_packet
mp->stream = mp->sfd->stream;
mp->media = mp->stream->media;
mp->call = mp->sfd->call;
struct call *call = mp->call;
rwlock_lock_r(&call->master_lock);
struct jitter_buffer *jb = mp->stream->jb;
if (!jb || jb->disabled)
goto end;
ilog(LOG_DEBUG, "Handling JB packet on: %s:%d", sockaddr_print_buf(&mp->stream->endpoint.address),
mp->stream->endpoint.port);
p = get_jb_packet(mp, s);
if (!p)
goto end;
mp = &p->mp;
int payload_type = (mp->rtp->m_pt & 0x7f);
mutex_lock(&jb->lock);
if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change
jb->first_send.tv_sec = 0;
jb->rtptime_delta = 0;
}
if (jb->first_send.tv_sec) {
if(rtpe_config.jb_clock_drift)
handle_clock_drift(mp);
ret = queue_packet(mp,p);
}
else {
// store data from first packet and use for successive packets and queue the first packet
unsigned long ts = ntohl(mp->rtp->timestamp);
int payload_type = (mp->rtp->m_pt & 0x7f);
int clockrate = get_clock_rate(mp, payload_type);
if(!clockrate){
jb->initial_pkts++;
if(jb->initial_pkts > INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any
reset_jitter_buffer(jb);
}
goto end_unlock;
}
p->ttq_entry.when = jb->first_send = rtpe_now;
jb->first_send_ts = ts;
jb->first_seq = ntohs(mp->rtp->seq_num);
}
// packet consumed?
if (ret == 0)
p = NULL;
check_buffered_packets(jb);
end_unlock:
mutex_unlock(&jb->lock);
end:
rwlock_unlock_r(&call->master_lock);
if (p)
jb_packet_free(&p);
return ret;
}
static void increment_buffer(struct jitter_buffer *jb) {
if(jb->buffer_len < rtpe_config.jb_length)
jb->buffer_len++;
}
static void decrement_buffer(struct jitter_buffer *jb) {
if(jb->buffer_len > 0) {
jb->buffer_len--;
jb->buf_decremented = 1;
}
}
static void set_jitter_values(struct media_packet *mp) {
struct jitter_buffer *jb = mp->stream->jb;
if(!jb || !mp->rtp)
return;
int curr_seq = ntohs(mp->rtp->seq_num);
if(jb->next_exp_seq) {
mutex_lock(&jb->lock);
if(curr_seq > jb->next_exp_seq) {
ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq);
increment_buffer(jb);
jb->cont_frames = 0;
jb->cont_miss++;
}
else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed
jb->cont_frames = 0;
jb->cont_miss++;
}
else {
jb->cont_frames++;
jb->cont_miss = 0;
if(jb->cont_frames >= CONT_SEQ_COUNT) {
decrement_buffer(jb);
jb->cont_frames = 0;
ilog(LOG_DEBUG, "Received continous frames Buffer len=%d", jb->buffer_len);
}
}
if(jb->cont_miss >= CONT_MISS_COUNT)
reset_jitter_buffer(jb);
mutex_unlock(&jb->lock);
}
if(curr_seq >= jb->next_exp_seq)
jb->next_exp_seq = curr_seq + 1;
}
static void __jb_send_later(struct timerthread_queue *ttq, void *p) {
struct jb_packet *cp = p;
set_jitter_values(&cp->mp);
play_buffered(p);
};
// jb and call are locked
static void __jb_send_now(struct timerthread_queue *ttq, void *p) {
struct jitter_buffer *jb = (void *) ttq;
mutex_unlock(&jb->lock);
rwlock_unlock_r(&jb->call->master_lock);
__jb_send_later(ttq, p);
rwlock_lock_r(&jb->call->master_lock);
mutex_lock(&jb->lock);
};
static void __jb_free(void *p) {
struct jitter_buffer *jb = p;
jitter_buffer_free(&jb);
}
void __jb_packet_free(void *p) {
struct jb_packet *jbp = p;
jb_packet_free(&jbp);
}
void jitter_buffer_loop(void *p) {
ilog(LOG_DEBUG, "jitter_buffer_loop");
timerthread_run(&jitter_buffer_thread);
}
struct jitter_buffer *jitter_buffer_new(struct call *c) {
ilog(LOG_DEBUG, "creating jitter_buffer");
struct jitter_buffer *jb = timerthread_queue_new("jitter_buffer", sizeof(*jb),
&jitter_buffer_thread,
__jb_send_now,
__jb_send_later,
__jb_free, __jb_packet_free);
mutex_init(&jb->lock);
jb->call = obj_get(c);
return jb;
}
void jitter_buffer_free(struct jitter_buffer **jbp) {
if (!jbp || !*jbp)
return;
ilog(LOG_DEBUG, "freeing jitter_buffer");
mutex_destroy(&(*jbp)->lock);
if ((*jbp)->call)
obj_put((*jbp)->call);
}
void jb_packet_free(struct jb_packet **jbp) {
if (!jbp || !*jbp)
return;
free((*jbp)->buf);
if ((*jbp)->mp.sfd)
obj_put((*jbp)->mp.sfd);
g_slice_free1(sizeof(**jbp), *jbp);
*jbp = NULL;
}

+ 13
- 0
daemon/main.c View File

@ -44,6 +44,7 @@
#include "ssllib.h"
#include "media_player.h"
#include "dtmf.h"
#include "jitter_buffer.h"
@ -376,6 +377,9 @@ static void options(int *argc, char ***argv) {
{ "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_pass,"MySQL connection credentials", "PASSWORD" },
{ "mysql-query",0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_query,"MySQL select query", "STRING" },
{ "endpoint-learning",0,0,G_OPTION_ARG_STRING, &endpoint_learning, "RTP endpoint learning algorithm", "delayed|immediate|off|heuristic" },
{ "jitter-buffer",0, 0, G_OPTION_ARG_INT, &rtpe_config.jb_length, "Size of jitter buffer", "INT" },
{ "jb-clock-drift",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_clock_drift,"Compensate for source clock drift",NULL },
{ NULL, }
};
@ -564,6 +568,9 @@ static void options(int *argc, char ***argv) {
die("Invalid --endpoint-learning option ('%s')", endpoint_learning);
}
rtpe_config.endpoint_learning = el_config;
if (rtpe_config.jb_length < 0)
die("Invalid negative jitter buffer size");
}
void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
@ -635,6 +642,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) {
ini_rtpe_cfg->rec_method = g_strdup(rtpe_config.rec_method);
ini_rtpe_cfg->rec_format = g_strdup(rtpe_config.rec_format);
ini_rtpe_cfg->jb_length = rtpe_config.jb_length;
ini_rtpe_cfg->jb_clock_drift = rtpe_config.jb_clock_drift;
}
static void early_init(void) {
@ -670,6 +679,7 @@ static void init_everything(void) {
codeclib_init(0);
media_player_init();
dtmf_init();
jitter_buffer_init();
}
@ -847,6 +857,9 @@ int main(int argc, char **argv) {
thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
#endif
thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling, rtpe_config.priority);
if (rtpe_config.jb_length > 0)
thread_create_detach_prio(jitter_buffer_loop, NULL, rtpe_config.scheduling,
rtpe_config.priority);
}


+ 42
- 87
daemon/media_player.c View File

@ -30,27 +30,15 @@ static struct timerthread send_timer_thread;
static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp);
static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp);
#ifdef WITH_TRANSCODING
// called with call->master lock in W
static unsigned int send_timer_flush(struct send_timer *st, void *ptr) {
if (!st)
return 0;
unsigned int num = 0;
GList *l = st->packets.head;
while (l) {
GList *next = l->next;
struct codec_packet *p = l->data;
if (p->source != ptr)
goto next;
g_queue_delete_link(&st->packets, l);
codec_packet_free(p);
num++;
next:
l = next;
}
return num;
return timerthread_queue_flush(&st->ttq, ptr);
}
@ -145,32 +133,34 @@ static void __send_timer_free(void *p) {
ilog(LOG_DEBUG, "freeing send_timer");
g_queue_clear_full(&st->packets, codec_packet_free);
mutex_destroy(&st->lock);
obj_put(st->call);
}
static void __send_timer_send_now(struct timerthread_queue *ttq, void *p) {
send_timer_send_nolock((void *) ttq, p);
};
static void __send_timer_send_later(struct timerthread_queue *ttq, void *p) {
send_timer_send_lock((void *) ttq, p);
};
// call->master_lock held in W
struct send_timer *send_timer_new(struct packet_stream *ps) {
ilog(LOG_DEBUG, "creating send_timer");
struct send_timer *st = obj_alloc0("send_timer", sizeof(*st), __send_timer_free);
st->tt_obj.tt = &send_timer_thread;
mutex_init(&st->lock);
struct send_timer *st = timerthread_queue_new("send_timer", sizeof(*st),
&send_timer_thread,
__send_timer_send_now,
__send_timer_send_later,
__send_timer_free, codec_packet_free);
st->call = obj_get(ps->call);
st->sink = ps;
g_queue_init(&st->packets);
return st;
}
// st->stream->out_lock (or call->master_lock/W) must be held already
static int send_timer_send(struct send_timer *st, struct codec_packet *cp) {
if (cp->to_send.tv_sec && timeval_cmp(&cp->to_send, &rtpe_now) > 0)
return -1; // not yet
static void __send_timer_send_common(struct send_timer *st, struct codec_packet *cp) {
if (!st->sink->selected_sfd)
goto out;
@ -186,36 +176,36 @@ static int send_timer_send(struct send_timer *st, struct codec_packet *cp) {
out:
codec_packet_free(cp);
return 0;
}
static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) {
struct call *call = st->call;
if (!call)
return;
log_info_call(call);
rwlock_lock_r(&call->master_lock);
__send_timer_send_common(st, cp);
rwlock_unlock_r(&call->master_lock);
}
// st->stream->out_lock (or call->master_lock/W) must be held already
void send_timer_push(struct send_timer *st, struct codec_packet *cp) {
// can we send immediately?
if (!send_timer_send(st, cp))
static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp) {
struct call *call = st->call;
if (!call)
return;
// queue for sending
log_info_call(call);
struct rtp_header *rh = (void *) cp->s.s;
ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)",
(unsigned long) cp->to_send.tv_sec,
(unsigned int) cp->to_send.tv_usec,
ntohs(rh->seq_num),
ntohl(rh->timestamp));
__send_timer_send_common(st, cp);
}
mutex_lock(&st->lock);
unsigned int qlen = st->packets.length;
// this hands over ownership of cp, so we must copy the timeval out
struct timeval tv_send = cp->to_send;
g_queue_push_tail(&st->packets, cp);
mutex_unlock(&st->lock);
// first packet in? we're probably not scheduled yet
if (!qlen)
timerthread_obj_schedule_abs(&st->tt_obj, &tv_send);
// st->stream->out_lock (or call->master_lock/W) must be held already
void send_timer_push(struct send_timer *st, struct codec_packet *cp) {
timerthread_queue_push(&st->ttq, &cp->ttq_entry);
}
@ -352,7 +342,7 @@ static void media_player_read_packet(struct media_player *mp) {
struct codec_packet *p = packet.packets_out.head->data;
if (p->rtp) {
mp->sync_ts = ntohl(p->rtp->timestamp);
mp->sync_ts_tv = p->to_send;
mp->sync_ts_tv = p->ttq_entry.when;
}
}
@ -646,46 +636,11 @@ static void media_player_run(void *ptr) {
#endif
static void send_timer_run(void *ptr) {
struct send_timer *st = ptr;
struct call *call = st->call;
log_info_call(call);
ilog(LOG_DEBUG, "running scheduled send_timer");
struct timeval next_send = {0,};
rwlock_lock_r(&call->master_lock);
mutex_lock(&st->lock);
while (st->packets.length) {
struct codec_packet *cp = st->packets.head->data;
// XXX this could be made lock-free
if (!send_timer_send(st, cp)) {
g_queue_pop_head(&st->packets);
continue;
}
// couldn't send the last one. remember time to schedule
next_send = cp->to_send;
break;
}
mutex_unlock(&st->lock);
rwlock_unlock_r(&call->master_lock);
if (next_send.tv_sec)
timerthread_obj_schedule_abs(&st->tt_obj, &next_send);
log_info_clear();
}
void media_player_init(void) {
#ifdef WITH_TRANSCODING
timerthread_init(&media_player_thread, media_player_run);
#endif
timerthread_init(&send_timer_thread, send_timer_run);
timerthread_init(&send_timer_thread, timerthread_queue_run);
}


+ 20
- 1
daemon/media_socket.c View File

@ -26,6 +26,7 @@
#include "main.h"
#include "codec.h"
#include "media_player.h"
#include "jitter_buffer.h"
#ifndef PORT_RANDOM_MIN
@ -1947,7 +1948,15 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) {
ilog(LOG_WARNING, "UDP packet possibly truncated");
str_init_len(&phc.s, buf + RTP_BUFFER_HEAD_ROOM, ret);
ret = stream_packet(&phc);
if (sfd->stream->jb) {
ret = buffer_packet(&phc.mp, &phc.s);
if (ret == 1)
ret = stream_packet(&phc);
}
else
ret = stream_packet(&phc);
if (G_UNLIKELY(ret < 0))
ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret));
else if (phc.update)
@ -2020,3 +2029,13 @@ const struct transport_protocol *transport_protocol(const str *s) {
out:
return NULL;
}
void play_buffered(struct jb_packet *cp) {
struct packet_handler_ctx phc;
ZERO(phc);
phc.mp = cp->mp;
phc.s = cp->mp.raw;
//phc.buffered_packet = buffered;
stream_packet(&phc);
jb_packet_free(&cp);
}

+ 10
- 0
daemon/rtpengine.pod View File

@ -671,6 +671,16 @@ seen, that address is used. Otherwise, if a packet with a matching source port
(but different address) is seen, that address is used. Otherwise, the source
address of any incoming packet seen is used.
=item B<--jitter-buffer=>I<INT>
Size of (incoming) jitter buffer in packets. A value of zero (the default)
disables the jitter buffer. The jitter buffer is currently only implemented for
userspace operation.
=item B<--jb-clock-drift>
Enable clock drift compensation for the jitter buffer.
=back
=head1 INTERFACES


+ 141
- 0
daemon/timerthread.c View File

@ -88,3 +88,144 @@ void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) {
nope:
mutex_unlock(&tt->lock);
}
static int timerthread_queue_run_one(struct timerthread_queue *ttq,
struct timerthread_queue_entry *ttqe,
void (*run_func)(struct timerthread_queue *, void *)) {
if (ttqe->when.tv_sec && timeval_cmp(&ttqe->when, &rtpe_now) > 0) {
if(timeval_diff(&ttqe->when, &rtpe_now) > 1000) // not to queue packet less than 1ms
return -1; // not yet
}
run_func(ttq, ttqe);
return 0;
}
void timerthread_queue_run(void *ptr) {
struct timerthread_queue *ttq = ptr;
ilog(LOG_DEBUG, "running timerthread_queue");
struct timeval next_send = {0,};
mutex_lock(&ttq->lock);
while (g_tree_nnodes(ttq->entries)) {
struct timerthread_queue_entry *ttqe = g_tree_find_first(ttq->entries, NULL, NULL);
assert(ttqe != NULL);
g_tree_remove(ttq->entries, ttqe);
mutex_unlock(&ttq->lock);
int ret = timerthread_queue_run_one(ttq, ttqe, ttq->run_later_func);
mutex_lock(&ttq->lock);
if (!ret)
continue;
// couldn't send the last one. remember time to schedule
g_tree_insert(ttq->entries, ttqe, ttqe);
next_send = ttqe->when;
break;
}
mutex_unlock(&ttq->lock);
if (next_send.tv_sec)
timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send);
}
static int ttqe_free_all(void *k, void *v, void *d) {
struct timerthread_queue *ttq = d;
ttq->entry_free_func(k);
return FALSE;
}
static void __timerthread_queue_free(void *p) {
struct timerthread_queue *ttq = p;
g_tree_foreach(ttq->entries, ttqe_free_all, ttq);
g_tree_destroy(ttq->entries);
mutex_destroy(&ttq->lock);
if (ttq->free_func)
ttq->free_func(p);
}
static int ttqe_compare(const void *a, const void *b) {
const struct timerthread_queue_entry *t1 = a;
const struct timerthread_queue_entry *t2 = b;
return timeval_cmp_ptr(&t1->when, &t2->when);
}
void *timerthread_queue_new(const char *type, size_t size,
struct timerthread *tt,
void (*run_now_func)(struct timerthread_queue *, void *),
void (*run_later_func)(struct timerthread_queue *, void *),
void (*free_func)(void *),
void (*entry_free_func)(void *))
{
struct timerthread_queue *ttq = obj_alloc0(type, size, __timerthread_queue_free);
ttq->type = type;
ttq->tt_obj.tt = tt;
assert(tt->func == timerthread_queue_run);
ttq->run_now_func = run_now_func;
ttq->run_later_func = run_later_func;
if (!ttq->run_later_func)
ttq->run_later_func = run_now_func;
ttq->free_func = free_func;
ttq->entry_free_func = entry_free_func;
mutex_init(&ttq->lock);
ttq->entries = g_tree_new(ttqe_compare);
return ttq;
}
void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_queue_entry *ttqe) {
// can we send immediately?
if (!timerthread_queue_run_one(ttq, ttqe, ttq->run_now_func))
return;
// queue for sending
ilog(LOG_DEBUG, "queuing up %s object for processing at %lu.%06u",
ttq->type,
(unsigned long) ttqe->when.tv_sec,
(unsigned int) ttqe->when.tv_usec);
// XXX recover log line fields
// struct rtp_header *rh = (void *) cp->s.s;
// ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)",
// (unsigned long) cp->to_send.tv_sec,
// (unsigned int) cp->to_send.tv_usec,
// ntohs(rh->seq_num),
// ntohl(rh->timestamp));
mutex_lock(&ttq->lock);
// this hands over ownership of cp, so we must copy the timeval out
struct timeval tv_send = ttqe->when;
g_tree_insert(ttq->entries, ttqe, ttqe);
struct timerthread_queue_entry *first_ttqe = g_tree_find_first(ttq->entries, NULL, NULL);
mutex_unlock(&ttq->lock);
// first packet in? we're probably not scheduled yet
if (first_ttqe == ttqe)
timerthread_obj_schedule_abs(&ttq->tt_obj, &tv_send);
}
static int ttqe_ptr_match(const void *ent, const void *ptr) {
const struct timerthread_queue_entry *ttqe = ent;
return ttqe->source == ptr;
}
unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) {
if (!ttq)
return 0;
unsigned int num = 0;
GQueue matches = G_QUEUE_INIT;
g_tree_find_all(&matches, ttq->entries, ttqe_ptr_match, ptr);
while (matches.length) {
struct timerthread_queue_entry *ttqe = g_queue_pop_head(&matches);
ttq->entry_free_func(ttqe);
num++;
}
return num;
}

+ 2
- 0
include/call.h View File

@ -193,6 +193,7 @@ struct rtp_payload_type;
struct media_player;
struct send_timer;
struct transport_protocol;
struct jitter_buffer;
typedef bencode_buffer_t call_buffer_t;
@ -266,6 +267,7 @@ struct packet_stream {
struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these
*ssrc_out; /* LOCK: out_lock */
struct send_timer *send_timer; /* RO */
struct jitter_buffer *jb; /* RO */
struct stats stats;
struct stats kernel_stats;


+ 2
- 2
include/codec.h View File

@ -8,6 +8,7 @@
#include "codeclib.h"
#include "aux.h"
#include "rtplib.h"
#include "timerthread.h"
struct call_media;
@ -40,11 +41,10 @@ struct codec_handler {
};
struct codec_packet {
struct timerthread_queue_entry ttq_entry;
str s;
struct rtp_header *rtp;
void *source; // opaque
void (*free_func)(void *);
struct timeval to_send;
};


+ 59
- 0
include/jitter_buffer.h View File

@ -0,0 +1,59 @@
#ifndef _JITTER_BUFFER_H_
#define _JITTER_BUFFER_H_
#include "auxlib.h"
#include "socket.h"
#include "timerthread.h"
#include "media_socket.h"
//#include "codec.h"
//
//struct packet_handler_ctx;
struct jb_packet;
struct media_packet;
//
struct jb_packet {
struct timerthread_queue_entry ttq_entry;
char *buf;
struct media_packet mp;
};
struct jitter_buffer {
struct timerthread_queue ttq;
mutex_t lock;
unsigned long first_send_ts;
struct timeval first_send;
unsigned int first_seq;
unsigned int rtptime_delta;
unsigned int next_exp_seq;
unsigned int cont_frames;
unsigned int cont_miss;
unsigned int clock_rate;
unsigned int payload_type;
unsigned int num_resets;
unsigned int initial_pkts;
unsigned int drift_mult_factor;
int buffer_len;
int clock_drift_val;
int buf_decremented;
struct call *call;
int disabled;
};
void jitter_buffer_init(void);
struct jitter_buffer *jitter_buffer_new(struct call *);
void jitter_buffer_free(struct jitter_buffer **);
int buffer_packet(struct media_packet *mp, const str *s);
void jb_packet_free(struct jb_packet **jbp);
void jitter_buffer_loop(void *p);
INLINE void jb_put(struct jitter_buffer **jb) {
if (!*jb)
return;
obj_put(&(*jb)->ttq.tt_obj);
*jb = NULL;
}
#endif

+ 2
- 0
include/main.h View File

@ -93,6 +93,8 @@ struct rtpengine_config {
char *mysql_query;
endpoint_t dtmf_udp_ep;
enum endpoint_learning endpoint_learning;
int jb_length;
int jb_clock_drift;
};


+ 2
- 4
include/media_player.h View File

@ -62,11 +62,9 @@ INLINE void media_player_put(struct media_player **mp) {
#endif
struct send_timer {
struct timerthread_obj tt_obj;
mutex_t lock;
struct timerthread_queue ttq;
struct call *call; // main reference that keeps this alive
struct packet_stream *sink;
GQueue packets;
};
@ -89,7 +87,7 @@ void send_timer_loop(void *p);
INLINE void send_timer_put(struct send_timer **st) {
if (!*st)
return;
obj_put(&(*st)->tt_obj);
obj_put(&(*st)->ttq.tt_obj);
*st = NULL;
}


+ 3
- 0
include/media_socket.h View File

@ -19,6 +19,7 @@ struct media_packet;
struct transport_protocol;
struct ssrc_ctx;
struct rtpengine_srtp;
struct jb_packet;
typedef int rtcp_filter_func(struct media_packet *, GQueue *);
typedef int (*rewrite_func)(str *, struct packet_stream *, struct stream_fd *, const endpoint_t *,
@ -174,6 +175,8 @@ const struct streamhandler *determine_handler(const struct transport_protocol *i
struct call_media *out_media, int must_recrypt);
int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp);
const struct transport_protocol *transport_protocol(const str *s);
//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered);
void play_buffered(struct jb_packet *cp);
/* XXX shouldn't be necessary */
/*


+ 28
- 0
include/timerthread.h View File

@ -22,6 +22,23 @@ struct timerthread_obj {
struct timeval last_run; /* ditto */
};
struct timerthread_queue {
struct timerthread_obj tt_obj;
const char *type;
mutex_t lock;
GTree *entries;
void (*run_now_func)(struct timerthread_queue *, void *);
void (*run_later_func)(struct timerthread_queue *, void *);
void (*free_func)(void *);
void (*entry_free_func)(void *);
};
struct timerthread_queue_entry {
struct timeval when;
void *source; // opaque
char __rest[0];
};
void timerthread_init(struct timerthread *, void (*)(void *));
void timerthread_run(void *);
@ -29,6 +46,17 @@ void timerthread_run(void *);
void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *);
void timerthread_obj_deschedule(struct timerthread_obj *);
// run_now_func = called if newly inserted object can be processed immediately by timerthread_queue_push within its calling context
// run_later_func = called from the separate timer thread
void *timerthread_queue_new(const char *type, size_t size,
struct timerthread *tt,
void (*run_now_func)(struct timerthread_queue *, void *),
void (*run_later_func)(struct timerthread_queue *, void *), // optional
void (*free_func)(void *),
void (*entry_free_func)(void *));
void timerthread_queue_run(void *ptr);
void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *);
unsigned int timerthread_queue_flush(struct timerthread_queue *, void *);
INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) {
if (!tt_obj)


+ 242
- 0
perl/NGCP/Rtpengine/AutoTest.pm View File

@ -0,0 +1,242 @@
package NGCP::Rtpengine::AutoTest;
use strict;
use warnings;
use NGCP::Rtpengine::Test;
use NGCP::Rtpclient::SRTP;
use Test::More;
use File::Temp;
use IPC::Open3;
use Time::HiRes;
use POSIX ":sys_wait_h";
use IO::Socket;
use Exporter;
our @ISA;
our @EXPORT;
BEGIN {
require Exporter;
@ISA = qw(Exporter);
our @EXPORT = qw(autotest_start new_call offer answer ft tt snd srtp_snd rtp rcv srtp_rcv
srtp_dec escape rtpm reverse_tags new_tt crlf sdp_split rtpe_req offer_answer);
};
my $rtpe_stdout;
my $rtpe_stderr;
my $rtpe_pid;
my $c;
my ($cid, $ft, $tt, @sockets, $tag_iter);
sub autotest_start {
my (@cmdline) = @_;
like $ENV{LD_PRELOAD}, qr/tests-preload/, 'LD_PRELOAD present';
is $ENV{RTPE_PRELOAD_TEST_ACTIVE}, '1', 'preload library is active';
SKIP: {
skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH};
ok -x $ENV{RTPE_BIN}, 'RTPE_BIN points to executable';
}
$rtpe_stdout = File::Temp::tempfile() or die;
$rtpe_stderr = File::Temp::tempfile() or die;
SKIP: {
skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH};
$rtpe_pid = open3(undef, '>&'.fileno($rtpe_stdout), '>&'.fileno($rtpe_stderr),
$ENV{RTPE_BIN}, @cmdline);
ok $rtpe_pid, 'daemon launched in background';
}
# keep trying to connect to the control socket while daemon is starting up
for (1 .. 300) {
$c = NGCP::Rtpengine->new($ENV{RTPENGINE_HOST} // '127.0.0.1', $ENV{RTPENGINE_PORT} // 2223);
last if $c->{socket};
Time::HiRes::usleep(100000); # 100 ms x 300 = 30 sec
}
1;
$c->{socket} or die;
$tag_iter = 0;
my $r = $c->req({command => 'ping'});
ok $r->{result} eq 'pong', 'ping works, daemon operational';
return 1;
}
sub new_call {
my @ports = @_;
for my $s (@sockets) {
$s->close();
}
@sockets = ();
$cid = $tag_iter++ . "-test-callID";
$ft = $tag_iter++ . "-test-fromtag";
$tt = $tag_iter++ . "-test-totag";
print("new call $cid\n");
for my $p (@ports) {
my ($addr, $port) = @{$p};
my $s = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp',
LocalHost => $addr, LocalPort => $port)
or die;
push(@sockets, $s);
}
return @sockets;
}
sub crlf {
my ($s) = @_;
$s =~ s/\r\n/\n/gs;
return $s;
}
sub sdp_split {
my ($s) = @_;
return split(/--------*\n/, $s);
}
sub rtpe_req {
my ($cmd, $name, $req) = @_;
$req->{command} = $cmd;
$req->{'call-id'} = $cid;
my $resp = $c->req($req);
is $resp->{result}, 'ok', "$name - '$cmd' status";
return $resp;
}
sub offer_answer {
my ($cmd, $name, $req, $sdps) = @_;
my ($sdp_in, $exp_sdp_out) = sdp_split($sdps);
$req->{'from-tag'} = $ft;
$req->{sdp} = $sdp_in;
my $resp = rtpe_req($cmd, $name, $req);
my $regexp = "^\Q$exp_sdp_out\E\$";
$regexp =~ s/\\\?/./gs;
$regexp =~ s/PORT/(\\d{1,5})/gs;
$regexp =~ s/ICEBASE/([0-9a-zA-Z]{16})/gs;
$regexp =~ s/ICEUFRAG/([0-9a-zA-Z]{8})/gs;
$regexp =~ s/ICEPWD/([0-9a-zA-Z]{26})/gs;
$regexp =~ s/CRYPTO128/([0-9a-zA-Z\/+]{40})/gs;
$regexp =~ s/CRYPTO192/([0-9a-zA-Z\/+]{51})/gs;
$regexp =~ s/CRYPTO256/([0-9a-zA-Z\/+]{62})/gs;
$regexp =~ s/LOOPER/([0-9a-f]{12})/gs;
my $crlf = crlf($resp->{sdp});
like $crlf, qr/$regexp/s, "$name - output '$cmd' SDP";
my @matches = $crlf =~ qr/$regexp/s;
return @matches;
}
sub offer {
return offer_answer('offer', @_);
}
sub answer {
my ($name, $req, $sdps) = @_;
$req->{'to-tag'} = $tt;
return offer_answer('answer', $name, $req, $sdps);
}
sub snd {
my ($sock, $dest, $packet) = @_;
$sock->send($packet, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die;
}
sub srtp_snd {
my ($sock, $dest, $packet, $srtp_ctx) = @_;
if (!$srtp_ctx->{skey}) {
my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs});
@$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt);
}
my ($enc, $out_roc) = NGCP::Rtpclient::SRTP::encrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)},
'', 0, 0, 0, $packet);
$srtp_ctx->{roc} = $out_roc;
$sock->send($enc, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die;
}
sub rtp {
my ($pt, $seq, $ts, $ssrc, $payload) = @_;
print("rtp in $pt $seq $ts $ssrc\n");
return pack('CCnNN a*', 0x80, $pt, $seq, $ts, $ssrc, $payload);
}
sub rcv {
my ($sock, $port, $match, $cb, $cb_arg) = @_;
my $p = '';
alarm(1);
my $addr = $sock->recv($p, 65535, 0) or die;
alarm(0);
my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload) = unpack('CCnNN a*', $p);
if ($payload) {
print("rtp recv $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n");
}
if ($cb) {
$p = $cb->($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $p, $cb_arg);
}
like $p, $match, 'received packet matches';
my @matches = $p =~ $match;
for my $m (@matches) {
if (length($m) == 2) {
($m) = unpack('n', $m);
}
elsif (length($m) == 4) {
($m) = unpack('N', $m);
}
}
return @matches;
}
sub srtp_rcv {
my ($sock, $port, $match, $srtp_ctx) = @_;
return rcv($sock, $port, $match, \&srtp_dec, $srtp_ctx);
}
sub srtp_dec {
my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $pack, $srtp_ctx) = @_;
if (!$srtp_ctx->{skey}) {
my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs});
@$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt);
}
my ($dec, $out_roc, $tag, $hmac) = NGCP::Rtpclient::SRTP::decrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)}, $pack);
$srtp_ctx->{roc} = $out_roc;
is $tag, substr($hmac, 0, length($tag)), 'SRTP auth tag matches';
return $dec;
}
sub escape {
return "\Q$_[0]\E";
}
sub rtpm {
my ($pt, $seq, $ts, $ssrc, $payload) = @_;
print("rtp matcher $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n");
my $re = '';
$re .= escape(pack('C', 0x80));
$re .= escape(pack('C', $pt));
$re .= $seq >= 0 ? escape(pack('n', $seq)) : '(..)';
$re .= $ts >= 0 ? escape(pack('N', $ts)) : '(....)';
$re .= $ssrc >= 0 ? escape(pack('N', $ssrc)) : '(....)';
$re .= escape($payload);
return qr/^$re$/s;
}
sub ft { return $ft; }
sub tt { return $tt; }
sub reverse_tags {
($tt, $ft) = ($ft, $tt);
}
sub new_tt {
$tt = $tag_iter++ . "-test-totag";
}
END {
if ($rtpe_pid) {
kill('INT', $rtpe_pid) or die;
# wait for daemon to terminate
my $status = -1;
for (1 .. 50) {
$status = waitpid($rtpe_pid, WNOHANG);
last if $status != 0;
Time::HiRes::usleep(100000); # 100 ms x 50 = 5 sec
}
kill('KILL', $rtpe_pid) if $status == 0;
$status == $rtpe_pid or die;
$? == 0 or die;
}
}
1;

+ 1
- 0
t/.gitignore View File

@ -53,3 +53,4 @@ test-dtmf-detect
*-test
dtmf_rx_fillin.h
*-test.c
jitter_buffer.c

+ 4
- 2
t/Makefile View File

@ -70,7 +70,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c
DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \
dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \
cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \
media_player.c
media_player.c jitter_buffer.c
HASHSRCS+= call_interfaces.c control_ng.c sdp.c
endif
@ -110,6 +110,8 @@ daemon-tests: tests-preload.so
mkdir fake-sockets
LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-sockets \
perl -I../perl auto-daemon-tests.pl
LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-sockets \
perl -I../perl auto-daemon-tests-jb.pl
test "$$(ls fake-sockets)" = ""
rmdir fake-sockets
@ -128,7 +130,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr
rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \
control_ng.strhash.o \
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o dtmflib.o
media_player.o jitter_buffer.o dtmflib.o
payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \
resample.o dtmflib.o


+ 94
- 0
t/auto-daemon-tests-jb.pl View File

@ -0,0 +1,94 @@
#!/usr/bin/perl
use strict;
use warnings;
use NGCP::Rtpengine::Test;
use NGCP::Rtpclient::SRTP;
use NGCP::Rtpengine::AutoTest;
use Test::More;
autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1
-n 2223 -c 12345 -f -L 7 -E -u 2222 --jitter-buffer=10))
or die;
my ($sock_a, $sock_b, $port_a, $port_b, $ssrc, $resp, $srtp_ctx_a, $srtp_ctx_b, @ret1, @ret2);
# RTP sequencing tests
($sock_a, $sock_b) = new_call([qw(198.51.100.1 2010)], [qw(198.51.100.3 2012)]);
($port_a) = offer('two codecs, no transcoding', { ICE => 'remove', replace => ['origin'] }, <<SDP);
v=0
o=- 1545997027 1 IN IP4 198.51.100.1
s=tester
t=0 0
m=audio 2010 RTP/AVP 0 8
c=IN IP4 198.51.100.1
a=sendrecv
----------------------------------
v=0
o=- 1545997027 1 IN IP4 203.0.113.1
s=tester
t=0 0
m=audio PORT RTP/AVP 0 8
c=IN IP4 203.0.113.1
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=sendrecv
a=rtcp:PORT
SDP
($port_b) = answer('two codecs, no transcoding', { ICE => 'remove', replace => ['origin'] }, <<SDP);
v=0
o=- 1545997027 1 IN IP4 198.51.100.3
s=tester
t=0 0
m=audio 2012 RTP/AVP 0 8
c=IN IP4 198.51.100.3
a=sendrecv
--------------------------------------
v=0
o=- 1545997027 1 IN IP4 203.0.113.1
s=tester
t=0 0
m=audio PORT RTP/AVP 0 8
c=IN IP4 203.0.113.1
a=rtpmap:0 PCMU/8000
a=rtpmap:8 PCMA/8000
a=sendrecv
a=rtcp:PORT
SDP
snd($sock_a, $port_b, rtp(0, 1000, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(0, 1000, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1000, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1000, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(0, 1001, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1001, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(0, 1010, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1010, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1000, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1000, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1000, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1000, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1001, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1001, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1010, 3000, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1010, 3000, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1011, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1011, 3160, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1012, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1012, 3320, 0x1234, "\x00" x 160));
snd($sock_a, $port_b, rtp(8, 1013, 3480, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1013, 3480, 0x1234, "\x00" x 160));
done_testing();

+ 26
- 220
t/auto-daemon-tests.pl View File

@ -4,191 +4,13 @@ use strict;
use warnings;
use NGCP::Rtpengine::Test;
use NGCP::Rtpclient::SRTP;
use NGCP::Rtpengine::AutoTest;
use Test::More;
use File::Temp;
use IPC::Open3;
use Time::HiRes;
use POSIX ":sys_wait_h";
use IO::Socket;
like $ENV{LD_PRELOAD}, qr/tests-preload/, 'LD_PRELOAD present';
is $ENV{RTPE_PRELOAD_TEST_ACTIVE}, '1', 'preload library is active';
SKIP: {
skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH};
ok -x $ENV{RTPE_BIN}, 'RTPE_BIN points to executable';
}
my $rtpe_stdout = File::Temp::tempfile() or die;
my $rtpe_stderr = File::Temp::tempfile() or die;
my $rtpe_pid;
SKIP: {
skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH};
$rtpe_pid = open3(undef, '>&'.fileno($rtpe_stdout), '>&'.fileno($rtpe_stderr),
$ENV{RTPE_BIN}, qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1
-n 2223 -c 12345 -f -L 7 -E -u 2222));
ok $rtpe_pid, 'daemon launched in background';
}
# keep trying to connect to the control socket while daemon is starting up
my $c;
for (1 .. 300) {
$c = NGCP::Rtpengine->new($ENV{RTPENGINE_HOST} // '127.0.0.1', $ENV{RTPENGINE_PORT} // 2223);
last if $c->{socket};
Time::HiRes::usleep(100000); # 100 ms x 300 = 30 sec
}
1;
$c->{socket} or die;
my ($cid, $ft, $tt, @sockets);
my ($tag_iter) = (0);
sub new_call {
my @ports = @_;
for my $s (@sockets) {
$s->close();
}
@sockets = ();
$cid = $tag_iter++ . "-test-callID";
$ft = $tag_iter++ . "-test-fromtag";
$tt = $tag_iter++ . "-test-totag";
print("new call $cid\n");
for my $p (@ports) {
my ($addr, $port) = @{$p};
my $s = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp',
LocalHost => $addr, LocalPort => $port)
or die;
push(@sockets, $s);
}
return @sockets;
}
sub crlf {
my ($s) = @_;
$s =~ s/\r\n/\n/gs;
return $s;
}
sub sdp_split {
my ($s) = @_;
return split(/--------*\n/, $s);
}
sub rtpe_req {
my ($cmd, $name, $req) = @_;
$req->{command} = $cmd;
$req->{'call-id'} = $cid;
my $resp = $c->req($req);
is $resp->{result}, 'ok', "$name - '$cmd' status";
return $resp;
}
sub offer_answer {
my ($cmd, $name, $req, $sdps) = @_;
my ($sdp_in, $exp_sdp_out) = sdp_split($sdps);
$req->{'from-tag'} = $ft;
$req->{sdp} = $sdp_in;
my $resp = rtpe_req($cmd, $name, $req);
my $regexp = "^\Q$exp_sdp_out\E\$";
$regexp =~ s/\\\?/./gs;
$regexp =~ s/PORT/(\\d{1,5})/gs;
$regexp =~ s/ICEBASE/([0-9a-zA-Z]{16})/gs;
$regexp =~ s/ICEUFRAG/([0-9a-zA-Z]{8})/gs;
$regexp =~ s/ICEPWD/([0-9a-zA-Z]{26})/gs;
$regexp =~ s/CRYPTO128/([0-9a-zA-Z\/+]{40})/gs;
$regexp =~ s/CRYPTO192/([0-9a-zA-Z\/+]{51})/gs;
$regexp =~ s/CRYPTO256/([0-9a-zA-Z\/+]{62})/gs;
$regexp =~ s/LOOPER/([0-9a-f]{12})/gs;
my $crlf = crlf($resp->{sdp});
like $crlf, qr/$regexp/s, "$name - output '$cmd' SDP";
my @matches = $crlf =~ qr/$regexp/s;
return @matches;
}
sub offer {
return offer_answer('offer', @_);
}
sub answer {
my ($name, $req, $sdps) = @_;
$req->{'to-tag'} = $tt;
return offer_answer('answer', $name, $req, $sdps);
}
sub snd {
my ($sock, $dest, $packet) = @_;
$sock->send($packet, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die;
}
sub srtp_snd {
my ($sock, $dest, $packet, $srtp_ctx) = @_;
if (!$srtp_ctx->{skey}) {
my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs});
@$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt);
}
my ($enc, $out_roc) = NGCP::Rtpclient::SRTP::encrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)},
'', 0, 0, 0, $packet);
$srtp_ctx->{roc} = $out_roc;
$sock->send($enc, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die;
}
sub rtp {
my ($pt, $seq, $ts, $ssrc, $payload) = @_;
print("rtp in $pt $seq $ts $ssrc\n");
return pack('CCnNN a*', 0x80, $pt, $seq, $ts, $ssrc, $payload);
}
sub rcv {
my ($sock, $port, $match, $cb, $cb_arg) = @_;
my $p = '';
alarm(1);
my $addr = $sock->recv($p, 65535, 0) or die;
alarm(0);
my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload) = unpack('CCnNN a*', $p);
if ($payload) {
print("rtp recv $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n");
}
if ($cb) {
$p = $cb->($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $p, $cb_arg);
}
like $p, $match, 'received packet matches';
my @matches = $p =~ $match;
for my $m (@matches) {
if (length($m) == 2) {
($m) = unpack('n', $m);
}
elsif (length($m) == 4) {
($m) = unpack('N', $m);
}
}
return @matches;
}
sub srtp_rcv {
my ($sock, $port, $match, $srtp_ctx) = @_;
return rcv($sock, $port, $match, \&srtp_dec, $srtp_ctx);
}
sub srtp_dec {
my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $pack, $srtp_ctx) = @_;
if (!$srtp_ctx->{skey}) {
my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs});
@$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt);
}
my ($dec, $out_roc, $tag, $hmac) = NGCP::Rtpclient::SRTP::decrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)}, $pack);
$srtp_ctx->{roc} = $out_roc;
is $tag, substr($hmac, 0, length($tag)), 'SRTP auth tag matches';
return $dec;
}
sub escape {
return "\Q$_[0]\E";
}
sub rtpm {
my ($pt, $seq, $ts, $ssrc, $payload) = @_;
print("rtp matcher $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n");
my $re = '';
$re .= escape(pack('C', 0x80));
$re .= escape(pack('C', $pt));
$re .= $seq >= 0 ? escape(pack('n', $seq)) : '(..)';
$re .= $ts >= 0 ? escape(pack('N', $ts)) : '(....)';
$re .= $ssrc >= 0 ? escape(pack('N', $ssrc)) : '(....)';
$re .= escape($payload);
return qr/^$re$/s;
}
{
my $r = $c->req({command => 'ping'});
ok $r->{result} eq 'pong', 'ping works, daemon operational';
}
autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1
-n 2223 -c 12345 -f -L 7 -E -u 2222))
or die;
my ($sock_a, $sock_b, $port_a, $port_b, $ssrc, $resp, $srtp_ctx_a, $srtp_ctx_b, @ret1, @ret2);
@ -771,7 +593,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => '0', volume => 10, duration => 100 });
{ 'from-tag' => ft(), code => '0', volume => 10, duration => 100 });
snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(96 | 0x80, 1002, 3320, $ssrc, "\x00\x0a\x00\xa0"));
@ -798,7 +620,7 @@ snd($sock_b, $port_a, rtp(0, 4001, 8160, 0x6543, "\x00" x 160));
rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards A',
{ 'from-tag' => $tt, code => '*', volume => 10, duration => 100 });
{ 'from-tag' => tt(), code => '*', volume => 10, duration => 100 });
snd($sock_b, $port_a, rtp(0, 4002, 8320, 0x6543, "\x00" x 160));
rcv($sock_a, $port_b, rtpm(96 | 0x80, 4002, 8320, $ssrc, "\x0a\x0a\x00\xa0"));
@ -879,7 +701,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1001, 3160, $ssrc, "\x2a" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => '0', volume => 10, duration => 100 });
{ 'from-tag' => ft(), code => '0', volume => 10, duration => 100 });
snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(96 | 0x80, 1002, 3320, $ssrc, "\x00\x0a\x00\xa0"));
@ -906,7 +728,7 @@ snd($sock_b, $port_a, rtp(8, 4001, 8160, 0x6543, "\x2a" x 160));
rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards A',
{ 'from-tag' => $tt, code => '#', volume => -10, duration => 100 });
{ 'from-tag' => tt(), code => '#', volume => -10, duration => 100 });
snd($sock_b, $port_a, rtp(8, 4002, 8320, 0x6543, "\x2a" x 160));
rcv($sock_a, $port_b, rtpm(96 | 0x80, 4002, 8320, $ssrc, "\x0b\x0a\x00\xa0"));
@ -981,7 +803,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => 'C', volume => 5, duration => 120, pause => 110 });
{ 'from-tag' => ft(), code => 'C', volume => 5, duration => 120, pause => 110 });
snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1002, 3320, $ssrc, "\xff\x93\x94\xbc\x2e\x56\xbf\x2b\x13\x1b\xa7\x8e\x98\x47\x25\x41\xe2\x24\x16\x2b\x99\x8e\x9f\x28\x1e\x3d\x5b\x23\x1c\xdf\x92\x8f\xb6\x1c\x1c\x40\x5d\x26\x25\xaa\x8f\x95\x3b\x15\x1d\x5e\xde\x2c\x38\x9d\x8f\x9e\x1f\x11\x20\xc0\xc1\x37\xdd\x99\x92\xb7\x15\x10\x2c\xac\xb5\x49\xb8\x97\x99\x37\x0f\x13\x58\xa0\xae\x67\xae\x99\xa4\x1f\x0d\x1a\xae\x9b\xad\x7b\xad\x9d\xbf\x16\x0e\x27\x9d\x98\xb0\x55\xb1\xa6\x3a\x11\x11\x63\x95\x98\xbf\x3e\xbb\xb4\x26\x10\x1a\xa9\x90\x9a\x4e\x30\xce\xd4\x1e\x12\x29\x99\x8e\xa1\x2d\x29\x6d\x4b\x1c\x18\xef\x91\x8f\xb6\x1f\x24\x57\x3e\x1d\x20\xa9\x8e\x95\x3e\x19\x23\x67\x3e\x21\x31\x9c\x8e\x9e\x22\x14\x26\xcd\x4a"));
@ -1017,7 +839,7 @@ snd($sock_b, $port_a, rtp(0, 4001, 8160, 0x6543, "\x00" x 160));
rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards A',
{ 'from-tag' => $tt, code => '4', volume => 3, duration => 150, pause => 100 });
{ 'from-tag' => tt(), code => '4', volume => 3, duration => 150, pause => 100 });
snd($sock_b, $port_a, rtp(0, 4002, 8320, 0x6543, "\x00" x 160));
rcv($sock_a, $port_b, rtpm(0, 4002, 8320, $ssrc, "\xff\x90\x8a\x93\xd9\x1b\x18\x27\x65\xe5\x33\x29\x4c\x9e\x8f\x91\xb8\x15\x09\x0d\x32\x98\x8e\x96\xbb\x2c\x2b\x4c\xd8\x34\x1c\x18\x2e\x9d\x8c\x8c\xa5\x1a\x0b\x0d\x27\xa3\x97\x9e\xbd\x4f\xc4\xaa\xb2\x2c\x12\x0e\x1e\xa1\x8b\x8a\x9c\x25\x0e\x10\x25\xb7\xa7\xb7\x5e\xcb\xa2\x98\x9f\x30\x0f\x0a\x16\xae\x8d\x8a\x98\x3a\x18\x19\x2c\xdd\xfd\x30\x2b\xce\x99\x8e\x95\x4c\x0f\x09\x10\xdf\x93\x8e\x9a\xec\x28\x2c\x56\xee\x2d\x1a\x1a\x48\x97\x8b\x8e\xba\x14\x0a\x0f\x39\x9d\x96\xa1\xcd\x4e\xbe\xab\xbe\x23\x10\x10\x2b\x99\x8a\x8c\xa7\x1b\x0d\x12\x2f\xad\xa7\xbc\x5e\xbd\x9f\x99\xa8\x23\x0d\x0b\x1d\x9f\x8b\x8c\x9f\x29\x16\x1b\x34\xcd\x60\x2f\x2f\xb6\x96"));
@ -1104,7 +926,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1001, 3160, $ssrc, "\x2a" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => 'C', volume => 5, duration => 120 });
{ 'from-tag' => ft(), code => 'C', volume => 5, duration => 120 });
snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(8, 1002, 3320, $ssrc, "\xd5\xb9\xbe\x97\x05\x70\xea\x01\x3e\x31\x82\xa5\xb2\x63\x0f\x69\xc1\x0f\x3d\x06\xb3\xa4\x8a\x03\x35\x14\x75\x0e\x36\xcc\xb8\xa5\x9d\x36\x36\x68\x49\x0d\x0c\x81\xa5\xbf\x16\x3f\x37\x4f\xcf\x07\x13\xb4\xa5\xb4\x0a\x3b\x0b\xeb\xe9\x12\xc9\xb3\xb8\x92\x3c\x3a\x07\x87\x9c\x61\x93\xb2\xb3\x12\x25\x39\x76\x8b\x85\x5a\x85\xb3\x8e\x35\x24\x30\x85\xb1\x87\x57\x84\xb7\xeb\x3c\x24\x0d\xb4\xb2\x9b\x70\x98\x8c\x11\x3b\x38\x41\xbf\xb2\xeb\x15\x96\x9f\x0d\x3a\x30\x83\xba\xb1\x7b\x1b\xfa\xf2\x34\x39\x03\xb0\xa5\x88\x04\x03\x5f\x67\x37\x32\xdd\xb8\xba\x9d\x35\x0e\x71\x15\x37\x0a\x80\xa4\xbf\x15\x33\x09\x45\x15\x0b\x18\xb6\xa4\xb4\x08\x3f\x0d\xe5\x66"));
@ -1141,7 +963,7 @@ snd($sock_b, $port_a, rtp(8, 4001, 8160, 0x6543, "\x2a" x 160));
rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards A',
{ 'from-tag' => $tt, code => '4', volume => 3, duration => 150 });
{ 'from-tag' => tt(), code => '4', volume => 3, duration => 150 });
snd($sock_b, $port_a, rtp(8, 4002, 8320, 0x6543, "\x2a" x 160));
rcv($sock_a, $port_b, rtpm(0, 4002, 8320, $ssrc, "\xff\x90\x8a\x93\xd9\x1b\x18\x27\x65\xe5\x33\x29\x4c\x9e\x8f\x91\xb8\x15\x09\x0d\x32\x98\x8e\x96\xbb\x2c\x2b\x4c\xd8\x34\x1c\x18\x2e\x9d\x8c\x8c\xa5\x1a\x0b\x0d\x27\xa3\x97\x9e\xbd\x4f\xc4\xaa\xb2\x2c\x12\x0e\x1e\xa1\x8b\x8a\x9c\x25\x0e\x10\x25\xb7\xa7\xb7\x5e\xcb\xa2\x98\x9f\x30\x0f\x0a\x16\xae\x8d\x8a\x98\x3a\x18\x19\x2c\xdd\xfd\x30\x2b\xce\x99\x8e\x95\x4c\x0f\x09\x10\xdf\x93\x8e\x9a\xec\x28\x2c\x56\xee\x2d\x1a\x1a\x48\x97\x8b\x8e\xba\x14\x0a\x0f\x39\x9d\x96\xa1\xcd\x4e\xbe\xab\xbe\x23\x10\x10\x2b\x99\x8a\x8c\xa7\x1b\x0d\x12\x2f\xad\xa7\xbc\x5e\xbd\x9f\x99\xa8\x23\x0d\x0b\x1d\x9f\x8b\x8c\x9f\x29\x16\x1b\x34\xcd\x60\x2f\x2f\xb6\x96"));
@ -1228,9 +1050,9 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => 'C', volume => 5, duration => 100 });
{ 'from-tag' => ft(), code => 'C', volume => 5, duration => 100 });
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => '4', volume => 5, duration => 100 });
{ 'from-tag' => ft(), code => '4', volume => 5, duration => 100 });
snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1002, 3320, $ssrc, "\xff\x93\x94\xbc\x2e\x56\xbf\x2b\x13\x1b\xa7\x8e\x98\x47\x25\x41\xe2\x24\x16\x2b\x99\x8e\x9f\x28\x1e\x3d\x5b\x23\x1c\xdf\x92\x8f\xb6\x1c\x1c\x40\x5d\x26\x25\xaa\x8f\x95\x3b\x15\x1d\x5e\xde\x2c\x38\x9d\x8f\x9e\x1f\x11\x20\xc0\xc1\x37\xdd\x99\x92\xb7\x15\x10\x2c\xac\xb5\x49\xb8\x97\x99\x37\x0f\x13\x58\xa0\xae\x67\xae\x99\xa4\x1f\x0d\x1a\xae\x9b\xad\x7b\xad\x9d\xbf\x16\x0e\x27\x9d\x98\xb0\x55\xb1\xa6\x3a\x11\x11\x63\x95\x98\xbf\x3e\xbb\xb4\x26\x10\x1a\xa9\x90\x9a\x4e\x30\xce\xd4\x1e\x12\x29\x99\x8e\xa1\x2d\x29\x6d\x4b\x1c\x18\xef\x91\x8f\xb6\x1f\x24\x57\x3e\x1d\x20\xa9\x8e\x95\x3e\x19\x23\x67\x3e\x21\x31\x9c\x8e\x9e\x22\x14\x26\xcd\x4a"));
@ -1340,9 +1162,9 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160));
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => '0', volume => 10, duration => 100 });
{ 'from-tag' => ft(), code => '0', volume => 10, duration => 100 });
$resp = rtpe_req('play DTMF', 'inject DTMF towards B',
{ 'from-tag' => $ft, code => '1', volume => 6, duration => 100 });
{ 'from-tag' => ft(), code => '1', volume => 6, duration => 100 });
snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160));
rcv($sock_b, $port_a, rtpm(96 | 0x80, 1002, 3320, $ssrc, "\x00\x0a\x00\xa0"));
@ -2792,7 +2614,7 @@ a=sendrecv
a=rtcp:PORT
SDP
$resp = rtpe_req('play media', 'media playback, offer only', { 'from-tag' => $ft, blob => $wav_file });
$resp = rtpe_req('play media', 'media playback, offer only', { 'from-tag' => ft(), blob => $wav_file });
is $resp->{duration}, 100, 'media duration';
my ($ts, $seq);
@ -2848,7 +2670,7 @@ a=rtcp:PORT
SDP
$resp = rtpe_req('play media', 'media playback, side A', { 'from-tag' => $ft, blob => $wav_file });
$resp = rtpe_req('play media', 'media playback, side A', { 'from-tag' => ft(), blob => $wav_file });
is $resp->{duration}, 100, 'media duration';
($seq, $ts, $ssrc) = rcv($sock_a, -1, rtpm(8 | 0x80, -1, -1, -1, $pcma_1));
@ -2903,7 +2725,7 @@ a=rtcp:PORT
SDP
$resp = rtpe_req('play media', 'media playback, side B', { 'from-tag' => $tt, blob => $wav_file });
$resp = rtpe_req('play media', 'media playback, side B', { 'from-tag' => tt(), blob => $wav_file });
is $resp->{duration}, 100, 'media duration';
($seq, $ts, $ssrc) = rcv($sock_b, -1, rtpm(8 | 0x80, -1, -1, -1, $pcma_1));
@ -2912,7 +2734,7 @@ rcv($sock_b, -1, rtpm(8, $seq + 2, $ts + 160 * 2, $ssrc, $pcma_3));
rcv($sock_b, -1, rtpm(8, $seq + 3, $ts + 160 * 3, $ssrc, $pcma_4));
rcv($sock_b, -1, rtpm(8, $seq + 4, $ts + 160 * 4, $ssrc, $pcma_5));
$resp = rtpe_req('play media', 'restart media playback', { 'from-tag' => $tt, blob => $wav_file });
$resp = rtpe_req('play media', 'restart media playback', { 'from-tag' => tt(), blob => $wav_file });
is $resp->{duration}, 100, 'media duration';
$ts += 160 * 5;
@ -3102,7 +2924,7 @@ a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:DVM+BTeYX2UI1LaA9bgXrcBEDBxoItA9/39fSo
SDP
$resp = rtpe_req('play media', 'media playback, SRTP', { 'from-tag' => $ft, blob => $wav_file });
$resp = rtpe_req('play media', 'media playback, SRTP', { 'from-tag' => ft(), blob => $wav_file });
is $resp->{duration}, 100, 'media duration';
my $srtp_ctx = {
@ -4046,10 +3868,10 @@ snd($sock_b, $port_a, rtp(0, 4000, 5000, 0x4567, "\x88" x 160));
($ssrc) = rcv($sock_a, $port_b, rtpm(0, 4000, 5000, -1, "\x88" x 160));
# reverse re-invite
($tt, $ft) = ($ft, $tt);
reverse_tags();
(undef, $port_b) = offer('gh 766 reinvite',
{ 'to-tag' => $tt,
{ 'to-tag' => tt(),
ICE => 'remove', replace => ['origin', 'session-connection'],
flags => [ "loop-protect", "asymmetric" ] }, <<SDP);
v=0
@ -4978,10 +4800,10 @@ a=rtcp:PORT
a=ptime:20
SDP
rtpe_req('delete', 'media playback after delete', { 'from-tag' => $ft });
rtpe_req('delete', 'media playback after delete', { 'from-tag' => ft() });
# new to-tag
$tt = $tag_iter++ . "-test-totag";
new_tt();
offer('media playback after delete', { ICE => 'remove', replace => ['origin'],
'transport-protocol' => 'transparent', flags => ['strict-source', 'record-call'],
@ -5053,7 +4875,7 @@ SDP
#rtpe_req('block media', 'media playback after delete', { });
$resp = rtpe_req('play media', 'media playback after delete', { 'from-tag' => $tt, 'to-tag' => $tt,
$resp = rtpe_req('play media', 'media playback after delete', { 'from-tag' => tt(), 'to-tag' => tt(),
blob => $wav_file });
is $resp->{duration}, 100, 'media duration';
@ -5066,20 +4888,4 @@ rcv($sock_b, -1, rtpm(8, $seq + 4, $ts + 160 * 4, $ssrc, $pcma_5));
END {
if ($rtpe_pid) {
kill('INT', $rtpe_pid) or die;
# wait for daemon to terminate
my $status = -1;
for (1 .. 50) {
$status = waitpid($rtpe_pid, WNOHANG);
last if $status != 0;
Time::HiRes::usleep(100000); # 100 ms x 50 = 5 sec
}
kill('KILL', $rtpe_pid) if $status == 0;
$status == $rtpe_pid or die;
$? == 0 or die;
}
}
done_testing();

Loading…
Cancel
Save