Browse Source

Use a different pattern for receiving messages.

pull/2/head
Justine Alexandra Roberts Tunney 11 years ago
parent
commit
29f2bda583
5 changed files with 166 additions and 178 deletions
  1. +73
    -0
      sip/receiver.go
  2. +50
    -0
      sip/route.go
  3. +27
    -0
      sip/trace.go
  4. +15
    -177
      sip/transport.go
  5. +1
    -1
      sip/uri.go

+ 73
- 0
sip/receiver.go View File

@ -0,0 +1,73 @@
package sip
import (
"github.com/jart/gosip/util"
"log"
"net"
"strconv"
"time"
)
func ReceiveMessages(contact *Addr, sock *net.UDPConn, c chan *Msg, e chan error) {
buf := make([]byte, 2048)
for {
amt, addr, err := sock.ReadFromUDP(buf)
if err != nil {
if !util.IsUseOfClosed(err) {
e <- err
}
return
}
ts := time.Now()
packet := string(buf[0:amt])
if *tracing {
trace("recv", packet, addr, ts)
}
msg, err := ParseMsg(packet)
if err != nil {
log.Println("Dropping SIP message:", err)
continue
}
addReceived(msg, addr)
addTimestamp(msg, ts)
if contact.CompareHostPort(msg.Route) {
msg.Route = msg.Route.Next
}
fixMessagesFromStrictRouters(contact, msg)
c <- msg
}
}
func addReceived(msg *Msg, addr *net.UDPAddr) {
if msg.Via.Host != addr.IP.String() || int(msg.Via.Port) != addr.Port {
msg.Via.Params["received"] = addr.String()
}
}
func addTimestamp(msg *Msg, ts time.Time) {
if *timestampTagging {
msg.Via.Params["µsi"] = strconv.FormatInt(ts.UnixNano()/int64(time.Microsecond), 10)
}
}
// RFC3261 16.4 Route Information Preprocessing
// RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy
func fixMessagesFromStrictRouters(contacts *Addr, msg *Msg) {
if msg.Request != nil && msg.Request.Params.Has("lr") && msg.Route != nil && contacts.Uri.CompareHostPort(msg.Request) {
var oldReq, newReq *URI
if msg.Route.Next == nil {
oldReq, newReq = msg.Request, msg.Route.Uri
msg.Request = msg.Route.Uri
msg.Route = nil
} else {
seclast := msg.Route
for ; seclast.Next.Next != nil; seclast = seclast.Next {
}
oldReq, newReq = msg.Request, seclast.Next.Uri
msg.Request = seclast.Next.Uri
seclast.Next = nil
msg.Route.Last()
}
log.Printf("Fixing request URI after strict router traversal: %s -> %s", oldReq, newReq)
}
}

+ 50
- 0
sip/route.go View File

@ -0,0 +1,50 @@
package sip
import (
"errors"
"net"
)
func RouteMessage(via *Via, contact *Addr, old *Msg) (msg *Msg, dest string, err error) {
var host string
var port uint16
msg = new(Msg)
*msg = *old // Start off with a shallow copy.
if msg.Contact == nil {
msg.Contact = contact
}
if msg.IsResponse {
if via.CompareHostPort(msg.Via) {
msg.Via = msg.Via.Next
}
if received, ok := msg.Via.Params["received"]; ok {
return msg, received, nil
} else {
host, port = msg.Via.Host, msg.Via.Port
}
} else {
if contact.CompareHostPort(msg.Route) {
msg.Route = msg.Route.Next
}
if msg.Route != nil {
if msg.Method == "REGISTER" {
return nil, "", errors.New("Don't route REGISTER requests")
}
if msg.Route.Uri.Params.Has("lr") {
// RFC3261 16.12.1.1 Basic SIP Trapezoid
host, port = msg.Route.Uri.Host, msg.Route.Uri.GetPort()
} else {
// RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy
msg.Route = old.Route.Copy()
msg.Route.Last().Next = &Addr{Uri: msg.Request}
msg.Request = msg.Route.Uri
msg.Route = msg.Route.Next
host, port = msg.Request.Host, msg.Request.GetPort()
}
} else {
host, port = msg.Request.Host, msg.Request.GetPort()
}
}
dest = net.JoinHostPort(host, portstr(port))
return
}

+ 27
- 0
sip/trace.go View File

