Browse Source

RTP stuff.

pull/2/head
Justine Alexandra Roberts Tunney 11 years ago
parent
commit
b3445bc2da
3 changed files with 89 additions and 47 deletions
  1. +11
    -13
      example/echo3/echo3_test.go
  2. +31
    -4
      rtp/session.go
  3. +47
    -30
      sip/dialog.go

+ 11
- 13
example/echo3/echo3_test.go View File

@ -5,7 +5,6 @@ package echo2_test
import (
"github.com/jart/gosip/dsp"
"github.com/jart/gosip/rtp"
"github.com/jart/gosip/sdp"
"github.com/jart/gosip/sip"
"net"
"testing"
@ -19,16 +18,15 @@ func TestCallToEchoApp(t *testing.T) {
}
// Create RTP audio session.
rs, err := rtp.NewSession("127.0.0.1")
rs, err := rtp.NewSession("")
if err != nil {
t.Fatal(err)
}
defer rs.Sock.Close()
rtpaddr := rs.Sock.LocalAddr().(*net.UDPAddr)
sip.AttachSDP(invite, sdp.New(rtpaddr, sdp.ULAWCodec, sdp.DTMFCodec))
defer rs.Close()
rtpPort := uint16(rs.Sock.LocalAddr().(*net.UDPAddr).Port)
// Create a SIP phone call.
dl, err := sip.NewDialog(invite)
dl, err := sip.NewDialog(invite, rtpPort)
if err != nil {
t.Fatal(err)
}
@ -46,11 +44,6 @@ func TestCallToEchoApp(t *testing.T) {
var answered bool
for {
select {
case err := <-rs.E:
t.Error("RTP recv failed:", err)
dl.Hangup <- true
case <-rs.C:
// Do nothing with received audio.
case <-ticker.C:
for n := 0; n < 160; n++ {
frame[n] = awgn.Get()
@ -71,8 +64,13 @@ func TestCallToEchoApp(t *testing.T) {
}
return
}
case ms := <-dl.OnSDP:
rs.Peer = &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)}
case rs.Peer = <-dl.OnPeer:
case frame := <-rs.C:
rs.R <- frame
case err := <-rs.E:
t.Error("RTP recv failed:", err)
rs.CloseAfterError()
dl.Hangup <- true
case <-death:
dl.Hangup <- true
}


+ 31
- 4
rtp/session.go View File

@ -27,6 +27,7 @@ type Session struct {
// Channel to which received RTP frames and errors are published.
C <-chan *Frame
R chan<- *Frame
E <-chan error
// Underlying UDP socket.
@ -50,12 +51,14 @@ func NewSession(host string) (rs *Session, err error) {
return nil, err
}
sock := conn.(*net.UDPConn)
c := make(chan *Frame, 4)
c := make(chan *Frame)
r := make(chan *Frame)
e := make(chan error, 1)
go receiver(sock, c, e)
go receiver(sock, c, e, r)
return &Session{
C: c,
E: e,
R: r,
Sock: sock,
Header: Header{
PT: sdp.ULAWCodec.PT,
@ -68,7 +71,7 @@ func NewSession(host string) (rs *Session, err error) {
}
func (rs *Session) Send(frame *Frame) (err error) {
if rs.Peer == nil {
if rs == nil || rs.Sock == nil || rs.Peer == nil {
return nil
}
rs.Header.Write(rs.obuf)
@ -81,7 +84,29 @@ func (rs *Session) Send(frame *Frame) (err error) {
return
}
func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error) {
func (rs *Session) Close() {
if rs == nil || rs.Sock == nil {
return
}
rs.Sock.Close()
if frame, ok := <-rs.C; ok {
rs.R <- frame
}
<-rs.E
rs.Sock = nil
close(rs.R)
}
func (rs *Session) CloseAfterError() {
if rs == nil || rs.Sock == nil {
return
}
rs.Sock.Close()
rs.Sock = nil
close(rs.R)
}
func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error, r <-chan *Frame) {
buf := make([]byte, 2048)
frame := new(Frame)
for {
@ -109,8 +134,10 @@ func receiver(sock *net.UDPConn, c chan<- *Frame, e chan<- error) {
frame[n] = int16(dsp.UlawToLinear(int64(buf[HeaderSize+n])))
}
c <- frame
frame = <-r
}
close(c)
close(e)
}
func listenRTP(host string) (sock net.PacketConn, err error) {


+ 47
- 30
sip/dialog.go View File

@ -30,7 +30,7 @@ var (
type Dialog struct {
OnErr <-chan error
OnState <-chan int
OnSDP <-chan *sdp.SDP
OnPeer <-chan *net.UDPAddr
Hangup chan<- bool
}
@ -40,14 +40,15 @@ type dialogState struct {
csockMsgs <-chan *Msg
csockErrs <-chan error
errChan chan<- error
sdpChan chan<- *sdp.SDP
stateChan chan<- int
doHangupChan <-chan bool
peerChan chan<- *net.UDPAddr
sendHangupChan <-chan bool
state int // Current state of the dialog.
dest string // Destination hostname (or IP).
addr string // Destination ip:port.
sock *net.UDPConn // Outbound message socket (connected for ICMP)
csock *net.UDPConn // Inbound socket for Contact field.
sdp *sdp.SDP // Media session description.
routes *AddressRoute // List of SRV addresses to attempt contacting.
invite *Msg // Our INVITE that established the dialog.
remote *Msg // Message from remote UA that established dialog.
@ -59,32 +60,42 @@ type dialogState struct {
responseTimer <-chan time.Time // Resend timer for message.
lseq int // Local CSeq value.
rseq int // Remote CSeq value.
b bytes.Buffer // Outbound message buffer.
}
// NewDialog creates a phone call.
func NewDialog(invite *Msg) (dl *Dialog, err error) {
func NewDialog(invite *Msg, rtpPort uint16) (dl *Dialog, err error) {
errChan := make(chan error)
sdpChan := make(chan *sdp.SDP)
stateChan := make(chan int)
doHangupChan := make(chan bool, 4)
peerChan := make(chan *net.UDPAddr)
sendHangupChan := make(chan bool, 4)
ms := &sdp.SDP{
Origin: sdp.Origin{
ID: util.GenerateOriginID(),
},
Audio: &sdp.Media{
Port: rtpPort,
Codecs: []sdp.Codec{sdp.ULAWCodec, sdp.DTMFCodec},
},
}
dls := &dialogState{
errChan: errChan,
sdpChan: sdpChan,
stateChan: stateChan,
doHangupChan: doHangupChan,
invite: invite,
errChan: errChan,
stateChan: stateChan,
peerChan: peerChan,
sendHangupChan: sendHangupChan,
invite: invite,
sdp: ms,
}
go dls.run()
return &Dialog{
OnErr: errChan,
OnState: stateChan,
OnSDP: sdpChan,
Hangup: doHangupChan,
OnPeer: peerChan,
Hangup: sendHangupChan,
}, nil
}
func (dls *dialogState) run() {
defer dls.sabotage()
defer dls.cleanup()
if !dls.sendRequest(dls.invite) {
return
@ -92,6 +103,8 @@ func (dls *dialogState) run() {
for {
select {
case err := <-dls.sockErrs:
dls.sock.Close()
dls.sock = nil
if util.IsRefused(err) {
log.Printf("ICMP refusal: %s (%s)", dls.sock.RemoteAddr(), dls.dest)
if !dls.popRoute() {
@ -102,6 +115,8 @@ func (dls *dialogState) run() {
return
}
case err := <-dls.csockErrs:
dls.csock.Close()
dls.csock = nil
dls.errChan <- err
return
case <-dls.requestTimer:
@ -112,10 +127,6 @@ func (dls *dialogState) run() {
if !dls.resendResponse() {
return
}
case <-dls.doHangupChan:
if !dls.sendHangup() {
return
}
case msg := <-dls.sockMsgs:
if !dls.handleMessage(msg) {
return
@ -124,6 +135,10 @@ func (dls *dialogState) run() {
if !dls.handleMessage(msg) {
return
}
case <-dls.sendHangupChan:
if !dls.sendHangup() {
return
}
}
}
}
@ -168,7 +183,7 @@ func (dls *dialogState) popRoute() bool {
}
func (dls *dialogState) connect() bool {
if dls.sock == nil || dls.sock.RemoteAddr().String() != dls.addr {
if dls.sock == nil || dls.addr != dls.sock.RemoteAddr().String() {
// Create socket through which we send messages. This socket is connected
// to the remote address so we can receive ICMP unavailable errors. It also
// allows us to discover the appropriate IP address for the local machine.
@ -227,6 +242,11 @@ func (dls *dialogState) populate(msg *Msg) {
},
}
}
if msg.Method == MethodInvite {
dls.sdp.Addr = lhost
dls.sdp.Origin.Addr = lhost
AttachSDP(msg, dls.sdp)
}
PopulateMessage(nil, nil, msg)
}
@ -350,7 +370,7 @@ func (dls *dialogState) checkSDP(msg *Msg) {
if err != nil {
log.Println("Bad SDP payload:", err)
} else {
dls.sdpChan <- ms
dls.peerChan <- &net.UDPAddr{IP: net.ParseIP(ms.Addr), Port: int(ms.Audio.Port)}
}
}
}
@ -365,12 +385,12 @@ func (dls *dialogState) send(msg *Msg) bool {
}
ts := time.Now()
addTimestamp(msg, ts)
var b bytes.Buffer
msg.Append(&b)
dls.b.Reset()
msg.Append(&dls.b)
if *tracing {
trace("send", b.String(), dls.sock.RemoteAddr(), ts)
trace("send", dls.b.String(), dls.sock.RemoteAddr(), ts)
}
_, err := dls.sock.Write(b.Bytes())
_, err := dls.sock.Write(dls.b.Bytes())
if err != nil {
dls.errChan <- err
return false
@ -450,6 +470,9 @@ func (dls *dialogState) transition(state int) {
func (dls *dialogState) cleanup() {
dls.cleanupSock()
dls.cleanupCSock()
close(dls.errChan)
close(dls.stateChan)
close(dls.peerChan)
}
func (dls *dialogState) cleanupSock() {
@ -469,9 +492,3 @@ func (dls *dialogState) cleanupCSock() {
<-dls.csockErrs
}
}
func (dls *dialogState) sabotage() {
close(dls.errChan)
close(dls.sdpChan)
close(dls.stateChan)
}

Loading…
Cancel
Save