Browse Source

TT#14008 convert RTCP timer and DTX to regular timer

Change-Id: I7f9e1e586e237d6b40ee250ed149cb21eae87f95
pull/1287/head
Richard Fuchs 5 years ago
parent
commit
5c50ec2bbe
4 changed files with 77 additions and 65 deletions
  1. +1
    -1
      daemon/call.c
  2. +72
    -63
      daemon/codec.c
  3. +2
    -1
      include/call.h
  4. +2
    -0
      include/codec.h

+ 1
- 1
daemon/call.c View File

@ -3195,7 +3195,7 @@ struct call_monologue *call_get_mono_dialogue(struct call *call, const str *from
static void media_stop(struct call_media *m) { static void media_stop(struct call_media *m) {
t38_gateway_stop(m->t38_gateway); t38_gateway_stop(m->t38_gateway);
codec_handlers_stop(&m->codec_handlers_store); codec_handlers_stop(&m->codec_handlers_store);
m->rtcp_timer.tv_sec = 0;
rtcp_timer_stop(&m->rtcp_timer);
} }
static void __monologue_stop(struct call_monologue *ml) { static void __monologue_stop(struct call_monologue *ml) {
media_player_stop(ml->player); media_player_stop(ml->player);


+ 72
- 63
daemon/codec.c View File

@ -20,7 +20,15 @@
struct codec_timer {
struct timerthread_obj tt_obj;
struct timeval next;
void (*func)(struct codec_timer *);
};
static codec_handler_func handler_func_passthrough; static codec_handler_func handler_func_passthrough;
static struct timerthread codec_timers_thread;
static struct rtp_payload_type *__rtp_payload_type_copy(const struct rtp_payload_type *pt); static struct rtp_payload_type *__rtp_payload_type_copy(const struct rtp_payload_type *pt);
static void __rtp_payload_type_dup(struct call *call, struct rtp_payload_type *pt); static void __rtp_payload_type_dup(struct call *call, struct rtp_payload_type *pt);
@ -69,7 +77,7 @@ struct codec_ssrc_handler;
struct transcode_packet; struct transcode_packet;
struct dtx_buffer { struct dtx_buffer {
struct timerthread_queue ttq;
struct codec_timer ct;
mutex_t lock; mutex_t lock;
struct codec_ssrc_handler *csh; struct codec_ssrc_handler *csh;
int ptime; // ms per packet int ptime; // ms per packet
@ -80,7 +88,6 @@ struct dtx_buffer {
struct media_packet last_mp; struct media_packet last_mp;
unsigned long head_ts; unsigned long head_ts;
uint32_t ssrc; uint32_t ssrc;
struct timerthread_queue_entry ttq_entry;
time_t start; time_t start;
}; };
struct dtx_packet { struct dtx_packet {
@ -151,21 +158,15 @@ struct codec_tracker {
GHashTable *supp_codecs; // telephone-event etc => hash table of clock rates GHashTable *supp_codecs; // telephone-event etc => hash table of clock rates
}; };
struct rtcp_timer_queue {
struct timerthread_queue ttq;
};
struct rtcp_timer { struct rtcp_timer {
struct timerthread_queue_entry ttq_entry;
struct codec_timer ct;
struct call *call; struct call *call;
struct call_media *media; struct call_media *media;
}; };
static struct timerthread codec_timers_thread;
static struct rtcp_timer_queue *rtcp_timer_queue;
static codec_handler_func handler_func_passthrough_ssrc; static codec_handler_func handler_func_passthrough_ssrc;
static codec_handler_func handler_func_transcode; static codec_handler_func handler_func_transcode;
static codec_handler_func handler_func_playback; static codec_handler_func handler_func_playback;
@ -1176,38 +1177,40 @@ static void __rtcp_timer_free(void *p) {
struct rtcp_timer *rt = p; struct rtcp_timer *rt = p;
if (rt->call) if (rt->call)
obj_put(rt->call); obj_put(rt->call);
g_slice_free1(sizeof(*rt), rt);
} }
static void __rtcp_timer_run(struct codec_timer *);
// master lock held in W // master lock held in W
static void __codec_rtcp_timer_schedule(struct call_media *media) { static void __codec_rtcp_timer_schedule(struct call_media *media) {
struct rtcp_timer *rt = g_slice_alloc0(sizeof(*rt));
rt->ttq_entry.when = media->rtcp_timer;
rt->call = obj_get(media->call);
rt->media = media;
struct rtcp_timer *rt = media->rtcp_timer;
if (!rt) {
media->rtcp_timer = rt = obj_alloc0("rtcp_timer", sizeof(*rt), __rtcp_timer_free);
rt->ct.tt_obj.tt = &codec_timers_thread;
rt->call = obj_get(media->call);
rt->media = media;
rt->ct.next = rtpe_now;
rt->ct.func = __rtcp_timer_run;
}
timerthread_queue_push(&rtcp_timer_queue->ttq, &rt->ttq_entry);
timeval_add_usec(&rt->ct.next, 5000000 + (ssl_random() % 2000000));
timerthread_obj_schedule_abs(&rt->ct.tt_obj, &rt->ct.next);
} }
// no lock held // no lock held
static void __rtcp_timer_run(struct timerthread_queue *q, void *p) {
struct rtcp_timer *rt = p;
static void __rtcp_timer_run(struct codec_timer *ct) {
struct rtcp_timer *rt = (void *) ct;
// check scheduling // check scheduling
rwlock_lock_w(&rt->call->master_lock); rwlock_lock_w(&rt->call->master_lock);
struct call_media *media = rt->media; struct call_media *media = rt->media;
struct timeval rtcp_timer = media->rtcp_timer;
log_info_call(rt->call); log_info_call(rt->call);
if (!rtcp_timer.tv_sec || timeval_diff(&rtpe_now, &rtcp_timer) < 0 || !proto_is_rtp(media->protocol)
|| !MEDIA_ISSET(media, RTCP_GEN))
{
media->rtcp_timer.tv_sec = 0;
if (media->rtcp_timer != rt || !proto_is_rtp(media->protocol) || !MEDIA_ISSET(media, RTCP_GEN)) {
if (media->rtcp_timer == rt)
rtcp_timer_stop(&media->rtcp_timer);
rwlock_unlock_w(&rt->call->master_lock); rwlock_unlock_w(&rt->call->master_lock);
__rtcp_timer_free(rt);
goto out; goto out;
} }
timeval_add_usec(&rtcp_timer, 5000000 + (ssl_random() % 2000000));
media->rtcp_timer = rtcp_timer;
timeval_add_usec(&ct->next, 5000000 + (ssl_random() % 2000000));
__codec_rtcp_timer_schedule(media); __codec_rtcp_timer_schedule(media);
// switch locks to be more graceful // switch locks to be more graceful
@ -1233,18 +1236,13 @@ static void __rtcp_timer_run(struct timerthread_queue *q, void *p) {
if (ssrc_out) if (ssrc_out)
obj_put(&ssrc_out->parent->h); obj_put(&ssrc_out->parent->h);
__rtcp_timer_free(rt);
out: out:
log_info_clear(); log_info_clear();
} }
// master lock held in W // master lock held in W
static void __codec_rtcp_timer(struct call_media *receiver) { static void __codec_rtcp_timer(struct call_media *receiver) {
if (receiver->rtcp_timer.tv_sec) // already scheduled
if (receiver->rtcp_timer) // already scheduled
return; return;
receiver->rtcp_timer = rtpe_now;
timeval_add_usec(&receiver->rtcp_timer, 5000000 + (ssl_random() % 2000000));
__codec_rtcp_timer_schedule(receiver); __codec_rtcp_timer_schedule(receiver);
// XXX unify with media player into a generic RTCP player // XXX unify with media player into a generic RTCP player
} }
@ -1650,6 +1648,19 @@ static struct codec_handler *codec_handler_get_udptl(struct call_media *m) {
#endif #endif
// master lock held in W
static void codec_timer_stop(struct codec_timer **ctp) {
if (!ctp || !*ctp)
return;
obj_put(&(*ctp)->tt_obj);
*ctp = NULL;
}
// master lock held in W
void rtcp_timer_stop(struct rtcp_timer **rtp) {
codec_timer_stop((struct codec_timer **) rtp);
}
// call must be locked in R // call must be locked in R
struct codec_handler *codec_handler_get(struct call_media *m, int payload_type) { struct codec_handler *codec_handler_get(struct call_media *m, int payload_type) {
#ifdef WITH_TRANSCODING #ifdef WITH_TRANSCODING
@ -2411,12 +2422,12 @@ static int __buffer_dtx(struct dtx_buffer *dtxb, struct codec_ssrc_handler *deco
ts, dtxb->packets.length); ts, dtxb->packets.length);
// schedule timer if not running yet // schedule timer if not running yet
if (!dtxb->ttq_entry.when.tv_sec) {
if (!dtxb->ct.next.tv_sec) {
if (!dtxb->ssrc) if (!dtxb->ssrc)
dtxb->ssrc = mp->ssrc_in->parent->h.ssrc; dtxb->ssrc = mp->ssrc_in->parent->h.ssrc;
dtxb->ttq_entry.when = mp->tv;
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_delay * 1000);
timerthread_queue_push(&dtxb->ttq, &dtxb->ttq_entry);
dtxb->ct.next = mp->tv;
timeval_add_usec(&dtxb->ct.next, rtpe_config.dtx_delay * 1000);
timerthread_obj_schedule_abs(&dtxb->ct.tt_obj, &dtxb->ct.next);
} }
mutex_unlock(&dtxb->lock); mutex_unlock(&dtxb->lock);
@ -2432,8 +2443,11 @@ static void dtx_packet_free(struct dtx_packet *dtxp) {
obj_put(&dtxp->decoder_handler->h); obj_put(&dtxp->decoder_handler->h);
g_slice_free1(sizeof(*dtxp), dtxp); g_slice_free1(sizeof(*dtxp), dtxp);
} }
static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
struct dtx_buffer *dtxb = (void *) ttq;
static void dtx_buffer_stop(struct dtx_buffer **dtxbp) {
codec_timer_stop((struct codec_timer **) dtxbp);
}
static void __dtx_send_later(struct codec_timer *ct) {
struct dtx_buffer *dtxb = (void *) ct;
struct media_packet mp_copy = {0,}; struct media_packet mp_copy = {0,};
int ret = 0, discard = 0; int ret = 0, discard = 0;
unsigned long ts; unsigned long ts;
@ -2502,10 +2516,12 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
if (!call || !ch || !ps || !ps->ssrc_in if (!call || !ch || !ps || !ps->ssrc_in
|| dtxb->ssrc != ps->ssrc_in->parent->h.ssrc || dtxb->ssrc != ps->ssrc_in->parent->h.ssrc
|| dtxb->ttq_entry.when.tv_sec == 0) {
|| dtxb->ct.next.tv_sec == 0) {
// shut down or SSRC change // shut down or SSRC change
ilogs(dtx, LOG_DEBUG, "DTX buffer for %lx has been shut down", (unsigned long) dtxb->ssrc); ilogs(dtx, LOG_DEBUG, "DTX buffer for %lx has been shut down", (unsigned long) dtxb->ssrc);
dtxb->ttq_entry.when.tv_sec = 0;
if (ch)
dtx_buffer_stop(&ch->dtx_buffer);
dtxb->ct.next.tv_sec = 0;
dtxb->head_ts = 0; dtxb->head_ts = 0;
mutex_unlock(&dtxb->lock); mutex_unlock(&dtxb->lock);
goto out; // shut down goto out; // shut down
@ -2518,7 +2534,7 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
"(%li ms < %i ms), " "(%li ms < %i ms), "
"pushing DTX timer forward my %i ms", "pushing DTX timer forward my %i ms",
tv_diff / 1000, rtpe_config.dtx_delay, rtpe_config.dtx_shift); tv_diff / 1000, rtpe_config.dtx_delay, rtpe_config.dtx_shift);
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * 1000);
timeval_add_usec(&dtxb->ct.next, rtpe_config.dtx_shift * 1000);
} }
else if (dtxp && ts_diff < dtxb->tspp) { else if (dtxp && ts_diff < dtxb->tspp) {
// TS underflow // TS underflow
@ -2531,7 +2547,7 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
"(TS %lu, diff %li), " "(TS %lu, diff %li), "
"pushing DTX timer forward by %i ms and discarding packet", "pushing DTX timer forward by %i ms and discarding packet",
ts, ts_diff, rtpe_config.dtx_shift); ts, ts_diff, rtpe_config.dtx_shift);
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * 1000);
timeval_add_usec(&dtxb->ct.next, rtpe_config.dtx_shift * 1000);
discard = 1; discard = 1;
} }
} }
@ -2545,7 +2561,7 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
ilogs(dtx, LOG_DEBUG, "DTX timer queue overflowing (%i packets in queue, " ilogs(dtx, LOG_DEBUG, "DTX timer queue overflowing (%i packets in queue, "
"%lli ms delay), speeding up DTX timer by %i ms", "%lli ms delay), speeding up DTX timer by %i ms",
dtxb->packets.length, ts_diff_us / 1000, rtpe_config.dtx_shift); dtxb->packets.length, ts_diff_us / 1000, rtpe_config.dtx_shift);
timeval_add_usec(&dtxb->ttq_entry.when, rtpe_config.dtx_shift * -1000);
timeval_add_usec(&dtxb->ct.next, rtpe_config.dtx_shift * -1000);
} }
} }
@ -2604,8 +2620,8 @@ static void __dtx_send_later(struct timerthread_queue *ttq, void *p) {
} }
// schedule next run // schedule next run
timeval_add_usec(&dtxb->ttq_entry.when, dtxb->ptime * 1000);
timerthread_queue_push(&dtxb->ttq, &dtxb->ttq_entry);
timeval_add_usec(&dtxb->ct.next, dtxb->ptime * 1000);
timerthread_obj_schedule_abs(&dtxb->ct.tt_obj, &dtxb->ct.next);
mutex_unlock(&dtxb->lock); mutex_unlock(&dtxb->lock);
@ -2663,8 +2679,9 @@ static void __dtx_setup(struct codec_ssrc_handler *ch) {
return; return;
struct dtx_buffer *dtx = struct dtx_buffer *dtx =
ch->dtx_buffer = timerthread_queue_new("dtx_buffer", sizeof(*ch->dtx_buffer),
&codec_timers_thread, NULL, __dtx_send_later, __dtx_free, NULL);
ch->dtx_buffer = obj_alloc0("dtx_buffer", sizeof(*dtx), __dtx_free);
dtx->ct.tt_obj.tt = &codec_timers_thread;
dtx->ct.func = __dtx_send_later;
dtx->csh = obj_get(&ch->h); dtx->csh = obj_get(&ch->h);
dtx->call = obj_get(ch->handler->media->call); dtx->call = obj_get(ch->handler->media->call);
mutex_init(&dtx->lock); mutex_init(&dtx->lock);
@ -2685,8 +2702,7 @@ static void __ssrc_handler_stop(void *p) {
__dtx_shutdown(ch->dtx_buffer); __dtx_shutdown(ch->dtx_buffer);
mutex_unlock(&ch->dtx_buffer->lock); mutex_unlock(&ch->dtx_buffer->lock);
obj_put(&ch->dtx_buffer->ttq.tt_obj);
ch->dtx_buffer = NULL;
dtx_buffer_stop(&ch->dtx_buffer);
} }
} }
void codec_handlers_stop(GQueue *q) { void codec_handlers_stop(GQueue *q) {
@ -2903,8 +2919,7 @@ static void __free_ssrc_handler(void *chp) {
resample_shutdown(&ch->dtmf_resampler); resample_shutdown(&ch->dtmf_resampler);
g_queue_clear_full(&ch->dtmf_events, dtmf_event_free); g_queue_clear_full(&ch->dtmf_events, dtmf_event_free);
g_queue_clear_full(&ch->silence_events, silence_event_free); g_queue_clear_full(&ch->silence_events, silence_event_free);
if (ch->dtx_buffer)
obj_put(&ch->dtx_buffer->ttq.tt_obj);
dtx_buffer_stop(&ch->dtx_buffer);
} }
static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) {
@ -3988,23 +4003,17 @@ void codec_rtp_payload_types(struct call_media *media, struct call_media *other_
g_hash_table_destroy(masked); g_hash_table_destroy(masked);
} }
static void codec_timers_run(void *p) {
struct codec_timer *ct = p;
ct->func(ct);
}
void codecs_init(void) { void codecs_init(void) {
#ifdef WITH_TRANSCODING
// XXX not real queue timer - unify to simple timerthread
timerthread_init(&codec_timers_thread, timerthread_queue_run);
rtcp_timer_queue = timerthread_queue_new("rtcp_timer_queue", sizeof(*rtcp_timer_queue),
&codec_timers_thread, NULL, __rtcp_timer_run, NULL, __rtcp_timer_free);
#endif
timerthread_init(&codec_timers_thread, codec_timers_run);
} }
void codecs_cleanup(void) { void codecs_cleanup(void) {
#ifdef WITH_TRANSCODING
obj_put(&rtcp_timer_queue->ttq.tt_obj);
timerthread_free(&codec_timers_thread); timerthread_free(&codec_timers_thread);
#endif
} }
void codec_timers_loop(void *p) { void codec_timers_loop(void *p) {
#ifdef WITH_TRANSCODING
//ilog(LOG_DEBUG, "codec_timers_loop");
timerthread_run(&codec_timers_thread); timerthread_run(&codec_timers_thread);
#endif
} }

