diff --git a/example/echo3/echo3_test.go b/example/echo3/echo3_test.go index ab7c1e2..54d3f7b 100755 --- a/example/echo3/echo3_test.go +++ b/example/echo3/echo3_test.go @@ -5,7 +5,6 @@ package echo2_test import ( "github.com/jart/gosip/dsp" "github.com/jart/gosip/rtp" - "github.com/jart/gosip/sdp" "github.com/jart/gosip/sip" "net" "testing" @@ -19,16 +18,15 @@ func TestCallToEchoApp(t *testing.T) { } // Create RTP audio session. - rs, err := rtp.NewSession("127.0.0.1") + rs, err := rtp.NewSession("") if err != nil { t.Fatal(err) } - defer rs.Sock.Close() - rtpaddr := rs.Sock.LocalAddr().(*net.UDPAddr) - sip.AttachSDP(invite, sdp.New(rtpaddr, sdp.ULAWCodec, sdp.DTMFCodec)) + defer rs.Close() + rtpPort := uint16(rs.Sock.LocalAddr().(*net.UDPAddr).Port) // Create a SIP phone call. - dl, err := sip.NewDialog(invite) + dl, err := sip.NewDialog(invite, rtpPort) if err != nil { t.Fatal(err) } @@ -46,11 +44,6 @@ func TestCallToEchoApp(t *testing.T) { var answered bool for { select { - case err := <-rs.E: - t.Error("RTP recv failed:", err) - dl.Hangup <- true - case <-rs.C: - // Do nothing with received audio. case <-ticker.C: for n := 0; n < 160; n++ { frame[n] = awgn.Get() @@ -71,8 +64,13 @@ func TestCallToEchoApp(t *testing.T) { } return } - case ms := <-dl.OnSDP: - rs.Peer = &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)} + case rs.Peer = <-dl.OnPeer: + case frame := <-rs.C: + rs.R <- frame + case err := <-rs.E: + t.Error("RTP recv failed:", err) + rs.CloseAfterError() + dl.Hangup <- true case <-death: dl.Hangup <- true } diff --git a/rtp/session.go b/rtp/session.go index c35e97c..00afaae 100644 --- a/rtp/session.go +++ b/rtp/session.go @@ -27,6 +27,7 @@ type Session struct { // Channel to which received RTP frames and errors are published. C <-chan *Frame + R chan<- *Frame E <-chan error // Underlying UDP socket. @@ -50,12 +51,14 @@ func NewSession(host string) (rs *Session, err error) { return nil, err } sock := conn.(*net.UDPConn) - c := make(chan *Frame, 4) + c := make(chan *Frame) + r := make(chan *Frame) e := make(chan error, 1) - go receiver(sock, c, e) + go receiver(sock, c, e, r) return &Session{ C: c, E: e, + R: r, Sock: sock, Header: Header{ PT: sdp.ULAWCodec.PT, @@ -68,7 +71,7 @@ func NewSession(host string) (rs *Session, err error) { } func (rs *Session) Send(frame *Frame) (err error) { - if rs.Peer == nil { + if rs == nil || rs.Sock == nil || rs.Peer == nil { return nil } rs.Header.Write(rs.obuf) @@ -81,7 +84,29 @@ func (rs *Session) Send(frame *Frame) (err error) { return } -func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error) { +func (rs *Session) Close() { + if rs == nil || rs.Sock == nil { + return + } + rs.Sock.Close() + if frame, ok := <-rs.C; ok { + rs.R <- frame + } + <-rs.E + rs.Sock = nil + close(rs.R) +} + +func (rs *Session) CloseAfterError() { + if rs == nil || rs.Sock == nil { + return + } + rs.Sock.Close() + rs.Sock = nil + close(rs.R) +} + +func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error, r <-chan *Frame) { buf := make([]byte, 2048) frame := new(Frame) for { @@ -109,8 +134,10 @@ func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error) { frame[n] = int16(dsp.UlawToLinear(int64(buf[HeaderSize+n]))) } c <- frame + frame = <-r } close(c) + close(e) } func listenRTP(host string) (sock net.PacketConn, err error) { diff --git a/sip/dialog.go b/sip/dialog.go index 78f28c6..a05614c 100755 --- a/sip/dialog.go +++ b/sip/dialog.go @@ -30,7 +30,7 @@ var ( type Dialog struct { OnErr <-chan error OnState <-chan int - OnSDP <-chan *sdp.SDP + OnPeer <-chan *net.UDPAddr Hangup chan<- bool } @@ -40,14 +40,15 @@ type dialogState struct { csockMsgs <-chan *Msg csockErrs <-chan error errChan chan<- error - sdpChan chan<- *sdp.SDP stateChan chan<- int - doHangupChan <-chan bool + peerChan chan<- *net.UDPAddr + sendHangupChan <-chan bool state int // Current state of the dialog. dest string // Destination hostname (or IP). addr string // Destination ip:port. sock *net.UDPConn // Outbound message socket (connected for ICMP) csock *net.UDPConn // Inbound socket for Contact field. + sdp *sdp.SDP // Media session description. routes *AddressRoute // List of SRV addresses to attempt contacting. invite *Msg // Our INVITE that established the dialog. remote *Msg // Message from remote UA that established dialog. @@ -59,32 +60,42 @@ type dialogState struct { responseTimer <-chan time.Time // Resend timer for message. lseq int // Local CSeq value. rseq int // Remote CSeq value. + b bytes.Buffer // Outbound message buffer. } // NewDialog creates a phone call. -func NewDialog(invite *Msg) (dl *Dialog, err error) { +func NewDialog(invite *Msg, rtpPort uint16) (dl *Dialog, err error) { errChan := make(chan error) - sdpChan := make(chan *sdp.SDP) stateChan := make(chan int) - doHangupChan := make(chan bool, 4) + peerChan := make(chan *net.UDPAddr) + sendHangupChan := make(chan bool, 4) + ms := &sdp.SDP{ + Origin: sdp.Origin{ + ID: util.GenerateOriginID(), + }, + Audio: &sdp.Media{ + Port: rtpPort, + Codecs: []sdp.Codec{sdp.ULAWCodec, sdp.DTMFCodec}, + }, + } dls := &dialogState{ - errChan: errChan, - sdpChan: sdpChan, - stateChan: stateChan, - doHangupChan: doHangupChan, - invite: invite, + errChan: errChan, + stateChan: stateChan, + peerChan: peerChan, + sendHangupChan: sendHangupChan, + invite: invite, + sdp: ms, } go dls.run() return &Dialog{ OnErr: errChan, OnState: stateChan, - OnSDP: sdpChan, - Hangup: doHangupChan, + OnPeer: peerChan, + Hangup: sendHangupChan, }, nil } func (dls *dialogState) run() { - defer dls.sabotage() defer dls.cleanup() if !dls.sendRequest(dls.invite) { return @@ -92,6 +103,8 @@ func (dls *dialogState) run() { for { select { case err := <-dls.sockErrs: + dls.sock.Close() + dls.sock = nil if util.IsRefused(err) { log.Printf("ICMP refusal: %s (%s)", dls.sock.RemoteAddr(), dls.dest) if !dls.popRoute() { @@ -102,6 +115,8 @@ func (dls *dialogState) run() { return } case err := <-dls.csockErrs: + dls.csock.Close() + dls.csock = nil dls.errChan <- err return case <-dls.requestTimer: @@ -112,10 +127,6 @@ func (dls *dialogState) run() { if !dls.resendResponse() { return } - case <-dls.doHangupChan: - if !dls.sendHangup() { - return - } case msg := <-dls.sockMsgs: if !dls.handleMessage(msg) { return @@ -124,6 +135,10 @@ func (dls *dialogState) run() { if !dls.handleMessage(msg) { return } + case <-dls.sendHangupChan: + if !dls.sendHangup() { + return + } } } } @@ -168,7 +183,7 @@ func (dls *dialogState) popRoute() bool { } func (dls *dialogState) connect() bool { - if dls.sock == nil || dls.sock.RemoteAddr().String() != dls.addr { + if dls.sock == nil || dls.addr != dls.sock.RemoteAddr().String() { // Create socket through which we send messages. This socket is connected // to the remote address so we can receive ICMP unavailable errors. It also // allows us to discover the appropriate IP address for the local machine. @@ -227,6 +242,11 @@ func (dls *dialogState) populate(msg *Msg) { }, } } + if msg.Method == MethodInvite { + dls.sdp.Addr = lhost + dls.sdp.Origin.Addr = lhost + AttachSDP(msg, dls.sdp) + } PopulateMessage(nil, nil, msg) } @@ -350,7 +370,7 @@ func (dls *dialogState) checkSDP(msg *Msg) { if err != nil { log.Println("Bad SDP payload:", err) } else { - dls.sdpChan <- ms + dls.peerChan <- &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)} } } } @@ -365,12 +385,12 @@ func (dls *dialogState) send(msg *Msg) bool { } ts := time.Now() addTimestamp(msg, ts) - var b bytes.Buffer - msg.Append(&b) + dls.b.Reset() + msg.Append(&dls.b) if *tracing { - trace("send", b.String(), dls.sock.RemoteAddr(), ts) + trace("send", dls.b.String(), dls.sock.RemoteAddr(), ts) } - _, err := dls.sock.Write(b.Bytes()) + _, err := dls.sock.Write(dls.b.Bytes()) if err != nil { dls.errChan <- err return false @@ -450,6 +470,9 @@ func (dls *dialogState) transition(state int) { func (dls *dialogState) cleanup() { dls.cleanupSock() dls.cleanupCSock() + close(dls.errChan) + close(dls.stateChan) + close(dls.peerChan) } func (dls *dialogState) cleanupSock() { @@ -469,9 +492,3 @@ func (dls *dialogState) cleanupCSock() { <-dls.csockErrs } } - -func (dls *dialogState) sabotage() { - close(dls.errChan) - close(dls.sdpChan) - close(dls.stateChan) -}