Browse Source

MT#55283 use bufferpool for media packets

Switch all memory buffers used for RTP I/O from generic stack or heap
allocated memory to the bufferpool implementation. Use a per-thread
bufferpool to minimise lock contention.

This commit is just a one-for-one swap and doesn't use the bufferpool's
reference counting semantics yet.

Change-Id: I9cba4ec97bd0afcd374bf6c0be2b608a46e73e57
pull/1826/head
Richard Fuchs 2 years ago
parent
commit
ad00134c61
14 changed files with 73 additions and 30 deletions
  1. +14
    -16
      daemon/codec.c
  2. +5
    -0
      daemon/helpers.c
  3. +3
    -2
      daemon/jitter_buffer.c
  4. +13
    -0
      daemon/main.c
  5. +3
    -2
      daemon/media_player.c
  6. +11
    -5
      daemon/media_socket.c
  7. +1
    -1
      debian/control
  8. +1
    -0
      include/media_socket.h
  9. +6
    -0
      lib/codeclib.c
  10. +4
    -0
      lib/codeclib.h
  11. +4
    -4
      t/Makefile
  12. +1
    -0
      t/test-mix-buffer.c
  13. +1
    -0
      t/test-payload-tracker.c
  14. +6
    -0
      t/test-transcode.c

+ 14
- 16
daemon/codec.c View File

