Browse Source

Playing around.

pull/2/head
Justine Alexandra Roberts Tunney 11 years ago
parent
commit
641d98e953
10 changed files with 251 additions and 206 deletions
  1. +9
    -9
      example/echo/echo_test.go
  2. +80
    -83
      example/echo2/echo2_test.go
  3. +3
    -3
      example/options/options_test.go
  4. +42
    -25
      rtp/session.go
  5. +2
    -1
      sdp/sdp.go
  6. +15
    -15
      sip/messages.go
  7. +20
    -0
      sip/method.go
  8. +20
    -24
      sip/msg.go
  9. +1
    -7
      sip/status.go
  10. +59
    -39
      sip/transport.go

+ 9
- 9
example/echo/echo_test.go View File

@ -227,9 +227,9 @@ func TestCallToEchoApp(t *testing.T) {
t.Fatal("read 100 trying:", err)
}
log.Printf("<<< %s\n%s\n", raddr, string(memory[0:amt]))
msg := sip.ParseMsg(string(memory[0:amt]))
if msg.Error != nil {
t.Fatal("parse 100 trying", msg.Error)
msg, err := sip.ParseMsg(string(memory[0:amt]))
if err != nil {
t.Fatal("parse 100 trying", err)
}
if !msg.IsResponse || msg.Status != 100 || msg.Phrase != "Trying" {
t.Fatal("didn't get 100 trying :[")
@ -242,9 +242,9 @@ func TestCallToEchoApp(t *testing.T) {
t.Fatal("read 200 ok:", err)
}
log.Printf("<<< %s\n%s\n", raddr, string(memory[0:amt]))
msg = sip.ParseMsg(string(memory[0:amt]))
if msg.Error != nil {
t.Fatal("parse 200 ok:", msg.Error)
msg, err = sip.ParseMsg(string(memory[0:amt]))
if err != nil {
t.Fatal("parse 200 ok:", err)
}
if !msg.IsResponse || msg.Status != 200 || msg.Phrase != "OK" {
t.Fatal("wanted 200 ok but got:", msg.Status, msg.Phrase)
@ -346,9 +346,9 @@ func TestCallToEchoApp(t *testing.T) {
if err != nil {
t.Fatal(err)
}
msg = sip.ParseMsg(string(memory[0:amt]))
if msg.Error != nil {
t.Fatal(msg.Error)
msg, err = sip.ParseMsg(string(memory[0:amt]))
if err != nil {
t.Fatal(err)
}
if !msg.IsResponse || msg.Status != 200 || msg.Phrase != "OK" {
t.Fatal("wanted bye response 200 ok but got:", msg.Status, msg.Phrase)


+ 80
- 83
example/echo2/echo2_test.go View File

@ -15,27 +15,20 @@ import (
)
func TestCallToEchoApp(t *testing.T) {
log.SetFlags(log.LstdFlags | log.Lmicroseconds | log.Lshortfile)
to := &sip.Addr{Uri: &sip.URI{User: "echo", Host: "127.0.0.1", Port: 5060}}
from := &sip.Addr{Uri: &sip.URI{Host: "127.0.0.1"}}
// Create the SIP UDP transport layer.
tp, err := sip.NewTransport(from)
if err != nil {
t.Fatal(err)
}
defer tp.Sock.Close()
// Used to notify main thread when subthreads die.
rtpDeath := make(chan bool, 2)
// Create an RTP session.
session, err := rtp.NewSession(from.Uri.Host)
// Create an RTP media session.
rs, err := rtp.NewSession(from.Uri.Host)
if err != nil {
t.Fatal("rtp listen:", err)
}
defer session.Sock.Close()
rtpaddr := session.Sock.LocalAddr().(*net.UDPAddr)
rtppeerChan := make(chan *net.UDPAddr, 1)
defer rs.Sock.Close()
rtpaddr := rs.Sock.LocalAddr().(*net.UDPAddr)
// Create an RTP audio sender.
rtpPeer := make(chan *net.UDPAddr, 1)
go func() {
var frame rtp.Frame
awgn := dsp.NewAWGN(-25.0)
@ -43,91 +36,60 @@ func TestCallToEchoApp(t *testing.T) {
defer ticker.Stop()
for {
select {
case rs.Peer = <-rtpPeer:
case <-ticker.C:
// Send an audio frame containing comfort noise.
for n := 0; n < 160; n++ {
frame[n] = awgn.Get()
}
err := session.Send(frame)
err := rs.Send(&frame)
if err != nil {
if !util.IsUseOfClosed(err) {
t.Error("rtp write", err)
}
rtpDeath <- true
return
}
case session.Peer = <-rtppeerChan:
if session.Peer == nil {
return
}
}
}
}()
defer func() { rtppeerChan <- nil }()
// Create an RTP message consumer.
go func() {
var frame rtp.Frame
for {
err := session.Recv(frame)
if err != nil {
if !util.IsUseOfClosed(err) {
t.Errorf("rtp read: %s %#v", err, err)
}
rtpDeath <- true
return
}
}
}()
// Create a SIP message consumer.
sipChan := make(chan *sip.Msg, 32)
go func() {
for {
msg := tp.Recv()
if msg.Error != nil && util.IsUseOfClosed(msg.Error) {
return
}
sipChan <- msg
}
}()
// Create the SIP UDP transport layer.
tp, err := sip.NewTransport(from)
if err != nil {
t.Fatal(err)
}
defer tp.Sock.Close()
// Send an INVITE message with an SDP.
invite := sip.NewRequest(tp, "INVITE", to, from)
invite := sip.NewRequest(tp, sip.MethodInvite, to, from)
sip.AttachSDP(invite, sdp.New(rtpaddr, sdp.ULAWCodec, sdp.DTMFCodec))
err = tp.Send(invite)
if err != nil {
t.Fatal(err)
}
// Set up some reliability in case a UDP packet drops.
resend := invite
resends := 0
resendInterval := 50 * time.Millisecond
resendTimer := time.After(resendInterval)
// Hangup after two seconds.
deathTimer := time.After(200 * time.Millisecond)
// Create a SIP dialog handler.
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
var answered bool
var msg *sip.Msg
loop:
for {
select {
case <-ticker.C:
if answered {
err = tp.Send(sip.NewBye(invite, msg))
} else {
err = tp.Send(sip.NewCancel(invite))
}
if err != nil {
t.Error(err)
}
case <-rtpDeath:
if answered {
err = tp.Send(sip.NewBye(invite, msg))
} else {
err = tp.Send(sip.NewCancel(invite))
}
if err != nil {
t.Error(err)
}
case msg = <-sipChan:
if msg.Error != nil {
t.Fatal(msg.Error)
}
case err = <-rs.E:
t.Fatal("RTP network error", err)
case err = <-tp.E:
t.Fatal("SIP network error", err)
case <-rs.C:
// Do nothing with inbound audio.
case msg = <-tp.C:
if msg.IsResponse {
if msg.Status >= sip.StatusOK && msg.CSeq == invite.CSeq {
err = tp.Send(sip.NewAck(invite, msg))
@ -136,29 +98,38 @@ func TestCallToEchoApp(t *testing.T) {
}
}
if msg.Status < sip.StatusOK {
log.Printf("Provisional %d %s", msg.Status, msg.Phrase)
if msg.Status == sip.StatusTrying {
log.Printf("Remote SIP endpoint exists!")
resendTimer = nil
} else if msg.Status == sip.StatusRinging {
log.Printf("Ringing!")
} else if msg.Status == sip.StatusSessionProgress {
log.Printf("Probably Ringing!")
} else {
log.Printf("Provisional %d %s", msg.Status, msg.Phrase)
}
} else if msg.Status == sip.StatusOK {
if msg.CSeqMethod == "INVITE" {
if msg.CSeqMethod == sip.MethodInvite {
log.Printf("Answered!")
answered = true
} else if msg.CSeqMethod == "BYE" {
} else if msg.CSeqMethod == sip.MethodBye {
log.Printf("Hungup!")
return
} else if msg.CSeqMethod == "CANCEL" {
break loop
} else if msg.CSeqMethod == sip.MethodCancel {
log.Printf("Cancelled!")
return
break loop
}
} else if msg.Status > sip.StatusOK {
t.Errorf("Got %d %s", msg.Status, msg.Phrase)
return
}
if msg.Headers["Content-Type"] == "application/sdp" {
if msg.Headers["Content-Type"] == sdp.ContentType {
log.Printf("Establishing media session")
rsdp, err := sdp.Parse(msg.Payload)
ms, err := sdp.Parse(msg.Payload)
if err != nil {
t.Fatal("failed to parse sdp", err)
}
rtppeerChan <- &net.UDPAddr{IP: net.ParseIP(rsdp.Addr), Port: int(rsdp.Audio.Port)}
rtpPeer <- &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)}
}
} else {
if msg.Method == "BYE" {
@ -167,9 +138,35 @@ func TestCallToEchoApp(t *testing.T) {
if err != nil {
t.Fatal(err)
}
return
break loop
}
}
case <-resendTimer:
if resends == 2 {
t.Fatal("Failed to send", resend.Method)
}
resends++
err = tp.Send(resend)
if err != nil {
t.Fatal(err)
}
case <-deathTimer:
resends = 0
resendTimer = time.After(resendInterval)
if answered {
resend = sip.NewBye(invite, msg)
} else {
resend = sip.NewCancel(invite)
}
err = tp.Send(resend)
if err != nil {
t.Error(err)
}
}
}
// The dialog has shut down cleanly.
if !answered {
t.Error("Call didn't get answered!")
}
}

+ 3
- 3
example/options/options_test.go View File

@ -81,9 +81,9 @@ func TestOptions(t *testing.T) {
t.Fatal(err)
}
msg := sip.ParseMsg(string(memory[0:amt]))
if msg.Error != nil {
t.Fatal(msg.Error)
msg, err := sip.ParseMsg(string(memory[0:amt]))
if err != nil {
t.Fatal(err)
}
if !msg.IsResponse || msg.Status != 200 || msg.Phrase != "OK" {


+ 42
- 25
rtp/session.go View File

@ -26,6 +26,10 @@ type Frame [160]int16
// support for RTCP is provided.
type Session struct {
// Channel to which received RTP frames and errors are published.
C chan *Frame
E chan error
// Underlying UDP socket.
Sock *net.UDPConn
@ -38,17 +42,18 @@ type Session struct {
ibuf []byte
obuf []byte
phdr Header
}
// Creates a new RTP µLaw 20ptime session listening on host with a random port
// selected from the range [16384,32768].
func NewSession(host string) (s *Session, err error) {
func NewSession(host string) (rs *Session, err error) {
sock, err := listenRTP(host)
if err != nil {
return nil, err
}
return &Session{
rs = &Session{
C: make(chan *Frame, 32),
E: make(chan error, 1),
Sock: sock.(*net.UDPConn),
Header: Header{
PT: sdp.ULAWCodec.PT,
@ -56,53 +61,65 @@ func NewSession(host string) (s *Session, err error) {
TS: 0,
Ssrc: rand.Uint32(),
},
}, err
obuf: make([]byte, HeaderSize+160),
ibuf: make([]byte, 2048),
}
rs.launchConsumer()
return
}
func (s *Session) Send(frame Frame) (err error) {
if s.Peer == nil {
func (rs *Session) Send(frame *Frame) (err error) {
if rs.Peer == nil {
return nil
}
if s.obuf == nil {
s.obuf = make([]byte, HeaderSize+160)
}
s.Header.Write(s.obuf)
s.Header.TS += 160
s.Header.Seq++
rs.Header.Write(rs.obuf)
rs.Header.TS += 160
rs.Header.Seq++
for n := 0; n < 160; n++ {
s.obuf[HeaderSize+n] = byte(dsp.LinearToUlaw(int64(frame[n])))
rs.obuf[HeaderSize+n] = byte(dsp.LinearToUlaw(int64(frame[n])))
}
_, err = s.Sock.WriteTo(s.obuf, s.Peer)
_, err = rs.Sock.WriteTo(rs.obuf, rs.Peer)
return
}
func (s *Session) Recv(frame Frame) (err error) {
if s.ibuf == nil {
s.ibuf = make([]byte, 2048)
}
func (rs *Session) launchConsumer() {
go func() {
for {
frame, err := rs.recv()
if err != nil {
rs.E <- err
return
}
rs.C <- frame
}
}()
}
func (rs *Session) recv() (frame *Frame, err error) {
frame = new(Frame)
for {
amt, _, err := s.Sock.ReadFrom(s.ibuf)
amt, _, err := rs.Sock.ReadFrom(rs.ibuf)
if err != nil {
return err
return nil, err
}
// TODO(jart): Verify source address?
// TODO(jart): Packet reordering? Drop duplicate packets?
// TODO(jart): DTMF?
var phdr Header
err = phdr.Read(s.ibuf)
err = phdr.Read(rs.ibuf)
if err != nil {
return err
return nil, err
}
if phdr.PT != sdp.ULAWCodec.PT {
continue
}
if amt != HeaderSize+160 {
return errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt))
return nil, errors.New(fmt.Sprintf("Unexpected RTP packet size: %d", amt))
}
for n := 0; n < 160; n++ {
frame[n] = int16(dsp.UlawToLinear(int64(s.ibuf[HeaderSize+n])))
frame[n] = int16(dsp.UlawToLinear(int64(rs.ibuf[HeaderSize+n])))
}
return nil
return frame, nil
}
}


+ 2
- 1
sdp/sdp.go View File

@ -64,7 +64,8 @@ import (
)
const (
MaxLength = 1450
ContentType = "application/sdp"
MaxLength = 1450
)
// SDP represents a Session Description Protocol SIP payload.


+ 15
- 15
sip/messages.go View File

@ -8,16 +8,16 @@ import (
const (
GosipUserAgent = "gosip/1.o"
GosipAllow = "INVITE, ACK, CANCEL, BYE, OPTIONS"
GosipAllow = MethodInvite + ", " + MethodAck + ", " + MethodCancel + ", " + MethodBye + ", " + MethodOptions
)
func NewRequest(tp *Transport, method string, to, from *Addr) *Msg {
return &Msg{
Method: method,
Request: to.Uri.Copy(),
Request: to.Uri,
Via: tp.Via.Copy().Branch(),
From: from.Or(tp.Contact).Tag(),
To: to.Copy(),
To: to,
Contact: tp.Contact,
CallID: util.GenerateCallID(),
CSeq: util.GenerateCSeq(),
@ -45,7 +45,7 @@ func NewResponse(msg *Msg, status int) *Msg {
// http://tools.ietf.org/html/rfc3261#section-17.1.1.3
func NewAck(original, msg *Msg) *Msg {
return &Msg{
Method: "ACK",
Method: MethodAck,
Request: original.Request,
Via: original.Via.Copy().SetNext(nil),
From: original.From,
@ -59,18 +59,18 @@ func NewAck(original, msg *Msg) *Msg {
}
func NewCancel(invite *Msg) *Msg {
if invite.IsResponse || invite.Method != "INVITE" {
if invite.IsResponse || invite.Method != MethodInvite {
log.Printf("Can't CANCEL anything non-INVITE:\n%s", invite)
}
return &Msg{
Method: "CANCEL",
Method: MethodCancel,
Request: invite.Request,
Via: invite.Via,
From: invite.From,
To: invite.To,
CallID: invite.CallID,
CSeq: invite.CSeq,
CSeqMethod: "CANCEL",
CSeqMethod: MethodCancel,
Route: invite.Route,
Headers: DefaultHeaders(),
}
@ -78,14 +78,14 @@ func NewCancel(invite *Msg) *Msg {
func NewBye(invite, last *Msg) *Msg {
return &Msg{
Method: "BYE",
Method: MethodBye,
Request: last.Contact.Uri,
Via: invite.Via,
From: invite.From,
To: last.To,
CallID: invite.CallID,
CSeq: invite.CSeq + 1,
CSeqMethod: "BYE",
CSeqMethod: MethodBye,
Route: last.RecordRoute.Reversed(),
Headers: DefaultHeaders(),
}
@ -101,19 +101,19 @@ func ResponseMatch(msg, resp *Msg) bool {
}
// Returns true if `ack` can be considered an appropriate response to `msg`.
// we don't enforce a matching Via because some VoIP software will generate a
// We don't enforce a matching Via because some VoIP software will generate a
// new branch for ACKs.
func AckMatch(msg, ack *Msg) bool {
return (!ack.IsResponse &&
ack.Method == "ACK" &&
ack.Method == MethodAck &&
ack.CSeq == msg.CSeq &&
ack.CSeqMethod == "ACK" &&
ack.CSeqMethod == MethodAck &&
ack.Via.Last().CompareAddr(msg.Via))
}
func AttachSDP(msg *Msg, sdp *sdp.SDP) {
msg.Headers["Content-Type"] = "application/sdp"
msg.Payload = sdp.String()
func AttachSDP(msg *Msg, ms *sdp.SDP) {
msg.Headers["Content-Type"] = sdp.ContentType
msg.Payload = ms.String()
}
func DefaultHeaders() Headers {


+ 20
- 0
sip/method.go View File

@ -0,0 +1,20 @@
// SIP Protocol Method Definitions
package sip
const (
MethodInvite = "INVITE" // Indicates a client is being invited to participate in a call session.
MethodAck = "ACK" // Confirms that the client has received a final response to an INVITE request.
MethodBye = "BYE" // Terminates a call and can be sent by either the caller or the callee.
MethodCancel = "CANCEL" // Cancels any pending request.
MethodOptions = "OPTIONS" // Queries the capabilities of servers.
MethodRegister = "REGISTER" // Registers the address listed in the To header field with a SIP server.
MethodPrack = "PRACK" // Provisional acknowledgement.
MethodSubscribe = "SUBSCRIBE" // Subscribes for an Event of Notification from the Notifier.
MethodNotify = "NOTIFY" // Notify the subscriber of a new Event.
MethodPublish = "PUBLISH" // Publishes an event to the Server.
MethodInfo = "INFO" // Sends mid-session information that does not modify the session state.
MethodRefer = "REFER" // Asks recipient to issue SIP request (call transfer.)
MethodMessage = "MESSAGE" // Transports instant messages using SIP.
MethodUpdate = "UPDATE" // Modifies the state of a session without changing the state of the dialog.
)

+ 20
- 24
sip/msg.go View File

@ -18,7 +18,6 @@ type Headers map[string]string
// These fields are never nil unless otherwise specified.
type Msg struct {
// Special non-SIP fields.
Error error // Set to indicate an error with this message
SourceAddr *net.UDPAddr // Set by transport layer as received address
// Fields that aren't headers.
@ -64,26 +63,23 @@ func (msg *Msg) String() string {
}
// Parses a SIP message into a data structure. This takes ~70 µs on average.
func ParseMsg(packet string) (msg *Msg) {
func ParseMsg(packet string) (msg *Msg, err error) {
msg = new(Msg)
if packet == "" {
msg.Error = errors.New("Empty msg")
return msg
return nil, errors.New("Empty msg")
}
if n := strings.Index(packet, "\r\n\r\n"); n > 0 {
packet, msg.Payload = packet[0:n], packet[n+4:]
}
lines := strings.Split(packet, "\r\n")
if lines == nil || len(lines) < 2 {
msg.Error = errors.New("Too few lines")
return msg
return nil, errors.New("Too few lines")
}
var k, v string
var okVia, okTo, okFrom, okCallID, okComputer bool
err := msg.parseFirstLine(lines[0])
err = msg.parseFirstLine(lines[0])
if err != nil {
msg.Error = err
return msg
return nil, err
}
hdrs := lines[1:]
msg.Headers = make(map[string]string, len(hdrs))
@ -118,7 +114,7 @@ func ParseMsg(packet string) (msg *Msg) {
okVia = true
*viap, err = ParseVia(v)
if err != nil {
msg.Error = errors.New("Bad Via header: " + err.Error())
return nil, errors.New("Bad Via header: " + err.Error())
} else {
viap = &(*viap).Next
}
@ -126,18 +122,18 @@ func ParseMsg(packet string) (msg *Msg) {
okTo = true
msg.To, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad To header: " + err.Error())
return nil, errors.New("Bad To header: " + err.Error())
}
case "from":
okFrom = true
msg.From, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad From header: " + err.Error())
return nil, errors.New("Bad From header: " + err.Error())
}
case "contact":
*contactp, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad Contact header: " + err.Error())
return nil, errors.New("Bad Contact header: " + err.Error())
} else {
contactp = &(*contactp).Last().Next
}
@ -151,66 +147,66 @@ func ParseMsg(packet string) (msg *Msg) {
}
}
if !okComputer {
msg.Error = errors.New("Bad CSeq Header")
return nil, errors.New("Bad CSeq Header")
}
case "content-length":
if cl, err := strconv.Atoi(v); err == nil {
if cl != len(msg.Payload) {
msg.Error = errors.New(fmt.Sprintf(
return nil, errors.New(fmt.Sprintf(
"Content-Length (%d) differs from payload length (%d)",
cl, len(msg.Payload)))
}
} else {
msg.Error = errors.New("Bad Content-Length header")
return nil, errors.New("Bad Content-Length header")
}
case "expires":
if cl, err := strconv.Atoi(v); err == nil && cl >= 0 {
msg.Expires = cl
} else {
msg.Error = errors.New("Bad Expires header")
return nil, errors.New("Bad Expires header")
}
case "min-expires":
if cl, err := strconv.Atoi(v); err == nil && cl > 0 {
msg.MinExpires = cl
} else {
msg.Error = errors.New("Bad Min-Expires header")
return nil, errors.New("Bad Min-Expires header")
}
case "max-forwards":
if cl, err := strconv.Atoi(v); err == nil && cl > 0 {
msg.MaxForwards = cl
} else {
msg.Error = errors.New("Bad Max-Forwards header")
return nil, errors.New("Bad Max-Forwards header")
}
case "route":
*routep, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad Route header: " + err.Error())
return nil, errors.New("Bad Route header: " + err.Error())
} else {
routep = &(*routep).Last().Next
}
case "record-route":
*rroutep, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad Record-Route header: " + err.Error())
return nil, errors.New("Bad Record-Route header: " + err.Error())
} else {
rroutep = &(*rroutep).Last().Next
}
case "p-asserted-identity":
msg.Paid, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad P-Asserted-Identity header: " + err.Error())
return nil, errors.New("Bad P-Asserted-Identity header: " + err.Error())
}
case "remote-party-id":
msg.Rpid, err = ParseAddr(v)
if err != nil {
msg.Error = errors.New("Bad Remote-Party-ID header: " + err.Error())
return nil, errors.New("Bad Remote-Party-ID header: " + err.Error())
}
default:
msg.Headers[k] = v
}
}
if !okVia || !okTo || !okFrom || !okCallID || !okComputer {
msg.Error = errors.New("Missing mandatory headers")
return nil, errors.New("Missing mandatory headers")
}
return
}


+ 1
- 7
sip/status.go View File

@ -1,4 +1,4 @@
// SIP Protocol Statuses
// SIP Protocol Status Definitions
//
// http://www.iana.org/assignments/sip-parameters
@ -92,10 +92,6 @@ const (
StatusDoesNotExistAnywhere = 604
StatusNotAcceptable606 = 606
StatusDialogTerminated = 687
// 8xx: Special gosip errors
Status8xxProgrammerError = 800
Status8xxNetworkError = 801
)
func Phrase(status int) string {
@ -177,6 +173,4 @@ var phrases = map[int]string{
StatusDoesNotExistAnywhere: "Does Not Exist Anywhere",
StatusNotAcceptable606: "Not Acceptable",
StatusDialogTerminated: "Dialog Terminated",
Status8xxProgrammerError: "Programmer Error",
Status8xxNetworkError: "Network Error",
}

+ 59
- 39
sip/transport.go View File

@ -23,6 +23,10 @@ var (
// Transport sends and receives SIP messages over UDP with stateless routing.
type Transport struct {
// Channel to which received SIP messages and errors are published.
C chan *Msg
E chan error
// Underlying UDP socket.
Sock *net.UDPConn
@ -58,14 +62,17 @@ func NewTransport(contact *Addr) (tp *Transport, err error) {
contact = contact.Copy()
contact.Uri.Port = uint16(addr.Port)
contact.Uri.Params["transport"] = addr.Network()
return &Transport{
tp = &Transport{
C: make(chan *Msg, 32),
Sock: sock.(*net.UDPConn),
Contact: contact,
Via: &Via{
Host: contact.Uri.Host,
Port: contact.Uri.Port,
},
}, nil
}
tp.launchConsumer()
return
}
// Sends a SIP message.
@ -86,7 +93,7 @@ func (tp *Transport) Send(msg *Msg) error {
var b bytes.Buffer
msg.Append(&b)
if *tracing {
tp.trace("send", b.String(), addr, ts)
trace("send", b.String(), addr, ts)
}
_, err = tp.Sock.WriteTo(b.Bytes(), addr)
if err != nil {
@ -95,76 +102,71 @@ func (tp *Transport) Send(msg *Msg) error {
return nil
}
func (tp *Transport) launchConsumer() {
go func() {
for {
msg, err := tp.recv()
if err != nil {
tp.E <- err
return
}
tp.C <- msg
}
}()
}
// Receives a SIP message. The received address is injected into the first Via
// header as the "received" param. The Error field of msg should be checked. If
// msg.Status is Status8xxNetworkError, it means the underlying socket died. If
// you set a deadline, you should check: util.IsTimeout(msg.Error).
//
// Warning: Must only be called by one goroutine.
func (tp *Transport) Recv() *Msg {
func (tp *Transport) recv() (msg *Msg, err error) {
if tp.buf == nil {
tp.buf = make([]byte, 2048)
}
amt, addr, err := tp.Sock.ReadFromUDP(tp.buf)
if err != nil {
return &Msg{Status: Status8xxNetworkError, Error: err}
return nil, err
}
ts := time.Now()
packet := string(tp.buf[0:amt])
if *tracing {
tp.trace("recv", packet, addr, ts)
trace("recv", packet, addr, ts)
}
// Validation: http://tools.ietf.org/html/rfc3261#section-16.3
msg := ParseMsg(packet)
if msg.Error != nil {
return msg
msg, err = ParseMsg(packet)
if err != nil {
return nil, err
}
addReceived(msg, addr)
addTimestamp(msg, ts)
if !tp.sanityCheck(msg) {
return msg
err = tp.sanityCheck(msg)
if err != nil {
return nil, err
}
tp.preprocess(msg)
return msg
}
func (tp *Transport) trace(dir, pkt string, addr *net.UDPAddr, t time.Time) {
size := len(pkt)
bar := strings.Repeat("-", 72)
suffix := "\n"
if pkt[len(pkt)-1] == '\n' {
suffix = ""
}
log.Printf(
"%s %d bytes to %s/%s at %s\n"+
"%s\n"+
"%s%s"+
"%s\n",
dir, size, addr.Network(), addr.String(),
t.Format(time.RFC3339Nano),
bar,
pkt, suffix,
bar)
return
}
// Checks if message is acceptable, otherwise sets msg.Error and returns false.
func (tp *Transport) sanityCheck(msg *Msg) bool {
func (tp *Transport) sanityCheck(msg *Msg) error {
if msg.MaxForwards <= 0 {
go tp.Send(NewResponse(msg, 483))
msg.Error = errors.New("Froot loop detected")
return errors.New("Froot loop detected")
}
if msg.IsResponse {
if msg.Status >= 700 {
go tp.Send(NewResponse(msg, 400))
msg.Error = errors.New("Crazy status number")
return errors.New("Crazy status number")
}
} else {
if msg.CSeqMethod == "" || msg.CSeqMethod != msg.Method {
go tp.Send(NewResponse(msg, 400))
msg.Error = errors.New("Bad CSeq")
return errors.New("Bad CSeq")
}
}
return msg.Error == nil
return nil
}
// Perform some ingress message mangling.
@ -195,13 +197,12 @@ func (tp *Transport) preprocess(msg *Msg) {
}
}
func (tp *Transport) route(old *Msg) (msg *Msg, saddr string, err error) {
func (tp *Transport) route(old *Msg) (msg *Msg, dest string, err error) {
var host string
var port uint16
msg = new(Msg)
*msg = *old // Start off with a shallow copy.
if msg.IsResponse {
msg.Via = old.Via.Copy()
if msg.Via.CompareAddr(tp.Via) {
// In proxy scenarios we have to remove our own Via.
msg.Via = msg.Via.Next
@ -242,7 +243,7 @@ func (tp *Transport) route(old *Msg) (msg *Msg, saddr string, err error) {
host, port = msg.Request.Host, msg.Request.Port
}
}
saddr = util.HostPortToString(host, port)
dest = util.HostPortToString(host, port)
return
}
@ -257,3 +258,22 @@ func addTimestamp(msg *Msg, ts time.Time) {
msg.Via.Params["µsi"] = strconv.FormatInt(ts.UnixNano()/int64(time.Microsecond), 10)
}
}
func trace(dir, pkt string, addr *net.UDPAddr, t time.Time) {
size := len(pkt)
bar := strings.Repeat("-", 72)
suffix := "\n"
if pkt[len(pkt)-1] == '\n' {
suffix = ""
}
log.Printf(
"%s %d bytes to %s/%s at %s\n"+
"%s\n"+
"%s%s"+
"%s\n",
dir, size, addr.Network(), addr.String(),
t.Format(time.RFC3339Nano),
bar,
pkt, suffix,
bar)
}

Loading…
Cancel
Save