Browse Source

Multiple AMQP connections (Kazoo multi-zone for Kamailio).

master
Ruel Tmeizeh - RuhNet 1 week ago
parent
commit
ab77d1b224
3 changed files with 60 additions and 33 deletions
  1. +7
    -0
      README.md
  2. +52
    -16
      amqp.go
  3. +1
    -17
      main.go

+ 7
- 0
README.md View File

@ -26,3 +26,10 @@ systemctl daemon-reload
systemctl enable kazoo_firewall_agent
systemctl start kazoo_firewall_agent
```
### Config
Specifying a `server_type` of `freeswitch` or `ephemeral` will use firewall rules that are temporary and are deleted when their cache timeout expires. Any other server type, such as `kamailio` or anything else, will NOT auto-delete records when the cache expires, and will use permanent firewall rules.
The `amqp_uri` config parameter can either be a single AMQP URI, or a comma separated list of multiple AMQP URIs (needed for `kamailio` server type with multiple Kazoo zones). Firewall agents running on Freeswitch do not need to connect to multiple Kazoo zones, only the local zone.
See the sample config file for other potentially useful configuration parameters.

+ 52
- 16
amqp.go View File

@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
@ -16,7 +17,6 @@ import (
)
var publisher *rabbitmq.Publisher
var consumer *rabbitmq.Consumer
var msgCatFilters []string
var msgNameFilters []string
@ -75,7 +75,7 @@ func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action {
return rabbitmq.Ack
}
func kazooPublish(challenge, event string) error {
func kazooPublish(pub *rabbitmq.Publisher, challenge, event string) error {
msg := KzMessage{
MsgId: uuid.New().String(),
AppName: appname,
@ -92,7 +92,7 @@ func kazooPublish(challenge, event string) error {
return err
}
err = publisher.Publish(msg_json,
err = pub.Publish(msg_json,
[]string{appconf.AmqpPubRoutingKey},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),
@ -114,11 +114,47 @@ func amqp() {
appconf.AmqpPubExch = appconf.AmqpExch
}
//parse config AMQP uri[s]
var amqpURIs []string
amqpURIs = strings.Split(appconf.AmqpURI, ",")
for n, uri := range amqpURIs {
go amqpConnect(uri, n)
}
log.Println(appnameFull + " system started.\n-----> [READY]")
fmt.Println(appnameFull + " system started.\n-----> [READY]")
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal: ")
fmt.Println(sig)
done <- true
}()
for {
select {
case <-done:
fmt.Println("Shutting myself down...")
return
case <-time.After(100 * time.Millisecond):
//case <-time.After(time.Second):
//fmt.Println("tick")
}
}
}
func amqpConnect(amqpURI string, connNumber int) {
/////////////////////////////////////////////
// RabbitMQ Setup Connection
log.Println("Connecting to RabbitMQ: " + appconf.AmqpURI)
log.Println("Connecting to RabbitMQ: " + amqpURI)
amqpConn, err := rabbitmq.NewConn(
appconf.AmqpURI,
amqpURI,
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
@ -140,6 +176,7 @@ func amqp() {
/////////////////////////////////////////////
// RabbitMQ Setup Publisher
// NOTE: IF THERE ARE MULTILPLE AMQP CONNECTIONS, THE PUBLISHER WILL ONLY BE ON THE LAST ONE!
publisher, err = rabbitmq.NewPublisher(
amqpConn,
rabbitmq.WithPublisherOptionsLogging,
@ -162,18 +199,20 @@ func amqp() {
logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
})
go publisher_loop()
go publisher_loop(publisher)
queue := "q_w" + strconv.Itoa(connNumber) + "_" + appname + "_" + myHostname
/////////////////////////////////////////////
// RabbitMQ Setup Consumer
var consumer *rabbitmq.Consumer
consumer, err = rabbitmq.NewConsumer(
amqpConn,
"q_"+appname+"_"+myHostname,
queue,
rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch),
rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey),
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpDirectRoutingKey),
rabbitmq.WithConsumerOptionsConsumerName("consumer_"+appname+"_"+myHostname),
rabbitmq.WithConsumerOptionsConsumerName("consumer_w"+strconv.Itoa(connNumber)+"_"+appname+"_"+myHostname),
rabbitmq.WithConsumerOptionsQueueAutoDelete,
rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers),
rabbitmq.WithConsumerOptionsExchangeDeclare,
@ -187,14 +226,11 @@ func amqp() {
}
defer consumer.Close()
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'q_"+appname+"_"+myHostname+"'.")
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: "+queue+"'.")
//logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process inbound AMQP messages.")
log.Println(appnameFull + " system started.\n-----> [READY]")
fmt.Println(appnameFull + " system started.\n-----> [READY]")
// block main thread
// block here
err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages
if err != nil {
fmt.Println("Unable to start/run RabbitMQ consumer: " + err.Error())
@ -203,7 +239,7 @@ func amqp() {
}
func publisher_loop() {
func publisher_loop(pub *rabbitmq.Publisher) {
/////////////////////////////////////////////
// RabbitMQ Publish Test
sigch := make(chan os.Signal, 1)
@ -218,7 +254,7 @@ func publisher_loop() {
EventName: "request",
ServerId: appname + "@" + myHostname,
Server: myHostname,
Node: "freeswitch@" + myHostname,
Node: "firewallagent_" + appconf.ServerType + "@" + myHostname,
}
msg_pubtest, err := json.Marshal(pubtest)
@ -237,7 +273,7 @@ func publisher_loop() {
msg_pubtest = msgFromFile
}
log.Println("Publishing test message to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_pubtest))
err = publisher.Publish(msg_pubtest,
err = pub.Publish(msg_pubtest,
[]string{appconf.AmqpPubRoutingKey},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),


+ 1
- 17
main.go View File

@ -19,7 +19,7 @@ import (
"time"
)
const appnameFull string = "RuhNet Kazoo Firewall Agent"
const appnameFull string = "Kazoo Firewall Agent"
const appname string = "kazoo_firewall_agent"
const version string = "1.0"
const serverVersion string = "RuhNet " + appname + " v" + version
@ -134,21 +134,5 @@ func main() {
}
}
/*
log.Println(appnameFull + " system started.\n-----> [READY]")
fmt.Println(appnameFull + " system started.\n-----> [READY]")
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal: ")
fmt.Println(sig)
done <- true
}()
*/
amqp() //this func will block the main thread, until receiving on the 'done' channel.
}

Loading…
Cancel
Save