@ -23,6 +23,7 @@
#ifdef WITH_TRANSCODING
#include "fix_frame_channel_layout.h"
#endif
#include "bufferpool.h"
struct codec_timer {
struct timerthread_obj tt_obj;
@ -1719,12 +1720,10 @@ static void codec_add_raw_packet_dup(struct media_packet *mp, unsigned int clock
struct codec_packet *p = g_slice_alloc0(sizeof(*p));
// don't just duplicate the string. need to ensure enough room
// if encryption is enabled on this stream
char *buf = g_malloc(mp->raw.len + 1 + RTP_BUFFER_TAIL_ROOM);
if (mp->raw.s && mp->raw.len)
memcpy(buf, mp->raw.s, mp->raw.len);
p->s.s = bufferpool_alloc(media_bufferpool, mp->raw.len + RTP_BUFFER_TAIL_ROOM);
memcpy(p->s.s, mp->raw.s, mp->raw.len);
p->s.len = mp->raw.len;
p->s.s = buf;
p->free_func = free;
p->free_func = bufferpool_unref;
p->rtp = (struct rtp_header *) p->s.s;
codec_add_raw_packet_common(mp, clockrate, p);
}
@ -2007,7 +2006,7 @@ out_ch:
void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch,
struct codec_handler *handler,
char *buf, // malloc'd, room for rtp_header + filled-in payload
char *buf, // bufferpool_alloc'd, room for rtp_header + filled-in payload
unsigned int payload_len,
unsigned long payload_ts,
int marker, int seq, int seq_inc, int payload_type,
@ -2035,7 +2034,7 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch,
p->s.s = buf;
p->s.len = payload_len + sizeof(struct rtp_header);
payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type);
p->free_func = free;
p->free_func = bufferpool_unref;
p->ttq_entry.source = handler;
p->rtp = rh;
p->ts = ts;
@ -2192,7 +2191,8 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr
skip:
obj_put(&output_ch->h);
char *buf = malloc(packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM);
char *buf = bufferpool_alloc(media_bufferpool,
packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM);
memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len);
if (packet->bypass_seq) // inject original seq
codec_output_rtp(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts,
@ -2439,12 +2439,10 @@ void codec_packet_free(void *pp) {
g_slice_free1(sizeof(*p), p);
}
bool codec_packet_copy(struct codec_packet *p) {
char *buf = malloc(p->s.len + RTP_BUFFER_TAIL_ROOM);
if (!buf)
return false;
char *buf = bufferpool_alloc(media_bufferpool, p->s.len + RTP_BUFFER_TAIL_ROOM);
memcpy(buf, p->s.s, p->s.len);
p->s.s = buf;
p->free_func = free;
p->free_func = bufferpool_unref;
return true;
}
struct codec_packet *codec_packet_dup(struct codec_packet *p) {
@ -3996,7 +3994,7 @@ void packet_encoded_packetize(AVPacket *pkt, struct codec_ssrc_handler *ch, stru
sizeof(struct telephone_event_payload));
unsigned int pkt_len = sizeof(struct rtp_header) + payload_len + RTP_BUFFER_TAIL_ROOM;
// prepare our buffers
char *buf = malloc(pkt_len);
char *buf = bufferpool_alloc(media_bufferpool, pkt_len);
char *payload = buf + sizeof(struct rtp_header);
// tell our packetizer how much we want
str inout = STR_INIT_LEN(payload, payload_len);
@ -4041,7 +4039,7 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) {
static void __codec_output_rtp_seq_passthrough(struct media_packet *mp, struct codec_scheduler *csch,
struct codec_handler *handler,
char *buf, // malloc'd, room for rtp_header + filled-in payload
char *buf, // bufferpool_alloc'd, room for rtp_header + filled-in payload
unsigned int payload_len,
unsigned long payload_ts,
int marker, int payload_type,
@ -4052,7 +4050,7 @@ static void __codec_output_rtp_seq_passthrough(struct media_packet *mp, struct c
static void __codec_output_rtp_seq_own(struct media_packet *mp, struct codec_scheduler *csch,
struct codec_handler *handler,
char *buf, // malloc'd, room for rtp_header + filled-in payload
char *buf, // bufferpool_alloc'd, room for rtp_header + filled-in payload
unsigned int payload_len,
unsigned long payload_ts,
int marker, int payload_type,
@ -4104,7 +4102,7 @@ static void __packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, st
char *send_buf = buf;
if (repeats > 0) {
// need to duplicate the payload as codec_output_rtp consumes it
send_buf = malloc(pkt_len);
send_buf = bufferpool_alloc(media_bufferpool, pkt_len);
memcpy(send_buf, buf, pkt_len);
}
func(mp, &ch->csch, ch->handler, send_buf, inout->len, ch->csch.first_ts


+ 5
- 0
daemon/helpers.c View File

@ -12,6 +12,8 @@
#include "log.h"
#include "main.h"
#include "bufferpool.h"
#include "media_socket.h"
#if 0
#define BSDB(x...) fprintf(stderr, x)
@ -193,6 +195,7 @@ void thread_waker_del(struct thread_waker *wk) {
static void thread_detach_cleanup(void *dtp) {
struct detach_thread *dt = dtp;
g_slice_free1(sizeof(*dt), dt);
bufferpool_destroy(media_bufferpool);
thread_join_me();
}
@ -245,6 +248,8 @@ static void *thread_detach_func(void *d) {
dt->priority, strerror(errno));
}
media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536);
thread_cleanup_push(thread_detach_cleanup, dt);
dt->func(dt->data);
thread_cleanup_pop(true);


+ 3
- 2
daemon/jitter_buffer.c View File

@ -9,6 +9,7 @@
#include "codec.h"
#include "main.h"
#include "rtcplib.h"
#include "bufferpool.h"
#define INITIAL_PACKETS 0x1E
#define CONT_SEQ_COUNT 0x1F4
@ -105,7 +106,7 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) {
if (rtp_payload(&mp->rtp, &mp->payload, s))
return NULL;
char *buf = malloc(s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM);
char *buf = bufferpool_alloc(media_bufferpool, s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM);
if (!buf) {
ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno));
return NULL;
@ -445,7 +446,7 @@ void jb_packet_free(struct jb_packet **jbp) {
if (!jbp || !*jbp)
return;
free((*jbp)->buf);
bufferpool_unref((*jbp)->buf);
media_packet_release(&(*jbp)->mp);
g_slice_free1(sizeof(**jbp), *jbp);
*jbp = NULL;


+ 13
- 0
daemon/main.c View File

@ -1222,6 +1222,15 @@ static void early_init(void) {
socket_init(); // needed for socktype_udp
}
#ifdef WITH_TRANSCODING
static void clib_init(void) {
media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536);
}
static void clib_cleanup(void) {
bufferpool_destroy(media_bufferpool);
}
#endif
static void init_everything(void) {
bufferpool_init();
gettimeofday(&rtpe_now, NULL);
@ -1247,6 +1256,10 @@ static void init_everything(void) {
if (call_interfaces_init())
abort();
statistics_init();
#ifdef WITH_TRANSCODING
codeclib_thread_init = clib_init;
codeclib_thread_cleanup = clib_cleanup;
#endif
codeclib_init(0);
media_player_init();
if (!dtmf_init())


+ 3
- 2
daemon/media_player.c View File

@ -22,6 +22,7 @@
#include "fix_frame_channel_layout.h"
#endif
#include "kernel.h"
#include "bufferpool.h"
#define DEFAULT_AVIO_BUFSIZE 4096
@ -445,7 +446,7 @@ retry:;
// make a copy to send out
size_t len = pkt->s.len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM;
char *buf = g_malloc(len);
char *buf = bufferpool_alloc(media_bufferpool, len);
memcpy(buf, pkt->buf, len);
struct media_packet packet = {
@ -504,7 +505,7 @@ static void media_player_cached_reader_start(struct media_player *mp, const rtp_
static void cache_packet_free(struct media_player_cache_packet *p) {
g_free(p->buf);
bufferpool_unref(p->buf);
g_slice_free1(sizeof(*p), p);
}


+ 11
- 5
daemon/media_socket.c View File

@ -31,6 +31,7 @@
#include "dtmf.h"
#include "mqtt.h"
#include "janus.h"
#include "bufferpool.h"
#include "xt_RTPENGINE.h"
@ -441,6 +442,7 @@ TYPED_GHASHTABLE(local_sockets_ht, endpoint_t, stream_fd, endpoint_t_hash, endpo
static rwlock_t local_media_socket_endpoints_lock;
static local_sockets_ht local_media_socket_endpoints;
__thread struct bufferpool *media_bufferpool;
/* checks for free no_ports on a local interface */
@ -2375,8 +2377,10 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s
for (__auto_type l = mp->packets_out.head; l; l = l->next) {
struct codec_packet *p = l->data;
if (mp->call->recording && rtpe_config.rec_egress) {
str_init_dup_str(&p->plain, &p->s);
p->plain_free_func = free;
p->plain.len = p->s.len;
p->plain.s = bufferpool_alloc(media_bufferpool, p->s.len);
memcpy(p->plain.s, p->s.s, p->s.len);
p->plain_free_func = bufferpool_unref;
}
int encret = encrypt_func(&p->s, out, mp->ssrc_out);
if (encret == 1)
@ -2925,7 +2929,7 @@ static int stream_packet(struct packet_handler_ctx *phc) {
if (sh_link->next) {
if (!orig_raw.s)
orig_raw = phc->mp.raw;
char *buf = g_malloc(orig_raw.len + RTP_BUFFER_TAIL_ROOM);
char *buf = bufferpool_alloc(media_bufferpool, orig_raw.len + RTP_BUFFER_TAIL_ROOM);
memcpy(buf, orig_raw.s, orig_raw.len);
phc->mp.raw.s = buf;
g_queue_push_tail(&free_list, buf);
@ -3129,7 +3133,7 @@ out:
ssrc_ctx_put(&phc->mp.ssrc_in);
rtcp_list_free(&phc->rtcp_list);
g_queue_clear_full(&free_list, g_free);
g_queue_clear_full(&free_list, bufferpool_unref);
return ret;
}
@ -3137,7 +3141,6 @@ out:
static void stream_fd_readable(int fd, void *p) {
stream_fd *sfd = p;
char buf[RTP_BUFFER_SIZE];
int ret, iters;
bool update = false;
call_t *ca;
@ -3187,6 +3190,9 @@ restart:
goto done;
}
}
g_autoptr(bp_char) buf = bufferpool_alloc(media_bufferpool, RTP_BUFFER_SIZE);
ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE,
&phc.mp.fsin, &phc.mp.tv);
if (ca)


+ 1
- 1
debian/control View File

@ -48,7 +48,7 @@ Build-Depends:
libxmlrpc-core-c3-dev (>= 1.16.07),
libxtables-dev (>= 1.4) | iptables-dev (>= 1.4),
markdown,
ngcp-libcodec-chain-dev <pkg.ngcp-rtpengine.codec-chain>,
ngcp-libcodec-chain-dev (>= 12.4) <pkg.ngcp-rtpengine.codec-chain>,
pandoc,
pkgconf,
python3,


+ 1
- 0
include/media_socket.h View File

@ -287,6 +287,7 @@ struct media_packet {
extern GQueue all_local_interfaces; // read-only during runtime
extern __thread struct bufferpool *media_bufferpool;
void interfaces_init(GQueue *interfaces);


+ 6
- 0
lib/codeclib.c View File

@ -148,6 +148,7 @@ static void *cc_lib_handle;
#ifdef HAVE_CODEC_CHAIN
static __typeof__(codec_chain_client_connect) *cc_client_connect;
static __typeof__(codec_chain_set_thread_funcs) *cc_set_thread_funcs;
static __typeof__(codec_chain_client_pcma2opus_runner_new) *cc_client_pcma2opus_runner_new;
static __typeof__(codec_chain_client_pcmu2opus_runner_new) *cc_client_pcmu2opus_runner_new;
@ -847,6 +848,8 @@ static GQueue __supplemental_codecs = G_QUEUE_INIT;
const GQueue * const codec_supplemental_codecs = &__supplemental_codecs;
static codec_def_t *codec_def_cn;
void (*codeclib_thread_init)(void);
void (*codeclib_thread_cleanup)(void);
static GHashTable *codecs_ht;
@ -1374,6 +1377,7 @@ static void *dlsym_assert(void *handle, const char *sym, const char *fn) {
#ifdef HAVE_CODEC_CHAIN
static void cc_dlsym_resolve(const char *fn) {
cc_client_connect = dlsym_assert(cc_lib_handle, "codec_chain_client_connect", fn);
cc_set_thread_funcs = dlsym_assert(cc_lib_handle, "codec_chain_set_thread_funcs", fn);
cc_client_pcma2opus_runner_new = dlsym_assert(cc_lib_handle,
"codec_chain_client_pcma2opus_runner_new", fn);
@ -1527,6 +1531,8 @@ static void cc_init(void) {
cc_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path);
cc_set_thread_funcs(codeclib_thread_init, codeclib_thread_cleanup, NULL);
cc_client = cc_client_connect(4);
if (!cc_client)
die("Failed to connect to cudecsd");


+ 4
- 0
lib/codeclib.h View File

@ -365,6 +365,10 @@ struct packet_sequencer_s {
extern const GQueue * const codec_supplemental_codecs;
// must be set before calling codeclib_init
extern void (*codeclib_thread_init)(void);
extern void (*codeclib_thread_cleanup)(void);
void codeclib_init(int);
void codeclib_free(void);


+ 4
- 4
t/Makefile View File

@ -195,7 +195,7 @@ test-bitstr: test-bitstr.o
test-mix-buffer: test-mix-buffer.o $(COMMONOBJS) mix_buffer.o ssrc.o rtp.o crypto.o helpers.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o codeclib.strhash.o dtmflib.o \
mvr2s_x64_avx2.o mvr2s_x64_avx512.o resample.o
mvr2s_x64_avx2.o mvr2s_x64_avx512.o resample.o bufferpool.o
spandsp_send_fax_pcm: spandsp_send_fax_pcm.o
@ -226,7 +226,7 @@ test-stats: test-stats.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssr
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o \
websocket.o cli.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o
test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssrc.o call.o ice.o helpers.o \
kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \
@ -235,13 +235,13 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o cod
streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \
media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \
cli.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o
mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o
test-resample: test-resample.o $(COMMONOBJS) codeclib.strhash.o resample.o dtmflib.o mvr2s_x64_avx2.o \
mvr2s_x64_avx512.o
test-payload-tracker: test-payload-tracker.o $(COMMONOBJS) ssrc.o helpers.o auxlib.o rtp.o crypto.o codeclib.strhash.o \
resample.o dtmflib.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o
resample.o dtmflib.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o bufferpool.o
test-kernel-module: test-kernel-module.o $(COMMONOBJS) kernel.o


+ 1
- 0
t/test-mix-buffer.c View File

@ -15,6 +15,7 @@ struct global_stats_sampled rtpe_stats_sampled;
struct global_sampled_min_max rtpe_sampled_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
__thread struct bufferpool *media_bufferpool;
int get_local_log_level(unsigned int u) {
return -1;


+ 1
- 0
t/test-payload-tracker.c View File

@ -15,6 +15,7 @@ struct global_stats_sampled rtpe_stats_sampled;
struct global_sampled_min_max rtpe_sampled_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max;
struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled;
__thread struct bufferpool *media_bufferpool;
static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) {
char buf[1024] = "";


+ 6
- 0
t/test-transcode.c View File

@ -5,6 +5,7 @@
#include "main.h"
#include "ssrc.h"
#include "helpers.h"
#include "bufferpool.h"
int _log_facility_rtcp;
int _log_facility_cdr;
@ -416,6 +417,8 @@ static void dtmf(const char *s) {
int main(void) {
rtpe_common_config_ptr = &rtpe_config.common;
bufferpool_init();
media_bufferpool = bufferpool_new(g_malloc, g_free, 4096);
unsigned long random_seed = 0;
@ -1721,6 +1724,9 @@ int main(void) {
expect(B, "8/PCMA/8000");
end();
bufferpool_destroy(media_bufferpool);
bufferpool_cleanup();
return 0;
}


Loading…
Cancel
Save