@ -0,0 +1,27 @@
package sip
import (
"log"
"net"
"strings"
"time"
)
func trace(dir, pkt string, addr net.Addr, t time.Time) {
size := len(pkt)
bar := strings.Repeat("-", 72)
suffix := "\n"
if pkt[len(pkt)-1] == '\n' {
suffix = ""
}
log.Printf(
"%s %d bytes to %s/%s at %s\n"+
"%s\n"+
"%s%s"+
"%s\n",
dir, size, addr.Network(), addr.String(),
t.Format(time.RFC3339Nano),
bar,
pkt, suffix,
bar)
}

+ 15
- 177
sip/transport.go View File

@ -7,11 +7,7 @@ import (
"bytes"
"errors"
"flag"
"github.com/jart/gosip/util"
"log"
"net"
"strconv"
"strings"
"time"
)
@ -35,12 +31,8 @@ type Transport struct {
// the branch parameter... are tricky.
Via *Via
// Returns a linked list of all canonical names and/or IP addresses that may
// be used to contact *this specific* transport.
// Contact that gets put in outbound SIP messages.
Contact *Addr
// Reusable memory for serialization
buf []byte
}
// Creates a new stateless network mechanism for transmitting and receiving SIP
@ -53,36 +45,37 @@ type Transport struct {
// canonical address.
func NewTransport(contact *Addr) (tp *Transport, err error) {
saddr := net.JoinHostPort(contact.Uri.Host, portstr(contact.Uri.Port))
sock, err := net.ListenPacket("udp", saddr)
c, err := net.ListenPacket("udp", saddr)
if err != nil {
return nil, err
}
addr := sock.LocalAddr().(*net.UDPAddr)
contact.Uri.Port = uint16(addr.Port)
sock := c.(*net.UDPConn)
addr := c.LocalAddr().(*net.UDPAddr)
contact = contact.Copy()
contact.Next = nil
contact.Uri.Port = uint16(addr.Port)
contact.Uri.Params["transport"] = addr.Network()
contact.Uri.Params["transport"] = "udp"
tp = &Transport{
C: make(chan *Msg, 32),
E: make(chan error, 1),
Sock: sock.(*net.UDPConn),
Sock: sock,
Contact: contact,
Via: &Via{
Host: contact.Uri.Host,
Port: contact.Uri.Port,
Host: addr.IP.String(),
Port: uint16(addr.Port),
},
}
tp.launchConsumer()
go ReceiveMessages(contact, sock, tp.C, tp.E)
return
}
// Sends a SIP message.
func (tp *Transport) Send(msg *Msg) error {
msg, saddr, err := tp.route(msg)
msg, hostport, err := RouteMessage(tp.Via, tp.Contact, msg)
if err != nil {
return err
}
addr, err := net.ResolveUDPAddr("udp", saddr)
addr, err := net.ResolveUDPAddr("udp", hostport)
if err != nil {
return err
}
@ -103,177 +96,22 @@ func (tp *Transport) Send(msg *Msg) error {
return nil
}
func (tp *Transport) launchConsumer() {
go func() {
for {
msg, err := tp.recv()
if err != nil {
if !util.IsUseOfClosed(err) {
tp.E <- err
}
return
}
tp.C <- msg
}
}()
}
// Receives a SIP message. The received address is injected into the first Via
// header as the "received" param. The Error field of msg should be checked. If
// msg.Status is Status8xxNetworkError, it means the underlying socket died. If
// you set a deadline, you should check: util.IsTimeout(msg.Error).
//
// Warning: Must only be called by one goroutine.
func (tp *Transport) recv() (msg *Msg, err error) {
if tp.buf == nil {
tp.buf = make([]byte, 2048)
}
amt, addr, err := tp.Sock.ReadFromUDP(tp.buf)
if err != nil {
return nil, err
}
ts := time.Now()
packet := string(tp.buf[0:amt])
if *tracing {
trace("recv", packet, addr, ts)
}
// Validation: http://tools.ietf.org/html/rfc3261#section-16.3
msg, err = ParseMsg(packet)
if err != nil {
return nil, err
}
addReceived(msg, addr)
addTimestamp(msg, ts)
err = tp.sanityCheck(msg)
if err != nil {
return nil, err
}
tp.removeOurRoute(msg)
tp.fixMessagesFromStrictRouters(msg)
return
}
// Checks if message is acceptable, otherwise sets msg.Error and returns false.
func (tp *Transport) sanityCheck(msg *Msg) error {
if msg.MaxForwards <= 0 {
go tp.Send(NewResponse(msg, 483))
tp.Send(NewResponse(msg, StatusTooManyHops))
return errors.New("Froot loop detected")
}
if msg.IsResponse {
if msg.Status >= 700 {
go tp.Send(NewResponse(msg, 400))
tp.Send(NewResponse(msg, StatusBadRequest))
return errors.New("Crazy status number")
}
} else {
if msg.CSeqMethod == "" || msg.CSeqMethod != msg.Method {
go tp.Send(NewResponse(msg, 400))
tp.Send(NewResponse(msg, StatusBadRequest))
return errors.New("Bad CSeq")
}
}
return nil
}
func (tp *Transport) route(old *Msg) (msg *Msg, dest string, err error) {
var host string
var port uint16
msg = new(Msg)
*msg = *old // Start off with a shallow copy.
if msg.Contact == nil {
msg.Contact = tp.Contact
}
if msg.IsResponse {
tp.removeOurVia(msg)
if received, ok := msg.Via.Params["received"]; ok {
return msg, received, nil
} else {
host, port = msg.Via.Host, msg.Via.Port
}
} else {
tp.removeOurRoute(msg)
if msg.Route != nil {
if msg.Method == "REGISTER" {
return nil, "", errors.New("Don't route REGISTER requests")
}
if msg.Route.Uri.Params.Has("lr") {
// RFC3261 16.12.1.1 Basic SIP Trapezoid
host, port = msg.Route.Uri.Host, msg.Route.Uri.GetPort()
} else {
// RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy
msg.Route = old.Route.Copy()
msg.Route.Last().Next = &Addr{Uri: msg.Request}
msg.Request = msg.Route.Uri
msg.Route = msg.Route.Next
host, port = msg.Request.Host, msg.Request.GetPort()
}
} else {
host, port = msg.Request.Host, msg.Request.GetPort()
}
}
dest = net.JoinHostPort(host, portstr(port))
return
}
// RFC3261 16.4 Route Information Preprocessing
// RFC3261 16.12.1.2: Traversing a Strict-Routing Proxy
func (tp *Transport) fixMessagesFromStrictRouters(msg *Msg) {
if msg.Request != nil && msg.Request.Params.Has("lr") && msg.Route != nil && tp.Contact.Uri.CompareHostPort(msg.Request) {
var oldReq, newReq *URI
if msg.Route.Next == nil {
oldReq, newReq = msg.Request, msg.Route.Uri
msg.Request = msg.Route.Uri
msg.Route = nil
} else {
seclast := msg.Route
for ; seclast.Next.Next != nil; seclast = seclast.Next {
}
oldReq, newReq = msg.Request, seclast.Next.Uri
msg.Request = seclast.Next.Uri
seclast.Next = nil
msg.Route.Last()
}
log.Printf("Fixing request URI after strict router traversal: %s -> %s", oldReq, newReq)
}
}
func (tp *Transport) removeOurRoute(msg *Msg) {
if tp.Contact.CompareHostPort(msg.Route) {
msg.Route = msg.Route.Next
}
}
func (tp *Transport) removeOurVia(msg *Msg) {
if tp.Via.CompareHostPort(msg.Via) {
msg.Via = msg.Via.Next
}
}
func addReceived(msg *Msg, addr *net.UDPAddr) {
if msg.Via.Host != addr.IP.String() || int(msg.Via.Port) != addr.Port {
msg.Via.Params["received"] = addr.String()
}
}
func addTimestamp(msg *Msg, ts time.Time) {
if *timestampTagging {
msg.Via.Params["µsi"] = strconv.FormatInt(ts.UnixNano()/int64(time.Microsecond), 10)
}
}
func trace(dir, pkt string, addr *net.UDPAddr, t time.Time) {
size := len(pkt)
bar := strings.Repeat("-", 72)
suffix := "\n"
if pkt[len(pkt)-1] == '\n' {
suffix = ""
}
log.Printf(
"%s %d bytes to %s/%s at %s\n"+
"%s\n"+
"%s%s"+
"%s\n",
dir, size, addr.Network(), addr.String(),
t.Format(time.RFC3339Nano),
bar,
pkt, suffix,
bar)
}

+ 1
- 1
sip/uri.go View File

@ -41,7 +41,7 @@ var (
type Params map[string]string
type URI struct {
Scheme string // sip, tel, etc. (never blank)
Scheme string // sip, tel, etc.
User string // sip:USER@host
Pass string // sip:user:PASS@host
Host string // example.com, 1.2.3.4, etc.


Loading…
Cancel
Save