+ 2
- 1
include/call.h View File

@ -205,6 +205,7 @@ struct send_timer;
struct transport_protocol; struct transport_protocol;
struct jitter_buffer; struct jitter_buffer;
struct codec_tracker; struct codec_tracker;
struct rtcp_timer;
typedef bencode_buffer_t call_buffer_t; typedef bencode_buffer_t call_buffer_t;
@ -344,7 +345,7 @@ struct call_media {
GQueue codec_handlers_store; // storage for struct codec_handler GQueue codec_handlers_store; // storage for struct codec_handler
struct codec_handler *codec_handler_cache; struct codec_handler *codec_handler_cache;
struct rtcp_handler *rtcp_handler; struct rtcp_handler *rtcp_handler;
struct timeval rtcp_timer; // master lock for scheduling purposes
struct rtcp_timer *rtcp_timer; // master lock for scheduling purposes
struct codec_handler *dtmf_injector; struct codec_handler *dtmf_injector;
struct t38_gateway *t38_gateway; struct t38_gateway *t38_gateway;
struct codec_handler *t38_handler; struct codec_handler *t38_handler;


+ 2
- 0
include/codec.h View File

@ -20,6 +20,7 @@ struct codec_ssrc_handler;
struct rtp_header; struct rtp_header;
struct stream_params; struct stream_params;
struct supp_codec_tracker; struct supp_codec_tracker;
struct rtcp_timer;
typedef int codec_handler_func(struct codec_handler *, struct media_packet *); typedef int codec_handler_func(struct codec_handler *, struct media_packet *);
@ -65,6 +66,7 @@ struct codec_packet {
void codecs_init(void); void codecs_init(void);
void codecs_cleanup(void); void codecs_cleanup(void);
void codec_timers_loop(void *); void codec_timers_loop(void *);
void rtcp_timer_stop(struct rtcp_timer **);
struct codec_handler *codec_handler_get(struct call_media *, int payload_type); struct codec_handler *codec_handler_get(struct call_media *, int payload_type);
void codec_handlers_free(struct call_media *); void codec_handlers_free(struct call_media *);


Loading…
Cancel
Save