diff --git a/daemon/codec.c b/daemon/codec.c index 864408a4e..c40ec609e 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -23,6 +23,7 @@ #ifdef WITH_TRANSCODING #include "fix_frame_channel_layout.h" #endif +#include "bufferpool.h" struct codec_timer { struct timerthread_obj tt_obj; @@ -1719,12 +1720,10 @@ static void codec_add_raw_packet_dup(struct media_packet *mp, unsigned int clock struct codec_packet *p = g_slice_alloc0(sizeof(*p)); // don't just duplicate the string. need to ensure enough room // if encryption is enabled on this stream - char *buf = g_malloc(mp->raw.len + 1 + RTP_BUFFER_TAIL_ROOM); - if (mp->raw.s && mp->raw.len) - memcpy(buf, mp->raw.s, mp->raw.len); + p->s.s = bufferpool_alloc(media_bufferpool, mp->raw.len + RTP_BUFFER_TAIL_ROOM); + memcpy(p->s.s, mp->raw.s, mp->raw.len); p->s.len = mp->raw.len; - p->s.s = buf; - p->free_func = free; + p->free_func = bufferpool_unref; p->rtp = (struct rtp_header *) p->s.s; codec_add_raw_packet_common(mp, clockrate, p); } @@ -2007,7 +2006,7 @@ out_ch: void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch, struct codec_handler *handler, - char *buf, // malloc'd, room for rtp_header + filled-in payload + char *buf, // bufferpool_alloc'd, room for rtp_header + filled-in payload unsigned int payload_len, unsigned long payload_ts, int marker, int seq, int seq_inc, int payload_type, @@ -2035,7 +2034,7 @@ void codec_output_rtp(struct media_packet *mp, struct codec_scheduler *csch, p->s.s = buf; p->s.len = payload_len + sizeof(struct rtp_header); payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type); - p->free_func = free; + p->free_func = bufferpool_unref; p->ttq_entry.source = handler; p->rtp = rh; p->ts = ts; @@ -2192,7 +2191,8 @@ static int codec_add_dtmf_packet(struct codec_ssrc_handler *ch, struct codec_ssr skip: obj_put(&output_ch->h); - char *buf = malloc(packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM); + char *buf = bufferpool_alloc(media_bufferpool, + packet->payload->len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM); memcpy(buf + sizeof(struct rtp_header), packet->payload->s, packet->payload->len); if (packet->bypass_seq) // inject original seq codec_output_rtp(mp, &ch->csch, packet->handler ? : h, buf, packet->payload->len, packet->ts, @@ -2439,12 +2439,10 @@ void codec_packet_free(void *pp) { g_slice_free1(sizeof(*p), p); } bool codec_packet_copy(struct codec_packet *p) { - char *buf = malloc(p->s.len + RTP_BUFFER_TAIL_ROOM); - if (!buf) - return false; + char *buf = bufferpool_alloc(media_bufferpool, p->s.len + RTP_BUFFER_TAIL_ROOM); memcpy(buf, p->s.s, p->s.len); p->s.s = buf; - p->free_func = free; + p->free_func = bufferpool_unref; return true; } struct codec_packet *codec_packet_dup(struct codec_packet *p) { @@ -3996,7 +3994,7 @@ void packet_encoded_packetize(AVPacket *pkt, struct codec_ssrc_handler *ch, stru sizeof(struct telephone_event_payload)); unsigned int pkt_len = sizeof(struct rtp_header) + payload_len + RTP_BUFFER_TAIL_ROOM; // prepare our buffers - char *buf = malloc(pkt_len); + char *buf = bufferpool_alloc(media_bufferpool, pkt_len); char *payload = buf + sizeof(struct rtp_header); // tell our packetizer how much we want str inout = STR_INIT_LEN(payload, payload_len); @@ -4041,7 +4039,7 @@ static int packet_encoded_rtp(encoder_t *enc, void *u1, void *u2) { static void __codec_output_rtp_seq_passthrough(struct media_packet *mp, struct codec_scheduler *csch, struct codec_handler *handler, - char *buf, // malloc'd, room for rtp_header + filled-in payload + char *buf, // bufferpool_alloc'd, room for rtp_header + filled-in payload unsigned int payload_len, unsigned long payload_ts, int marker, int payload_type, @@ -4052,7 +4050,7 @@ static void __codec_output_rtp_seq_passthrough(struct media_packet *mp, struct c static void __codec_output_rtp_seq_own(struct media_packet *mp, struct codec_scheduler *csch, struct codec_handler *handler, - char *buf, // malloc'd, room for rtp_header + filled-in payload + char *buf, // bufferpool_alloc'd, room for rtp_header + filled-in payload unsigned int payload_len, unsigned long payload_ts, int marker, int payload_type, @@ -4104,7 +4102,7 @@ static void __packet_encoded_tx(AVPacket *pkt, struct codec_ssrc_handler *ch, st char *send_buf = buf; if (repeats > 0) { // need to duplicate the payload as codec_output_rtp consumes it - send_buf = malloc(pkt_len); + send_buf = bufferpool_alloc(media_bufferpool, pkt_len); memcpy(send_buf, buf, pkt_len); } func(mp, &ch->csch, ch->handler, send_buf, inout->len, ch->csch.first_ts diff --git a/daemon/helpers.c b/daemon/helpers.c index bd4bffca7..61bae052e 100644 --- a/daemon/helpers.c +++ b/daemon/helpers.c @@ -12,6 +12,8 @@ #include "log.h" #include "main.h" +#include "bufferpool.h" +#include "media_socket.h" #if 0 #define BSDB(x...) fprintf(stderr, x) @@ -193,6 +195,7 @@ void thread_waker_del(struct thread_waker *wk) { static void thread_detach_cleanup(void *dtp) { struct detach_thread *dt = dtp; g_slice_free1(sizeof(*dt), dt); + bufferpool_destroy(media_bufferpool); thread_join_me(); } @@ -245,6 +248,8 @@ static void *thread_detach_func(void *d) { dt->priority, strerror(errno)); } + media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536); + thread_cleanup_push(thread_detach_cleanup, dt); dt->func(dt->data); thread_cleanup_pop(true); diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c index 61483eccc..a1ad6a3bc 100644 --- a/daemon/jitter_buffer.c +++ b/daemon/jitter_buffer.c @@ -9,6 +9,7 @@ #include "codec.h" #include "main.h" #include "rtcplib.h" +#include "bufferpool.h" #define INITIAL_PACKETS 0x1E #define CONT_SEQ_COUNT 0x1F4 @@ -105,7 +106,7 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { if (rtp_payload(&mp->rtp, &mp->payload, s)) return NULL; - char *buf = malloc(s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM); + char *buf = bufferpool_alloc(media_bufferpool, s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM); if (!buf) { ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno)); return NULL; @@ -445,7 +446,7 @@ void jb_packet_free(struct jb_packet **jbp) { if (!jbp || !*jbp) return; - free((*jbp)->buf); + bufferpool_unref((*jbp)->buf); media_packet_release(&(*jbp)->mp); g_slice_free1(sizeof(**jbp), *jbp); *jbp = NULL; diff --git a/daemon/main.c b/daemon/main.c index 65eb2bd6e..b56b1a87d 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1222,6 +1222,15 @@ static void early_init(void) { socket_init(); // needed for socktype_udp } +#ifdef WITH_TRANSCODING +static void clib_init(void) { + media_bufferpool = bufferpool_new(g_malloc, g_free, 64 * 65536); +} +static void clib_cleanup(void) { + bufferpool_destroy(media_bufferpool); +} +#endif + static void init_everything(void) { bufferpool_init(); gettimeofday(&rtpe_now, NULL); @@ -1247,6 +1256,10 @@ static void init_everything(void) { if (call_interfaces_init()) abort(); statistics_init(); +#ifdef WITH_TRANSCODING + codeclib_thread_init = clib_init; + codeclib_thread_cleanup = clib_cleanup; +#endif codeclib_init(0); media_player_init(); if (!dtmf_init()) diff --git a/daemon/media_player.c b/daemon/media_player.c index ddc678517..5bb31804b 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -22,6 +22,7 @@ #include "fix_frame_channel_layout.h" #endif #include "kernel.h" +#include "bufferpool.h" #define DEFAULT_AVIO_BUFSIZE 4096 @@ -445,7 +446,7 @@ retry:; // make a copy to send out size_t len = pkt->s.len + sizeof(struct rtp_header) + RTP_BUFFER_TAIL_ROOM; - char *buf = g_malloc(len); + char *buf = bufferpool_alloc(media_bufferpool, len); memcpy(buf, pkt->buf, len); struct media_packet packet = { @@ -504,7 +505,7 @@ static void media_player_cached_reader_start(struct media_player *mp, const rtp_ static void cache_packet_free(struct media_player_cache_packet *p) { - g_free(p->buf); + bufferpool_unref(p->buf); g_slice_free1(sizeof(*p), p); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index 5a4730286..6878d9510 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -31,6 +31,7 @@ #include "dtmf.h" #include "mqtt.h" #include "janus.h" +#include "bufferpool.h" #include "xt_RTPENGINE.h" @@ -441,6 +442,7 @@ TYPED_GHASHTABLE(local_sockets_ht, endpoint_t, stream_fd, endpoint_t_hash, endpo static rwlock_t local_media_socket_endpoints_lock; static local_sockets_ht local_media_socket_endpoints; +__thread struct bufferpool *media_bufferpool; /* checks for free no_ports on a local interface */ @@ -2375,8 +2377,10 @@ int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, s for (__auto_type l = mp->packets_out.head; l; l = l->next) { struct codec_packet *p = l->data; if (mp->call->recording && rtpe_config.rec_egress) { - str_init_dup_str(&p->plain, &p->s); - p->plain_free_func = free; + p->plain.len = p->s.len; + p->plain.s = bufferpool_alloc(media_bufferpool, p->s.len); + memcpy(p->plain.s, p->s.s, p->s.len); + p->plain_free_func = bufferpool_unref; } int encret = encrypt_func(&p->s, out, mp->ssrc_out); if (encret == 1) @@ -2925,7 +2929,7 @@ static int stream_packet(struct packet_handler_ctx *phc) { if (sh_link->next) { if (!orig_raw.s) orig_raw = phc->mp.raw; - char *buf = g_malloc(orig_raw.len + RTP_BUFFER_TAIL_ROOM); + char *buf = bufferpool_alloc(media_bufferpool, orig_raw.len + RTP_BUFFER_TAIL_ROOM); memcpy(buf, orig_raw.s, orig_raw.len); phc->mp.raw.s = buf; g_queue_push_tail(&free_list, buf); @@ -3129,7 +3133,7 @@ out: ssrc_ctx_put(&phc->mp.ssrc_in); rtcp_list_free(&phc->rtcp_list); - g_queue_clear_full(&free_list, g_free); + g_queue_clear_full(&free_list, bufferpool_unref); return ret; } @@ -3137,7 +3141,6 @@ out: static void stream_fd_readable(int fd, void *p) { stream_fd *sfd = p; - char buf[RTP_BUFFER_SIZE]; int ret, iters; bool update = false; call_t *ca; @@ -3187,6 +3190,9 @@ restart: goto done; } } + + g_autoptr(bp_char) buf = bufferpool_alloc(media_bufferpool, RTP_BUFFER_SIZE); + ret = socket_recvfrom_ts(&sfd->socket, buf + RTP_BUFFER_HEAD_ROOM, MAX_RTP_PACKET_SIZE, &phc.mp.fsin, &phc.mp.tv); if (ca) diff --git a/debian/control b/debian/control index ed9723913..1bf98a478 100644 --- a/debian/control +++ b/debian/control @@ -48,7 +48,7 @@ Build-Depends: libxmlrpc-core-c3-dev (>= 1.16.07), libxtables-dev (>= 1.4) | iptables-dev (>= 1.4), markdown, - ngcp-libcodec-chain-dev , + ngcp-libcodec-chain-dev (>= 12.4) , pandoc, pkgconf, python3, diff --git a/include/media_socket.h b/include/media_socket.h index 001d413f2..f343996cc 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -287,6 +287,7 @@ struct media_packet { extern GQueue all_local_interfaces; // read-only during runtime +extern __thread struct bufferpool *media_bufferpool; void interfaces_init(GQueue *interfaces); diff --git a/lib/codeclib.c b/lib/codeclib.c index 052832fbc..1fcda717d 100644 --- a/lib/codeclib.c +++ b/lib/codeclib.c @@ -148,6 +148,7 @@ static void *cc_lib_handle; #ifdef HAVE_CODEC_CHAIN static __typeof__(codec_chain_client_connect) *cc_client_connect; +static __typeof__(codec_chain_set_thread_funcs) *cc_set_thread_funcs; static __typeof__(codec_chain_client_pcma2opus_runner_new) *cc_client_pcma2opus_runner_new; static __typeof__(codec_chain_client_pcmu2opus_runner_new) *cc_client_pcmu2opus_runner_new; @@ -847,6 +848,8 @@ static GQueue __supplemental_codecs = G_QUEUE_INIT; const GQueue * const codec_supplemental_codecs = &__supplemental_codecs; static codec_def_t *codec_def_cn; +void (*codeclib_thread_init)(void); +void (*codeclib_thread_cleanup)(void); static GHashTable *codecs_ht; @@ -1374,6 +1377,7 @@ static void *dlsym_assert(void *handle, const char *sym, const char *fn) { #ifdef HAVE_CODEC_CHAIN static void cc_dlsym_resolve(const char *fn) { cc_client_connect = dlsym_assert(cc_lib_handle, "codec_chain_client_connect", fn); + cc_set_thread_funcs = dlsym_assert(cc_lib_handle, "codec_chain_set_thread_funcs", fn); cc_client_pcma2opus_runner_new = dlsym_assert(cc_lib_handle, "codec_chain_client_pcma2opus_runner_new", fn); @@ -1527,6 +1531,8 @@ static void cc_init(void) { cc_dlsym_resolve(rtpe_common_config_ptr->codec_chain_lib_path); + cc_set_thread_funcs(codeclib_thread_init, codeclib_thread_cleanup, NULL); + cc_client = cc_client_connect(4); if (!cc_client) die("Failed to connect to cudecsd"); diff --git a/lib/codeclib.h b/lib/codeclib.h index f0abe4916..1e21f6356 100644 --- a/lib/codeclib.h +++ b/lib/codeclib.h @@ -365,6 +365,10 @@ struct packet_sequencer_s { extern const GQueue * const codec_supplemental_codecs; +// must be set before calling codeclib_init +extern void (*codeclib_thread_init)(void); +extern void (*codeclib_thread_cleanup)(void); + void codeclib_init(int); void codeclib_free(void); diff --git a/t/Makefile b/t/Makefile index 78a9a7db2..8720e301c 100644 --- a/t/Makefile +++ b/t/Makefile @@ -195,7 +195,7 @@ test-bitstr: test-bitstr.o test-mix-buffer: test-mix-buffer.o $(COMMONOBJS) mix_buffer.o ssrc.o rtp.o crypto.o helpers.o \ mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o codeclib.strhash.o dtmflib.o \ - mvr2s_x64_avx2.o mvr2s_x64_avx512.o resample.o + mvr2s_x64_avx2.o mvr2s_x64_avx512.o resample.o bufferpool.o spandsp_send_fax_pcm: spandsp_send_fax_pcm.o @@ -226,7 +226,7 @@ test-stats: test-stats.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssr streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o \ websocket.o cli.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \ - mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o + mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o codec.o ssrc.o call.o ice.o helpers.o \ kernel.o media_socket.o stun.o bencode.o socket.o poller.o dtls.o recording.o statistics.o \ @@ -235,13 +235,13 @@ test-transcode: test-transcode.o $(COMMONOBJS) codeclib.strhash.o resample.o cod streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ media_player.o jitter_buffer.o dtmflib.o t38.o tcp_listener.o mqtt.o janus.strhash.o websocket.o \ cli.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o audio_player.o mix_buffer.o \ - mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o + mix_in_x64_avx2.o mix_in_x64_sse2.o mix_in_x64_avx512bw.o bufferpool.o test-resample: test-resample.o $(COMMONOBJS) codeclib.strhash.o resample.o dtmflib.o mvr2s_x64_avx2.o \ mvr2s_x64_avx512.o test-payload-tracker: test-payload-tracker.o $(COMMONOBJS) ssrc.o helpers.o auxlib.o rtp.o crypto.o codeclib.strhash.o \ - resample.o dtmflib.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o + resample.o dtmflib.o mvr2s_x64_avx2.o mvr2s_x64_avx512.o bufferpool.o test-kernel-module: test-kernel-module.o $(COMMONOBJS) kernel.o diff --git a/t/test-mix-buffer.c b/t/test-mix-buffer.c index 68671ea9b..c85763c9e 100644 --- a/t/test-mix-buffer.c +++ b/t/test-mix-buffer.c @@ -15,6 +15,7 @@ struct global_stats_sampled rtpe_stats_sampled; struct global_sampled_min_max rtpe_sampled_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; +__thread struct bufferpool *media_bufferpool; int get_local_log_level(unsigned int u) { return -1; diff --git a/t/test-payload-tracker.c b/t/test-payload-tracker.c index 6767a1c54..853e1d16e 100644 --- a/t/test-payload-tracker.c +++ b/t/test-payload-tracker.c @@ -15,6 +15,7 @@ struct global_stats_sampled rtpe_stats_sampled; struct global_sampled_min_max rtpe_sampled_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max; struct global_sampled_min_max rtpe_sampled_graphite_min_max_sampled; +__thread struct bufferpool *media_bufferpool; static void most_cmp(struct payload_tracker *t, const char *cmp, const char *file, int line) { char buf[1024] = ""; diff --git a/t/test-transcode.c b/t/test-transcode.c index 50dfc157e..5483367b3 100644 --- a/t/test-transcode.c +++ b/t/test-transcode.c @@ -5,6 +5,7 @@ #include "main.h" #include "ssrc.h" #include "helpers.h" +#include "bufferpool.h" int _log_facility_rtcp; int _log_facility_cdr; @@ -416,6 +417,8 @@ static void dtmf(const char *s) { int main(void) { rtpe_common_config_ptr = &rtpe_config.common; + bufferpool_init(); + media_bufferpool = bufferpool_new(g_malloc, g_free, 4096); unsigned long random_seed = 0; @@ -1721,6 +1724,9 @@ int main(void) { expect(B, "8/PCMA/8000"); end(); + bufferpool_destroy(media_bufferpool); + bufferpool_cleanup(); + return 0; }