diff --git a/example/echo3/echo3_test.go b/example/echo3/echo3_test.go new file mode 100755 index 0000000..0935029 --- /dev/null +++ b/example/echo3/echo3_test.go @@ -0,0 +1,81 @@ +// Echo test that uses slightly higher level APIs. + +package echo2_test + +import ( + "github.com/jart/gosip/dsp" + "github.com/jart/gosip/rtp" + "github.com/jart/gosip/sdp" + "github.com/jart/gosip/sip" + "net" + "testing" + "time" +) + +func TestCallToEchoApp(t *testing.T) { + invite := &sip.Msg{ + Request: &sip.URI{User: "echo", Host: "127.0.0.1", Port: 5060}, + } + + // Create RTP audio session. + rs, err := rtp.NewSession("") + if err != nil { + t.Fatal(err) + } + defer rs.Sock.Close() + rtpaddr := rs.Sock.LocalAddr().(*net.UDPAddr) + sip.AttachSDP(invite, sdp.New(rtpaddr, sdp.ULAWCodec, sdp.DTMFCodec)) + + // Create a SIP phone call. + dl, err := sip.NewDialog(invite) + if err != nil { + t.Fatal(err) + } + + // We're going to send white noise every 20ms. + var frame rtp.Frame + awgn := dsp.NewAWGN(-45.0) + ticker := time.NewTicker(20 * time.Millisecond) + defer ticker.Stop() + + // Hangup after 200ms. + death := time.After(200 * time.Millisecond) + + // Let's GO! + var answered bool + for { + select { + case err := <-rs.E: + t.Error("RTP recv failed:", err) + dl.Hangup <- true + case <-rs.C: + // Do nothing with received audio. + case <-ticker.C: + for n := 0; n < 160; n++ { + frame[n] = awgn.Get() + } + if err := rs.Send(&frame); err != nil { + t.Fatal("RTP send failed:", err) + } + case <-dl.OnErr: + t.Error(err) + return + case state := <-dl.OnState: + switch state { + case sip.DialogAnswered: + answered = true + case sip.DialogHangup: + return + } + case ms := <-dl.OnSDP: + rs.Peer = &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)} + case <-death: + dl.Hangup <- true + } + } + + // The dialog has shut down cleanly. Was it answered? + if !answered { + t.Error("Call didn't get answered!") + } +} diff --git a/rtp/session.go b/rtp/session.go index 7a8f808..175ff69 100644 --- a/rtp/session.go +++ b/rtp/session.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/jart/gosip/dsp" "github.com/jart/gosip/sdp" + "log" "math/rand" "net" "strconv" @@ -128,11 +129,10 @@ func listenRTP(host string) (sock net.PacketConn, err error) { port := rtpBindPortMin + rand.Int63()%(rtpBindPortMax-rtpBindPortMin+1) saddr := net.JoinHostPort(host, strconv.FormatInt(port, 10)) sock, err = net.ListenPacket("udp", saddr) - if err != nil { - if !strings.Contains(err.Error(), "address already in use") { - break - } + if err == nil || !strings.Contains(err.Error(), "address already in use") { + break } + log.Println("RTP listen congestion:", saddr) } return } diff --git a/sip/dialog.go b/sip/dialog.go new file mode 100755 index 0000000..0460f13 --- /dev/null +++ b/sip/dialog.go @@ -0,0 +1,266 @@ +// SIP Transport Layer. Responsible for serializing messages to/from +// your network. + +package sip + +import ( + "bytes" + "errors" + "github.com/jart/gosip/sdp" + "github.com/jart/gosip/util" + "log" + "net" + "time" +) + +const ( + DialogConnected = 1 + DialogRinging = 2 + DialogAnswered = 3 + DialogHangup = 4 + resendInterval = 200 * time.Millisecond + maxResends = 2 +) + +// Dialog represents an outbound phone call. +type Dialog struct { + OnErr <-chan error + OnState <-chan int + OnSDP <-chan *sdp.SDP + Hangup chan<- bool +} + +type dialogState struct { + sock *net.UDPConn + sockMsgs <-chan *Msg + sockErrs <-chan error + errChan chan<- error + sdpChan chan<- *sdp.SDP + stateChan chan<- int + doHangupChan <-chan bool + routes *AddressRoute + invite *Msg + response *Msg + resend *Msg + resends int + timer <-chan time.Time +} + +// NewDialog creates a phone call. +func NewDialog(invite *Msg) (dl *Dialog, err error) { + invite, host, port, err := RouteMessage(nil, nil, invite) + if err != nil { + return nil, err + } + routes, err := RouteAddress(host, port) + if err != nil { + return nil, err + } + errChan := make(chan error) + sdpChan := make(chan *sdp.SDP) + stateChan := make(chan int) + doHangupChan := make(chan bool, 4) + dls := &dialogState{ + errChan: errChan, + sdpChan: sdpChan, + stateChan: stateChan, + doHangupChan: doHangupChan, + invite: invite, + routes: routes, + } + go dls.run() + return &Dialog{ + OnErr: errChan, + OnState: stateChan, + OnSDP: sdpChan, + Hangup: doHangupChan, + }, nil +} + +func (dls *dialogState) popRoute() bool { + if dls.routes == nil { + dls.errChan <- errors.New("failed to contact host") + return false + } + dls.cleanup() + conn, err := net.Dial("udp", dls.routes.Address) + dls.routes = dls.routes.Next + if err != nil { + log.Println("net.Dial() failed:", err) + return dls.popRoute() + } + dls.sock = conn.(*net.UDPConn) + laddr := conn.LocalAddr().(*net.UDPAddr) + lhost := laddr.IP.String() + lport := uint16(laddr.Port) + dls.invite.Via = &Via{ + Host: lhost, + Port: lport, + Params: Params{"branch": util.GenerateBranch()}, + } + dls.invite.Contact = &Addr{ + Uri: &URI{ + Scheme: "sip", + Host: lhost, + Port: lport, + Params: Params{"transport": "udp"}, + }, + } + PopulateMessage(nil, nil, dls.invite) + dls.resend = dls.invite + dls.timer = time.After(resendInterval) + dls.resends = 0 + sockMsgs := make(chan *Msg) + sockErrs := make(chan error) + dls.sockMsgs = sockMsgs + dls.sockErrs = sockErrs + go ReceiveMessages(dls.invite.Contact, dls.sock, sockMsgs, sockErrs) + return dls.send(dls.resend) +} + +func (dls *dialogState) run() { + defer dls.sabotage() + defer dls.cleanup() + if !dls.popRoute() { + return + } + for { + select { + case err := <-dls.sockErrs: + if util.IsRefused(err) { + if !dls.popRoute() { + return + } + } else { + dls.errChan <- err + return + } + case <-dls.timer: + if dls.resends < maxResends { + if !dls.send(dls.resend) { + return + } + dls.resends++ + dls.timer = time.After(resendInterval) + } else { + if !dls.popRoute() { + return + } + } + case <-dls.doHangupChan: + if !dls.hangup() { + return + } + case msg := <-dls.sockMsgs: + if msg.CallID != dls.invite.CallID { + continue + } + if msg.IsResponse { + if msg.Status >= StatusOK && msg.CSeq == dls.invite.CSeq { + if msg.Contact != nil { + if !dls.send(NewAck(dls.invite, msg)) { + return + } + } + if msg.Status > StatusOK { + dls.errChan <- errors.New(msg.Phrase) + return + } + } + switch msg.Status { + case StatusTrying: + dls.routes = nil + dls.timer = nil + dls.stateChan <- DialogConnected + case StatusRinging, StatusSessionProgress: + dls.stateChan <- DialogRinging + case StatusOK: + switch msg.CSeqMethod { + case dls.invite.Method: + if dls.response == nil { + dls.stateChan <- DialogAnswered + } + dls.response = msg + case MethodBye, MethodCancel: + dls.stateChan <- DialogHangup + return + default: + dls.errChan <- errors.New("Bad CSeq Method") + return + } + } + if msg.Headers["Content-Type"] == sdp.ContentType { + ms, err := sdp.Parse(msg.Payload) + if err != nil { + log.Println("Bad SDP payload:", err) + } else { + dls.sdpChan <- ms + } + } + } else { + if msg.MaxForwards <= 0 { + if !dls.send(NewResponse(msg, StatusTooManyHops)) { + return + } + dls.errChan <- errors.New("Froot loop detected") + return + } + switch msg.Method { + case MethodBye: + if !dls.send(NewResponse(msg, StatusOK)) { + return + } + dls.stateChan <- DialogHangup + return + } + } + } + } +} + +func (dls *dialogState) send(msg *Msg) bool { + // TODO(jart): Double-check route matches socket binding. + if msg.MaxForwards > 0 { + msg.MaxForwards-- + } + ts := time.Now() + addTimestamp(msg, ts) + var b bytes.Buffer + msg.Append(&b) + if *tracing { + trace("send", b.String(), dls.sock.RemoteAddr(), ts) + } + _, err := dls.sock.Write(b.Bytes()) + if err != nil { + dls.errChan <- err + return false + } + return true +} + +func (dls *dialogState) hangup() bool { + if dls.response != nil { + dls.resend = NewBye(dls.invite, dls.response) + } else { + dls.resend = NewCancel(dls.invite) + } + if !dls.send(dls.resend) { + return false + } + dls.resends = 0 + dls.timer = time.After(resendInterval) + return true +} + +func (dls *dialogState) cleanup() { + if dls.sock != nil { + dls.sock.Close() + dls.sock = nil + } +} + +func (dls *dialogState) sabotage() { + close(dls.errChan) + close(dls.sdpChan) + close(dls.stateChan) +} diff --git a/sip/messages.go b/sip/messages.go index 2c488cb..aaf3368 100644 --- a/sip/messages.go +++ b/sip/messages.go @@ -111,6 +111,9 @@ func AckMatch(msg, ack *Msg) bool { } func AttachSDP(msg *Msg, ms *sdp.SDP) { + if msg.Headers == nil { + msg.Headers = Headers{} + } msg.Headers["Content-Type"] = sdp.ContentType msg.Payload = ms.String() } diff --git a/sip/msg.go b/sip/msg.go index d7992bf..17d9ddb 100755 --- a/sip/msg.go +++ b/sip/msg.go @@ -378,25 +378,35 @@ func (msg *Msg) Append(b *bytes.Buffer) error { return nil } -func (msg *Msg) parseFirstLine(s string) (err error) { - toks := strings.Split(s, " ") - if toks != nil && len(toks) == 3 && toks[2] == "SIP/2.0" { - msg.Phrase = "" - msg.Status = 0 - msg.Method = toks[0] - msg.Request = new(URI) - msg.Request, err = ParseURI(toks[1]) - } else if toks != nil && len(toks) == 3 && toks[0] == "SIP/2.0" { +func (msg *Msg) parseFirstLine(s string) error { + i := strings.Index(s, "SIP/2.0") + if i == -1 { + return errors.New("Not a SIP message") + } else if i == 0 { msg.IsResponse = true - msg.Method = "" - msg.Request = nil - msg.Phrase = toks[2] - msg.Status, err = strconv.Atoi(toks[1]) + toks := strings.SplitN(s, " ", 3) + if len(toks) < 2 { + return errors.New("Bad response status line") + } + s, err := strconv.Atoi(toks[1]) if err != nil { - return errors.New("Invalid status") + return errors.New("Bad response status code") + } + msg.Status = s + if len(toks) == 3 { + msg.Phrase = toks[2] + } else { + msg.Phrase = Phrase(msg.Status) } } else { - err = errors.New("Bad protocol or request line") + j := strings.Index(s, " ") + msg.Method = s[:j] + msg.Request = new(URI) + r, err := ParseURI(s[j+1 : i-1]) + msg.Request = r + if err != nil { + return err + } } - return err + return nil } diff --git a/sip/route.go b/sip/route.go index e75d7c2..b0966a3 100644 --- a/sip/route.go +++ b/sip/route.go @@ -2,6 +2,7 @@ package sip import ( "errors" + "github.com/jart/gosip/util" "log" "net" ) @@ -11,12 +12,52 @@ type AddressRoute struct { Next *AddressRoute } +func PopulateMessage(via *Via, contact *Addr, msg *Msg) { + if !msg.IsResponse { + if msg.Method == "" { + msg.Method = "INVITE" + } + if msg.Via == nil { + msg.Via = via + } + if msg.Contact == nil { + msg.Contact = contact + } + if msg.To == nil { + msg.To = &Addr{Uri: msg.Request} + } + if msg.From == nil { + msg.From = msg.Contact + } + if msg.CallID == "" { + msg.CallID = util.GenerateCallID() + } + if msg.CSeq == 0 { + msg.CSeq = util.GenerateCSeq() + } + if msg.CSeqMethod == "" { + msg.CSeqMethod = msg.Method + } + if msg.MaxForwards == 0 { + msg.MaxForwards = 70 + } + if _, ok := msg.Via.Params["branch"]; !ok { + msg.Via = msg.Via.Copy() + msg.Via.Params["branch"] = util.GenerateBranch() + } + if _, ok := msg.From.Params["tag"]; !ok { + msg.From = msg.From.Copy() + msg.From.Params["tag"] = util.GenerateTag() + } + if _, ok := msg.Headers["User-Agent"]; !ok { + msg.Headers["User-Agent"] = GosipUserAgent + } + } +} + func RouteMessage(via *Via, contact *Addr, old *Msg) (msg *Msg, host string, port uint16, err error) { msg = new(Msg) *msg = *old // Start off with a shallow copy. - if msg.Contact == nil { - msg.Contact = contact - } if msg.IsResponse { if via.CompareHostPort(msg.Via) { msg.Via = msg.Via.Next @@ -61,12 +102,15 @@ func RouteAddress(host string, port uint16) (routes *AddressRoute, err error) { if port == 0 { _, srvs, err := net.LookupSRV("sip", "udp", host) if err == nil && len(srvs) > 0 { + s := "" for i := len(srvs) - 1; i >= 0; i-- { routes = &AddressRoute{ Address: net.JoinHostPort(srvs[i].Target, portstr(srvs[i].Port)), Next: routes, } + s = " " + routes.Address + s } + log.Printf("%s routes to: %s", host, s) return routes, nil } log.Println("net.LookupSRV(sip, udp, %s) failed: %s", err) diff --git a/sip/trace.go b/sip/trace.go index 94349e8..57fce96 100644 --- a/sip/trace.go +++ b/sip/trace.go @@ -17,7 +17,7 @@ func trace(dir, pkt string, addr net.Addr, t time.Time) { size := len(pkt) bar := strings.Repeat("-", 72) suffix := "\n" - if pkt[len(pkt)-1] == '\n' { + if pkt != "" && pkt[len(pkt)-1] == '\n' { suffix = "" } log.Printf( diff --git a/sip/transport.go b/sip/transport.go index bcf3f75..40cf50c 100755 --- a/sip/transport.go +++ b/sip/transport.go @@ -66,6 +66,7 @@ func NewTransport(contact *Addr) (tp *Transport, err error) { // Sends a SIP message. func (tp *Transport) Send(msg *Msg) error { + PopulateMessage(tp.Via, tp.Contact, msg) msg, host, port, err := RouteMessage(tp.Via, tp.Contact, msg) if err != nil { return err