diff --git a/dsp/dsp.go b/dsp/dsp.go index d82bc8c..91fd20b 100755 --- a/dsp/dsp.go +++ b/dsp/dsp.go @@ -9,6 +9,8 @@ func L16MixSat160(dst, src *int16) // Compresses a PCM audio sample into a G.711 μ-Law sample. The BSR instruction // is what makes this code fast. +// +// TODO(jart): How do I make assembly use proper types? func LinearToUlaw(linear int64) (ulaw int64) // Turns a μ-Law byte back into an audio sample. diff --git a/example/echo2/echo2_test.go b/example/echo2/echo2_test.go index 65411bb..f2f2113 100755 --- a/example/echo2/echo2_test.go +++ b/example/echo2/echo2_test.go @@ -3,11 +3,12 @@ package echo2_test import ( + "github.com/jart/gosip/dsp" "github.com/jart/gosip/rtp" "github.com/jart/gosip/sdp" "github.com/jart/gosip/sip" + "github.com/jart/gosip/util" "log" - "math/rand" "net" "testing" "time" @@ -24,48 +25,71 @@ func TestCallToEchoApp(t *testing.T) { } defer tp.Sock.Close() + // Used to notify main thread when subthreads die. + rtpDeath := make(chan bool, 2) + // Create an RTP session. - rtpsock, err := net.ListenPacket("udp", "108.61.60.146:0") + session, err := rtp.NewSession(from.Uri.Host) if err != nil { t.Fatal("rtp listen:", err) } - defer rtpsock.Close() - rtpaddr := rtpsock.LocalAddr().(*net.UDPAddr) - rrtpaddrChan := make(chan *net.UDPAddr) + defer session.Sock.Close() + rtpaddr := session.Sock.LocalAddr().(*net.UDPAddr) + rtppeerChan := make(chan *net.UDPAddr, 1) go func() { - var rrtpaddr *net.UDPAddr - frameout := make([]byte, rtp.HeaderSize+160) - rtpHeader := rtp.Header{ - PT: sdp.ULAWCodec.PT, - Seq: 666, - TS: 0, - Ssrc: rand.Uint32(), - } - for n := 0; n < 160; n++ { - frameout[rtp.HeaderSize+n] = byte(n) - } + var frame rtp.Frame + awgn := dsp.NewAWGN(-25.0) ticker := time.NewTicker(20 * time.Millisecond) defer ticker.Stop() for { select { case <-ticker.C: - rtpHeader.Write(frameout) - rtpHeader.TS += 160 - rtpHeader.Seq++ - if rrtpaddr != nil { - _, err := rtpsock.WriteTo(frameout, rrtpaddr) - if err != nil { - t.Fatal("rtp write", err) + for n := 0; n < 160; n++ { + frame[n] = awgn.Get() + } + err := session.Send(frame) + if err != nil { + if !util.IsUseOfClosed(err) { + t.Error("rtp write", err) } + rtpDeath <- true + return } - case rrtpaddr = <-rrtpaddrChan: - if rrtpaddr == nil { + case session.Peer = <-rtppeerChan: + if session.Peer == nil { return } } } }() - defer func() { rrtpaddrChan <- nil }() + defer func() { rtppeerChan <- nil }() + + // Create an RTP message consumer. + go func() { + var frame rtp.Frame + for { + err := session.Recv(frame) + if err != nil { + if !util.IsUseOfClosed(err) { + t.Errorf("rtp read: %s %#v", err, err) + } + rtpDeath <- true + return + } + } + }() + + // Create a SIP message consumer. + sipChan := make(chan *sip.Msg, 32) + go func() { + for { + msg := tp.Recv() + if msg.Error != nil && util.IsUseOfClosed(msg.Error) { + return + } + sipChan <- msg + } + }() // Send an INVITE message with an SDP. invite := sip.NewRequest(tp, "INVITE", to, from) @@ -75,75 +99,77 @@ func TestCallToEchoApp(t *testing.T) { t.Fatal(err) } - // Consume provisional messages until we receive answer. + // Create a SIP dialog handler. + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + var answered bool var msg *sip.Msg for { - tp.Sock.SetDeadline(time.Now().Add(time.Second)) - msg = tp.Recv() - if msg.Error != nil { - t.Fatal(msg.Error) - } - if msg.Status < sip.StatusOK { - log.Printf("Provisional %d %s", msg.Status, msg.Phrase) - } else if msg.Status == sip.StatusOK { - log.Printf("Answered!") - } - if msg.Status >= sip.StatusOK { - err = tp.Send(sip.NewAck(invite, msg)) + select { + case <-ticker.C: + if answered { + err = tp.Send(sip.NewBye(invite, msg)) + } else { + err = tp.Send(sip.NewCancel(invite)) + } if err != nil { - t.Fatal(err) + t.Error(err) + } + case <-rtpDeath: + if answered { + err = tp.Send(sip.NewBye(invite, msg)) + } else { + err = tp.Send(sip.NewCancel(invite)) } - } - if msg.Headers["Content-Type"] == "application/sdp" { - log.Printf("Establishing media session") - rsdp, err := sdp.Parse(msg.Payload) if err != nil { - t.Fatal("failed to parse sdp", err) + t.Error(err) + } + case msg = <-sipChan: + if msg.Error != nil { + t.Fatal(msg.Error) + } + if msg.IsResponse { + if msg.Status >= sip.StatusOK && msg.CSeq == invite.CSeq { + err = tp.Send(sip.NewAck(invite, msg)) + if err != nil { + t.Fatal(err) + } + } + if msg.Status < sip.StatusOK { + log.Printf("Provisional %d %s", msg.Status, msg.Phrase) + } else if msg.Status == sip.StatusOK { + if msg.CSeqMethod == "INVITE" { + log.Printf("Answered!") + answered = true + } else if msg.CSeqMethod == "BYE" { + log.Printf("Hungup!") + return + } else if msg.CSeqMethod == "CANCEL" { + log.Printf("Cancelled!") + return + } + } else if msg.Status > sip.StatusOK { + t.Errorf("Got %d %s", msg.Status, msg.Phrase) + return + } + if msg.Headers["Content-Type"] == "application/sdp" { + log.Printf("Establishing media session") + rsdp, err := sdp.Parse(msg.Payload) + if err != nil { + t.Fatal("failed to parse sdp", err) + } + rtppeerChan <- &net.UDPAddr{IP: net.ParseIP(rsdp.Addr), Port: int(rsdp.Audio.Port)} + } + } else { + if msg.Method == "BYE" { + log.Printf("Remote Hangup!") + err = tp.Send(sip.NewResponse(invite, sip.StatusOK)) + if err != nil { + t.Fatal(err) + } + return + } } - rrtpaddrChan <- &net.UDPAddr{IP: net.ParseIP(rsdp.Addr), Port: int(rsdp.Audio.Port)} - } - if msg.Status == sip.StatusOK { - break - } else if msg.Status > sip.StatusOK { - t.Fatalf("Got %d %s", msg.Status, msg.Phrase) - } - } - - // We're talking to an echo application so they should send us back exactly - // the same audio. - memory := make([]byte, 2048) - rtpsock.SetDeadline(time.Now().Add(5 * time.Second)) - amt, _, err := rtpsock.ReadFrom(memory) - if err != nil { - t.Fatal("rtp read", err) - } - if amt != rtp.HeaderSize+160 { - t.Fatal("rtp recv amt != 12+160") - } - var rtpHeader rtp.Header - err = rtpHeader.Read(memory) - if err != nil { - t.Fatal(err) - } - for n := 0; n < 160; n++ { - if memory[rtp.HeaderSize+n] != byte(n) { - t.Fatal("rtp response audio didnt match") } } - - // Hangup (we'll be lazy and just change up the ack Msg) - err = tp.Send(sip.NewBye(invite, msg)) - if err != nil { - t.Fatal(err) - } - - // Wait for acknowledgment of hangup. - tp.Sock.SetDeadline(time.Now().Add(time.Second)) - msg = tp.Recv() - if msg.Error != nil { - t.Fatal(msg.Error) - } - if msg.Status != 200 || msg.CSeqMethod != "BYE" { - t.Fatal("Wanted BYE 200:", msg) - } } diff --git a/rtp/rtp.go b/rtp/rtp.go index 803d144..0dffeb8 100755 --- a/rtp/rtp.go +++ b/rtp/rtp.go @@ -35,12 +35,12 @@ var ( // Header is encoded at the beginning of a UDP audio packet. type Header struct { - Pad bool // the padding flag is used for secure rtp - Mark bool // the marker flag is used for rfc2833 - PT uint8 // payload type you got from sdp - Seq uint16 // sequence id useful for reordering packets - TS uint32 // timestamp measured in samples - Ssrc uint32 // random id used to identify an rtp session + Pad bool // Padding flag is used for secure RTP. + Mark bool // Marker flag is used for RFC2833. + PT uint8 // Payload type you got from SDP. + Seq uint16 // Sequence id useful for reordering packets. + TS uint32 // Timestamp measured in samples. + Ssrc uint32 // Random ID used to identify an RTP session. } // EventHeader stores things like DTMF and is encoded after Header. diff --git a/rtp/session.go b/rtp/session.go new file mode 100644 index 0000000..a746e05 --- /dev/null +++ b/rtp/session.go @@ -0,0 +1,121 @@ +// RTP Session Transport Layer. + +package rtp + +import ( + "errors" + "fmt" + "github.com/jart/gosip/dsp" + "github.com/jart/gosip/sdp" + "github.com/jart/gosip/util" + "math/rand" + "net" + "strings" +) + +const ( + rtpBindMaxAttempts = 10 + rtpBindPortMin = 16384 + rtpBindPortMax = 32768 +) + +type Frame [160]int16 + +// Session allows sending and receiving slinear frames for a single SIP media +// session. These frames are encoded as µLaw and transmitted over UDP. No +// support for RTCP is provided. +type Session struct { + + // Underlying UDP socket. + Sock *net.UDPConn + + // Address of remote endpoint. This might change mid-session. If it's nil, + // then egress packets are dropped. + Peer *net.UDPAddr + + // Header is the current header that gets mutated with each transmit. + Header Header + + ibuf []byte + obuf []byte + phdr Header +} + +// Creates a new RTP µLaw 20ptime session listening on host with a random port +// selected from the range [16384,32768]. +func NewSession(host string) (s *Session, err error) { + sock, err := listenRTP(host) + if err != nil { + return nil, err + } + return &Session{ + Sock: sock.(*net.UDPConn), + Header: Header{ + PT: sdp.ULAWCodec.PT, + Seq: 666, + TS: 0, + Ssrc: rand.Uint32(), + }, + }, err +} + +func (s *Session) Send(frame Frame) (err error) { + if s.Peer == nil { + return nil + } + if s.obuf == nil { + s.obuf = make([]byte, HeaderSize+160) + } + s.Header.Write(s.obuf) + s.Header.TS += 160 + s.Header.Seq++ + for n := 0; n < 160; n++ { + s.obuf[HeaderSize+n] = byte(dsp.LinearToUlaw(int64(frame[n]))) + } + _, err = s.Sock.WriteTo(s.obuf, s.Peer) + return +} + +func (s *Session) Recv(frame Frame) (err error) { + if s.ibuf == nil { + s.ibuf = make([]byte, 2048) + } + for { + amt, _, err := s.Sock.ReadFrom(s.ibuf) + if err != nil { + return err + } + // TODO(jart): Verify source address? + // TODO(jart): Packet reordering? Drop duplicate packets? + // TODO(jart): DTMF? + var phdr Header + err = phdr.Read(s.ibuf) + if err != nil { + return err + } + if phdr.PT != sdp.ULAWCodec.PT { + continue + } + if amt != HeaderSize+160 { + return errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt)) + } + for n := 0; n < 160; n++ { + frame[n] = int16(dsp.UlawToLinear(int64(s.ibuf[HeaderSize+n]))) + } + return nil + } +} + +func listenRTP(host string) (sock net.PacketConn, err error) { + for i := 0; i < rtpBindMaxAttempts; i++ { + port := rtpBindPortMin + rand.Int()%(rtpBindPortMax-rtpBindPortMin+1) + saddr := util.HostPortToString(host, uint16(port)) + sock, err = net.ListenPacket("udp", saddr) + if err != nil { + if !strings.Contains(err.Error(), "address already in use") { + break + } + } + } + return +} diff --git a/rtp/session_test.go b/rtp/session_test.go new file mode 100644 index 0000000..9e592c7 --- /dev/null +++ b/rtp/session_test.go @@ -0,0 +1,8 @@ +package rtp_test + +import ( + "testing" +) + +func TestBind(t *testing.T) { +} diff --git a/sip/transport.go b/sip/transport.go index 48b9724..8cd1317 100755 --- a/sip/transport.go +++ b/sip/transport.go @@ -23,7 +23,7 @@ var ( // Transport sends and receives SIP messages over UDP with stateless routing. type Transport struct { - // Thing returned by ListenPacket + // Underlying UDP socket. Sock *net.UDPConn // When you send an outbound request (not a response) you have to set the via diff --git a/util/util.go b/util/util.go index 13d4e6d..787b569 100755 --- a/util/util.go +++ b/util/util.go @@ -16,6 +16,11 @@ func IsTimeout(err error) bool { return false } +// Returns true if error was caused by reading from a closed socket. +func IsUseOfClosed(err error) bool { + return strings.Contains(err.Error(), "use of closed network connection") +} + // Returns true if IP contains a colon. func IsIPv6(ip string) bool { n := strings.Index(ip, ":")