From e3a5c463b5c479f60f83376c7d3b4bce74914b13 Mon Sep 17 00:00:00 2001 From: Justine Alexandra Roberts Tunney Date: Sun, 28 Dec 2014 06:46:08 -0500 Subject: [PATCH] Cleaner RTP channels. --- rtp/session.go | 61 ++++++++++++++++++++----------------------------- sip/receiver.go | 2 +- 2 files changed, 26 insertions(+), 37 deletions(-) diff --git a/rtp/session.go b/rtp/session.go index fce53e4..c35e97c 100644 --- a/rtp/session.go +++ b/rtp/session.go @@ -3,8 +3,6 @@ package rtp import ( - "errors" - "fmt" "github.com/jart/gosip/dsp" "github.com/jart/gosip/sdp" "log" @@ -28,8 +26,8 @@ type Frame [160]int16 type Session struct { // Channel to which received RTP frames and errors are published. - C chan *Frame - E chan error + C <-chan *Frame + E <-chan error // Underlying UDP socket. Sock *net.UDPConn @@ -41,21 +39,24 @@ type Session struct { // Header is the current header that gets mutated with each transmit. Header Header - ibuf []byte obuf []byte } // Creates a new RTP µLaw 20ptime session listening on host with a random port // selected from the range [16384,32768]. func NewSession(host string) (rs *Session, err error) { - sock, err := listenRTP(host) + conn, err := listenRTP(host) if err != nil { return nil, err } - rs = &Session{ - C: make(chan *Frame, 4), - E: make(chan error, 1), - Sock: sock.(*net.UDPConn), + sock := conn.(*net.UDPConn) + c := make(chan *Frame, 4) + e := make(chan error, 1) + go receiver(sock, c, e) + return &Session{ + C: c, + E: e, + Sock: sock, Header: Header{ PT: sdp.ULAWCodec.PT, Seq: 666, @@ -63,10 +64,7 @@ func NewSession(host string) (rs *Session, err error) { Ssrc: rand.Uint32(), }, obuf: make([]byte, HeaderSize+160), - ibuf: make([]byte, 2048), - } - rs.launchConsumer() - return + }, nil } func (rs *Session) Send(frame *Frame) (err error) { @@ -83,45 +81,36 @@ func (rs *Session) Send(frame *Frame) (err error) { return } -func (rs *Session) launchConsumer() { - go func() { - for { - frame, err := rs.recv() - if err != nil { - rs.E <- err - return - } - rs.C <- frame - } - }() -} - -func (rs *Session) recv() (frame *Frame, err error) { - frame = new(Frame) +func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error) { + buf := make([]byte, 2048) + frame := new(Frame) for { - amt, _, err := rs.Sock.ReadFrom(rs.ibuf) + amt, _, err := sock.ReadFrom(buf) if err != nil { - return nil, err + e <- err + break } // TODO(jart): Verify source address? // TODO(jart): Packet reordering? Drop duplicate packets? // TODO(jart): DTMF? var phdr Header - err = phdr.Read(rs.ibuf) + err = phdr.Read(buf) if err != nil { - return nil, err + // TODO(jart): Best logging strategy? + continue } if phdr.PT != sdp.ULAWCodec.PT { continue } if amt != HeaderSize+160 { - return nil, errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt)) + continue } for n := 0; n < 160; n++ { - frame[n] = int16(dsp.UlawToLinear(int64(rs.ibuf[HeaderSize+n]))) + frame[n] = int16(dsp.UlawToLinear(int64(buf[HeaderSize+n]))) } - return frame, nil + c <- frame } + close(c) } func listenRTP(host string) (sock net.PacketConn, err error) { diff --git a/sip/receiver.go b/sip/receiver.go index 5d766e7..a6a17d1 100644 --- a/sip/receiver.go +++ b/sip/receiver.go @@ -34,7 +34,7 @@ func ReceiveMessages(contact *Addr, sock *net.UDPConn, c chan<- *Msg, e chan<- e c <- msg } close(c) - close(e) + close(e) // Must be unbuffered! } func addReceived(msg *Msg, addr *net.UDPAddr) {