From 4267e1ca09562922791931fd42cb136ae08b99a6 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 29 Jan 2020 09:22:24 -0500 Subject: [PATCH 1/7] TT#74301 split auto-daemon-test framework into perl module Change-Id: Id12cd5a6db1f23da9ba0d50ebf17912ba94297e3 --- perl/NGCP/Rtpengine/AutoTest.pm | 242 +++++++++++++++++++++++++++++++ t/auto-daemon-tests.pl | 246 ++++---------------------------- 2 files changed, 268 insertions(+), 220 deletions(-) create mode 100644 perl/NGCP/Rtpengine/AutoTest.pm diff --git a/perl/NGCP/Rtpengine/AutoTest.pm b/perl/NGCP/Rtpengine/AutoTest.pm new file mode 100644 index 000000000..c05478a23 --- /dev/null +++ b/perl/NGCP/Rtpengine/AutoTest.pm @@ -0,0 +1,242 @@ +package NGCP::Rtpengine::AutoTest; + +use strict; +use warnings; +use NGCP::Rtpengine::Test; +use NGCP::Rtpclient::SRTP; +use Test::More; +use File::Temp; +use IPC::Open3; +use Time::HiRes; +use POSIX ":sys_wait_h"; +use IO::Socket; +use Exporter; + + +our @ISA; +our @EXPORT; + +BEGIN { + require Exporter; + @ISA = qw(Exporter); + our @EXPORT = qw(autotest_start new_call offer answer ft tt snd srtp_snd rtp rcv srtp_rcv + srtp_dec escape rtpm reverse_tags new_tt crlf sdp_split rtpe_req offer_answer); +}; + + +my $rtpe_stdout; +my $rtpe_stderr; +my $rtpe_pid; +my $c; +my ($cid, $ft, $tt, @sockets, $tag_iter); + + +sub autotest_start { + my (@cmdline) = @_; + + like $ENV{LD_PRELOAD}, qr/tests-preload/, 'LD_PRELOAD present'; + is $ENV{RTPE_PRELOAD_TEST_ACTIVE}, '1', 'preload library is active'; + SKIP: { + skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH}; + ok -x $ENV{RTPE_BIN}, 'RTPE_BIN points to executable'; + } + + $rtpe_stdout = File::Temp::tempfile() or die; + $rtpe_stderr = File::Temp::tempfile() or die; + SKIP: { + skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH}; + $rtpe_pid = open3(undef, '>&'.fileno($rtpe_stdout), '>&'.fileno($rtpe_stderr), + $ENV{RTPE_BIN}, @cmdline); + ok $rtpe_pid, 'daemon launched in background'; + } + + # keep trying to connect to the control socket while daemon is starting up + for (1 .. 300) { + $c = NGCP::Rtpengine->new($ENV{RTPENGINE_HOST} // '127.0.0.1', $ENV{RTPENGINE_PORT} // 2223); + last if $c->{socket}; + Time::HiRes::usleep(100000); # 100 ms x 300 = 30 sec + } + + 1; + $c->{socket} or die; + + $tag_iter = 0; + + my $r = $c->req({command => 'ping'}); + ok $r->{result} eq 'pong', 'ping works, daemon operational'; + + return 1; +} + +sub new_call { + my @ports = @_; + for my $s (@sockets) { + $s->close(); + } + @sockets = (); + $cid = $tag_iter++ . "-test-callID"; + $ft = $tag_iter++ . "-test-fromtag"; + $tt = $tag_iter++ . "-test-totag"; + print("new call $cid\n"); + for my $p (@ports) { + my ($addr, $port) = @{$p}; + my $s = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp', + LocalHost => $addr, LocalPort => $port) + or die; + push(@sockets, $s); + } + return @sockets; +} +sub crlf { + my ($s) = @_; + $s =~ s/\r\n/\n/gs; + return $s; +} +sub sdp_split { + my ($s) = @_; + return split(/--------*\n/, $s); +} +sub rtpe_req { + my ($cmd, $name, $req) = @_; + $req->{command} = $cmd; + $req->{'call-id'} = $cid; + my $resp = $c->req($req); + is $resp->{result}, 'ok', "$name - '$cmd' status"; + return $resp; +} +sub offer_answer { + my ($cmd, $name, $req, $sdps) = @_; + my ($sdp_in, $exp_sdp_out) = sdp_split($sdps); + $req->{'from-tag'} = $ft; + $req->{sdp} = $sdp_in; + my $resp = rtpe_req($cmd, $name, $req); + my $regexp = "^\Q$exp_sdp_out\E\$"; + $regexp =~ s/\\\?/./gs; + $regexp =~ s/PORT/(\\d{1,5})/gs; + $regexp =~ s/ICEBASE/([0-9a-zA-Z]{16})/gs; + $regexp =~ s/ICEUFRAG/([0-9a-zA-Z]{8})/gs; + $regexp =~ s/ICEPWD/([0-9a-zA-Z]{26})/gs; + $regexp =~ s/CRYPTO128/([0-9a-zA-Z\/+]{40})/gs; + $regexp =~ s/CRYPTO192/([0-9a-zA-Z\/+]{51})/gs; + $regexp =~ s/CRYPTO256/([0-9a-zA-Z\/+]{62})/gs; + $regexp =~ s/LOOPER/([0-9a-f]{12})/gs; + my $crlf = crlf($resp->{sdp}); + like $crlf, qr/$regexp/s, "$name - output '$cmd' SDP"; + my @matches = $crlf =~ qr/$regexp/s; + return @matches; +} +sub offer { + return offer_answer('offer', @_); +} +sub answer { + my ($name, $req, $sdps) = @_; + $req->{'to-tag'} = $tt; + return offer_answer('answer', $name, $req, $sdps); +} +sub snd { + my ($sock, $dest, $packet) = @_; + $sock->send($packet, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die; +} +sub srtp_snd { + my ($sock, $dest, $packet, $srtp_ctx) = @_; + if (!$srtp_ctx->{skey}) { + my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs}); + @$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt); + } + my ($enc, $out_roc) = NGCP::Rtpclient::SRTP::encrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)}, + '', 0, 0, 0, $packet); + $srtp_ctx->{roc} = $out_roc; + $sock->send($enc, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die; +} +sub rtp { + my ($pt, $seq, $ts, $ssrc, $payload) = @_; + print("rtp in $pt $seq $ts $ssrc\n"); + return pack('CCnNN a*', 0x80, $pt, $seq, $ts, $ssrc, $payload); +} +sub rcv { + my ($sock, $port, $match, $cb, $cb_arg) = @_; + my $p = ''; + alarm(1); + my $addr = $sock->recv($p, 65535, 0) or die; + alarm(0); + my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload) = unpack('CCnNN a*', $p); + if ($payload) { + print("rtp recv $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n"); + } + if ($cb) { + $p = $cb->($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $p, $cb_arg); + } + like $p, $match, 'received packet matches'; + my @matches = $p =~ $match; + for my $m (@matches) { + if (length($m) == 2) { + ($m) = unpack('n', $m); + } + elsif (length($m) == 4) { + ($m) = unpack('N', $m); + } + } + return @matches; +} +sub srtp_rcv { + my ($sock, $port, $match, $srtp_ctx) = @_; + return rcv($sock, $port, $match, \&srtp_dec, $srtp_ctx); +} +sub srtp_dec { + my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $pack, $srtp_ctx) = @_; + if (!$srtp_ctx->{skey}) { + my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs}); + @$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt); + } + my ($dec, $out_roc, $tag, $hmac) = NGCP::Rtpclient::SRTP::decrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)}, $pack); + $srtp_ctx->{roc} = $out_roc; + is $tag, substr($hmac, 0, length($tag)), 'SRTP auth tag matches'; + return $dec; +} +sub escape { + return "\Q$_[0]\E"; +} +sub rtpm { + my ($pt, $seq, $ts, $ssrc, $payload) = @_; + print("rtp matcher $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n"); + my $re = ''; + $re .= escape(pack('C', 0x80)); + $re .= escape(pack('C', $pt)); + $re .= $seq >= 0 ? escape(pack('n', $seq)) : '(..)'; + $re .= $ts >= 0 ? escape(pack('N', $ts)) : '(....)'; + $re .= $ssrc >= 0 ? escape(pack('N', $ssrc)) : '(....)'; + $re .= escape($payload); + return qr/^$re$/s; +} + +sub ft { return $ft; } +sub tt { return $tt; } + +sub reverse_tags { + ($tt, $ft) = ($ft, $tt); +} +sub new_tt { + $tt = $tag_iter++ . "-test-totag"; +} + + + +END { + if ($rtpe_pid) { + kill('INT', $rtpe_pid) or die; + # wait for daemon to terminate + my $status = -1; + for (1 .. 50) { + $status = waitpid($rtpe_pid, WNOHANG); + last if $status != 0; + Time::HiRes::usleep(100000); # 100 ms x 50 = 5 sec + } + kill('KILL', $rtpe_pid) if $status == 0; + $status == $rtpe_pid or die; + $? == 0 or die; + } +} + + + +1; diff --git a/t/auto-daemon-tests.pl b/t/auto-daemon-tests.pl index 431f4c16a..1ee877168 100755 --- a/t/auto-daemon-tests.pl +++ b/t/auto-daemon-tests.pl @@ -4,191 +4,13 @@ use strict; use warnings; use NGCP::Rtpengine::Test; use NGCP::Rtpclient::SRTP; +use NGCP::Rtpengine::AutoTest; use Test::More; -use File::Temp; -use IPC::Open3; -use Time::HiRes; -use POSIX ":sys_wait_h"; -use IO::Socket; - -like $ENV{LD_PRELOAD}, qr/tests-preload/, 'LD_PRELOAD present'; -is $ENV{RTPE_PRELOAD_TEST_ACTIVE}, '1', 'preload library is active'; -SKIP: { - skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH}; - ok -x $ENV{RTPE_BIN}, 'RTPE_BIN points to executable'; -} - -my $rtpe_stdout = File::Temp::tempfile() or die; -my $rtpe_stderr = File::Temp::tempfile() or die; -my $rtpe_pid; -SKIP: { - skip 'daemon is running externally', 1 if $ENV{RTPE_TEST_NO_LAUNCH}; - $rtpe_pid = open3(undef, '>&'.fileno($rtpe_stdout), '>&'.fileno($rtpe_stderr), - $ENV{RTPE_BIN}, qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 - -n 2223 -c 12345 -f -L 7 -E -u 2222)); - ok $rtpe_pid, 'daemon launched in background'; -} - -# keep trying to connect to the control socket while daemon is starting up -my $c; -for (1 .. 300) { - $c = NGCP::Rtpengine->new($ENV{RTPENGINE_HOST} // '127.0.0.1', $ENV{RTPENGINE_PORT} // 2223); - last if $c->{socket}; - Time::HiRes::usleep(100000); # 100 ms x 300 = 30 sec -} - -1; -$c->{socket} or die; - -my ($cid, $ft, $tt, @sockets); -my ($tag_iter) = (0); - -sub new_call { - my @ports = @_; - for my $s (@sockets) { - $s->close(); - } - @sockets = (); - $cid = $tag_iter++ . "-test-callID"; - $ft = $tag_iter++ . "-test-fromtag"; - $tt = $tag_iter++ . "-test-totag"; - print("new call $cid\n"); - for my $p (@ports) { - my ($addr, $port) = @{$p}; - my $s = IO::Socket::IP->new(Type => &SOCK_DGRAM, Proto => 'udp', - LocalHost => $addr, LocalPort => $port) - or die; - push(@sockets, $s); - } - return @sockets; -} -sub crlf { - my ($s) = @_; - $s =~ s/\r\n/\n/gs; - return $s; -} -sub sdp_split { - my ($s) = @_; - return split(/--------*\n/, $s); -} -sub rtpe_req { - my ($cmd, $name, $req) = @_; - $req->{command} = $cmd; - $req->{'call-id'} = $cid; - my $resp = $c->req($req); - is $resp->{result}, 'ok', "$name - '$cmd' status"; - return $resp; -} -sub offer_answer { - my ($cmd, $name, $req, $sdps) = @_; - my ($sdp_in, $exp_sdp_out) = sdp_split($sdps); - $req->{'from-tag'} = $ft; - $req->{sdp} = $sdp_in; - my $resp = rtpe_req($cmd, $name, $req); - my $regexp = "^\Q$exp_sdp_out\E\$"; - $regexp =~ s/\\\?/./gs; - $regexp =~ s/PORT/(\\d{1,5})/gs; - $regexp =~ s/ICEBASE/([0-9a-zA-Z]{16})/gs; - $regexp =~ s/ICEUFRAG/([0-9a-zA-Z]{8})/gs; - $regexp =~ s/ICEPWD/([0-9a-zA-Z]{26})/gs; - $regexp =~ s/CRYPTO128/([0-9a-zA-Z\/+]{40})/gs; - $regexp =~ s/CRYPTO192/([0-9a-zA-Z\/+]{51})/gs; - $regexp =~ s/CRYPTO256/([0-9a-zA-Z\/+]{62})/gs; - $regexp =~ s/LOOPER/([0-9a-f]{12})/gs; - my $crlf = crlf($resp->{sdp}); - like $crlf, qr/$regexp/s, "$name - output '$cmd' SDP"; - my @matches = $crlf =~ qr/$regexp/s; - return @matches; -} -sub offer { - return offer_answer('offer', @_); -} -sub answer { - my ($name, $req, $sdps) = @_; - $req->{'to-tag'} = $tt; - return offer_answer('answer', $name, $req, $sdps); -} -sub snd { - my ($sock, $dest, $packet) = @_; - $sock->send($packet, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die; -} -sub srtp_snd { - my ($sock, $dest, $packet, $srtp_ctx) = @_; - if (!$srtp_ctx->{skey}) { - my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs}); - @$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt); - } - my ($enc, $out_roc) = NGCP::Rtpclient::SRTP::encrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)}, - '', 0, 0, 0, $packet); - $srtp_ctx->{roc} = $out_roc; - $sock->send($enc, 0, pack_sockaddr_in($dest, inet_aton('203.0.113.1'))) or die; -} -sub rtp { - my ($pt, $seq, $ts, $ssrc, $payload) = @_; - print("rtp in $pt $seq $ts $ssrc\n"); - return pack('CCnNN a*', 0x80, $pt, $seq, $ts, $ssrc, $payload); -} -sub rcv { - my ($sock, $port, $match, $cb, $cb_arg) = @_; - my $p = ''; - alarm(1); - my $addr = $sock->recv($p, 65535, 0) or die; - alarm(0); - my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload) = unpack('CCnNN a*', $p); - if ($payload) { - print("rtp recv $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n"); - } - if ($cb) { - $p = $cb->($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $p, $cb_arg); - } - like $p, $match, 'received packet matches'; - my @matches = $p =~ $match; - for my $m (@matches) { - if (length($m) == 2) { - ($m) = unpack('n', $m); - } - elsif (length($m) == 4) { - ($m) = unpack('N', $m); - } - } - return @matches; -} -sub srtp_rcv { - my ($sock, $port, $match, $srtp_ctx) = @_; - return rcv($sock, $port, $match, \&srtp_dec, $srtp_ctx); -} -sub srtp_dec { - my ($hdr_mark, $pt, $seq, $ts, $ssrc, $payload, $pack, $srtp_ctx) = @_; - if (!$srtp_ctx->{skey}) { - my ($key, $salt) = NGCP::Rtpclient::SRTP::decode_inline_base64($srtp_ctx->{key}, $srtp_ctx->{cs}); - @$srtp_ctx{qw(skey sauth ssalt)} = NGCP::Rtpclient::SRTP::gen_rtp_session_keys($key, $salt); - } - my ($dec, $out_roc, $tag, $hmac) = NGCP::Rtpclient::SRTP::decrypt_rtp(@$srtp_ctx{qw(cs skey ssalt sauth roc)}, $pack); - $srtp_ctx->{roc} = $out_roc; - is $tag, substr($hmac, 0, length($tag)), 'SRTP auth tag matches'; - return $dec; -} -sub escape { - return "\Q$_[0]\E"; -} -sub rtpm { - my ($pt, $seq, $ts, $ssrc, $payload) = @_; - print("rtp matcher $pt $seq $ts $ssrc " . unpack('H*', $payload) . "\n"); - my $re = ''; - $re .= escape(pack('C', 0x80)); - $re .= escape(pack('C', $pt)); - $re .= $seq >= 0 ? escape(pack('n', $seq)) : '(..)'; - $re .= $ts >= 0 ? escape(pack('N', $ts)) : '(....)'; - $re .= $ssrc >= 0 ? escape(pack('N', $ssrc)) : '(....)'; - $re .= escape($payload); - return qr/^$re$/s; -} -{ - my $r = $c->req({command => 'ping'}); - ok $r->{result} eq 'pong', 'ping works, daemon operational'; -} +autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 + -n 2223 -c 12345 -f -L 7 -E -u 2222)) + or die; my ($sock_a, $sock_b, $port_a, $port_b, $ssrc, $resp, $srtp_ctx_a, $srtp_ctx_b, @ret1, @ret2); @@ -771,7 +593,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => '0', volume => 10, duration => 100 }); + { 'from-tag' => ft(), code => '0', volume => 10, duration => 100 }); snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(96 | 0x80, 1002, 3320, $ssrc, "\x00\x0a\x00\xa0")); @@ -798,7 +620,7 @@ snd($sock_b, $port_a, rtp(0, 4001, 8160, 0x6543, "\x00" x 160)); rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards A', - { 'from-tag' => $tt, code => '*', volume => 10, duration => 100 }); + { 'from-tag' => tt(), code => '*', volume => 10, duration => 100 }); snd($sock_b, $port_a, rtp(0, 4002, 8320, 0x6543, "\x00" x 160)); rcv($sock_a, $port_b, rtpm(96 | 0x80, 4002, 8320, $ssrc, "\x0a\x0a\x00\xa0")); @@ -879,7 +701,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(8, 1001, 3160, $ssrc, "\x2a" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => '0', volume => 10, duration => 100 }); + { 'from-tag' => ft(), code => '0', volume => 10, duration => 100 }); snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(96 | 0x80, 1002, 3320, $ssrc, "\x00\x0a\x00\xa0")); @@ -906,7 +728,7 @@ snd($sock_b, $port_a, rtp(8, 4001, 8160, 0x6543, "\x2a" x 160)); rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards A', - { 'from-tag' => $tt, code => '#', volume => -10, duration => 100 }); + { 'from-tag' => tt(), code => '#', volume => -10, duration => 100 }); snd($sock_b, $port_a, rtp(8, 4002, 8320, 0x6543, "\x2a" x 160)); rcv($sock_a, $port_b, rtpm(96 | 0x80, 4002, 8320, $ssrc, "\x0b\x0a\x00\xa0")); @@ -981,7 +803,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => 'C', volume => 5, duration => 120, pause => 110 }); + { 'from-tag' => ft(), code => 'C', volume => 5, duration => 120, pause => 110 }); snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(0, 1002, 3320, $ssrc, "\xff\x93\x94\xbc\x2e\x56\xbf\x2b\x13\x1b\xa7\x8e\x98\x47\x25\x41\xe2\x24\x16\x2b\x99\x8e\x9f\x28\x1e\x3d\x5b\x23\x1c\xdf\x92\x8f\xb6\x1c\x1c\x40\x5d\x26\x25\xaa\x8f\x95\x3b\x15\x1d\x5e\xde\x2c\x38\x9d\x8f\x9e\x1f\x11\x20\xc0\xc1\x37\xdd\x99\x92\xb7\x15\x10\x2c\xac\xb5\x49\xb8\x97\x99\x37\x0f\x13\x58\xa0\xae\x67\xae\x99\xa4\x1f\x0d\x1a\xae\x9b\xad\x7b\xad\x9d\xbf\x16\x0e\x27\x9d\x98\xb0\x55\xb1\xa6\x3a\x11\x11\x63\x95\x98\xbf\x3e\xbb\xb4\x26\x10\x1a\xa9\x90\x9a\x4e\x30\xce\xd4\x1e\x12\x29\x99\x8e\xa1\x2d\x29\x6d\x4b\x1c\x18\xef\x91\x8f\xb6\x1f\x24\x57\x3e\x1d\x20\xa9\x8e\x95\x3e\x19\x23\x67\x3e\x21\x31\x9c\x8e\x9e\x22\x14\x26\xcd\x4a")); @@ -1017,7 +839,7 @@ snd($sock_b, $port_a, rtp(0, 4001, 8160, 0x6543, "\x00" x 160)); rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards A', - { 'from-tag' => $tt, code => '4', volume => 3, duration => 150, pause => 100 }); + { 'from-tag' => tt(), code => '4', volume => 3, duration => 150, pause => 100 }); snd($sock_b, $port_a, rtp(0, 4002, 8320, 0x6543, "\x00" x 160)); rcv($sock_a, $port_b, rtpm(0, 4002, 8320, $ssrc, "\xff\x90\x8a\x93\xd9\x1b\x18\x27\x65\xe5\x33\x29\x4c\x9e\x8f\x91\xb8\x15\x09\x0d\x32\x98\x8e\x96\xbb\x2c\x2b\x4c\xd8\x34\x1c\x18\x2e\x9d\x8c\x8c\xa5\x1a\x0b\x0d\x27\xa3\x97\x9e\xbd\x4f\xc4\xaa\xb2\x2c\x12\x0e\x1e\xa1\x8b\x8a\x9c\x25\x0e\x10\x25\xb7\xa7\xb7\x5e\xcb\xa2\x98\x9f\x30\x0f\x0a\x16\xae\x8d\x8a\x98\x3a\x18\x19\x2c\xdd\xfd\x30\x2b\xce\x99\x8e\x95\x4c\x0f\x09\x10\xdf\x93\x8e\x9a\xec\x28\x2c\x56\xee\x2d\x1a\x1a\x48\x97\x8b\x8e\xba\x14\x0a\x0f\x39\x9d\x96\xa1\xcd\x4e\xbe\xab\xbe\x23\x10\x10\x2b\x99\x8a\x8c\xa7\x1b\x0d\x12\x2f\xad\xa7\xbc\x5e\xbd\x9f\x99\xa8\x23\x0d\x0b\x1d\x9f\x8b\x8c\x9f\x29\x16\x1b\x34\xcd\x60\x2f\x2f\xb6\x96")); @@ -1104,7 +926,7 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(8, 1001, 3160, $ssrc, "\x2a" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => 'C', volume => 5, duration => 120 }); + { 'from-tag' => ft(), code => 'C', volume => 5, duration => 120 }); snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(8, 1002, 3320, $ssrc, "\xd5\xb9\xbe\x97\x05\x70\xea\x01\x3e\x31\x82\xa5\xb2\x63\x0f\x69\xc1\x0f\x3d\x06\xb3\xa4\x8a\x03\x35\x14\x75\x0e\x36\xcc\xb8\xa5\x9d\x36\x36\x68\x49\x0d\x0c\x81\xa5\xbf\x16\x3f\x37\x4f\xcf\x07\x13\xb4\xa5\xb4\x0a\x3b\x0b\xeb\xe9\x12\xc9\xb3\xb8\x92\x3c\x3a\x07\x87\x9c\x61\x93\xb2\xb3\x12\x25\x39\x76\x8b\x85\x5a\x85\xb3\x8e\x35\x24\x30\x85\xb1\x87\x57\x84\xb7\xeb\x3c\x24\x0d\xb4\xb2\x9b\x70\x98\x8c\x11\x3b\x38\x41\xbf\xb2\xeb\x15\x96\x9f\x0d\x3a\x30\x83\xba\xb1\x7b\x1b\xfa\xf2\x34\x39\x03\xb0\xa5\x88\x04\x03\x5f\x67\x37\x32\xdd\xb8\xba\x9d\x35\x0e\x71\x15\x37\x0a\x80\xa4\xbf\x15\x33\x09\x45\x15\x0b\x18\xb6\xa4\xb4\x08\x3f\x0d\xe5\x66")); @@ -1141,7 +963,7 @@ snd($sock_b, $port_a, rtp(8, 4001, 8160, 0x6543, "\x2a" x 160)); rcv($sock_a, $port_b, rtpm(0, 4001, 8160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards A', - { 'from-tag' => $tt, code => '4', volume => 3, duration => 150 }); + { 'from-tag' => tt(), code => '4', volume => 3, duration => 150 }); snd($sock_b, $port_a, rtp(8, 4002, 8320, 0x6543, "\x2a" x 160)); rcv($sock_a, $port_b, rtpm(0, 4002, 8320, $ssrc, "\xff\x90\x8a\x93\xd9\x1b\x18\x27\x65\xe5\x33\x29\x4c\x9e\x8f\x91\xb8\x15\x09\x0d\x32\x98\x8e\x96\xbb\x2c\x2b\x4c\xd8\x34\x1c\x18\x2e\x9d\x8c\x8c\xa5\x1a\x0b\x0d\x27\xa3\x97\x9e\xbd\x4f\xc4\xaa\xb2\x2c\x12\x0e\x1e\xa1\x8b\x8a\x9c\x25\x0e\x10\x25\xb7\xa7\xb7\x5e\xcb\xa2\x98\x9f\x30\x0f\x0a\x16\xae\x8d\x8a\x98\x3a\x18\x19\x2c\xdd\xfd\x30\x2b\xce\x99\x8e\x95\x4c\x0f\x09\x10\xdf\x93\x8e\x9a\xec\x28\x2c\x56\xee\x2d\x1a\x1a\x48\x97\x8b\x8e\xba\x14\x0a\x0f\x39\x9d\x96\xa1\xcd\x4e\xbe\xab\xbe\x23\x10\x10\x2b\x99\x8a\x8c\xa7\x1b\x0d\x12\x2f\xad\xa7\xbc\x5e\xbd\x9f\x99\xa8\x23\x0d\x0b\x1d\x9f\x8b\x8c\x9f\x29\x16\x1b\x34\xcd\x60\x2f\x2f\xb6\x96")); @@ -1228,9 +1050,9 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => 'C', volume => 5, duration => 100 }); + { 'from-tag' => ft(), code => 'C', volume => 5, duration => 100 }); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => '4', volume => 5, duration => 100 }); + { 'from-tag' => ft(), code => '4', volume => 5, duration => 100 }); snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(0, 1002, 3320, $ssrc, "\xff\x93\x94\xbc\x2e\x56\xbf\x2b\x13\x1b\xa7\x8e\x98\x47\x25\x41\xe2\x24\x16\x2b\x99\x8e\x9f\x28\x1e\x3d\x5b\x23\x1c\xdf\x92\x8f\xb6\x1c\x1c\x40\x5d\x26\x25\xaa\x8f\x95\x3b\x15\x1d\x5e\xde\x2c\x38\x9d\x8f\x9e\x1f\x11\x20\xc0\xc1\x37\xdd\x99\x92\xb7\x15\x10\x2c\xac\xb5\x49\xb8\x97\x99\x37\x0f\x13\x58\xa0\xae\x67\xae\x99\xa4\x1f\x0d\x1a\xae\x9b\xad\x7b\xad\x9d\xbf\x16\x0e\x27\x9d\x98\xb0\x55\xb1\xa6\x3a\x11\x11\x63\x95\x98\xbf\x3e\xbb\xb4\x26\x10\x1a\xa9\x90\x9a\x4e\x30\xce\xd4\x1e\x12\x29\x99\x8e\xa1\x2d\x29\x6d\x4b\x1c\x18\xef\x91\x8f\xb6\x1f\x24\x57\x3e\x1d\x20\xa9\x8e\x95\x3e\x19\x23\x67\x3e\x21\x31\x9c\x8e\x9e\x22\x14\x26\xcd\x4a")); @@ -1340,9 +1162,9 @@ snd($sock_a, $port_b, rtp(0, 1001, 3160, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(0, 1001, 3160, $ssrc, "\x00" x 160)); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => '0', volume => 10, duration => 100 }); + { 'from-tag' => ft(), code => '0', volume => 10, duration => 100 }); $resp = rtpe_req('play DTMF', 'inject DTMF towards B', - { 'from-tag' => $ft, code => '1', volume => 6, duration => 100 }); + { 'from-tag' => ft(), code => '1', volume => 6, duration => 100 }); snd($sock_a, $port_b, rtp(0, 1002, 3320, 0x1234, "\x00" x 160)); rcv($sock_b, $port_a, rtpm(96 | 0x80, 1002, 3320, $ssrc, "\x00\x0a\x00\xa0")); @@ -2792,7 +2614,7 @@ a=sendrecv a=rtcp:PORT SDP -$resp = rtpe_req('play media', 'media playback, offer only', { 'from-tag' => $ft, blob => $wav_file }); +$resp = rtpe_req('play media', 'media playback, offer only', { 'from-tag' => ft(), blob => $wav_file }); is $resp->{duration}, 100, 'media duration'; my ($ts, $seq); @@ -2848,7 +2670,7 @@ a=rtcp:PORT SDP -$resp = rtpe_req('play media', 'media playback, side A', { 'from-tag' => $ft, blob => $wav_file }); +$resp = rtpe_req('play media', 'media playback, side A', { 'from-tag' => ft(), blob => $wav_file }); is $resp->{duration}, 100, 'media duration'; ($seq, $ts, $ssrc) = rcv($sock_a, -1, rtpm(8 | 0x80, -1, -1, -1, $pcma_1)); @@ -2903,7 +2725,7 @@ a=rtcp:PORT SDP -$resp = rtpe_req('play media', 'media playback, side B', { 'from-tag' => $tt, blob => $wav_file }); +$resp = rtpe_req('play media', 'media playback, side B', { 'from-tag' => tt(), blob => $wav_file }); is $resp->{duration}, 100, 'media duration'; ($seq, $ts, $ssrc) = rcv($sock_b, -1, rtpm(8 | 0x80, -1, -1, -1, $pcma_1)); @@ -2912,7 +2734,7 @@ rcv($sock_b, -1, rtpm(8, $seq + 2, $ts + 160 * 2, $ssrc, $pcma_3)); rcv($sock_b, -1, rtpm(8, $seq + 3, $ts + 160 * 3, $ssrc, $pcma_4)); rcv($sock_b, -1, rtpm(8, $seq + 4, $ts + 160 * 4, $ssrc, $pcma_5)); -$resp = rtpe_req('play media', 'restart media playback', { 'from-tag' => $tt, blob => $wav_file }); +$resp = rtpe_req('play media', 'restart media playback', { 'from-tag' => tt(), blob => $wav_file }); is $resp->{duration}, 100, 'media duration'; $ts += 160 * 5; @@ -3102,7 +2924,7 @@ a=crypto:1 AES_CM_128_HMAC_SHA1_80 inline:DVM+BTeYX2UI1LaA9bgXrcBEDBxoItA9/39fSo SDP -$resp = rtpe_req('play media', 'media playback, SRTP', { 'from-tag' => $ft, blob => $wav_file }); +$resp = rtpe_req('play media', 'media playback, SRTP', { 'from-tag' => ft(), blob => $wav_file }); is $resp->{duration}, 100, 'media duration'; my $srtp_ctx = { @@ -4046,10 +3868,10 @@ snd($sock_b, $port_a, rtp(0, 4000, 5000, 0x4567, "\x88" x 160)); ($ssrc) = rcv($sock_a, $port_b, rtpm(0, 4000, 5000, -1, "\x88" x 160)); # reverse re-invite -($tt, $ft) = ($ft, $tt); +reverse_tags(); (undef, $port_b) = offer('gh 766 reinvite', - { 'to-tag' => $tt, + { 'to-tag' => tt(), ICE => 'remove', replace => ['origin', 'session-connection'], flags => [ "loop-protect", "asymmetric" ] }, < $ft }); +rtpe_req('delete', 'media playback after delete', { 'from-tag' => ft() }); # new to-tag -$tt = $tag_iter++ . "-test-totag"; +new_tt(); offer('media playback after delete', { ICE => 'remove', replace => ['origin'], 'transport-protocol' => 'transparent', flags => ['strict-source', 'record-call'], @@ -5053,7 +4875,7 @@ SDP #rtpe_req('block media', 'media playback after delete', { }); -$resp = rtpe_req('play media', 'media playback after delete', { 'from-tag' => $tt, 'to-tag' => $tt, +$resp = rtpe_req('play media', 'media playback after delete', { 'from-tag' => tt(), 'to-tag' => tt(), blob => $wav_file }); is $resp->{duration}, 100, 'media duration'; @@ -5066,20 +4888,4 @@ rcv($sock_b, -1, rtpm(8, $seq + 4, $ts + 160 * 4, $ssrc, $pcma_5)); -END { - if ($rtpe_pid) { - kill('INT', $rtpe_pid) or die; - # wait for daemon to terminate - my $status = -1; - for (1 .. 50) { - $status = waitpid($rtpe_pid, WNOHANG); - last if $status != 0; - Time::HiRes::usleep(100000); # 100 ms x 50 = 5 sec - } - kill('KILL', $rtpe_pid) if $status == 0; - $status == $rtpe_pid or die; - $? == 0 or die; - } -} - done_testing(); From 18634c42029b6cd944192b3ea8f0844a1680b1e5 Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Fri, 17 Jan 2020 13:20:51 -0500 Subject: [PATCH 2/7] TT#74301 refactor send_timer into generic timer Change-Id: I81dae7ae8bb1bfe0324f9a8ce256cf9d1c377840 --- daemon/codec.c | 14 ++--- daemon/media_player.c | 129 ++++++++++++++--------------------------- daemon/timerthread.c | 126 ++++++++++++++++++++++++++++++++++++++++ include/codec.h | 4 +- include/media_player.h | 6 +- include/timerthread.h | 28 +++++++++ 6 files changed, 207 insertions(+), 100 deletions(-) diff --git a/daemon/codec.c b/daemon/codec.c index 2e1b00803..aeff30a29 100644 --- a/daemon/codec.c +++ b/daemon/codec.c @@ -969,35 +969,35 @@ static void __output_rtp(struct media_packet *mp, struct codec_ssrc_handler *ch, p->s.len = payload_len + sizeof(struct rtp_header); payload_tracker_add(&ssrc_out->tracker, handler->dest_pt.payload_type); p->free_func = free; - p->source = handler; + p->ttq_entry.source = handler; p->rtp = rh; // this packet is dynamically allocated, so we're able to schedule it. // determine scheduled time to send if (ch->first_send.tv_sec && ch->encoder_format.clockrate) { // scale first_send from first_send_ts to ts - p->to_send = ch->first_send; + p->ttq_entry.when = ch->first_send; uint32_t ts_diff = (uint32_t) ts - (uint32_t) ch->first_send_ts; // allow for wrap-around unsigned long long ts_diff_us = (unsigned long long) ts_diff * 1000000 / ch->encoder_format.clockrate * ch->handler->dest_pt.codec_def->clockrate_mult; - timeval_add_usec(&p->to_send, ts_diff_us); + timeval_add_usec(&p->ttq_entry.when, ts_diff_us); // how far in the future is this? - ts_diff_us = timeval_diff(&p->to_send, &rtpe_now); // negative wrap-around to positive OK + ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); // negative wrap-around to positive OK if (ts_diff_us > 1000000) // more than one second, can't be right ch->first_send.tv_sec = 0; // fix it up below } if (!ch->first_send.tv_sec) { - p->to_send = ch->first_send = rtpe_now; + p->ttq_entry.when = ch->first_send = rtpe_now; ch->first_send_ts = ts; } ilog(LOG_DEBUG, "Scheduling to send RTP packet (seq %u TS %lu) at %lu.%06lu", ntohs(rh->seq_num), ts, - (long unsigned) p->to_send.tv_sec, - (long unsigned) p->to_send.tv_usec); + (long unsigned) p->ttq_entry.when.tv_sec, + (long unsigned) p->ttq_entry.when.tv_usec); g_queue_push_tail(&mp->packets_out, p); diff --git a/daemon/media_player.c b/daemon/media_player.c index 22d0cb000..e21810e7f 100644 --- a/daemon/media_player.c +++ b/daemon/media_player.c @@ -30,27 +30,15 @@ static struct timerthread send_timer_thread; +static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp); +static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp); + + + #ifdef WITH_TRANSCODING // called with call->master lock in W static unsigned int send_timer_flush(struct send_timer *st, void *ptr) { - if (!st) - return 0; - - unsigned int num = 0; - GList *l = st->packets.head; - while (l) { - GList *next = l->next; - struct codec_packet *p = l->data; - if (p->source != ptr) - goto next; - g_queue_delete_link(&st->packets, l); - codec_packet_free(p); - num++; - -next: - l = next; - } - return num; + return timerthread_queue_flush(&st->ttq, ptr); } @@ -145,32 +133,34 @@ static void __send_timer_free(void *p) { ilog(LOG_DEBUG, "freeing send_timer"); - g_queue_clear_full(&st->packets, codec_packet_free); - mutex_destroy(&st->lock); obj_put(st->call); } +static void __send_timer_send_now(struct timerthread_queue *ttq, void *p) { + send_timer_send_nolock((void *) ttq, p); +}; +static void __send_timer_send_later(struct timerthread_queue *ttq, void *p) { + send_timer_send_lock((void *) ttq, p); +}; + // call->master_lock held in W struct send_timer *send_timer_new(struct packet_stream *ps) { ilog(LOG_DEBUG, "creating send_timer"); - struct send_timer *st = obj_alloc0("send_timer", sizeof(*st), __send_timer_free); - st->tt_obj.tt = &send_timer_thread; - mutex_init(&st->lock); + struct send_timer *st = timerthread_queue_new("send_timer", sizeof(*st), + &send_timer_thread, + __send_timer_send_now, + __send_timer_send_later, + __send_timer_free, codec_packet_free); st->call = obj_get(ps->call); st->sink = ps; - g_queue_init(&st->packets); return st; } -// st->stream->out_lock (or call->master_lock/W) must be held already -static int send_timer_send(struct send_timer *st, struct codec_packet *cp) { - if (cp->to_send.tv_sec && timeval_cmp(&cp->to_send, &rtpe_now) > 0) - return -1; // not yet - +static void __send_timer_send_common(struct send_timer *st, struct codec_packet *cp) { if (!st->sink->selected_sfd) goto out; @@ -186,36 +176,36 @@ static int send_timer_send(struct send_timer *st, struct codec_packet *cp) { out: codec_packet_free(cp); - - return 0; } +static void send_timer_send_lock(struct send_timer *st, struct codec_packet *cp) { + struct call *call = st->call; + if (!call) + return; + + log_info_call(call); + rwlock_lock_r(&call->master_lock); + + __send_timer_send_common(st, cp); + + rwlock_unlock_r(&call->master_lock); +} // st->stream->out_lock (or call->master_lock/W) must be held already -void send_timer_push(struct send_timer *st, struct codec_packet *cp) { - // can we send immediately? - if (!send_timer_send(st, cp)) +static void send_timer_send_nolock(struct send_timer *st, struct codec_packet *cp) { + struct call *call = st->call; + if (!call) return; - // queue for sending + log_info_call(call); - struct rtp_header *rh = (void *) cp->s.s; - ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)", - (unsigned long) cp->to_send.tv_sec, - (unsigned int) cp->to_send.tv_usec, - ntohs(rh->seq_num), - ntohl(rh->timestamp)); + __send_timer_send_common(st, cp); +} - mutex_lock(&st->lock); - unsigned int qlen = st->packets.length; - // this hands over ownership of cp, so we must copy the timeval out - struct timeval tv_send = cp->to_send; - g_queue_push_tail(&st->packets, cp); - mutex_unlock(&st->lock); - // first packet in? we're probably not scheduled yet - if (!qlen) - timerthread_obj_schedule_abs(&st->tt_obj, &tv_send); +// st->stream->out_lock (or call->master_lock/W) must be held already +void send_timer_push(struct send_timer *st, struct codec_packet *cp) { + timerthread_queue_push(&st->ttq, &cp->ttq_entry); } @@ -352,7 +342,7 @@ static void media_player_read_packet(struct media_player *mp) { struct codec_packet *p = packet.packets_out.head->data; if (p->rtp) { mp->sync_ts = ntohl(p->rtp->timestamp); - mp->sync_ts_tv = p->to_send; + mp->sync_ts_tv = p->ttq_entry.when; } } @@ -646,46 +636,11 @@ static void media_player_run(void *ptr) { #endif -static void send_timer_run(void *ptr) { - struct send_timer *st = ptr; - struct call *call = st->call; - - log_info_call(call); - - ilog(LOG_DEBUG, "running scheduled send_timer"); - - struct timeval next_send = {0,}; - - rwlock_lock_r(&call->master_lock); - mutex_lock(&st->lock); - - while (st->packets.length) { - struct codec_packet *cp = st->packets.head->data; - // XXX this could be made lock-free - if (!send_timer_send(st, cp)) { - g_queue_pop_head(&st->packets); - continue; - } - // couldn't send the last one. remember time to schedule - next_send = cp->to_send; - break; - } - - mutex_unlock(&st->lock); - rwlock_unlock_r(&call->master_lock); - - if (next_send.tv_sec) - timerthread_obj_schedule_abs(&st->tt_obj, &next_send); - - log_info_clear(); -} - - void media_player_init(void) { #ifdef WITH_TRANSCODING timerthread_init(&media_player_thread, media_player_run); #endif - timerthread_init(&send_timer_thread, send_timer_run); + timerthread_init(&send_timer_thread, timerthread_queue_run); } diff --git a/daemon/timerthread.c b/daemon/timerthread.c index 0b02ceb90..0de77c7ef 100644 --- a/daemon/timerthread.c +++ b/daemon/timerthread.c @@ -88,3 +88,129 @@ void timerthread_obj_deschedule(struct timerthread_obj *tt_obj) { nope: mutex_unlock(&tt->lock); } + +static int timerthread_queue_run_one(struct timerthread_queue *ttq, + struct timerthread_queue_entry *ttqe, + void (*run_func)(struct timerthread_queue *, void *)) { + if (ttqe->when.tv_sec && timeval_cmp(&ttqe->when, &rtpe_now) > 0) + return -1; // not yet + run_func(ttq, ttqe); + return 0; +} + + +void timerthread_queue_run(void *ptr) { + struct timerthread_queue *ttq = ptr; + + ilog(LOG_DEBUG, "running timerthread_queue"); + + struct timeval next_send = {0,}; + + mutex_lock(&ttq->lock); + + while (ttq->entries.length) { + struct timerthread_queue_entry *ttqe = g_queue_pop_head(&ttq->entries); + + mutex_unlock(&ttq->lock); + + int ret = timerthread_queue_run_one(ttq, ttqe, ttq->run_later_func); + + mutex_lock(&ttq->lock); + + if (!ret) + continue; + // couldn't send the last one. remember time to schedule + g_queue_push_head(&ttq->entries, ttqe); + // XXX sort queue? + next_send = ttqe->when; + break; + } + + mutex_unlock(&ttq->lock); + + if (next_send.tv_sec) + timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send); // XXX does this work if already scheduled earlier? +} + +static void __timerthread_queue_free(void *p) { + struct timerthread_queue *ttq = p; + g_queue_clear_full(&ttq->entries, ttq->entry_free_func); + mutex_destroy(&ttq->lock); + if (ttq->free_func) + ttq->free_func(p); +} + +void *timerthread_queue_new(const char *type, size_t size, + struct timerthread *tt, + void (*run_now_func)(struct timerthread_queue *, void *), + void (*run_later_func)(struct timerthread_queue *, void *), + void (*free_func)(void *), + void (*entry_free_func)(void *)) +{ + struct timerthread_queue *ttq = obj_alloc0(type, size, __timerthread_queue_free); + ttq->type = type; + ttq->tt_obj.tt = tt; + assert(tt->func == timerthread_queue_run); + ttq->run_now_func = run_now_func; + ttq->run_later_func = run_later_func; + if (!ttq->run_later_func) + ttq->run_later_func = run_now_func; + ttq->free_func = free_func; + ttq->entry_free_func = entry_free_func; + mutex_init(&ttq->lock); + g_queue_init(&ttq->entries); + return ttq; +} + +void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_queue_entry *ttqe) { + // can we send immediately? + if (!timerthread_queue_run_one(ttq, ttqe, ttq->run_now_func)) + return; + + // queue for sending + + ilog(LOG_DEBUG, "queuing up %s object for processing at %lu.%06u", + ttq->type, + (unsigned long) ttqe->when.tv_sec, + (unsigned int) ttqe->when.tv_usec); + + // XXX recover log line fields +// struct rtp_header *rh = (void *) cp->s.s; +// ilog(LOG_DEBUG, "queuing up packet for delivery at %lu.%06u (RTP seq %u TS %u)", +// (unsigned long) cp->to_send.tv_sec, +// (unsigned int) cp->to_send.tv_usec, +// ntohs(rh->seq_num), +// ntohl(rh->timestamp)); + + mutex_lock(&ttq->lock); + unsigned int qlen = ttq->entries.length; + // this hands over ownership of cp, so we must copy the timeval out + struct timeval tv_send = ttqe->when; + g_queue_push_tail(&ttq->entries, ttqe); + mutex_unlock(&ttq->lock); + + // first packet in? we're probably not scheduled yet + if (!qlen) + timerthread_obj_schedule_abs(&ttq->tt_obj, &tv_send); +} + +unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) { + if (!ttq) + return 0; + + unsigned int num = 0; + GList *l = ttq->entries.head; + while (l) { + GList *next = l->next; + struct timerthread_queue_entry *ttqe = l->data; + if (ttqe->source != ptr) + goto next; + g_queue_delete_link(&ttq->entries, l); + ttq->entry_free_func(ttqe); + num++; + +next: + l = next; + } + return num; +} diff --git a/include/codec.h b/include/codec.h index 17e0bc019..d11aa9599 100644 --- a/include/codec.h +++ b/include/codec.h @@ -8,6 +8,7 @@ #include "codeclib.h" #include "aux.h" #include "rtplib.h" +#include "timerthread.h" struct call_media; @@ -40,11 +41,10 @@ struct codec_handler { }; struct codec_packet { + struct timerthread_queue_entry ttq_entry; str s; struct rtp_header *rtp; - void *source; // opaque void (*free_func)(void *); - struct timeval to_send; }; diff --git a/include/media_player.h b/include/media_player.h index 6ec4e7673..813ab9245 100644 --- a/include/media_player.h +++ b/include/media_player.h @@ -62,11 +62,9 @@ INLINE void media_player_put(struct media_player **mp) { #endif struct send_timer { - struct timerthread_obj tt_obj; - mutex_t lock; + struct timerthread_queue ttq; struct call *call; // main reference that keeps this alive struct packet_stream *sink; - GQueue packets; }; @@ -89,7 +87,7 @@ void send_timer_loop(void *p); INLINE void send_timer_put(struct send_timer **st) { if (!*st) return; - obj_put(&(*st)->tt_obj); + obj_put(&(*st)->ttq.tt_obj); *st = NULL; } diff --git a/include/timerthread.h b/include/timerthread.h index 3f19d9bb6..8fd9a0162 100644 --- a/include/timerthread.h +++ b/include/timerthread.h @@ -22,6 +22,23 @@ struct timerthread_obj { struct timeval last_run; /* ditto */ }; +struct timerthread_queue { + struct timerthread_obj tt_obj; + const char *type; + mutex_t lock; + GQueue entries; + void (*run_now_func)(struct timerthread_queue *, void *); + void (*run_later_func)(struct timerthread_queue *, void *); + void (*free_func)(void *); + void (*entry_free_func)(void *); +}; + +struct timerthread_queue_entry { + struct timeval when; + void *source; // opaque + char __rest[0]; +}; + void timerthread_init(struct timerthread *, void (*)(void *)); void timerthread_run(void *); @@ -29,6 +46,17 @@ void timerthread_run(void *); void timerthread_obj_schedule_abs_nl(struct timerthread_obj *, const struct timeval *); void timerthread_obj_deschedule(struct timerthread_obj *); +// run_now_func = called if newly inserted object can be processed immediately by timerthread_queue_push within its calling context +// run_later_func = called from the separate timer thread +void *timerthread_queue_new(const char *type, size_t size, + struct timerthread *tt, + void (*run_now_func)(struct timerthread_queue *, void *), + void (*run_later_func)(struct timerthread_queue *, void *), // optional + void (*free_func)(void *), + void (*entry_free_func)(void *)); +void timerthread_queue_run(void *ptr); +void timerthread_queue_push(struct timerthread_queue *, struct timerthread_queue_entry *); +unsigned int timerthread_queue_flush(struct timerthread_queue *, void *); INLINE void timerthread_obj_schedule_abs(struct timerthread_obj *tt_obj, const struct timeval *tv) { if (!tt_obj) From d6ad6a6744c39ab7b0e0d6eecb4ee414e0cf976c Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Tue, 28 Jan 2020 10:10:18 -0500 Subject: [PATCH 3/7] TT#74301 convert timerthread_queue entries list to GTree Change-Id: I7a8d54f7f7ffe2b27617c109b6d04a2cc20861e9 --- daemon/timerthread.c | 53 +++++++++++++++++++++++++++---------------- include/timerthread.h | 2 +- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/daemon/timerthread.c b/daemon/timerthread.c index 0de77c7ef..dc39cf0d4 100644 --- a/daemon/timerthread.c +++ b/daemon/timerthread.c @@ -108,8 +108,10 @@ void timerthread_queue_run(void *ptr) { mutex_lock(&ttq->lock); - while (ttq->entries.length) { - struct timerthread_queue_entry *ttqe = g_queue_pop_head(&ttq->entries); + while (g_tree_nnodes(ttq->entries)) { + struct timerthread_queue_entry *ttqe = g_tree_find_first(ttq->entries, NULL, NULL); + assert(ttqe != NULL); + g_tree_remove(ttq->entries, ttqe); mutex_unlock(&ttq->lock); @@ -120,8 +122,7 @@ void timerthread_queue_run(void *ptr) { if (!ret) continue; // couldn't send the last one. remember time to schedule - g_queue_push_head(&ttq->entries, ttqe); - // XXX sort queue? + g_tree_insert(ttq->entries, ttqe, ttqe); next_send = ttqe->when; break; } @@ -129,17 +130,30 @@ void timerthread_queue_run(void *ptr) { mutex_unlock(&ttq->lock); if (next_send.tv_sec) - timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send); // XXX does this work if already scheduled earlier? + timerthread_obj_schedule_abs(&ttq->tt_obj, &next_send); +} + +static int ttqe_free_all(void *k, void *v, void *d) { + struct timerthread_queue *ttq = d; + ttq->entry_free_func(k); + return FALSE; } static void __timerthread_queue_free(void *p) { struct timerthread_queue *ttq = p; - g_queue_clear_full(&ttq->entries, ttq->entry_free_func); + g_tree_foreach(ttq->entries, ttqe_free_all, ttq); + g_tree_destroy(ttq->entries); mutex_destroy(&ttq->lock); if (ttq->free_func) ttq->free_func(p); } +static int ttqe_compare(const void *a, const void *b) { + const struct timerthread_queue_entry *t1 = a; + const struct timerthread_queue_entry *t2 = b; + return timeval_cmp_ptr(&t1->when, &t2->when); +} + void *timerthread_queue_new(const char *type, size_t size, struct timerthread *tt, void (*run_now_func)(struct timerthread_queue *, void *), @@ -158,7 +172,7 @@ void *timerthread_queue_new(const char *type, size_t size, ttq->free_func = free_func; ttq->entry_free_func = entry_free_func; mutex_init(&ttq->lock); - g_queue_init(&ttq->entries); + ttq->entries = g_tree_new(ttqe_compare); return ttq; } @@ -183,34 +197,33 @@ void timerthread_queue_push(struct timerthread_queue *ttq, struct timerthread_qu // ntohl(rh->timestamp)); mutex_lock(&ttq->lock); - unsigned int qlen = ttq->entries.length; // this hands over ownership of cp, so we must copy the timeval out struct timeval tv_send = ttqe->when; - g_queue_push_tail(&ttq->entries, ttqe); + g_tree_insert(ttq->entries, ttqe, ttqe); + struct timerthread_queue_entry *first_ttqe = g_tree_find_first(ttq->entries, NULL, NULL); mutex_unlock(&ttq->lock); // first packet in? we're probably not scheduled yet - if (!qlen) + if (first_ttqe == ttqe) timerthread_obj_schedule_abs(&ttq->tt_obj, &tv_send); } +static int ttqe_ptr_match(const void *ent, const void *ptr) { + const struct timerthread_queue_entry *ttqe = ent; + return ttqe->source == ptr; +} unsigned int timerthread_queue_flush(struct timerthread_queue *ttq, void *ptr) { if (!ttq) return 0; unsigned int num = 0; - GList *l = ttq->entries.head; - while (l) { - GList *next = l->next; - struct timerthread_queue_entry *ttqe = l->data; - if (ttqe->source != ptr) - goto next; - g_queue_delete_link(&ttq->entries, l); + GQueue matches = G_QUEUE_INIT; + g_tree_find_all(&matches, ttq->entries, ttqe_ptr_match, ptr); + + while (matches.length) { + struct timerthread_queue_entry *ttqe = g_queue_pop_head(&matches); ttq->entry_free_func(ttqe); num++; - -next: - l = next; } return num; } diff --git a/include/timerthread.h b/include/timerthread.h index 8fd9a0162..708ccc54c 100644 --- a/include/timerthread.h +++ b/include/timerthread.h @@ -26,7 +26,7 @@ struct timerthread_queue { struct timerthread_obj tt_obj; const char *type; mutex_t lock; - GQueue entries; + GTree *entries; void (*run_now_func)(struct timerthread_queue *, void *); void (*run_later_func)(struct timerthread_queue *, void *); void (*free_func)(void *); From ef0d6a3a807fcb7fa2b4789ae2651dee542944df Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 15 Jan 2020 14:35:29 -0500 Subject: [PATCH 4/7] TT#74301 merge jitter buffer PR #834 refactored closes #834 Change-Id: I174cc6e365af54fb66d2dd78be02c601c5d5d645 --- daemon/Makefile | 2 +- daemon/call.c | 5 + daemon/jitter_buffer.c | 376 ++++++++++++++++++++++++++++++++++++++++ daemon/main.c | 13 ++ daemon/media_socket.c | 21 ++- daemon/rtpengine.pod | 10 ++ include/call.h | 2 + include/jitter_buffer.h | 57 ++++++ include/main.h | 2 + include/media_socket.h | 3 + t/.gitignore | 1 + t/Makefile | 4 +- 12 files changed, 492 insertions(+), 4 deletions(-) create mode 100644 daemon/jitter_buffer.c create mode 100644 include/jitter_buffer.h diff --git a/daemon/Makefile b/daemon/Makefile index 91b7edc7a..7d0b11254 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -127,7 +127,7 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \ crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \ media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \ - codec.c load.c dtmf.c timerthread.c media_player.c + codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c ifeq ($(with_transcoding),yes) LIBSRCS+= codeclib.c resample.c diff --git a/daemon/call.c b/daemon/call.c index 62c4d5fa6..7b180343a 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -45,6 +45,7 @@ #include "graphite.h" #include "codec.h" #include "media_player.h" +#include "jitter_buffer.h" /* also serves as array index for callstream->peers[] */ @@ -905,6 +906,9 @@ struct packet_stream *__packet_stream_new(struct call *call) { recording_init_stream(stream); stream->send_timer = send_timer_new(stream); + if (rtpe_config.jb_length) + stream->jb = jitter_buffer_new(call); + return stream; } @@ -2257,6 +2261,7 @@ no_stats_output: ps = l->data; send_timer_put(&ps->send_timer); + jitter_buffer_free(&ps->jb); __unkernelize(ps); dtls_shutdown(ps); ps->selected_sfd = NULL; diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c new file mode 100644 index 000000000..9482f607e --- /dev/null +++ b/daemon/jitter_buffer.c @@ -0,0 +1,376 @@ +#include "jitter_buffer.h" +#include "timerthread.h" +#include "media_socket.h" +#include "call.h" +#include "codec.h" +#include "main.h" + +#define INITIAL_PACKETS 0x1E +#define CONT_SEQ_COUNT 0x64 +#define CONT_MISS_COUNT 0x0A +#define CONT_INCORRECT_BUFFERING 0x14 + + +static struct timerthread jitter_buffer_thread; + + +void jitter_buffer_init(void) { + timerthread_init(&jitter_buffer_thread, timerthread_queue_run); +} + +// jb is locked +static void reset_jitter_buffer(struct jitter_buffer *jb) { + ilog(LOG_DEBUG, "reset_jitter_buffer"); + + jb->first_send_ts = 0; + jb->first_send.tv_sec = 0; + jb->first_send.tv_usec = 0; + jb->first_seq = 0; + jb->rtptime_delta = 0; + jb->buffer_len = 0; + jb->cont_frames = 0; + jb->cont_miss = 0; + jb->next_exp_seq = 0; + jb->clock_rate = 0; + jb->payload_type = 0; + jb->cont_buff_err = 0; + jb->buf_decremented = 0; + jb->clock_drift_val = 0; + jb->clock_drift_enable = 0; + + jb_packet_free(&jb->p); + + jb->num_resets++; + + //disable jitter buffer in case of more than 2 resets + if(jb->num_resets > 2 && jb->call) + jb->disabled = 1; +} + +static int get_clock_rate(struct media_packet *mp, int payload_type) { + const struct rtp_payload_type *rtp_pt = NULL; + struct jitter_buffer *jb = mp->stream->jb; + int clock_rate = 0; + + if(jb->clock_rate && jb->payload_type == payload_type) + return jb->clock_rate; + + struct codec_handler *transcoder = codec_handler_get(mp->media, payload_type); + if(transcoder) { + if(transcoder->source_pt.payload_type == payload_type) + rtp_pt = &transcoder->source_pt; + if(transcoder->dest_pt.payload_type == payload_type) + rtp_pt = &transcoder->dest_pt; + } + + if(rtp_pt) { + clock_rate = jb->clock_rate = rtp_pt->clock_rate; + jb->payload_type = payload_type; + } + else + ilog(LOG_DEBUG, "clock_rate not present payload_type = %d", payload_type); + + return clock_rate; +} + +static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { + char *buf = malloc(s->len + RTP_BUFFER_HEAD_ROOM + RTP_BUFFER_TAIL_ROOM); + if (!buf) { + ilog(LOG_ERROR, "Failed to allocate memory: %s", strerror(errno)); + return NULL; + } + + struct jb_packet *p = g_slice_alloc0(sizeof(*p)); + + p->buf = buf; + p->mp = *mp; + obj_hold(p->mp.sfd); + + str_init_len(&p->mp.raw, buf + RTP_BUFFER_HEAD_ROOM, s->len); + memcpy(p->mp.raw.s, s->s, s->len); + + if(rtp_payload(&p->mp.rtp, &p->mp.payload, s)) { + jb_packet_free(&p); + return NULL; + } + + return p; +} + +// jb is locked +static void check_buffered_packets(struct jitter_buffer *jb) { + if (g_tree_nnodes(jb->ttq.entries) >= (2* rtpe_config.jb_length)) { + ilog(LOG_DEBUG, "Jitter reset due to buffer overflow"); + reset_jitter_buffer(jb); + } +} + +// jb is locked +static int queue_packet(struct media_packet *mp, struct jb_packet *p) { + struct jitter_buffer *jb = mp->stream->jb; + unsigned long ts = ntohl(mp->rtp->timestamp); + int payload_type = (mp->rtp->m_pt & 0x7f); + int clockrate = get_clock_rate(mp, payload_type); + + if(!clockrate || !jb->first_send.tv_sec) { + ilog(LOG_DEBUG, "Jitter reset due to clockrate"); + reset_jitter_buffer(jb); + return 1; + } + long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; + int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + if(!jb->rtptime_delta && seq_diff) { + jb->rtptime_delta = ts_diff/seq_diff; + } + p->ttq_entry.when = jb->first_send; + long long ts_diff_us = + (long long) (ts_diff + (jb->rtptime_delta * jb->buffer_len))* 1000000 / clockrate; + + ts_diff_us += (jb->clock_drift_val * seq_diff); + + if(jb->buf_decremented) { + ts_diff_us += 5000; //add 5ms delta when 2 packets are scheduled around same time + jb->buf_decremented = 0; + } + timeval_add_usec(&p->ttq_entry.when, ts_diff_us); + + ts_diff_us = timeval_diff(&p->ttq_entry.when, &rtpe_now); + + if (ts_diff_us > 3000000) { // more than three second, can't be right + jb->first_send.tv_sec = 0; + jb->rtptime_delta = 0; + } + + timerthread_queue_push(&jb->ttq, &p->ttq_entry); + + return 0; +} + +static void handle_clock_drift(struct media_packet *mp) { + ilog(LOG_DEBUG, "handle_clock_drift"); + unsigned long ts = ntohl(mp->rtp->timestamp); + struct jitter_buffer *jb = mp->stream->jb; + int payload_type = (mp->rtp->m_pt & 0x7f); + int clockrate = get_clock_rate(mp, payload_type); + if(!clockrate) { + return; + } + long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; + int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + long long ts_diff_us = + (long long) (ts_diff)* 1000000 / clockrate; + struct timeval to_send = jb->first_send; + timeval_add_usec(&to_send, ts_diff_us); + long long time_diff = timeval_diff(&rtpe_now, &to_send); + + jb->clock_drift_val = time_diff/seq_diff; + jb->clock_drift_enable = 0; + jb->cont_buff_err = 0; +} + +int buffer_packet(struct media_packet *mp, const str *s) { + struct jb_packet *p = NULL; + int ret = 1; // must call stream_packet + + mp->stream = mp->sfd->stream; + mp->media = mp->stream->media; + mp->call = mp->sfd->call; + struct call *call = mp->call; + + rwlock_lock_r(&call->master_lock); + + struct jitter_buffer *jb = mp->stream->jb; + if (!jb || jb->disabled) + goto end; + + ilog(LOG_DEBUG, "Handling JB packet on: %s:%d", sockaddr_print_buf(&mp->stream->endpoint.address), + mp->stream->endpoint.port); + + p = get_jb_packet(mp, s); + if (!p) + goto end; + + mp = &p->mp; + + int payload_type = (mp->rtp->m_pt & 0x7f); + + mutex_lock(&jb->lock); + + if(jb->clock_rate && jb->payload_type != payload_type) { //reset in case of payload change + jb->first_send.tv_sec = 0; + jb->rtptime_delta = 0; + } + + if(jb->clock_drift_enable) + handle_clock_drift(mp); + + if (jb->first_send.tv_sec) { + ret = queue_packet(mp,p); + if(!ret && jb->p) { + // push first packet into jitter buffer + queue_packet(&jb->p->mp,jb->p); + jb->p = NULL; + } + } + else { + // store data from first packet and use for successive packets and queue the first packet + unsigned long ts = ntohl(mp->rtp->timestamp); + int payload_type = (mp->rtp->m_pt & 0x7f); + int clockrate = get_clock_rate(mp, payload_type); + if(!clockrate){ + jb->initial_pkts++; + if(jb->initial_pkts > INITIAL_PACKETS) { //Ignore initial Payload Type 126 if any + reset_jitter_buffer(jb); + } + goto end_unlock; + } + + p->ttq_entry.when = jb->first_send = rtpe_now; + jb->first_send_ts = ts; + jb->first_seq = ntohs(mp->rtp->seq_num); + jb->p = p; + ret = 0; + } + + // packet consumed? + if (ret == 0) + p = NULL; + + check_buffered_packets(jb); + +end_unlock: + mutex_unlock(&jb->lock); + +end: + rwlock_unlock_r(&call->master_lock); + if (p) + jb_packet_free(&p); + return ret; +} + +static void increment_buffer(struct jitter_buffer *jb) { + if(jb->buffer_len < rtpe_config.jb_length) + jb->buffer_len++; +} + +static void decrement_buffer(struct jitter_buffer *jb) { + if(jb->buffer_len > 0) { + jb->buffer_len--; + jb->buf_decremented = 1; + } +} + +static int set_jitter_values(struct media_packet *mp) { + int ret=0; + int curr_seq = ntohs(mp->rtp->seq_num); + struct jitter_buffer *jb = mp->stream->jb; + if(jb->next_exp_seq) { + mutex_lock(&jb->lock); + if(curr_seq > jb->next_exp_seq) { + ilog(LOG_DEBUG, "missing seq exp seq =%d, received seq= %d", jb->next_exp_seq, curr_seq); + increment_buffer(jb); + jb->cont_frames = 0; + jb->cont_miss++; + } + else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed + jb->cont_frames = 0; + jb->cont_miss++; + ret=1; + } + else { + jb->cont_frames++; + jb->cont_miss = 0; + if(jb->cont_frames >= CONT_SEQ_COUNT) { + decrement_buffer(jb); + jb->cont_frames = 0; + ilog(LOG_DEBUG, "Received continous frames Buffer len=%d", jb->buffer_len); + } + } + + if(jb->cont_miss >= CONT_MISS_COUNT) + reset_jitter_buffer(jb); + mutex_unlock(&jb->lock); + } + if(curr_seq >= jb->next_exp_seq) + jb->next_exp_seq = curr_seq + 1; + + int len = g_tree_nnodes(jb->ttq.entries); + + if(len > jb->buffer_len || len < jb->buffer_len) { + jb->cont_buff_err++; + if((jb->cont_buff_err > CONT_INCORRECT_BUFFERING) && rtpe_config.jb_clock_drift) + jb->clock_drift_enable=1; + } + else + jb->cont_buff_err = 0; + + return ret; +} + +static void __jb_send_later(struct timerthread_queue *ttq, void *p) { + struct jb_packet *cp = p; + set_jitter_values(&cp->mp); + play_buffered(p); +}; +// jb and call are locked +static void __jb_send_now(struct timerthread_queue *ttq, void *p) { + struct jitter_buffer *jb = (void *) ttq; + + mutex_unlock(&jb->lock); + rwlock_unlock_r(&jb->call->master_lock); + + __jb_send_later(ttq, p); + + rwlock_lock_r(&jb->call->master_lock); + mutex_lock(&jb->lock); +}; +static void __jb_free(void *p) { + struct jitter_buffer *jb = p; + jitter_buffer_free(&jb); +} +void __jb_packet_free(void *p) { + struct jb_packet *jbp = p; + jb_packet_free(&jbp); +} + +void jitter_buffer_loop(void *p) { + ilog(LOG_DEBUG, "jitter_buffer_loop"); + timerthread_run(&jitter_buffer_thread); +} + +struct jitter_buffer *jitter_buffer_new(struct call *c) { + ilog(LOG_DEBUG, "creating jitter_buffer"); + + struct jitter_buffer *jb = timerthread_queue_new("jitter_buffer", sizeof(*jb), + &jitter_buffer_thread, + __jb_send_now, + __jb_send_later, + __jb_free, __jb_packet_free); + mutex_init(&jb->lock); + jb->call = obj_get(c); + return jb; +} + +void jitter_buffer_free(struct jitter_buffer **jbp) { + if (!jbp || !*jbp) + return; + + ilog(LOG_DEBUG, "freeing jitter_buffer"); + + mutex_destroy(&(*jbp)->lock); + if ((*jbp)->call) + obj_put((*jbp)->call); + g_slice_free1(sizeof(**jbp), *jbp); + *jbp = NULL; +} + +void jb_packet_free(struct jb_packet **jbp) { + if (!jbp || !*jbp) + return; + + free((*jbp)->buf); + if ((*jbp)->mp.sfd) + obj_put((*jbp)->mp.sfd); + g_slice_free1(sizeof(**jbp), *jbp); + *jbp = NULL; +} diff --git a/daemon/main.c b/daemon/main.c index 8e4083756..ca18d0fe0 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -44,6 +44,7 @@ #include "ssllib.h" #include "media_player.h" #include "dtmf.h" +#include "jitter_buffer.h" @@ -376,6 +377,9 @@ static void options(int *argc, char ***argv) { { "mysql-pass", 0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_pass,"MySQL connection credentials", "PASSWORD" }, { "mysql-query",0, 0, G_OPTION_ARG_STRING, &rtpe_config.mysql_query,"MySQL select query", "STRING" }, { "endpoint-learning",0,0,G_OPTION_ARG_STRING, &endpoint_learning, "RTP endpoint learning algorithm", "delayed|immediate|off|heuristic" }, + { "jitter-buffer",0, 0, G_OPTION_ARG_INT, &rtpe_config.jb_length, "Size of jitter buffer", "INT" }, + { "jb-clock-drift",0,0, G_OPTION_ARG_NONE, &rtpe_config.jb_clock_drift,"Compensate for source clock drift",NULL }, + { NULL, } }; @@ -564,6 +568,9 @@ static void options(int *argc, char ***argv) { die("Invalid --endpoint-learning option ('%s')", endpoint_learning); } rtpe_config.endpoint_learning = el_config; + + if (rtpe_config.jb_length < 0) + die("Invalid negative jitter buffer size"); } void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { @@ -635,6 +642,8 @@ void fill_initial_rtpe_cfg(struct rtpengine_config* ini_rtpe_cfg) { ini_rtpe_cfg->rec_method = g_strdup(rtpe_config.rec_method); ini_rtpe_cfg->rec_format = g_strdup(rtpe_config.rec_format); + ini_rtpe_cfg->jb_length = rtpe_config.jb_length; + ini_rtpe_cfg->jb_clock_drift = rtpe_config.jb_clock_drift; } static void early_init(void) { @@ -670,6 +679,7 @@ static void init_everything(void) { codeclib_init(0); media_player_init(); dtmf_init(); + jitter_buffer_init(); } @@ -847,6 +857,9 @@ int main(int argc, char **argv) { thread_create_detach_prio(media_player_loop, NULL, rtpe_config.scheduling, rtpe_config.priority); #endif thread_create_detach_prio(send_timer_loop, NULL, rtpe_config.scheduling, rtpe_config.priority); + if (rtpe_config.jb_length > 0) + thread_create_detach_prio(jitter_buffer_loop, NULL, rtpe_config.scheduling, + rtpe_config.priority); } diff --git a/daemon/media_socket.c b/daemon/media_socket.c index a29c1993a..b172017eb 100644 --- a/daemon/media_socket.c +++ b/daemon/media_socket.c @@ -26,6 +26,7 @@ #include "main.h" #include "codec.h" #include "media_player.h" +#include "jitter_buffer.h" #ifndef PORT_RANDOM_MIN @@ -1947,7 +1948,15 @@ static void stream_fd_readable(int fd, void *p, uintptr_t u) { ilog(LOG_WARNING, "UDP packet possibly truncated"); str_init_len(&phc.s, buf + RTP_BUFFER_HEAD_ROOM, ret); - ret = stream_packet(&phc); + + if (sfd->stream->jb) { + ret = buffer_packet(&phc.mp, &phc.s); + if (ret == 1) + ret = stream_packet(&phc); + } + else + ret = stream_packet(&phc); + if (G_UNLIKELY(ret < 0)) ilog(LOG_WARNING, "Write error on media socket: %s", strerror(-ret)); else if (phc.update) @@ -2020,3 +2029,13 @@ const struct transport_protocol *transport_protocol(const str *s) { out: return NULL; } + +void play_buffered(struct jb_packet *cp) { + struct packet_handler_ctx phc; + ZERO(phc); + phc.mp = cp->mp; + phc.s = cp->mp.raw; + //phc.buffered_packet = buffered; + stream_packet(&phc); + jb_packet_free(&cp); +} diff --git a/daemon/rtpengine.pod b/daemon/rtpengine.pod index 619dd631f..9831c041e 100644 --- a/daemon/rtpengine.pod +++ b/daemon/rtpengine.pod @@ -671,6 +671,16 @@ seen, that address is used. Otherwise, if a packet with a matching source port (but different address) is seen, that address is used. Otherwise, the source address of any incoming packet seen is used. +=item B<--jitter-buffer=>I + +Size of (incoming) jitter buffer in packets. A value of zero (the default) +disables the jitter buffer. The jitter buffer is currently only implemented for +userspace operation. + +=item B<--jb-clock-drift> + +Enable clock drift compensation for the jitter buffer. + =back =head1 INTERFACES diff --git a/include/call.h b/include/call.h index 022ed8391..3de4534f9 100644 --- a/include/call.h +++ b/include/call.h @@ -193,6 +193,7 @@ struct rtp_payload_type; struct media_player; struct send_timer; struct transport_protocol; +struct jitter_buffer; typedef bencode_buffer_t call_buffer_t; @@ -266,6 +267,7 @@ struct packet_stream { struct ssrc_ctx *ssrc_in, /* LOCK: in_lock */ // XXX eliminate these *ssrc_out; /* LOCK: out_lock */ struct send_timer *send_timer; /* RO */ + struct jitter_buffer *jb; /* RO */ struct stats stats; struct stats kernel_stats; diff --git a/include/jitter_buffer.h b/include/jitter_buffer.h new file mode 100644 index 000000000..83ba9ba68 --- /dev/null +++ b/include/jitter_buffer.h @@ -0,0 +1,57 @@ +#ifndef _JITTER_BUFFER_H_ +#define _JITTER_BUFFER_H_ + +#include "auxlib.h" +#include "socket.h" +#include "timerthread.h" +#include "media_socket.h" +//#include "codec.h" +// +//struct packet_handler_ctx; +struct jb_packet; +struct media_packet; +// +struct jb_packet { + struct timerthread_queue_entry ttq_entry; + char *buf; + struct media_packet mp; + //int buffered; +}; + +struct jitter_buffer { + struct timerthread_queue ttq; + mutex_t lock; + unsigned long first_send_ts; + struct timeval first_send; + unsigned int first_seq; + unsigned int rtptime_delta; + unsigned int next_exp_seq; + unsigned int cont_frames; + unsigned int cont_miss; + unsigned int clock_rate; + unsigned int payload_type; + unsigned int num_resets; + unsigned int initial_pkts; + unsigned int cont_buff_err; + int buffer_len; + int clock_drift_val; + int clock_drift_enable; //flag for buffer overflow underflow + int buf_decremented; + struct jb_packet *p; + struct call *call; + int disabled; +}; + +void jitter_buffer_init(void); + +struct jitter_buffer *jitter_buffer_new(struct call *); +void jitter_buffer_free(struct jitter_buffer **); + +int buffer_packet(struct media_packet *mp, const str *s); +void jb_packet_free(struct jb_packet **jbp); + +//int set_jitter_values(struct media_packet *mp); + +void jitter_buffer_loop(void *p); + +#endif diff --git a/include/main.h b/include/main.h index 1ac8feb41..5ca67d486 100644 --- a/include/main.h +++ b/include/main.h @@ -93,6 +93,8 @@ struct rtpengine_config { char *mysql_query; endpoint_t dtmf_udp_ep; enum endpoint_learning endpoint_learning; + int jb_length; + int jb_clock_drift; }; diff --git a/include/media_socket.h b/include/media_socket.h index 967a96cf9..6e42aee14 100644 --- a/include/media_socket.h +++ b/include/media_socket.h @@ -19,6 +19,7 @@ struct media_packet; struct transport_protocol; struct ssrc_ctx; struct rtpengine_srtp; +struct jb_packet; typedef int rtcp_filter_func(struct media_packet *, GQueue *); typedef int (*rewrite_func)(str *, struct packet_stream *, struct stream_fd *, const endpoint_t *, @@ -174,6 +175,8 @@ const struct streamhandler *determine_handler(const struct transport_protocol *i struct call_media *out_media, int must_recrypt); int media_packet_encrypt(rewrite_func encrypt_func, struct packet_stream *out, struct media_packet *mp); const struct transport_protocol *transport_protocol(const str *s); +//void play_buffered(struct packet_stream *sink, struct codec_packet *cp, int buffered); +void play_buffered(struct jb_packet *cp); /* XXX shouldn't be necessary */ /* diff --git a/t/.gitignore b/t/.gitignore index f686cdefb..d0321297f 100644 --- a/t/.gitignore +++ b/t/.gitignore @@ -53,3 +53,4 @@ test-dtmf-detect *-test dtmf_rx_fillin.h *-test.c +jitter_buffer.c diff --git a/t/Makefile b/t/Makefile index 1885435fe..93d4e2e73 100644 --- a/t/Makefile +++ b/t/Makefile @@ -70,7 +70,7 @@ LIBSRCS+= codeclib.c resample.c socket.c streambuf.c dtmflib.c DAEMONSRCS+= codec.c call.c ice.c kernel.c media_socket.c stun.c bencode.c poller.c \ dtls.c recording.c statistics.c rtcp.c redis.c iptables.c graphite.c \ cookie_cache.c udp_listener.c homer.c load.c cdr.c dtmf.c timerthread.c \ - media_player.c + media_player.c jitter_buffer.c HASHSRCS+= call_interfaces.c control_ng.c sdp.c endif @@ -128,7 +128,7 @@ transcode-test: transcode-test.o $(COMMONOBJS) codeclib.o resample.o codec.o ssr rtcp.o redis.o iptables.o graphite.o call_interfaces.strhash.o sdp.strhash.o rtp.o crypto.o \ control_ng.strhash.o \ streambuf.o cookie_cache.o udp_listener.o homer.o load.o cdr.o dtmf.o timerthread.o \ - media_player.o dtmflib.o + media_player.o jitter_buffer.o dtmflib.o payload-tracker-test: payload-tracker-test.o $(COMMONOBJS) ssrc.o aux.o auxlib.o rtp.o crypto.o codeclib.o \ resample.o dtmflib.o From 06b87041ed0b9af838f9a1b16e96e9b20618d27a Mon Sep 17 00:00:00 2001 From: Richard Fuchs Date: Wed, 29 Jan 2020 10:20:38 -0500 Subject: [PATCH 5/7] TT#74301 add tests for jitter buffer Change-Id: I167cb10e5d390b89aa6f77122567238d642a5736 --- t/Makefile | 2 + t/auto-daemon-tests-jb.pl | 94 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100755 t/auto-daemon-tests-jb.pl diff --git a/t/Makefile b/t/Makefile index 93d4e2e73..8e7737120 100644 --- a/t/Makefile +++ b/t/Makefile @@ -110,6 +110,8 @@ daemon-tests: tests-preload.so mkdir fake-sockets LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-sockets \ perl -I../perl auto-daemon-tests.pl + LD_PRELOAD=../t/tests-preload.so RTPE_BIN=../daemon/rtpengine TEST_SOCKET_PATH=./fake-sockets \ + perl -I../perl auto-daemon-tests-jb.pl test "$$(ls fake-sockets)" = "" rmdir fake-sockets diff --git a/t/auto-daemon-tests-jb.pl b/t/auto-daemon-tests-jb.pl new file mode 100755 index 000000000..dcb385070 --- /dev/null +++ b/t/auto-daemon-tests-jb.pl @@ -0,0 +1,94 @@ +#!/usr/bin/perl + +use strict; +use warnings; +use NGCP::Rtpengine::Test; +use NGCP::Rtpclient::SRTP; +use NGCP::Rtpengine::AutoTest; +use Test::More; + + +autotest_start(qw(--config-file=none -t -1 -i 203.0.113.1 -i 2001:db8:4321::1 + -n 2223 -c 12345 -f -L 7 -E -u 2222 --jitter-buffer=10)) + or die; + + +my ($sock_a, $sock_b, $port_a, $port_b, $ssrc, $resp, $srtp_ctx_a, $srtp_ctx_b, @ret1, @ret2); + + + + +# RTP sequencing tests + +($sock_a, $sock_b) = new_call([qw(198.51.100.1 2010)], [qw(198.51.100.3 2012)]); + +($port_a) = offer('two codecs, no transcoding', { ICE => 'remove', replace => ['origin'] }, < 'remove', replace => ['origin'] }, < Date: Sun, 9 Feb 2020 22:10:34 +0530 Subject: [PATCH 6/7] jb_new --- daemon/call.c | 2 +- daemon/jitter_buffer.c | 59 +++++++++++++++-------------------------- include/jitter_buffer.h | 7 +---- 3 files changed, 23 insertions(+), 45 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 7b180343a..29e58ecd3 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2261,7 +2261,7 @@ no_stats_output: ps = l->data; send_timer_put(&ps->send_timer); - jitter_buffer_free(&ps->jb); + obj_put(&ps->jb->ttq.tt_obj); __unkernelize(ps); dtls_shutdown(ps); ps->selected_sfd = NULL; diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c index 9482f607e..3a6a4ed14 100644 --- a/daemon/jitter_buffer.c +++ b/daemon/jitter_buffer.c @@ -4,23 +4,25 @@ #include "call.h" #include "codec.h" #include "main.h" +#include #define INITIAL_PACKETS 0x1E #define CONT_SEQ_COUNT 0x64 #define CONT_MISS_COUNT 0x0A -#define CONT_INCORRECT_BUFFERING 0x14 +#define CLOCK_DRIFT_MULT 0x14 static struct timerthread jitter_buffer_thread; void jitter_buffer_init(void) { + ilog(LOG_ERROR, "jitter_buffer_init"); timerthread_init(&jitter_buffer_thread, timerthread_queue_run); } // jb is locked static void reset_jitter_buffer(struct jitter_buffer *jb) { - ilog(LOG_DEBUG, "reset_jitter_buffer"); + ilog(LOG_INFO, "reset_jitter_buffer"); jb->first_send_ts = 0; jb->first_send.tv_sec = 0; @@ -33,12 +35,9 @@ static void reset_jitter_buffer(struct jitter_buffer *jb) { jb->next_exp_seq = 0; jb->clock_rate = 0; jb->payload_type = 0; - jb->cont_buff_err = 0; + jb->drift_mult_factor = 0; jb->buf_decremented = 0; jb->clock_drift_val = 0; - jb->clock_drift_enable = 0; - - jb_packet_free(&jb->p); jb->num_resets++; @@ -89,7 +88,7 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { str_init_len(&p->mp.raw, buf + RTP_BUFFER_HEAD_ROOM, s->len); memcpy(p->mp.raw.s, s->s, s->len); - if(rtp_payload(&p->mp.rtp, &p->mp.payload, s)) { + if(rtp_payload(&p->mp.rtp, &p->mp.payload, &p->mp.raw)) { jb_packet_free(&p); return NULL; } @@ -148,15 +147,21 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) { static void handle_clock_drift(struct media_packet *mp) { ilog(LOG_DEBUG, "handle_clock_drift"); - unsigned long ts = ntohl(mp->rtp->timestamp); struct jitter_buffer *jb = mp->stream->jb; + int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + + int mult_factor = pow(2, jb->drift_mult_factor); + + if(seq_diff < (mult_factor * CLOCK_DRIFT_MULT)) + return; + + unsigned long ts = ntohl(mp->rtp->timestamp); int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); if(!clockrate) { return; } long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; - int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; long long ts_diff_us = (long long) (ts_diff)* 1000000 / clockrate; struct timeval to_send = jb->first_send; @@ -164,8 +169,7 @@ static void handle_clock_drift(struct media_packet *mp) { long long time_diff = timeval_diff(&rtpe_now, &to_send); jb->clock_drift_val = time_diff/seq_diff; - jb->clock_drift_enable = 0; - jb->cont_buff_err = 0; + jb->drift_mult_factor++; } int buffer_packet(struct media_packet *mp, const str *s) { @@ -201,16 +205,10 @@ int buffer_packet(struct media_packet *mp, const str *s) { jb->rtptime_delta = 0; } - if(jb->clock_drift_enable) - handle_clock_drift(mp); - if (jb->first_send.tv_sec) { + if(rtpe_config.jb_clock_drift) + handle_clock_drift(mp); ret = queue_packet(mp,p); - if(!ret && jb->p) { - // push first packet into jitter buffer - queue_packet(&jb->p->mp,jb->p); - jb->p = NULL; - } } else { // store data from first packet and use for successive packets and queue the first packet @@ -228,8 +226,6 @@ int buffer_packet(struct media_packet *mp, const str *s) { p->ttq_entry.when = jb->first_send = rtpe_now; jb->first_send_ts = ts; jb->first_seq = ntohs(mp->rtp->seq_num); - jb->p = p; - ret = 0; } // packet consumed? @@ -260,10 +256,12 @@ static void decrement_buffer(struct jitter_buffer *jb) { } } -static int set_jitter_values(struct media_packet *mp) { +static void set_jitter_values(struct media_packet *mp) { int ret=0; - int curr_seq = ntohs(mp->rtp->seq_num); struct jitter_buffer *jb = mp->stream->jb; + if(!jb || !mp->rtp) + return; + int curr_seq = ntohs(mp->rtp->seq_num); if(jb->next_exp_seq) { mutex_lock(&jb->lock); if(curr_seq > jb->next_exp_seq) { @@ -275,7 +273,6 @@ static int set_jitter_values(struct media_packet *mp) { else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed jb->cont_frames = 0; jb->cont_miss++; - ret=1; } else { jb->cont_frames++; @@ -293,18 +290,6 @@ static int set_jitter_values(struct media_packet *mp) { } if(curr_seq >= jb->next_exp_seq) jb->next_exp_seq = curr_seq + 1; - - int len = g_tree_nnodes(jb->ttq.entries); - - if(len > jb->buffer_len || len < jb->buffer_len) { - jb->cont_buff_err++; - if((jb->cont_buff_err > CONT_INCORRECT_BUFFERING) && rtpe_config.jb_clock_drift) - jb->clock_drift_enable=1; - } - else - jb->cont_buff_err = 0; - - return ret; } static void __jb_send_later(struct timerthread_queue *ttq, void *p) { @@ -360,8 +345,6 @@ void jitter_buffer_free(struct jitter_buffer **jbp) { mutex_destroy(&(*jbp)->lock); if ((*jbp)->call) obj_put((*jbp)->call); - g_slice_free1(sizeof(**jbp), *jbp); - *jbp = NULL; } void jb_packet_free(struct jb_packet **jbp) { diff --git a/include/jitter_buffer.h b/include/jitter_buffer.h index 83ba9ba68..6dc6b8c45 100644 --- a/include/jitter_buffer.h +++ b/include/jitter_buffer.h @@ -15,7 +15,6 @@ struct jb_packet { struct timerthread_queue_entry ttq_entry; char *buf; struct media_packet mp; - //int buffered; }; struct jitter_buffer { @@ -32,12 +31,10 @@ struct jitter_buffer { unsigned int payload_type; unsigned int num_resets; unsigned int initial_pkts; - unsigned int cont_buff_err; + unsigned int drift_mult_factor; int buffer_len; int clock_drift_val; - int clock_drift_enable; //flag for buffer overflow underflow int buf_decremented; - struct jb_packet *p; struct call *call; int disabled; }; @@ -50,8 +47,6 @@ void jitter_buffer_free(struct jitter_buffer **); int buffer_packet(struct media_packet *mp, const str *s); void jb_packet_free(struct jb_packet **jbp); -//int set_jitter_values(struct media_packet *mp); - void jitter_buffer_loop(void *p); #endif From e3a5d45433e44bdf5e9aa59e18a42d1f0569fd05 Mon Sep 17 00:00:00 2001 From: Balajee SV Date: Sun, 9 Feb 2020 22:10:34 +0530 Subject: [PATCH 7/7] jb_new --- daemon/call.c | 2 +- daemon/jitter_buffer.c | 59 +++++++++++++++-------------------------- include/jitter_buffer.h | 7 +---- 3 files changed, 23 insertions(+), 45 deletions(-) diff --git a/daemon/call.c b/daemon/call.c index 7b180343a..29e58ecd3 100644 --- a/daemon/call.c +++ b/daemon/call.c @@ -2261,7 +2261,7 @@ no_stats_output: ps = l->data; send_timer_put(&ps->send_timer); - jitter_buffer_free(&ps->jb); + obj_put(&ps->jb->ttq.tt_obj); __unkernelize(ps); dtls_shutdown(ps); ps->selected_sfd = NULL; diff --git a/daemon/jitter_buffer.c b/daemon/jitter_buffer.c index 9482f607e..12823b94b 100644 --- a/daemon/jitter_buffer.c +++ b/daemon/jitter_buffer.c @@ -4,23 +4,25 @@ #include "call.h" #include "codec.h" #include "main.h" +#include #define INITIAL_PACKETS 0x1E #define CONT_SEQ_COUNT 0x64 #define CONT_MISS_COUNT 0x0A -#define CONT_INCORRECT_BUFFERING 0x14 +#define CLOCK_DRIFT_MULT 0x14 static struct timerthread jitter_buffer_thread; void jitter_buffer_init(void) { + ilog(LOG_INFO, "jitter_buffer_init"); timerthread_init(&jitter_buffer_thread, timerthread_queue_run); } // jb is locked static void reset_jitter_buffer(struct jitter_buffer *jb) { - ilog(LOG_DEBUG, "reset_jitter_buffer"); + ilog(LOG_INFO, "reset_jitter_buffer"); jb->first_send_ts = 0; jb->first_send.tv_sec = 0; @@ -33,12 +35,9 @@ static void reset_jitter_buffer(struct jitter_buffer *jb) { jb->next_exp_seq = 0; jb->clock_rate = 0; jb->payload_type = 0; - jb->cont_buff_err = 0; + jb->drift_mult_factor = 0; jb->buf_decremented = 0; jb->clock_drift_val = 0; - jb->clock_drift_enable = 0; - - jb_packet_free(&jb->p); jb->num_resets++; @@ -89,7 +88,7 @@ static struct jb_packet* get_jb_packet(struct media_packet *mp, const str *s) { str_init_len(&p->mp.raw, buf + RTP_BUFFER_HEAD_ROOM, s->len); memcpy(p->mp.raw.s, s->s, s->len); - if(rtp_payload(&p->mp.rtp, &p->mp.payload, s)) { + if(rtp_payload(&p->mp.rtp, &p->mp.payload, &p->mp.raw)) { jb_packet_free(&p); return NULL; } @@ -148,15 +147,21 @@ static int queue_packet(struct media_packet *mp, struct jb_packet *p) { static void handle_clock_drift(struct media_packet *mp) { ilog(LOG_DEBUG, "handle_clock_drift"); - unsigned long ts = ntohl(mp->rtp->timestamp); struct jitter_buffer *jb = mp->stream->jb; + int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; + + int mult_factor = pow(2, jb->drift_mult_factor); + + if(seq_diff < (mult_factor * CLOCK_DRIFT_MULT)) + return; + + unsigned long ts = ntohl(mp->rtp->timestamp); int payload_type = (mp->rtp->m_pt & 0x7f); int clockrate = get_clock_rate(mp, payload_type); if(!clockrate) { return; } long ts_diff = (uint32_t) ts - (uint32_t) jb->first_send_ts; - int seq_diff = ntohs(mp->rtp->seq_num) - jb->first_seq; long long ts_diff_us = (long long) (ts_diff)* 1000000 / clockrate; struct timeval to_send = jb->first_send; @@ -164,8 +169,7 @@ static void handle_clock_drift(struct media_packet *mp) { long long time_diff = timeval_diff(&rtpe_now, &to_send); jb->clock_drift_val = time_diff/seq_diff; - jb->clock_drift_enable = 0; - jb->cont_buff_err = 0; + jb->drift_mult_factor++; } int buffer_packet(struct media_packet *mp, const str *s) { @@ -201,16 +205,10 @@ int buffer_packet(struct media_packet *mp, const str *s) { jb->rtptime_delta = 0; } - if(jb->clock_drift_enable) - handle_clock_drift(mp); - if (jb->first_send.tv_sec) { + if(rtpe_config.jb_clock_drift) + handle_clock_drift(mp); ret = queue_packet(mp,p); - if(!ret && jb->p) { - // push first packet into jitter buffer - queue_packet(&jb->p->mp,jb->p); - jb->p = NULL; - } } else { // store data from first packet and use for successive packets and queue the first packet @@ -228,8 +226,6 @@ int buffer_packet(struct media_packet *mp, const str *s) { p->ttq_entry.when = jb->first_send = rtpe_now; jb->first_send_ts = ts; jb->first_seq = ntohs(mp->rtp->seq_num); - jb->p = p; - ret = 0; } // packet consumed? @@ -260,10 +256,12 @@ static void decrement_buffer(struct jitter_buffer *jb) { } } -static int set_jitter_values(struct media_packet *mp) { +static void set_jitter_values(struct media_packet *mp) { int ret=0; - int curr_seq = ntohs(mp->rtp->seq_num); struct jitter_buffer *jb = mp->stream->jb; + if(!jb || !mp->rtp) + return; + int curr_seq = ntohs(mp->rtp->seq_num); if(jb->next_exp_seq) { mutex_lock(&jb->lock); if(curr_seq > jb->next_exp_seq) { @@ -275,7 +273,6 @@ static int set_jitter_values(struct media_packet *mp) { else if(curr_seq < jb->next_exp_seq) { //Might be duplicate or sequence already crossed jb->cont_frames = 0; jb->cont_miss++; - ret=1; } else { jb->cont_frames++; @@ -293,18 +290,6 @@ static int set_jitter_values(struct media_packet *mp) { } if(curr_seq >= jb->next_exp_seq) jb->next_exp_seq = curr_seq + 1; - - int len = g_tree_nnodes(jb->ttq.entries); - - if(len > jb->buffer_len || len < jb->buffer_len) { - jb->cont_buff_err++; - if((jb->cont_buff_err > CONT_INCORRECT_BUFFERING) && rtpe_config.jb_clock_drift) - jb->clock_drift_enable=1; - } - else - jb->cont_buff_err = 0; - - return ret; } static void __jb_send_later(struct timerthread_queue *ttq, void *p) { @@ -360,8 +345,6 @@ void jitter_buffer_free(struct jitter_buffer **jbp) { mutex_destroy(&(*jbp)->lock); if ((*jbp)->call) obj_put((*jbp)->call); - g_slice_free1(sizeof(**jbp), *jbp); - *jbp = NULL; } void jb_packet_free(struct jb_packet **jbp) { diff --git a/include/jitter_buffer.h b/include/jitter_buffer.h index 83ba9ba68..6dc6b8c45 100644 --- a/include/jitter_buffer.h +++ b/include/jitter_buffer.h @@ -15,7 +15,6 @@ struct jb_packet { struct timerthread_queue_entry ttq_entry; char *buf; struct media_packet mp; - //int buffered; }; struct jitter_buffer { @@ -32,12 +31,10 @@ struct jitter_buffer { unsigned int payload_type; unsigned int num_resets; unsigned int initial_pkts; - unsigned int cont_buff_err; + unsigned int drift_mult_factor; int buffer_len; int clock_drift_val; - int clock_drift_enable; //flag for buffer overflow underflow int buf_decremented; - struct jb_packet *p; struct call *call; int disabled; }; @@ -50,8 +47,6 @@ void jitter_buffer_free(struct jitter_buffer **); int buffer_packet(struct media_packet *mp, const str *s); void jb_packet_free(struct jb_packet **jbp); -//int set_jitter_values(struct media_packet *mp); - void jitter_buffer_loop(void *p); #endif