Browse Source

RTP Sessions.

pull/2/head
Justine Alexandra Roberts Tunney 11 years ago
parent
commit
ec87bbe1a9
7 changed files with 258 additions and 96 deletions
  1. +2
    -0
      dsp/dsp.go
  2. +115
    -89
      example/echo2/echo2_test.go
  3. +6
    -6
      rtp/rtp.go
  4. +121
    -0
      rtp/session.go
  5. +8
    -0
      rtp/session_test.go
  6. +1
    -1
      sip/transport.go
  7. +5
    -0
      util/util.go

+ 2
- 0
dsp/dsp.go View File

@ -9,6 +9,8 @@ func L16MixSat160(dst, src *int16)
// Compresses a PCM audio sample into a G.711 μ-Law sample. The BSR instruction
// is what makes this code fast.
//
// TODO(jart): How do I make assembly use proper types?
func LinearToUlaw(linear int64) (ulaw int64)
// Turns a μ-Law byte back into an audio sample.


+ 115
- 89
example/echo2/echo2_test.go View File

@ -3,11 +3,12 @@
package echo2_test
import (
"github.com/jart/gosip/dsp"
"github.com/jart/gosip/rtp"
"github.com/jart/gosip/sdp"
"github.com/jart/gosip/sip"
"github.com/jart/gosip/util"
"log"
"math/rand"
"net"
"testing"
"time"
@ -24,48 +25,71 @@ func TestCallToEchoApp(t *testing.T) {
}
defer tp.Sock.Close()
// Used to notify main thread when subthreads die.
rtpDeath := make(chan bool, 2)
// Create an RTP session.
rtpsock, err := net.ListenPacket("udp", "108.61.60.146:0")
session, err := rtp.NewSession(from.Uri.Host)
if err != nil {
t.Fatal("rtp listen:", err)
}
defer rtpsock.Close()
rtpaddr := rtpsock.LocalAddr().(*net.UDPAddr)
rrtpaddrChan := make(chan *net.UDPAddr)
defer session.Sock.Close()
rtpaddr := session.Sock.LocalAddr().(*net.UDPAddr)
rtppeerChan := make(chan *net.UDPAddr, 1)
go func() {
var rrtpaddr *net.UDPAddr
frameout := make([]byte, rtp.HeaderSize+160)
rtpHeader := rtp.Header{
PT: sdp.ULAWCodec.PT,
Seq: 666,
TS: 0,
Ssrc: rand.Uint32(),
}
for n := 0; n < 160; n++ {
frameout[rtp.HeaderSize+n] = byte(n)
}
var frame rtp.Frame
awgn := dsp.NewAWGN(-25.0)
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
rtpHeader.Write(frameout)
rtpHeader.TS += 160
rtpHeader.Seq++
if rrtpaddr != nil {
_, err := rtpsock.WriteTo(frameout, rrtpaddr)
if err != nil {
t.Fatal("rtp write", err)
for n := 0; n < 160; n++ {
frame[n] = awgn.Get()
}
err := session.Send(frame)
if err != nil {
if !util.IsUseOfClosed(err) {
t.Error("rtp write", err)
}
rtpDeath <- true
return
}
case rrtpaddr = <-rrtpaddrChan:
if rrtpaddr == nil {
case session.Peer = <-rtppeerChan:
if session.Peer == nil {
return
}
}
}
}()
defer func() { rrtpaddrChan <- nil }()
defer func() { rtppeerChan <- nil }()
// Create an RTP message consumer.
go func() {
var frame rtp.Frame
for {
err := session.Recv(frame)
if err != nil {
if !util.IsUseOfClosed(err) {
t.Errorf("rtp read: %s %#v", err, err)
}
rtpDeath <- true
return
}
}
}()
// Create a SIP message consumer.
sipChan := make(chan *sip.Msg, 32)
go func() {
for {
msg := tp.Recv()
if msg.Error != nil && util.IsUseOfClosed(msg.Error) {
return
}
sipChan <- msg
}
}()
// Send an INVITE message with an SDP.
invite := sip.NewRequest(tp, "INVITE", to, from)
@ -75,75 +99,77 @@ func TestCallToEchoApp(t *testing.T) {
t.Fatal(err)
}
// Consume provisional messages until we receive answer.
// Create a SIP dialog handler.
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var answered bool
var msg *sip.Msg
for {
tp.Sock.SetDeadline(time.Now().Add(time.Second))
msg = tp.Recv()
if msg.Error != nil {
t.Fatal(msg.Error)
}
if msg.Status < sip.StatusOK {
log.Printf("Provisional %d %s", msg.Status, msg.Phrase)
} else if msg.Status == sip.StatusOK {
log.Printf("Answered!")
}
if msg.Status >= sip.StatusOK {
err = tp.Send(sip.NewAck(invite, msg))
select {
case <-ticker.C:
if answered {
err = tp.Send(sip.NewBye(invite, msg))
} else {
err = tp.Send(sip.NewCancel(invite))
}
if err != nil {
t.Fatal(err)
t.Error(err)
}
case <-rtpDeath:
if answered {
err = tp.Send(sip.NewBye(invite, msg))
} else {
err = tp.Send(sip.NewCancel(invite))
}
}
if msg.Headers["Content-Type"] == "application/sdp" {
log.Printf("Establishing media session")
rsdp, err := sdp.Parse(msg.Payload)
if err != nil {
t.Fatal("failed to parse sdp", err)
t.Error(err)
}
case msg = <-sipChan:
if msg.Error != nil {
t.Fatal(msg.Error)
}
if msg.IsResponse {
if msg.Status >= sip.StatusOK && msg.CSeq == invite.CSeq {
err = tp.Send(sip.NewAck(invite, msg))
if err != nil {
t.Fatal(err)
}
}
if msg.Status < sip.StatusOK {
log.Printf("Provisional %d %s", msg.Status, msg.Phrase)
} else if msg.Status == sip.StatusOK {
if msg.CSeqMethod == "INVITE" {
log.Printf("Answered!")
answered = true
} else if msg.CSeqMethod == "BYE" {
log.Printf("Hungup!")
return
} else if msg.CSeqMethod == "CANCEL" {
log.Printf("Cancelled!")
return
}
} else if msg.Status > sip.StatusOK {
t.Errorf("Got %d %s", msg.Status, msg.Phrase)
return
}
if msg.Headers["Content-Type"] == "application/sdp" {
log.Printf("Establishing media session")
rsdp, err := sdp.Parse(msg.Payload)
if err != nil {
t.Fatal("failed to parse sdp", err)
}
rtppeerChan <- &net.UDPAddr{IP: net.ParseIP(rsdp.Addr), Port: int(rsdp.Audio.Port)}
}
} else {
if msg.Method == "BYE" {
log.Printf("Remote Hangup!")
err = tp.Send(sip.NewResponse(invite, sip.StatusOK))
if err != nil {
t.Fatal(err)
}
return
}
}
rrtpaddrChan <- &net.UDPAddr{IP: net.ParseIP(rsdp.Addr), Port: int(rsdp.Audio.Port)}
}
if msg.Status == sip.StatusOK {
break
} else if msg.Status > sip.StatusOK {
t.Fatalf("Got %d %s", msg.Status, msg.Phrase)
}
}
// We're talking to an echo application so they should send us back exactly
// the same audio.
memory := make([]byte, 2048)
rtpsock.SetDeadline(time.Now().Add(5 * time.Second))
amt, _, err := rtpsock.ReadFrom(memory)
if err != nil {
t.Fatal("rtp read", err)
}
if amt != rtp.HeaderSize+160 {
t.Fatal("rtp recv amt != 12+160")
}
var rtpHeader rtp.Header
err = rtpHeader.Read(memory)
if err != nil {
t.Fatal(err)
}
for n := 0; n < 160; n++ {
if memory[rtp.HeaderSize+n] != byte(n) {
t.Fatal("rtp response audio didnt match")
}
}
// Hangup (we'll be lazy and just change up the ack Msg)
err = tp.Send(sip.NewBye(invite, msg))
if err != nil {
t.Fatal(err)
}
// Wait for acknowledgment of hangup.
tp.Sock.SetDeadline(time.Now().Add(time.Second))
msg = tp.Recv()
if msg.Error != nil {
t.Fatal(msg.Error)
}
if msg.Status != 200 || msg.CSeqMethod != "BYE" {
t.Fatal("Wanted BYE 200:", msg)
}
}

+ 6
- 6
rtp/rtp.go View File

@ -35,12 +35,12 @@ var (
// Header is encoded at the beginning of a UDP audio packet.
type Header struct {
Pad bool // the padding flag is used for secure rtp
Mark bool // the marker flag is used for rfc2833
PT uint8 // payload type you got from sdp
Seq uint16 // sequence id useful for reordering packets
TS uint32 // timestamp measured in samples
Ssrc uint32 // random id used to identify an rtp session
Pad bool // Padding flag is used for secure RTP.
Mark bool // Marker flag is used for RFC2833.
PT uint8 // Payload type you got from SDP.
Seq uint16 // Sequence id useful for reordering packets.
TS uint32 // Timestamp measured in samples.
Ssrc uint32 // Random ID used to identify an RTP session.
}
// EventHeader stores things like DTMF and is encoded after Header.


+ 121
- 0
rtp/session.go View File

@ -0,0 +1,121 @@
// RTP Session Transport Layer.
package rtp
import (
"errors"
"fmt"
"github.com/jart/gosip/dsp"
"github.com/jart/gosip/sdp"
"github.com/jart/gosip/util"
"math/rand"
"net"
"strings"
)
const (
rtpBindMaxAttempts = 10
rtpBindPortMin = 16384
rtpBindPortMax = 32768
)
type Frame [160]int16
// Session allows sending and receiving slinear frames for a single SIP media
// session. These frames are encoded as µLaw and transmitted over UDP. No
// support for RTCP is provided.
type Session struct {
// Underlying UDP socket.
Sock *net.UDPConn
// Address of remote endpoint. This might change mid-session. If it's nil,
// then egress packets are dropped.
Peer *net.UDPAddr
// Header is the current header that gets mutated with each transmit.
Header Header
ibuf []byte
obuf []byte
phdr Header
}
// Creates a new RTP µLaw 20ptime session listening on host with a random port
// selected from the range [16384,32768].
func NewSession(host string) (s *Session, err error) {
sock, err := listenRTP(host)
if err != nil {
return nil, err
}
return &Session{
Sock: sock.(*net.UDPConn),
Header: Header{
PT: sdp.ULAWCodec.PT,
Seq: 666,
TS: 0,
Ssrc: rand.Uint32(),
},
}, err
}
func (s *Session) Send(frame Frame) (err error) {
if s.Peer == nil {
return nil
}
if s.obuf == nil {
s.obuf = make([]byte, HeaderSize+160)
}
s.Header.Write(s.obuf)
s.Header.TS += 160
s.Header.Seq++
for n := 0; n < 160; n++ {
s.obuf[HeaderSize+n] = byte(dsp.LinearToUlaw(int64(frame[n])))
}
_, err = s.Sock.WriteTo(s.obuf, s.Peer)
return
}
func (s *Session) Recv(frame Frame) (err error) {
if s.ibuf == nil {
s.ibuf = make([]byte, 2048)
}
for {
amt, _, err := s.Sock.ReadFrom(s.ibuf)
if err != nil {
return err
}
// TODO(jart): Verify source address?
// TODO(jart): Packet reordering? Drop duplicate packets?
// TODO(jart): DTMF?
var phdr Header
err = phdr.Read(s.ibuf)
if err != nil {
return err
}
if phdr.PT != sdp.ULAWCodec.PT {
continue
}
if amt != HeaderSize+160 {
return errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt))
}
for n := 0; n < 160; n++ {
frame[n] = int16(dsp.UlawToLinear(int64(s.ibuf[HeaderSize+n])))
}
return nil
}
}
func listenRTP(host string) (sock net.PacketConn, err error) {
for i := 0; i < rtpBindMaxAttempts; i++ {
port := rtpBindPortMin + rand.Int()%(rtpBindPortMax-rtpBindPortMin+1)
saddr := util.HostPortToString(host, uint16(port))
sock, err = net.ListenPacket("udp", saddr)
if err != nil {
if !strings.Contains(err.Error(), "address already in use") {
break
}
}
}
return
}

+ 8
- 0
rtp/session_test.go View File

@ -0,0 +1,8 @@
package rtp_test
import (
"testing"
)
func TestBind(t *testing.T) {
}

+ 1
- 1
sip/transport.go View File

@ -23,7 +23,7 @@ var (
// Transport sends and receives SIP messages over UDP with stateless routing.
type Transport struct {
// Thing returned by ListenPacket
// Underlying UDP socket.
Sock *net.UDPConn
// When you send an outbound request (not a response) you have to set the via


+ 5
- 0
util/util.go View File

@ -16,6 +16,11 @@ func IsTimeout(err error) bool {
return false
}
// Returns true if error was caused by reading from a closed socket.
func IsUseOfClosed(err error) bool {
return strings.Contains(err.Error(), "use of closed network connection")
}
// Returns true if IP contains a colon.
func IsIPv6(ip string) bool {
n := strings.Index(ip, ":")


Loading…
Cancel
Save