Browse Source

Got a dialog thing working.

pull/2/head
Justine Alexandra Roberts Tunney 11 years ago
parent
commit
aa82bc06f9
8 changed files with 429 additions and 24 deletions
  1. +81
    -0
      example/echo3/echo3_test.go
  2. +4
    -4
      rtp/session.go
  3. +266
    -0
      sip/dialog.go
  4. +3
    -0
      sip/messages.go
  5. +26
    -16
      sip/msg.go
  6. +47
    -3
      sip/route.go
  7. +1
    -1
      sip/trace.go
  8. +1
    -0
      sip/transport.go

+ 81
- 0
example/echo3/echo3_test.go View File

@ -0,0 +1,81 @@
// Echo test that uses slightly higher level APIs.
package echo2_test
import (
"github.com/jart/gosip/dsp"
"github.com/jart/gosip/rtp"
"github.com/jart/gosip/sdp"
"github.com/jart/gosip/sip"
"net"
"testing"
"time"
)
func TestCallToEchoApp(t *testing.T) {
invite := &sip.Msg{
Request: &sip.URI{User: "echo", Host: "127.0.0.1", Port: 5060},
}
// Create RTP audio session.
rs, err := rtp.NewSession("")
if err != nil {
t.Fatal(err)
}
defer rs.Sock.Close()
rtpaddr := rs.Sock.LocalAddr().(*net.UDPAddr)
sip.AttachSDP(invite, sdp.New(rtpaddr, sdp.ULAWCodec, sdp.DTMFCodec))
// Create a SIP phone call.
dl, err := sip.NewDialog(invite)
if err != nil {
t.Fatal(err)
}
// We're going to send white noise every 20ms.
var frame rtp.Frame
awgn := dsp.NewAWGN(-45.0)
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
// Hangup after 200ms.
death := time.After(200 * time.Millisecond)
// Let's GO!
var answered bool
for {
select {
case err := <-rs.E:
t.Error("RTP recv failed:", err)
dl.Hangup <- true
case <-rs.C:
// Do nothing with received audio.
case <-ticker.C:
for n := 0; n < 160; n++ {
frame[n] = awgn.Get()
}
if err := rs.Send(&frame); err != nil {
t.Fatal("RTP send failed:", err)
}
case <-dl.OnErr:
t.Error(err)
return
case state := <-dl.OnState:
switch state {
case sip.DialogAnswered:
answered = true
case sip.DialogHangup:
return
}
case ms := <-dl.OnSDP:
rs.Peer = &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)}
case <-death:
dl.Hangup <- true
}
}
// The dialog has shut down cleanly. Was it answered?
if !answered {
t.Error("Call didn't get answered!")
}
}

+ 4
- 4
rtp/session.go View File

