From 641d98e953606adf6967e4bace3548344d4fef3c Mon Sep 17 00:00:00 2001 From: Justine Alexandra Roberts Tunney Date: Fri, 26 Dec 2014 15:43:17 -0500 Subject: [PATCH] Playing around. --- example/echo/echo_test.go | 18 ++-- example/echo2/echo2_test.go | 163 ++++++++++++++++---------------- example/options/options_test.go | 6 +- rtp/session.go | 67 ++++++++----- sdp/sdp.go | 3 +- sip/messages.go | 30 +++--- sip/method.go | 20 ++++ sip/msg.go | 44 ++++----- sip/status.go | 8 +- sip/transport.go | 98 +++++++++++-------- 10 files changed, 251 insertions(+), 206 deletions(-) create mode 100644 sip/method.go diff --git a/example/echo/echo_test.go b/example/echo/echo_test.go index b3177bf..06dcb02 100755 --- a/example/echo/echo_test.go +++ b/example/echo/echo_test.go @@ -227,9 +227,9 @@ func TestCallToEchoApp(t *testing.T) { t.Fatal("read 100 trying:", err) } log.Printf("<<< %s\n%s\n", raddr, string(memory[0:amt])) - msg := sip.ParseMsg(string(memory[0:amt])) - if msg.Error != nil { - t.Fatal("parse 100 trying", msg.Error) + msg, err := sip.ParseMsg(string(memory[0:amt])) + if err != nil { + t.Fatal("parse 100 trying", err) } if !msg.IsResponse || msg.Status != 100 || msg.Phrase != "Trying" { t.Fatal("didn't get 100 trying :[") @@ -242,9 +242,9 @@ func TestCallToEchoApp(t *testing.T) { t.Fatal("read 200 ok:", err) } log.Printf("<<< %s\n%s\n", raddr, string(memory[0:amt])) - msg = sip.ParseMsg(string(memory[0:amt])) - if msg.Error != nil { - t.Fatal("parse 200 ok:", msg.Error) + msg, err = sip.ParseMsg(string(memory[0:amt])) + if err != nil { + t.Fatal("parse 200 ok:", err) } if !msg.IsResponse || msg.Status != 200 || msg.Phrase != "OK" { t.Fatal("wanted 200 ok but got:", msg.Status, msg.Phrase) @@ -346,9 +346,9 @@ func TestCallToEchoApp(t *testing.T) { if err != nil { t.Fatal(err) } - msg = sip.ParseMsg(string(memory[0:amt])) - if msg.Error != nil { - t.Fatal(msg.Error) + msg, err = sip.ParseMsg(string(memory[0:amt])) + if err != nil { + t.Fatal(err) } if !msg.IsResponse || msg.Status != 200 || msg.Phrase != "OK" { t.Fatal("wanted bye response 200 ok but got:", msg.Status, msg.Phrase) diff --git a/example/echo2/echo2_test.go b/example/echo2/echo2_test.go index f2f2113..f1d6c93 100755 --- a/example/echo2/echo2_test.go +++ b/example/echo2/echo2_test.go @@ -15,27 +15,20 @@ import ( ) func TestCallToEchoApp(t *testing.T) { + log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile) to := &sip.Addr{Uri: &sip.URI{User: "echo", Host: "127.0.0.1", Port: 5060}} from := &sip.Addr{Uri: &sip.URI{Host: "127.0.0.1"}} - // Create the SIP UDP transport layer. - tp, err := sip.NewTransport(from) - if err != nil { - t.Fatal(err) - } - defer tp.Sock.Close() - - // Used to notify main thread when subthreads die. - rtpDeath := make(chan bool, 2) - - // Create an RTP session. - session, err := rtp.NewSession(from.Uri.Host) + // Create an RTP media session. + rs, err := rtp.NewSession(from.Uri.Host) if err != nil { t.Fatal("rtp listen:", err) } - defer session.Sock.Close() - rtpaddr := session.Sock.LocalAddr().(*net.UDPAddr) - rtppeerChan := make(chan *net.UDPAddr, 1) + defer rs.Sock.Close() + rtpaddr := rs.Sock.LocalAddr().(*net.UDPAddr) + + // Create an RTP audio sender. + rtpPeer := make(chan *net.UDPAddr, 1) go func() { var frame rtp.Frame awgn := dsp.NewAWGN(-25.0) @@ -43,91 +36,60 @@ func TestCallToEchoApp(t *testing.T) { defer ticker.Stop() for { select { + case rs.Peer = <-rtpPeer: case <-ticker.C: + // Send an audio frame containing comfort noise. for n := 0; n < 160; n++ { frame[n] = awgn.Get() } - err := session.Send(frame) + err := rs.Send(&frame) if err != nil { if !util.IsUseOfClosed(err) { t.Error("rtp write", err) } - rtpDeath <- true - return - } - case session.Peer = <-rtppeerChan: - if session.Peer == nil { return } } } }() - defer func() { rtppeerChan <- nil }() - - // Create an RTP message consumer. - go func() { - var frame rtp.Frame - for { - err := session.Recv(frame) - if err != nil { - if !util.IsUseOfClosed(err) { - t.Errorf("rtp read: %s %#v", err, err) - } - rtpDeath <- true - return - } - } - }() - // Create a SIP message consumer. - sipChan := make(chan *sip.Msg, 32) - go func() { - for { - msg := tp.Recv() - if msg.Error != nil && util.IsUseOfClosed(msg.Error) { - return - } - sipChan <- msg - } - }() + // Create the SIP UDP transport layer. + tp, err := sip.NewTransport(from) + if err != nil { + t.Fatal(err) + } + defer tp.Sock.Close() // Send an INVITE message with an SDP. - invite := sip.NewRequest(tp, "INVITE", to, from) + invite := sip.NewRequest(tp, sip.MethodInvite, to, from) sip.AttachSDP(invite, sdp.New(rtpaddr, sdp.ULAWCodec, sdp.DTMFCodec)) err = tp.Send(invite) if err != nil { t.Fatal(err) } + // Set up some reliability in case a UDP packet drops. + resend := invite + resends := 0 + resendInterval := 50 * time.Millisecond + resendTimer := time.After(resendInterval) + + // Hangup after two seconds. + deathTimer := time.After(200 * time.Millisecond) + // Create a SIP dialog handler. - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() var answered bool var msg *sip.Msg +loop: for { select { - case <-ticker.C: - if answered { - err = tp.Send(sip.NewBye(invite, msg)) - } else { - err = tp.Send(sip.NewCancel(invite)) - } - if err != nil { - t.Error(err) - } - case <-rtpDeath: - if answered { - err = tp.Send(sip.NewBye(invite, msg)) - } else { - err = tp.Send(sip.NewCancel(invite)) - } - if err != nil { - t.Error(err) - } - case msg = <-sipChan: - if msg.Error != nil { - t.Fatal(msg.Error) - } + case err = <-rs.E: + t.Fatal("RTP network error", err) + case err = <-tp.E: + t.Fatal("SIP network error", err) + case <-rs.C: + // Do nothing with inbound audio. + case msg = <-tp.C: if msg.IsResponse { if msg.Status >= sip.StatusOK && msg.CSeq == invite.CSeq { err = tp.Send(sip.NewAck(invite, msg)) @@ -136,29 +98,38 @@ func TestCallToEchoApp(t *testing.T) { } } if msg.Status < sip.StatusOK { - log.Printf("Provisional %d %s", msg.Status, msg.Phrase) + if msg.Status == sip.StatusTrying { + log.Printf("Remote SIP endpoint exists!") + resendTimer = nil + } else if msg.Status == sip.StatusRinging { + log.Printf("Ringing!") + } else if msg.Status == sip.StatusSessionProgress { + log.Printf("Probably Ringing!") + } else { + log.Printf("Provisional %d %s", msg.Status, msg.Phrase) + } } else if msg.Status == sip.StatusOK { - if msg.CSeqMethod == "INVITE" { + if msg.CSeqMethod == sip.MethodInvite { log.Printf("Answered!") answered = true - } else if msg.CSeqMethod == "BYE" { + } else if msg.CSeqMethod == sip.MethodBye { log.Printf("Hungup!") - return - } else if msg.CSeqMethod == "CANCEL" { + break loop + } else if msg.CSeqMethod == sip.MethodCancel { log.Printf("Cancelled!") - return + break loop } } else if msg.Status > sip.StatusOK { t.Errorf("Got %d %s", msg.Status, msg.Phrase) return } - if msg.Headers["Content-Type"] == "application/sdp" { + if msg.Headers["Content-Type"] == sdp.ContentType { log.Printf("Establishing media session") - rsdp, err := sdp.Parse(msg.Payload) + ms, err := sdp.Parse(msg.Payload) if err != nil { t.Fatal("failed to parse sdp", err) } - rtppeerChan <- &net.UDPAddr{IP: net.ParseIP(rsdp.Addr), Port: int(rsdp.Audio.Port)} + rtpPeer <- &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)} } } else { if msg.Method == "BYE" { @@ -167,9 +138,35 @@ func TestCallToEchoApp(t *testing.T) { if err != nil { t.Fatal(err) } - return + break loop } } + case <-resendTimer: + if resends == 2 { + t.Fatal("Failed to send", resend.Method) + } + resends++ + err = tp.Send(resend) + if err != nil { + t.Fatal(err) + } + case <-deathTimer: + resends = 0 + resendTimer = time.After(resendInterval) + if answered { + resend = sip.NewBye(invite, msg) + } else { + resend = sip.NewCancel(invite) + } + err = tp.Send(resend) + if err != nil { + t.Error(err) + } } } + + // The dialog has shut down cleanly. + if !answered { + t.Error("Call didn't get answered!") + } } diff --git a/example/options/options_test.go b/example/options/options_test.go index 990b021..77217c0 100755 --- a/example/options/options_test.go +++ b/example/options/options_test.go @@ -81,9 +81,9 @@ func TestOptions(t *testing.T) { t.Fatal(err) } - msg := sip.ParseMsg(string(memory[0:amt])) - if msg.Error != nil { - t.Fatal(msg.Error) + msg, err := sip.ParseMsg(string(memory[0:amt])) + if err != nil { + t.Fatal(err) } if !msg.IsResponse || msg.Status != 200 || msg.Phrase != "OK" { diff --git a/rtp/session.go b/rtp/session.go index a746e05..c8eff8a 100644 --- a/rtp/session.go +++ b/rtp/session.go @@ -26,6 +26,10 @@ type Frame [160]int16 // support for RTCP is provided. type Session struct { + // Channel to which received RTP frames and errors are published. + C chan *Frame + E chan error + // Underlying UDP socket. Sock *net.UDPConn @@ -38,17 +42,18 @@ type Session struct { ibuf []byte obuf []byte - phdr Header } // Creates a new RTP µLaw 20ptime session listening on host with a random port // selected from the range [16384,32768]. -func NewSession(host string) (s *Session, err error) { +func NewSession(host string) (rs *Session, err error) { sock, err := listenRTP(host) if err != nil { return nil, err } - return &Session{ + rs = &Session{ + C: make(chan *Frame, 32), + E: make(chan error, 1), Sock: sock.(*net.UDPConn), Header: Header{ PT: sdp.ULAWCodec.PT, @@ -56,53 +61,65 @@ func NewSession(host string) (s *Session, err error) { TS: 0, Ssrc: rand.Uint32(), }, - }, err + obuf: make([]byte, HeaderSize+160), + ibuf: make([]byte, 2048), + } + rs.launchConsumer() + return } -func (s *Session) Send(frame Frame) (err error) { - if s.Peer == nil { +func (rs *Session) Send(frame *Frame) (err error) { + if rs.Peer == nil { return nil } - if s.obuf == nil { - s.obuf = make([]byte, HeaderSize+160) - } - s.Header.Write(s.obuf) - s.Header.TS += 160 - s.Header.Seq++ + rs.Header.Write(rs.obuf) + rs.Header.TS += 160 + rs.Header.Seq++ for n := 0; n < 160; n++ { - s.obuf[HeaderSize+n] = byte(dsp.LinearToUlaw(int64(frame[n]))) + rs.obuf[HeaderSize+n] = byte(dsp.LinearToUlaw(int64(frame[n]))) } - _, err = s.Sock.WriteTo(s.obuf, s.Peer) + _, err = rs.Sock.WriteTo(rs.obuf, rs.Peer) return } -func (s *Session) Recv(frame Frame) (err error) { - if s.ibuf == nil { - s.ibuf = make([]byte, 2048) - } +func (rs *Session) launchConsumer() { + go func() { + for { + frame, err := rs.recv() + if err != nil { + rs.E <- err + return + } + rs.C <- frame + } + }() +} + +func (rs *Session) recv() (frame *Frame, err error) { + frame = new(Frame) for { - amt, _, err := s.Sock.ReadFrom(s.ibuf) + amt, _, err := rs.Sock.ReadFrom(rs.ibuf) if err != nil { - return err + return nil, err } // TODO(jart): Verify source address? // TODO(jart): Packet reordering? Drop duplicate packets? // TODO(jart): DTMF? var phdr Header - err = phdr.Read(s.ibuf) + err = phdr.Read(rs.ibuf) if err != nil { - return err + return nil, err } if phdr.PT != sdp.ULAWCodec.PT { continue } if amt != HeaderSize+160 { - return errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt)) + return nil, errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt)) } for n := 0; n < 160; n++ { - frame[n] = int16(dsp.UlawToLinear(int64(s.ibuf[HeaderSize+n]))) + frame[n] = int16(dsp.UlawToLinear(int64(rs.ibuf[HeaderSize+n]))) } - return nil + return frame, nil } } diff --git a/sdp/sdp.go b/sdp/sdp.go index fd92536..831de89 100755 --- a/sdp/sdp.go +++ b/sdp/sdp.go @@ -64,7 +64,8 @@ import ( ) const ( - MaxLength = 1450 + ContentType = "application/sdp" + MaxLength = 1450 ) // SDP represents a Session Description Protocol SIP payload. diff --git a/sip/messages.go b/sip/messages.go index 3abc9eb..3e0da7a 100644 --- a/sip/messages.go +++ b/sip/messages.go @@ -8,16 +8,16 @@ import ( const ( GosipUserAgent = "gosip/1.o" - GosipAllow = "INVITE, ACK, CANCEL, BYE, OPTIONS" + GosipAllow = MethodInvite + ", " + MethodAck + ", " + MethodCancel + ", " + MethodBye + ", " + MethodOptions ) func NewRequest(tp *Transport, method string, to, from *Addr) *Msg { return &Msg{ Method: method, - Request: to.Uri.Copy(), + Request: to.Uri, Via: tp.Via.Copy().Branch(), From: from.Or(tp.Contact).Tag(), - To: to.Copy(), + To: to, Contact: tp.Contact, CallID: util.GenerateCallID(), CSeq: util.GenerateCSeq(), @@ -45,7 +45,7 @@ func NewResponse(msg *Msg, status int) *Msg { // http://tools.ietf.org/html/rfc3261#section-17.1.1.3 func NewAck(original, msg *Msg) *Msg { return &Msg{ - Method: "ACK", + Method: MethodAck, Request: original.Request, Via: original.Via.Copy().SetNext(nil), From: original.From, @@ -59,18 +59,18 @@ func NewAck(original, msg *Msg) *Msg { } func NewCancel(invite *Msg) *Msg { - if invite.IsResponse || invite.Method != "INVITE" { + if invite.IsResponse || invite.Method != MethodInvite { log.Printf("Can't CANCEL anything non-INVITE:\n%s", invite) } return &Msg{ - Method: "CANCEL", + Method: MethodCancel, Request: invite.Request, Via: invite.Via, From: invite.From, To: invite.To, CallID: invite.CallID, CSeq: invite.CSeq, - CSeqMethod: "CANCEL", + CSeqMethod: MethodCancel, Route: invite.Route, Headers: DefaultHeaders(), } @@ -78,14 +78,14 @@ func NewCancel(invite *Msg) *Msg { func NewBye(invite, last *Msg) *Msg { return &Msg{ - Method: "BYE", + Method: MethodBye, Request: last.Contact.Uri, Via: invite.Via, From: invite.From, To: last.To, CallID: invite.CallID, CSeq: invite.CSeq + 1, - CSeqMethod: "BYE", + CSeqMethod: MethodBye, Route: last.RecordRoute.Reversed(), Headers: DefaultHeaders(), } @@ -101,19 +101,19 @@ func ResponseMatch(msg, resp *Msg) bool { } // Returns true if `ack` can be considered an appropriate response to `msg`. -// we don't enforce a matching Via because some VoIP software will generate a +// We don't enforce a matching Via because some VoIP software will generate a // new branch for ACKs. func AckMatch(msg, ack *Msg) bool { return (!ack.IsResponse && - ack.Method == "ACK" && + ack.Method == MethodAck && ack.CSeq == msg.CSeq && - ack.CSeqMethod == "ACK" && + ack.CSeqMethod == MethodAck && ack.Via.Last().CompareAddr(msg.Via)) } -func AttachSDP(msg *Msg, sdp *sdp.SDP) { - msg.Headers["Content-Type"] = "application/sdp" - msg.Payload = sdp.String() +func AttachSDP(msg *Msg, ms *sdp.SDP) { + msg.Headers["Content-Type"] = sdp.ContentType + msg.Payload = ms.String() } func DefaultHeaders() Headers { diff --git a/sip/method.go b/sip/method.go new file mode 100644 index 0000000..59dbb2d --- /dev/null +++ b/sip/method.go @@ -0,0 +1,20 @@ +// SIP Protocol Method Definitions + +package sip + +const ( + MethodInvite = "INVITE" // Indicates a client is being invited to participate in a call session. + MethodAck = "ACK" // Confirms that the client has received a final response to an INVITE request. + MethodBye = "BYE" // Terminates a call and can be sent by either the caller or the callee. + MethodCancel = "CANCEL" // Cancels any pending request. + MethodOptions = "OPTIONS" // Queries the capabilities of servers. + MethodRegister = "REGISTER" // Registers the address listed in the To header field with a SIP server. + MethodPrack = "PRACK" // Provisional acknowledgement. + MethodSubscribe = "SUBSCRIBE" // Subscribes for an Event of Notification from the Notifier. + MethodNotify = "NOTIFY" // Notify the subscriber of a new Event. + MethodPublish = "PUBLISH" // Publishes an event to the Server. + MethodInfo = "INFO" // Sends mid-session information that does not modify the session state. + MethodRefer = "REFER" // Asks recipient to issue SIP request (call transfer.) + MethodMessage = "MESSAGE" // Transports instant messages using SIP. + MethodUpdate = "UPDATE" // Modifies the state of a session without changing the state of the dialog. +) diff --git a/sip/msg.go b/sip/msg.go index 0259e08..b37fd85 100755 --- a/sip/msg.go +++ b/sip/msg.go @@ -18,7 +18,6 @@ type Headers map[string]string // These fields are never nil unless otherwise specified. type Msg struct { // Special non-SIP fields. - Error error // Set to indicate an error with this message SourceAddr *net.UDPAddr // Set by transport layer as received address // Fields that aren't headers. @@ -64,26 +63,23 @@ func (msg *Msg) String() string { } // Parses a SIP message into a data structure. This takes ~70 µs on average. -func ParseMsg(packet string) (msg *Msg) { +func ParseMsg(packet string) (msg *Msg, err error) { msg = new(Msg) if packet == "" { - msg.Error = errors.New("Empty msg") - return msg + return nil, errors.New("Empty msg") } if n := strings.Index(packet, "\r\n\r\n"); n > 0 { packet, msg.Payload = packet[0:n], packet[n+4:] } lines := strings.Split(packet, "\r\n") if lines == nil || len(lines) < 2 { - msg.Error = errors.New("Too few lines") - return msg + return nil, errors.New("Too few lines") } var k, v string var okVia, okTo, okFrom, okCallID, okComputer bool - err := msg.parseFirstLine(lines[0]) + err = msg.parseFirstLine(lines[0]) if err != nil { - msg.Error = err - return msg + return nil, err } hdrs := lines[1:] msg.Headers = make(map[string]string, len(hdrs)) @@ -118,7 +114,7 @@ func ParseMsg(packet string) (msg *Msg) { okVia = true *viap, err = ParseVia(v) if err != nil { - msg.Error = errors.New("Bad Via header: " + err.Error()) + return nil, errors.New("Bad Via header: " + err.Error()) } else { viap = &(*viap).Next } @@ -126,18 +122,18 @@ func ParseMsg(packet string) (msg *Msg) { okTo = true msg.To, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad To header: " + err.Error()) + return nil, errors.New("Bad To header: " + err.Error()) } case "from": okFrom = true msg.From, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad From header: " + err.Error()) + return nil, errors.New("Bad From header: " + err.Error()) } case "contact": *contactp, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad Contact header: " + err.Error()) + return nil, errors.New("Bad Contact header: " + err.Error()) } else { contactp = &(*contactp).Last().Next } @@ -151,66 +147,66 @@ func ParseMsg(packet string) (msg *Msg) { } } if !okComputer { - msg.Error = errors.New("Bad CSeq Header") + return nil, errors.New("Bad CSeq Header") } case "content-length": if cl, err := strconv.Atoi(v); err == nil { if cl != len(msg.Payload) { - msg.Error = errors.New(fmt.Sprintf( + return nil, errors.New(fmt.Sprintf( "Content-Length (%d) differs from payload length (%d)", cl, len(msg.Payload))) } } else { - msg.Error = errors.New("Bad Content-Length header") + return nil, errors.New("Bad Content-Length header") } case "expires": if cl, err := strconv.Atoi(v); err == nil && cl >= 0 { msg.Expires = cl } else { - msg.Error = errors.New("Bad Expires header") + return nil, errors.New("Bad Expires header") } case "min-expires": if cl, err := strconv.Atoi(v); err == nil && cl > 0 { msg.MinExpires = cl } else { - msg.Error = errors.New("Bad Min-Expires header") + return nil, errors.New("Bad Min-Expires header") } case "max-forwards": if cl, err := strconv.Atoi(v); err == nil && cl > 0 { msg.MaxForwards = cl } else { - msg.Error = errors.New("Bad Max-Forwards header") + return nil, errors.New("Bad Max-Forwards header") } case "route": *routep, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad Route header: " + err.Error()) + return nil, errors.New("Bad Route header: " + err.Error()) } else { routep = &(*routep).Last().Next } case "record-route": *rroutep, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad Record-Route header: " + err.Error()) + return nil, errors.New("Bad Record-Route header: " + err.Error()) } else { rroutep = &(*rroutep).Last().Next } case "p-asserted-identity": msg.Paid, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad P-Asserted-Identity header: " + err.Error()) + return nil, errors.New("Bad P-Asserted-Identity header: " + err.Error()) } case "remote-party-id": msg.Rpid, err = ParseAddr(v) if err != nil { - msg.Error = errors.New("Bad Remote-Party-ID header: " + err.Error()) + return nil, errors.New("Bad Remote-Party-ID header: " + err.Error()) } default: msg.Headers[k] = v } } if !okVia || !okTo || !okFrom || !okCallID || !okComputer { - msg.Error = errors.New("Missing mandatory headers") + return nil, errors.New("Missing mandatory headers") } return } diff --git a/sip/status.go b/sip/status.go index 5959295..61e75ee 100755 --- a/sip/status.go +++ b/sip/status.go @@ -1,4 +1,4 @@ -// SIP Protocol Statuses +// SIP Protocol Status Definitions // // http://www.iana.org/assignments/sip-parameters @@ -92,10 +92,6 @@ const ( StatusDoesNotExistAnywhere = 604 StatusNotAcceptable606 = 606 StatusDialogTerminated = 687 - - // 8xx: Special gosip errors - Status8xxProgrammerError = 800 - Status8xxNetworkError = 801 ) func Phrase(status int) string { @@ -177,6 +173,4 @@ var phrases = map[int]string{ StatusDoesNotExistAnywhere: "Does Not Exist Anywhere", StatusNotAcceptable606: "Not Acceptable", StatusDialogTerminated: "Dialog Terminated", - Status8xxProgrammerError: "Programmer Error", - Status8xxNetworkError: "Network Error", } diff --git a/sip/transport.go b/sip/transport.go index 8cd1317..f2afc50 100755 --- a/sip/transport.go +++ b/sip/transport.go @@ -23,6 +23,10 @@ var ( // Transport sends and receives SIP messages over UDP with stateless routing. type Transport struct { + // Channel to which received SIP messages and errors are published. + C chan *Msg + E chan error + // Underlying UDP socket. Sock *net.UDPConn @@ -58,14 +62,17 @@ func NewTransport(contact *Addr) (tp *Transport, err error) { contact = contact.Copy() contact.Uri.Port = uint16(addr.Port) contact.Uri.Params["transport"] = addr.Network() - return &Transport{ + tp = &Transport{ + C: make(chan *Msg, 32), Sock: sock.(*net.UDPConn), Contact: contact, Via: &Via{ Host: contact.Uri.Host, Port: contact.Uri.Port, }, - }, nil + } + tp.launchConsumer() + return } // Sends a SIP message. @@ -86,7 +93,7 @@ func (tp *Transport) Send(msg *Msg) error { var b bytes.Buffer msg.Append(&b) if *tracing { - tp.trace("send", b.String(), addr, ts) + trace("send", b.String(), addr, ts) } _, err = tp.Sock.WriteTo(b.Bytes(), addr) if err != nil { @@ -95,76 +102,71 @@ func (tp *Transport) Send(msg *Msg) error { return nil } +func (tp *Transport) launchConsumer() { + go func() { + for { + msg, err := tp.recv() + if err != nil { + tp.E <- err + return + } + tp.C <- msg + } + }() +} + // Receives a SIP message. The received address is injected into the first Via // header as the "received" param. The Error field of msg should be checked. If // msg.Status is Status8xxNetworkError, it means the underlying socket died. If // you set a deadline, you should check: util.IsTimeout(msg.Error). // // Warning: Must only be called by one goroutine. -func (tp *Transport) Recv() *Msg { +func (tp *Transport) recv() (msg *Msg, err error) { if tp.buf == nil { tp.buf = make([]byte, 2048) } amt, addr, err := tp.Sock.ReadFromUDP(tp.buf) if err != nil { - return &Msg{Status: Status8xxNetworkError, Error: err} + return nil, err } ts := time.Now() packet := string(tp.buf[0:amt]) if *tracing { - tp.trace("recv", packet, addr, ts) + trace("recv", packet, addr, ts) } // Validation: http://tools.ietf.org/html/rfc3261#section-16.3 - msg := ParseMsg(packet) - if msg.Error != nil { - return msg + msg, err = ParseMsg(packet) + if err != nil { + return nil, err } addReceived(msg, addr) addTimestamp(msg, ts) - if !tp.sanityCheck(msg) { - return msg + err = tp.sanityCheck(msg) + if err != nil { + return nil, err } tp.preprocess(msg) - return msg -} - -func (tp *Transport) trace(dir, pkt string, addr *net.UDPAddr, t time.Time) { - size := len(pkt) - bar := strings.Repeat("-", 72) - suffix := "\n" - if pkt[len(pkt)-1] == '\n' { - suffix = "" - } - log.Printf( - "%s %d bytes to %s/%s at %s\n"+ - "%s\n"+ - "%s%s"+ - "%s\n", - dir, size, addr.Network(), addr.String(), - t.Format(time.RFC3339Nano), - bar, - pkt, suffix, - bar) + return } // Checks if message is acceptable, otherwise sets msg.Error and returns false. -func (tp *Transport) sanityCheck(msg *Msg) bool { +func (tp *Transport) sanityCheck(msg *Msg) error { if msg.MaxForwards <= 0 { go tp.Send(NewResponse(msg, 483)) - msg.Error = errors.New("Froot loop detected") + return errors.New("Froot loop detected") } if msg.IsResponse { if msg.Status >= 700 { go tp.Send(NewResponse(msg, 400)) - msg.Error = errors.New("Crazy status number") + return errors.New("Crazy status number") } } else { if msg.CSeqMethod == "" || msg.CSeqMethod != msg.Method { go tp.Send(NewResponse(msg, 400)) - msg.Error = errors.New("Bad CSeq") + return errors.New("Bad CSeq") } } - return msg.Error == nil + return nil } // Perform some ingress message mangling. @@ -195,13 +197,12 @@ func (tp *Transport) preprocess(msg *Msg) { } } -func (tp *Transport) route(old *Msg) (msg *Msg, saddr string, err error) { +func (tp *Transport) route(old *Msg) (msg *Msg, dest string, err error) { var host string var port uint16 msg = new(Msg) *msg = *old // Start off with a shallow copy. if msg.IsResponse { - msg.Via = old.Via.Copy() if msg.Via.CompareAddr(tp.Via) { // In proxy scenarios we have to remove our own Via. msg.Via = msg.Via.Next @@ -242,7 +243,7 @@ func (tp *Transport) route(old *Msg) (msg *Msg, saddr string, err error) { host, port = msg.Request.Host, msg.Request.Port } } - saddr = util.HostPortToString(host, port) + dest = util.HostPortToString(host, port) return } @@ -257,3 +258,22 @@ func addTimestamp(msg *Msg, ts time.Time) { msg.Via.Params["µsi"] = strconv.FormatInt(ts.UnixNano()/int64(time.Microsecond), 10) } } + +func trace(dir, pkt string, addr *net.UDPAddr, t time.Time) { + size := len(pkt) + bar := strings.Repeat("-", 72) + suffix := "\n" + if pkt[len(pkt)-1] == '\n' { + suffix = "" + } + log.Printf( + "%s %d bytes to %s/%s at %s\n"+ + "%s\n"+ + "%s%s"+ + "%s\n", + dir, size, addr.Network(), addr.String(), + t.Format(time.RFC3339Nano), + bar, + pkt, suffix, + bar) +}