#include "jitter_buffer.h" #include #include #include "timerthread.h" #include "media_socket.h" #include "call.h" #include "codec.h" #include "main.h" #include "rtcplib.h" #include "bufferpool.h" #define INITIAL_PACKETS 0x1E #define CONT_SEQ_COUNT 0x1F4 #define CONT_MISS_COUNT 0x0A #define CLOCK_DRIFT_MULT 0x28 #define DELAY_FACTOR 0x64 #define COMFORT_NOISE 0x0D #define JB_ADAPTIVE_MIN_SAMPLES 0x0A // Minimum samples before calculating buffer size (10) #define JB_ADAPTIVE_RECALC_INTERVAL 0x0A // Recalculate buffer size every N packets (10) #define JB_MAX_BURST_SIZE 0x03E8 // Maximum burst size to prevent overflow (1000 packets = 20 sec) #define JB_MAX_JITTER_US 0x1E8480 // Maximum jitter value (2000000 µs = 2 seconds) static struct timerthread jitter_buffer_thread; void jitter_buffer_init(void) { //ilog(LOG_DEBUG, "jitter_buffer_init"); unsigned int num_threads = rtpe_config.jb_length > 0 ? rtpe_config.media_num_threads : 0; timerthread_init(&jitter_buffer_thread, num_threads, timerthread_queue_run); } void jitter_buffer_init_free(void) { //ilog(LOG_DEBUG, "jitter_buffer_free"); timerthread_free(&jitter_buffer_thread); } static void jitter_buffer_flush(struct jitter_buffer *jb) { mutex_unlock(&jb->lock); timerthread_queue_flush_data(&jb->ttq); mutex_lock(&jb->lock); } // jb is locked static void reset_jitter_buffer(struct jitter_buffer *jb) { //ilog(LOG_INFO, "reset_jitter_buffer"); jb->first_send_ts = 0; jb->first_seq = 0; jb->rtptime_delta = 0; jb->buffer_len = 0; jb->cont_frames = 0; jb->cont_miss = 0; jb->next_exp_seq = 0; jb->clock_rate = 0; jb->payload_type = 0; jb->clock_drift_val = 0; jb->prev_seq_ts = rtpe_now; jb->prev_seq = 0; jb->jitter_mean = 0.0; jb->jitter_variance = 0.0; jb->jitter_m2 = 0.0; jb->jitter_samples = 0; jb->dynamic_capacity = 0; jb->num_resets++; if(g_tree_nnodes(jb->ttq.entries) > 0) jitter_buffer_flush(jb); //disable jitter buffer in case of more than 2 resets if(jb->num_resets >= 2) jb->disabled = 1; } static inline rtp_payload_type *codec_rtp_pt(struct media_packet *mp, int payload_type) { return t_hash_table_lookup(mp->media->codecs.codecs, GINT_TO_POINTER(payload_type)); } static int get_clock_rate(struct media_packet *mp, int payload_type) { struct jitter_buffer *jb = mp->stream->jb; int clock_rate = 0; if(jb->clock_rate && jb->payload_type == payload_type) return jb->clock_rate; const rtp_payload_type *rtp_pt = codec_rtp_pt(mp, payload_type); if(rtp_pt) { if(rtp_pt->codec_def && !rtp_pt->codec_def->dtmf) { clock_rate = jb->clock_rate = rtp_pt->clock_rate; jb->payload_type = payload_type; } else clock_rate = jb->clock_rate; //dtmf packet continue with same clockrate } else ilog(LOG_DEBUG, "clock_rate not present payload_type = %d", payload_type); return clock_rate; } // jb is locked static int get_target_capacity(struct jitter_buffer *jb) { if (rtpe_config.jb_adaptive && jb->dynamic_capacity > 0) return jb->dynamic_capacity; return rtpe_config.jb_length; } // jb is locked static void update_jitter_statistics(struct jitter_buffer *jb, int64_t jitter_sample_us) { if (!rtpe_config.jb_adaptive) return; if (jitter_sample_us < 0 || jitter_sample_us > JB_MAX_JITTER_US) { ilog(LOG_WARN, "Extreme jitter value detected: %lld µs, ignoring", (long long)jitter_sample_us); return; } jb->jitter_samples++; double delta = (double)jitter_sample_us - jb->jitter_mean; jb->jitter_mean += delta / (double)jb->jitter_samples; double delta2 = (double)jitter_sample_us - jb->jitter_mean; jb->jitter_m2 += delta * delta2; if (jb->jitter_samples > 1) jb->jitter_variance = jb->jitter_m2 / (double)(jb->jitter_samples - 1); } // jb is locked static void calculate_adaptive_buffer_size(struct jitter_buffer *jb) { if (!rtpe_config.jb_adaptive || jb->jitter_samples < JB_ADAPTIVE_MIN_SAMPLES) return; // Protect against negative variance due to floating-point errors double std_dev_us = sqrt(jb->jitter_variance < 0.0 ? 0.0 : jb->jitter_variance); double optimal_buffer_us = jb->jitter_mean + (4.0 * std_dev_us); int optimal_buffer_ms = (int)(optimal_buffer_us / 1000.0); int min_capacity = rtpe_config.jb_adaptive_min; int max_capacity = rtpe_config.jb_adaptive_max; if (max_capacity <= 0) max_capacity = 300; if (min_capacity < 0) min_capacity = 0; if (min_capacity > max_capacity) min_capacity = max_capacity; if (optimal_buffer_ms < min_capacity) optimal_buffer_ms = min_capacity; if (optimal_buffer_ms > max_capacity) optimal_buffer_ms = max_capacity; jb->dynamic_capacity = optimal_buffer_ms; ilog(LOG_DEBUG, "Adaptive JB: mean=%.2fms, stddev=%.2fms, capacity=%dms (samples=%u)", jb->jitter_mean / 1000.0, std_dev_us / 1000.0, jb->dynamic_capacity, jb->jitter_samples); } static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { if (!(mp->rtp = rtp_payload(&mp->payload, s, NULL))) return NULL; char *buf = bufferpool_alloc(media_bufferpool, s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM); if (!buf) { ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno)); return NULL; } struct jb_packet *p = g_new0(__typeof(*p), 1); p->buf = buf; media_packet_copy(&p->mp, mp); p->mp.raw = STR_LEN(buf + RTP_BUFFER_HEAD_ROOM, s->len); memcpy(p->mp.raw.s, s->s, s->len); return p; } // jb is locked (temporarily unlocked during operation, then relocked) static int remove_oldest_packets(struct jitter_buffer *jb, int num_to_remove) { if (num_to_remove <= 0) return 0; int removed = 0; mutex_unlock(&jb->lock); for (int i = 0; i < num_to_remove; i++) { struct timerthread_queue_entry *ttqe = rtpe_g_tree_first(jb->ttq.entries); if (!ttqe) break; g_tree_remove(jb->ttq.entries, ttqe); if (jb->ttq.entry_free_func) jb->ttq.entry_free_func(ttqe); removed++; } mutex_lock(&jb->lock); return removed; } // jb is locked static int try_burst_aware_discard(struct jitter_buffer *jb, int current_buffer_size) { if (!jb->rtptime_delta || !jb->clock_rate || !jb->prev_seq_ts) { ilog(LOG_DEBUG, "Burst-aware discard: insufficient data for calculation"); return 0; } int64_t packetization_interval_us = ((int64_t)jb->rtptime_delta * 1000000) / jb->clock_rate; if (packetization_interval_us <= 0) { ilog(LOG_DEBUG, "Burst-aware discard: invalid packetization interval"); return 0; } int64_t delta_t = rtpe_now - jb->prev_seq_ts; if (delta_t < 0) { ilog(LOG_DEBUG, "Burst-aware discard: negative time delta"); return 0; } int64_t burst_calc = delta_t / packetization_interval_us; if (burst_calc > JB_MAX_BURST_SIZE) { ilog(LOG_DEBUG, "Burst size exceeds maximum (%d packets), capping", JB_MAX_BURST_SIZE); burst_calc = JB_MAX_BURST_SIZE; } int estimated_burst_size = (int)burst_calc; int target_capacity = get_target_capacity(jb); int packets_to_remove = estimated_burst_size - target_capacity; if (packets_to_remove <= 0) { ilog(LOG_DEBUG, "Burst-aware discard: no removal needed (burst: %d, capacity: %d)", estimated_burst_size, target_capacity); return 1; } if (packets_to_remove >= current_buffer_size) { ilog(LOG_DEBUG, "Burst-aware discard: would remove all packets (burst: %d, capacity: %d, buffer: %d)", estimated_burst_size, target_capacity, current_buffer_size); return 0; } int packets_saved = current_buffer_size - packets_to_remove; ilog(LOG_DEBUG, "Burst-aware discard: burst of %d packets detected, removing %d (saving %d packets)", estimated_burst_size, packets_to_remove, packets_saved); int removed = remove_oldest_packets(jb, packets_to_remove); if (removed > 0) { ilog(LOG_DEBUG, "Burst-aware discard: successfully removed %d packets", removed); return 1; } return 0; } // jb is locked static void check_buffered_packets(struct jitter_buffer *jb) { int current_buffer_size = g_tree_nnodes(jb->ttq.entries); int target_capacity = get_target_capacity(jb); if (current_buffer_size > target_capacity) { if (jb->rtptime_delta && jb->clock_rate && jb->prev_seq_ts) { if (try_burst_aware_discard(jb, current_buffer_size)) { return; } } } if (current_buffer_size >= (3 * rtpe_config.jb_length)) { ilog(LOG_DEBUG, "Emergency buffer overflow at 3x capacity - forcing reset"); reset_jitter_buffer(jb); } } // jb is locked static int queue_packet(struct media_packet *mp, struct jb_packet *p) { struct jitter_buffer *jb = mp->stream->jb; unsigned long ts = ntohl(mp->rtp->timestamp); int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); int curr_seq = ntohs(mp->rtp->seq_num); if(!clockrate || !jb->first_send) { ilog(LOG_DEBUG, "Jitter reset due to clockrate"); reset_jitter_buffer(jb); return 1; } long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; int seq_diff = curr_seq - jb->first_seq; if(seq_diff < 0) { jb->first_send = 0; return 1; } if(!jb->rtptime_delta && seq_diff) { jb->rtptime_delta = ts_diff/seq_diff; } p->ttq_entry.when = jb->first_send; int64_t ts_diff_us = (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate; ts_diff_us += jb->clock_drift_val * seq_diff; ts_diff_us += jb->dtmf_mult_factor * DELAY_FACTOR; p->ttq_entry.when += ts_diff_us; ts_diff_us = p->ttq_entry.when - rtpe_now; if (ts_diff_us > 1000000) { // more than one second, can't be right ilog(LOG_DEBUG, "Partial reset due to timestamp"); jb->first_send = 0; p->ttq_entry.when = rtpe_now; } if(jb->prev_seq_ts == 0) jb->prev_seq_ts = rtpe_now; if((p->ttq_entry.when - jb->prev_seq_ts < 0) && (curr_seq > jb->prev_seq)) { p->ttq_entry.when = jb->prev_seq_ts; p->ttq_entry.when += DELAY_FACTOR; } if(p->ttq_entry.when - jb->prev_seq_ts > 0) { jb->prev_seq_ts = p->ttq_entry.when; jb->prev_seq = curr_seq; } if(seq_diff > 3000) //readjust after 3k packets jb->first_send = 0; timerthread_queue_push(&jb->ttq, &p->ttq_entry); return 0; } static int handle_clock_drift(struct media_packet *mp) { ilog(LOG_DEBUG, "handle_clock_drift"); struct jitter_buffer *jb = mp->stream->jb; int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; if(((seq_diff % CLOCK_DRIFT_MULT) != 0) || !seq_diff) return 0; uint32_t ts = ntohl(mp->rtp->timestamp); int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); if(!clockrate) { return 0; } int64_t ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; int64_t ts_diff_us = ts_diff* 1000000 / clockrate; int64_t to_send = jb->first_send; to_send += ts_diff_us; int64_t time_diff = rtpe_now - to_send; jb->clock_drift_val = time_diff/seq_diff; if(jb->clock_drift_val < -10000 || jb->clock_drift_val > 10000) { //disable jb if clock drift greater than 10 ms jb->disabled = 1; jitter_buffer_flush(jb); ilog(LOG_DEBUG, "JB disabled due to clock drift"); return 1; } return 0; } int buffer_packet(struct media_packet *mp, const str *s) { struct jb_packet *p = NULL; int ret = 1; // must call stream_packet mp->call = mp->sfd->call; call_t *call = mp->call; rwlock_lock_r(&call->master_lock); mp->stream = mp->sfd->stream; mp->media = mp->stream->media; struct jitter_buffer *jb = mp->stream->jb; if (!jb || jb->disabled || !PS_ISSET(mp->sfd->stream, RTP)) goto end; if(jb->initial_pkts < INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any jb->initial_pkts++; goto end; } p = get_jb_packet(mp, s); if (!p) goto end; if (PS_ISSET(mp->sfd->stream, RTCP) && rtcp_demux_is_rtcp((void *) &p->mp.raw)){ ilog(LOG_DEBUG, "Discarding from JB. This is RTCP packet. SSRC %u Payload %d", ntohl(p->mp.rtp->ssrc), (p->mp.rtp->m_pt & 0x7f)); goto end; } ilog(LOG_DEBUG, "Handling JB packet on: %s:%d (RTP SSRC %u Payload: %d)", sockaddr_print_buf(&mp->stream->endpoint.address), mp->stream->endpoint.port, ntohl(p->mp.rtp->ssrc), (p->mp.rtp->m_pt & 0x7f)); mp = &p->mp; mutex_lock(&jb->lock); int payload_type = (mp->rtp->m_pt & 0x7f); int seq = ntohs(mp->rtp->seq_num); int marker = (mp->rtp->m_pt & 0x80) ? 1 : 0; int dtmf = 0; const rtp_payload_type *rtp_pt = codec_rtp_pt(mp, payload_type); if(rtp_pt) { if(rtp_pt->codec_def && rtp_pt->codec_def->dtmf) dtmf = 1; } if(marker || (jb->ssrc != ntohl(mp->rtp->ssrc)) || seq == 0 ) { //marker or ssrc change or sequence wrap jb->first_send = 0; } if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change if(!dtmf) jb->first_send = 0; else jb->dtmf_mult_factor++; } if(!dtmf && jb->dtmf_mult_factor) { //reset after DTMF ends jb->first_send = 0; jb->dtmf_mult_factor=0; } if (jb->first_send) { if(rtpe_config.jb_clock_drift) { if(handle_clock_drift(mp)) goto end_unlock; } ret = queue_packet(mp,p); } else { // store data from first packet and use for successive packets and queue the first packet unsigned long ts = ntohl(mp->rtp->timestamp); payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); if(!clockrate){ if(jb->rtptime_delta && payload_type != COMFORT_NOISE) { //ignore CN ilog(LOG_DEBUG, "Jitter reset due to unknown payload = %d", payload_type); reset_jitter_buffer(jb); } goto end_unlock; } jb->first_send = rtpe_now; p->ttq_entry.when = rtpe_now; jb->first_send_ts = ts; jb->first_seq = ntohs(mp->rtp->seq_num); jb->ssrc = ntohl(mp->rtp->ssrc); if(jb->rtptime_delta) ret = queue_packet(mp,p); if(!dtmf) jb->rtptime_delta = 0; } // packet consumed? if (ret == 0) p = NULL; // Update adaptive jitter buffer statistics if (rtpe_config.jb_adaptive && jb->first_send && jb->rtptime_delta && jb->clock_rate) { unsigned long ts = ntohl(mp->rtp->timestamp); long ts_diff = (uint32_t)ts - (uint32_t)jb->first_send_ts; int64_t expected_arrival = jb->first_send + (ts_diff * 1000000LL / jb->clock_rate); int64_t jitter_us = llabs(rtpe_now - expected_arrival); update_jitter_statistics(jb, jitter_us); if (jb->jitter_samples % JB_ADAPTIVE_RECALC_INTERVAL == 0) calculate_adaptive_buffer_size(jb); } check_buffered_packets(jb); end_unlock: mutex_unlock(&jb->lock); end: rwlock_unlock_r(&call->master_lock); if (p) jb_packet_free(&p); return ret; } static void increment_buffer(struct jitter_buffer *jb) { if(jb->buffer_len < rtpe_config.jb_length) jb->buffer_len++; } static void decrement_buffer(struct jitter_buffer *jb) { if(jb->buffer_len > 0) jb->buffer_len--; } static void set_jitter_values(struct media_packet *mp) { struct jitter_buffer *jb = mp->stream->jb; if(!jb || !mp->rtp) return; int curr_seq = ntohs(mp->rtp->seq_num); int payload_type = (mp->rtp->m_pt & 0x7f); int dtmf = 0; const rtp_payload_type *rtp_pt = codec_rtp_pt(mp, payload_type); if(rtp_pt) { if(rtp_pt->codec_def && rtp_pt->codec_def->dtmf) dtmf = 1; } mutex_lock(&jb->lock); if(jb->next_exp_seq && !dtmf) { if(curr_seq > jb->next_exp_seq) { int marker = (mp->rtp->m_pt & 0x80) ? 1 : 0; if(!marker) { ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq); increment_buffer(jb); jb->cont_frames = 0; jb->cont_miss++; } } else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed jb->cont_frames = 0; if((curr_seq == 0) || (jb->next_exp_seq - curr_seq) > 65500) //sequence wrap jb->next_exp_seq = 0; } else { jb->cont_frames++; jb->cont_miss = 0; if(jb->cont_frames >= CONT_SEQ_COUNT) { decrement_buffer(jb); jb->cont_frames = 0; } } if(jb->cont_miss >= CONT_MISS_COUNT) reset_jitter_buffer(jb); } if(curr_seq >= jb->next_exp_seq) jb->next_exp_seq = curr_seq + 1; mutex_unlock(&jb->lock); } static void __jb_send_later(struct timerthread_queue *ttq, void *p) { struct jb_packet *cp = p; set_jitter_values(&cp->mp); play_buffered(p); }; // jb and call are locked static void __jb_send_now(struct timerthread_queue *ttq, void *p) { struct jitter_buffer *jb = (void *) ttq; mutex_unlock(&jb->lock); rwlock_unlock_r(&jb->call->master_lock); __jb_send_later(ttq, p); rwlock_lock_r(&jb->call->master_lock); mutex_lock(&jb->lock); }; static void __jb_free(void *p) { struct jitter_buffer *jb = p; jitter_buffer_free(&jb); } void __jb_packet_free(void *p) { struct jb_packet *jbp = p; jb_packet_free(&jbp); } void jitter_buffer_launch(void) { timerthread_launch(&jitter_buffer_thread, rtpe_config.scheduling, rtpe_config.priority, "jitter buffer"); } struct jitter_buffer *jitter_buffer_new(call_t *c) { ilog(LOG_DEBUG, "creating jitter_buffer"); struct jitter_buffer *jb = timerthread_queue_new("jitter_buffer", sizeof(*jb), &jitter_buffer_thread, __jb_send_now, __jb_send_later, __jb_free, __jb_packet_free); mutex_init(&jb->lock); jb->call = obj_get(c); return jb; } void jitter_buffer_free(struct jitter_buffer **jbp) { if (!jbp || !*jbp) return; ilog(LOG_DEBUG, "freeing jitter_buffer"); mutex_destroy(&(*jbp)->lock); if ((*jbp)->call) obj_put((*jbp)->call); } void jb_packet_free(struct jb_packet **jbp) { if (!jbp || !*jbp) return; bufferpool_unref((*jbp)->buf); media_packet_release(&(*jbp)->mp); g_free(*jbp); *jbp = NULL; }