@ -7,6 +7,7 @@ import (
"fmt"
"github.com/jart/gosip/dsp"
"github.com/jart/gosip/sdp"
"log"
"math/rand"
"net"
"strconv"
@ -128,11 +129,10 @@ func listenRTP(host string) (sock net.PacketConn, err error) {
port := rtpBindPortMin + rand.Int63()%(rtpBindPortMax-rtpBindPortMin+1)
saddr := net.JoinHostPort(host, strconv.FormatInt(port, 10))
sock, err = net.ListenPacket("udp", saddr)
if err != nil {
if !strings.Contains(err.Error(), "address already in use") {
break
}
if err == nil || !strings.Contains(err.Error(), "address already in use") {
break
}
log.Println("RTP listen congestion:", saddr)
}
return
}

+ 266
- 0
sip/dialog.go View File

@ -0,0 +1,266 @@
// SIP Transport Layer. Responsible for serializing messages to/from
// your network.
package sip
import (
"bytes"
"errors"
"github.com/jart/gosip/sdp"
"github.com/jart/gosip/util"
"log"
"net"
"time"
)
const (
DialogConnected = 1
DialogRinging = 2
DialogAnswered = 3
DialogHangup = 4
resendInterval = 200 * time.Millisecond
maxResends = 2
)
// Dialog represents an outbound phone call.
type Dialog struct {
OnErr <-chan error
OnState <-chan int
OnSDP <-chan *sdp.SDP
Hangup chan<- bool
}
type dialogState struct {
sock *net.UDPConn
sockMsgs <-chan *Msg
sockErrs <-chan error
errChan chan<- error
sdpChan chan<- *sdp.SDP
stateChan chan<- int
doHangupChan <-chan bool
routes *AddressRoute
invite *Msg
response *Msg
resend *Msg
resends int
timer <-chan time.Time
}
// NewDialog creates a phone call.
func NewDialog(invite *Msg) (dl *Dialog, err error) {
invite, host, port, err := RouteMessage(nil, nil, invite)
if err != nil {
return nil, err
}
routes, err := RouteAddress(host, port)
if err != nil {
return nil, err
}
errChan := make(chan error)
sdpChan := make(chan *sdp.SDP)
stateChan := make(chan int)
doHangupChan := make(chan bool, 4)
dls := &dialogState{
errChan: errChan,
sdpChan: sdpChan,
stateChan: stateChan,
doHangupChan: doHangupChan,
invite: invite,
routes: routes,
}
go dls.run()
return &Dialog{
OnErr: errChan,
OnState: stateChan,
OnSDP: sdpChan,
Hangup: doHangupChan,
}, nil
}
func (dls *dialogState) popRoute() bool {
if dls.routes == nil {
dls.errChan <- errors.New("failed to contact host")
return false
}
dls.cleanup()
conn, err := net.Dial("udp", dls.routes.Address)
dls.routes = dls.routes.Next
if err != nil {
log.Println("net.Dial() failed:", err)
return dls.popRoute()
}
dls.sock = conn.(*net.UDPConn)
laddr := conn.LocalAddr().(*net.UDPAddr)
lhost := laddr.IP.String()
lport := uint16(laddr.Port)
dls.invite.Via = &Via{
Host: lhost,
Port: lport,
Params: Params{"branch": util.GenerateBranch()},
}
dls.invite.Contact = &Addr{
Uri: &URI{
Scheme: "sip",
Host: lhost,
Port: lport,
Params: Params{"transport": "udp"},
},
}
PopulateMessage(nil, nil, dls.invite)
dls.resend = dls.invite
dls.timer = time.After(resendInterval)
dls.resends = 0
sockMsgs := make(chan *Msg)
sockErrs := make(chan error)
dls.sockMsgs = sockMsgs
dls.sockErrs = sockErrs
go ReceiveMessages(dls.invite.Contact, dls.sock, sockMsgs, sockErrs)
return dls.send(dls.resend)
}
func (dls *dialogState) run() {
defer dls.sabotage()
defer dls.cleanup()
if !dls.popRoute() {
return
}
for {
select {
case err := <-dls.sockErrs:
if util.IsRefused(err) {
if !dls.popRoute() {
return
}
} else {
dls.errChan <- err
return
}
case <-dls.timer:
if dls.resends < maxResends {
if !dls.send(dls.resend) {
return
}
dls.resends++
dls.timer = time.After(resendInterval)
} else {
if !dls.popRoute() {
return
}
}
case <-dls.doHangupChan:
if !dls.hangup() {
return
}
case msg := <-dls.sockMsgs:
if msg.CallID != dls.invite.CallID {
continue
}
if msg.IsResponse {
if msg.Status >= StatusOK && msg.CSeq == dls.invite.CSeq {
if msg.Contact != nil {
if !dls.send(NewAck(dls.invite, msg)) {
return
}
}
if msg.Status > StatusOK {
dls.errChan <- errors.New(msg.Phrase)
return
}
}
switch msg.Status {
case StatusTrying:
dls.routes = nil
dls.timer = nil
dls.stateChan <- DialogConnected
case StatusRinging, StatusSessionProgress:
dls.stateChan <- DialogRinging
case StatusOK:
switch msg.CSeqMethod {
case dls.invite.Method:
if dls.response == nil {
dls.stateChan <- DialogAnswered
}
dls.response = msg
case MethodBye, MethodCancel:
dls.stateChan <- DialogHangup
return
default:
dls.errChan <- errors.New("Bad CSeq Method")
return
}
}
if msg.Headers["Content-Type"] == sdp.ContentType {
ms, err := sdp.Parse(msg.Payload)
if err != nil {
log.Println("Bad SDP payload:", err)
} else {
dls.sdpChan <- ms
}
}
} else {
if msg.MaxForwards <= 0 {
if !dls.send(NewResponse(msg, StatusTooManyHops)) {
return
}
dls.errChan <- errors.New("Froot loop detected")
return
}
switch msg.Method {
case MethodBye:
if !dls.send(NewResponse(msg, StatusOK)) {
return
}
dls.stateChan <- DialogHangup
return
}
}
}
}
}
func (dls *dialogState) send(msg *Msg) bool {
// TODO(jart): Double-check route matches socket binding.
if msg.MaxForwards > 0 {
msg.MaxForwards--
}
ts := time.Now()
addTimestamp(msg, ts)
var b bytes.Buffer
msg.Append(&b)
if *tracing {
trace("send", b.String(), dls.sock.RemoteAddr(), ts)
}
_, err := dls.sock.Write(b.Bytes())
if err != nil {
dls.errChan <- err
return false
}
return true
}
func (dls *dialogState) hangup() bool {
if dls.response != nil {
dls.resend = NewBye(dls.invite, dls.response)
} else {
dls.resend = NewCancel(dls.invite)
}
if !dls.send(dls.resend) {
return false
}
dls.resends = 0
dls.timer = time.After(resendInterval)
return true
}
func (dls *dialogState) cleanup() {
if dls.sock != nil {
dls.sock.Close()
dls.sock = nil
}
}
func (dls *dialogState) sabotage() {
close(dls.errChan)
close(dls.sdpChan)
close(dls.stateChan)
}

+ 3
- 0
sip/messages.go View File

@ -111,6 +111,9 @@ func AckMatch(msg, ack *Msg) bool {
}
func AttachSDP(msg *Msg, ms *sdp.SDP) {
if msg.Headers == nil {
msg.Headers = Headers{}
}
msg.Headers["Content-Type"] = sdp.ContentType
msg.Payload = ms.String()
}


+ 26
- 16
sip/msg.go View File

@ -378,25 +378,35 @@ func (msg *Msg) Append(b *bytes.Buffer) error {
return nil
}
func (msg *Msg) parseFirstLine(s string) (err error) {
toks := strings.Split(s, " ")
if toks != nil && len(toks) == 3 && toks[2] == "SIP/2.0" {
msg.Phrase = ""
msg.Status = 0
msg.Method = toks[0]
msg.Request = new(URI)
msg.Request, err = ParseURI(toks[1])
} else if toks != nil && len(toks) == 3 && toks[0] == "SIP/2.0" {
func (msg *Msg) parseFirstLine(s string) error {
i := strings.Index(s, "SIP/2.0")
if i == -1 {
return errors.New("Not a SIP message")
} else if i == 0 {
msg.IsResponse = true
msg.Method = ""
msg.Request = nil
msg.Phrase = toks[2]
msg.Status, err = strconv.Atoi(toks[1])
toks := strings.SplitN(s, " ", 3)
if len(toks) < 2 {
return errors.New("Bad response status line")
}
s, err := strconv.Atoi(toks[1])
if err != nil {
return errors.New("Invalid status")
return errors.New("Bad response status code")
}
msg.Status = s
if len(toks) == 3 {
msg.Phrase = toks[2]
} else {
msg.Phrase = Phrase(msg.Status)
}
} else {
err = errors.New("Bad protocol or request line")
j := strings.Index(s, " ")
msg.Method = s[:j]
msg.Request = new(URI)
r, err := ParseURI(s[j+1 : i-1])
msg.Request = r
if err != nil {
return err
}
}
return err
return nil
}

+ 47
- 3
sip/route.go View File

@ -2,6 +2,7 @@ package sip
import (
"errors"
"github.com/jart/gosip/util"
"log"
"net"
)
@ -11,12 +12,52 @@ type AddressRoute struct {
Next *AddressRoute
}
func PopulateMessage(via *Via, contact *Addr, msg *Msg) {
if !msg.IsResponse {
if msg.Method == "" {
msg.Method = "INVITE"
}
if msg.Via == nil {
msg.Via = via
}
if msg.Contact == nil {
msg.Contact = contact
}
if msg.To == nil {
msg.To = &Addr{Uri: msg.Request}
}
if msg.From == nil {
msg.From = msg.Contact
}
if msg.CallID == "" {
msg.CallID = util.GenerateCallID()
}
if msg.CSeq == 0 {
msg.CSeq = util.GenerateCSeq()
}
if msg.CSeqMethod == "" {
msg.CSeqMethod = msg.Method
}
if msg.MaxForwards == 0 {
msg.MaxForwards = 70
}
if _, ok := msg.Via.Params["branch"]; !ok {
msg.Via = msg.Via.Copy()
msg.Via.Params["branch"] = util.GenerateBranch()
}
if _, ok := msg.From.Params["tag"]; !ok {
msg.From = msg.From.Copy()
msg.From.Params["tag"] = util.GenerateTag()
}
if _, ok := msg.Headers["User-Agent"]; !ok {
msg.Headers["User-Agent"] = GosipUserAgent
}
}
}
func RouteMessage(via *Via, contact *Addr, old *Msg) (msg *Msg, host string, port uint16, err error) {
msg = new(Msg)
*msg = *old // Start off with a shallow copy.
if msg.Contact == nil {
msg.Contact = contact
}
if msg.IsResponse {
if via.CompareHostPort(msg.Via) {
msg.Via = msg.Via.Next
@ -61,12 +102,15 @@ func RouteAddress(host string, port uint16) (routes *AddressRoute, err error) {
if port == 0 {
_, srvs, err := net.LookupSRV("sip", "udp", host)
if err == nil && len(srvs) > 0 {
s := ""
for i := len(srvs) - 1; i >= 0; i-- {
routes = &AddressRoute{
Address: net.JoinHostPort(srvs[i].Target, portstr(srvs[i].Port)),
Next: routes,
}
s = " " + routes.Address + s
}
log.Printf("%s routes to: %s", host, s)
return routes, nil
}
log.Println("net.LookupSRV(sip, udp, %s) failed: %s", err)


+ 1
- 1
sip/trace.go View File

@ -17,7 +17,7 @@ func trace(dir, pkt string, addr net.Addr, t time.Time) {
size := len(pkt)
bar := strings.Repeat("-", 72)
suffix := "\n"
if pkt[len(pkt)-1] == '\n' {
if pkt != "" && pkt[len(pkt)-1] == '\n' {
suffix = ""
}
log.Printf(


+ 1
- 0
sip/transport.go View File

@ -66,6 +66,7 @@ func NewTransport(contact *Addr) (tp *Transport, err error) {
// Sends a SIP message.
func (tp *Transport) Send(msg *Msg) error {
PopulateMessage(tp.Via, tp.Contact, msg)
msg, host, port, err := RouteMessage(tp.Via, tp.Contact, msg)
if err != nil {
return err


Loading…
Cancel
Save