From 29f2bda5834f42b6d19b534f16c1bdd01eceecdc Mon Sep 17 00:00:00 2001 From: Justine Alexandra Roberts Tunney Date: Sat, 27 Dec 2014 23:21:58 -0500 Subject: [PATCH] Use a different pattern for receiving messages. --- sip/receiver.go | 73 ++++++++++++++++++ sip/route.go | 50 ++++++++++++ sip/trace.go | 27 +++++++ sip/transport.go | 192 ++++------------------------------------------- sip/uri.go | 2 +- 5 files changed, 166 insertions(+), 178 deletions(-) create mode 100644 sip/receiver.go create mode 100644 sip/route.go create mode 100644 sip/trace.go diff --git a/sip/receiver.go b/sip/receiver.go new file mode 100644 index 0000000..6644ae9 --- /dev/null +++ b/sip/receiver.go @@ -0,0 +1,73 @@ +package sip + +import ( + "github.com/jart/gosip/util" + "log" + "net" + "strconv" + "time" +) + +func ReceiveMessages(contact *Addr, sock *net.UDPConn, c chan *Msg, e chan error) { + buf := make([]byte, 2048) + for { + amt, addr, err := sock.ReadFromUDP(buf) + if err != nil { + if !util.IsUseOfClosed(err) { + e <- err + } + return + } + ts := time.Now() + packet := string(buf[0:amt]) + if *tracing { + trace("recv", packet, addr, ts) + } + msg, err := ParseMsg(packet) + if err != nil { + log.Println("Dropping SIP message:", err) + continue + } + addReceived(msg, addr) + addTimestamp(msg, ts) + if contact.CompareHostPort(msg.Route) { + msg.Route = msg.Route.Next + } + fixMessagesFromStrictRouters(contact, msg) + c <- msg + } +} + +func addReceived(msg *Msg, addr *net.UDPAddr) { + if msg.Via.Host != addr.IP.String() || int(msg.Via.Port) != addr.Port { + msg.Via.Params["received"] = addr.String() + } +} + +func addTimestamp(msg *Msg, ts time.Time) { + if *timestampTagging { + msg.Via.Params["µsi"] = strconv.FormatInt(ts.UnixNano()/int64(time.Microsecond), 10) + } +} + +// RFC3261 16.4 Route Information Preprocessing +// RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy +func fixMessagesFromStrictRouters(contacts *Addr, msg *Msg) { + if msg.Request != nil && msg.Request.Params.Has("lr") && msg.Route != nil && contacts.Uri.CompareHostPort(msg.Request) { + var oldReq, newReq *URI + if msg.Route.Next == nil { + oldReq, newReq = msg.Request, msg.Route.Uri + msg.Request = msg.Route.Uri + msg.Route = nil + } else { + seclast := msg.Route + for ; seclast.Next.Next != nil; seclast = seclast.Next { + } + oldReq, newReq = msg.Request, seclast.Next.Uri + msg.Request = seclast.Next.Uri + seclast.Next = nil + msg.Route.Last() + } + log.Printf("Fixing request URI after strict router traversal: %s -> %s", oldReq, newReq) + } +} diff --git a/sip/route.go b/sip/route.go new file mode 100644 index 0000000..bd8ed60 --- /dev/null +++ b/sip/route.go @@ -0,0 +1,50 @@ +package sip + +import ( + "errors" + "net" +) + +func RouteMessage(via *Via, contact *Addr, old *Msg) (msg *Msg, dest string, err error) { + var host string + var port uint16 + msg = new(Msg) + *msg = *old // Start off with a shallow copy. + if msg.Contact == nil { + msg.Contact = contact + } + if msg.IsResponse { + if via.CompareHostPort(msg.Via) { + msg.Via = msg.Via.Next + } + if received, ok := msg.Via.Params["received"]; ok { + return msg, received, nil + } else { + host, port = msg.Via.Host, msg.Via.Port + } + } else { + if contact.CompareHostPort(msg.Route) { + msg.Route = msg.Route.Next + } + if msg.Route != nil { + if msg.Method == "REGISTER" { + return nil, "", errors.New("Don't route REGISTER requests") + } + if msg.Route.Uri.Params.Has("lr") { + // RFC3261 16.12.1.1 Basic SIP Trapezoid + host, port = msg.Route.Uri.Host, msg.Route.Uri.GetPort() + } else { + // RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy + msg.Route = old.Route.Copy() + msg.Route.Last().Next = &Addr{Uri: msg.Request} + msg.Request = msg.Route.Uri + msg.Route = msg.Route.Next + host, port = msg.Request.Host, msg.Request.GetPort() + } + } else { + host, port = msg.Request.Host, msg.Request.GetPort() + } + } + dest = net.JoinHostPort(host, portstr(port)) + return +} diff --git a/sip/trace.go b/sip/trace.go new file mode 100644 index 0000000..69b9828 --- /dev/null +++ b/sip/trace.go @@ -0,0 +1,27 @@ +package sip + +import ( + "log" + "net" + "strings" + "time" +) + +func trace(dir, pkt string, addr net.Addr, t time.Time) { + size := len(pkt) + bar := strings.Repeat("-", 72) + suffix := "\n" + if pkt[len(pkt)-1] == '\n' { + suffix = "" + } + log.Printf( + "%s %d bytes to %s/%s at %s\n"+ + "%s\n"+ + "%s%s"+ + "%s\n", + dir, size, addr.Network(), addr.String(), + t.Format(time.RFC3339Nano), + bar, + pkt, suffix, + bar) +} diff --git a/sip/transport.go b/sip/transport.go index 571b9f6..ac5760c 100755 --- a/sip/transport.go +++ b/sip/transport.go @@ -7,11 +7,7 @@ import ( "bytes" "errors" "flag" - "github.com/jart/gosip/util" - "log" "net" - "strconv" - "strings" "time" ) @@ -35,12 +31,8 @@ type Transport struct { // the branch parameter... are tricky. Via *Via - // Returns a linked list of all canonical names and/or IP addresses that may - // be used to contact *this specific* transport. + // Contact that gets put in outbound SIP messages. Contact *Addr - - // Reusable memory for serialization - buf []byte } // Creates a new stateless network mechanism for transmitting and receiving SIP @@ -53,36 +45,37 @@ type Transport struct { // canonical address. func NewTransport(contact *Addr) (tp *Transport, err error) { saddr := net.JoinHostPort(contact.Uri.Host, portstr(contact.Uri.Port)) - sock, err := net.ListenPacket("udp", saddr) + c, err := net.ListenPacket("udp", saddr) if err != nil { return nil, err } - addr := sock.LocalAddr().(*net.UDPAddr) - contact.Uri.Port = uint16(addr.Port) + sock := c.(*net.UDPConn) + addr := c.LocalAddr().(*net.UDPAddr) contact = contact.Copy() + contact.Next = nil contact.Uri.Port = uint16(addr.Port) - contact.Uri.Params["transport"] = addr.Network() + contact.Uri.Params["transport"] = "udp" tp = &Transport{ C: make(chan *Msg, 32), E: make(chan error, 1), - Sock: sock.(*net.UDPConn), + Sock: sock, Contact: contact, Via: &Via{ - Host: contact.Uri.Host, - Port: contact.Uri.Port, + Host: addr.IP.String(), + Port: uint16(addr.Port), }, } - tp.launchConsumer() + go ReceiveMessages(contact, sock, tp.C, tp.E) return } // Sends a SIP message. func (tp *Transport) Send(msg *Msg) error { - msg, saddr, err := tp.route(msg) + msg, hostport, err := RouteMessage(tp.Via, tp.Contact, msg) if err != nil { return err } - addr, err := net.ResolveUDPAddr("udp", saddr) + addr, err := net.ResolveUDPAddr("udp", hostport) if err != nil { return err } @@ -103,177 +96,22 @@ func (tp *Transport) Send(msg *Msg) error { return nil } -func (tp *Transport) launchConsumer() { - go func() { - for { - msg, err := tp.recv() - if err != nil { - if !util.IsUseOfClosed(err) { - tp.E <- err - } - return - } - tp.C <- msg - } - }() -} - -// Receives a SIP message. The received address is injected into the first Via -// header as the "received" param. The Error field of msg should be checked. If -// msg.Status is Status8xxNetworkError, it means the underlying socket died. If -// you set a deadline, you should check: util.IsTimeout(msg.Error). -// -// Warning: Must only be called by one goroutine. -func (tp *Transport) recv() (msg *Msg, err error) { - if tp.buf == nil { - tp.buf = make([]byte, 2048) - } - amt, addr, err := tp.Sock.ReadFromUDP(tp.buf) - if err != nil { - return nil, err - } - ts := time.Now() - packet := string(tp.buf[0:amt]) - if *tracing { - trace("recv", packet, addr, ts) - } - // Validation: http://tools.ietf.org/html/rfc3261#section-16.3 - msg, err = ParseMsg(packet) - if err != nil { - return nil, err - } - addReceived(msg, addr) - addTimestamp(msg, ts) - err = tp.sanityCheck(msg) - if err != nil { - return nil, err - } - tp.removeOurRoute(msg) - tp.fixMessagesFromStrictRouters(msg) - return -} - // Checks if message is acceptable, otherwise sets msg.Error and returns false. func (tp *Transport) sanityCheck(msg *Msg) error { if msg.MaxForwards <= 0 { - go tp.Send(NewResponse(msg, 483)) + tp.Send(NewResponse(msg, StatusTooManyHops)) return errors.New("Froot loop detected") } if msg.IsResponse { if msg.Status >= 700 { - go tp.Send(NewResponse(msg, 400)) + tp.Send(NewResponse(msg, StatusBadRequest)) return errors.New("Crazy status number") } } else { if msg.CSeqMethod == "" || msg.CSeqMethod != msg.Method { - go tp.Send(NewResponse(msg, 400)) + tp.Send(NewResponse(msg, StatusBadRequest)) return errors.New("Bad CSeq") } } return nil } - -func (tp *Transport) route(old *Msg) (msg *Msg, dest string, err error) { - var host string - var port uint16 - msg = new(Msg) - *msg = *old // Start off with a shallow copy. - if msg.Contact == nil { - msg.Contact = tp.Contact - } - if msg.IsResponse { - tp.removeOurVia(msg) - if received, ok := msg.Via.Params["received"]; ok { - return msg, received, nil - } else { - host, port = msg.Via.Host, msg.Via.Port - } - } else { - tp.removeOurRoute(msg) - if msg.Route != nil { - if msg.Method == "REGISTER" { - return nil, "", errors.New("Don't route REGISTER requests") - } - if msg.Route.Uri.Params.Has("lr") { - // RFC3261 16.12.1.1 Basic SIP Trapezoid - host, port = msg.Route.Uri.Host, msg.Route.Uri.GetPort() - } else { - // RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy - msg.Route = old.Route.Copy() - msg.Route.Last().Next = &Addr{Uri: msg.Request} - msg.Request = msg.Route.Uri - msg.Route = msg.Route.Next - host, port = msg.Request.Host, msg.Request.GetPort() - } - } else { - host, port = msg.Request.Host, msg.Request.GetPort() - } - } - dest = net.JoinHostPort(host, portstr(port)) - return -} - -// RFC3261 16.4 Route Information Preprocessing -// RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy -func (tp *Transport) fixMessagesFromStrictRouters(msg *Msg) { - if msg.Request != nil && msg.Request.Params.Has("lr") && msg.Route != nil && tp.Contact.Uri.CompareHostPort(msg.Request) { - var oldReq, newReq *URI - if msg.Route.Next == nil { - oldReq, newReq = msg.Request, msg.Route.Uri - msg.Request = msg.Route.Uri - msg.Route = nil - } else { - seclast := msg.Route - for ; seclast.Next.Next != nil; seclast = seclast.Next { - } - oldReq, newReq = msg.Request, seclast.Next.Uri - msg.Request = seclast.Next.Uri - seclast.Next = nil - msg.Route.Last() - } - log.Printf("Fixing request URI after strict router traversal: %s -> %s", oldReq, newReq) - } -} - -func (tp *Transport) removeOurRoute(msg *Msg) { - if tp.Contact.CompareHostPort(msg.Route) { - msg.Route = msg.Route.Next - } -} - -func (tp *Transport) removeOurVia(msg *Msg) { - if tp.Via.CompareHostPort(msg.Via) { - msg.Via = msg.Via.Next - } -} - -func addReceived(msg *Msg, addr *net.UDPAddr) { - if msg.Via.Host != addr.IP.String() || int(msg.Via.Port) != addr.Port { - msg.Via.Params["received"] = addr.String() - } -} - -func addTimestamp(msg *Msg, ts time.Time) { - if *timestampTagging { - msg.Via.Params["µsi"] = strconv.FormatInt(ts.UnixNano()/int64(time.Microsecond), 10) - } -} - -func trace(dir, pkt string, addr *net.UDPAddr, t time.Time) { - size := len(pkt) - bar := strings.Repeat("-", 72) - suffix := "\n" - if pkt[len(pkt)-1] == '\n' { - suffix = "" - } - log.Printf( - "%s %d bytes to %s/%s at %s\n"+ - "%s\n"+ - "%s%s"+ - "%s\n", - dir, size, addr.Network(), addr.String(), - t.Format(time.RFC3339Nano), - bar, - pkt, suffix, - bar) -} diff --git a/sip/uri.go b/sip/uri.go index 216950b..0bcd740 100755 --- a/sip/uri.go +++ b/sip/uri.go @@ -41,7 +41,7 @@ var ( type Params map[string]string type URI struct { - Scheme string // sip, tel, etc. (never blank) + Scheme string // sip, tel, etc. User string // sip:USER@host Pass string // sip:user:PASS@host Host string // example.com, 1.2.3.4, etc.