|
|
package main
|
|
|
|
|
|
import (
|
|
|
"encoding/json"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"log"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
"strconv"
|
|
|
"strings"
|
|
|
"syscall"
|
|
|
"time"
|
|
|
|
|
|
"github.com/google/uuid"
|
|
|
rabbitmq "github.com/wagslane/go-rabbitmq"
|
|
|
)
|
|
|
|
|
|
var publisher *rabbitmq.Publisher
|
|
|
|
|
|
var msgCatFilters []string
|
|
|
var msgNameFilters []string
|
|
|
var msgAppNameFilters []string
|
|
|
|
|
|
type KzMessage struct {
|
|
|
MsgId string `json:"Msg-ID"`
|
|
|
AppName string `json:"App-Name"` //call_shield
|
|
|
AppVersion string `json:"App-Version"` //4.0.0
|
|
|
EventName string `json:"Event-Name"` //request
|
|
|
EventCategory string `json:"Event-Category"` //firewall_add firewall_remove
|
|
|
ServerId string `json:"Server-ID"`
|
|
|
Server string `json:"Server,omitempty"`
|
|
|
IP string `json:"IP-Address,omitempty"` //e.g. 1.2.3.4
|
|
|
Ports interface{} `json:"Ports,omitempty"` //e.g. sip http https 8443 [25,8443/tcp,53/udp,8080]
|
|
|
Node string `json:"Node,omitempty"`
|
|
|
//Node string `json:"Freeswitch-Node"` //freeswitch@server.com
|
|
|
}
|
|
|
|
|
|
func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action {
|
|
|
logit(6, "RoutingKey: "+string(d.RoutingKey))
|
|
|
logit(6, "AMQP message received: "+string(d.Body))
|
|
|
|
|
|
var msg KzMessage
|
|
|
err := json.Unmarshal(d.Body, &msg)
|
|
|
if err != nil {
|
|
|
log.Println("handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: " + string(d.Body) + "\nUnmarshalling error: " + err.Error())
|
|
|
return rabbitmq.NackDiscard
|
|
|
}
|
|
|
|
|
|
for _, appname := range msgAppNameFilters {
|
|
|
if appconf.FilterEvtAppName == "*" || appname == msg.AppName { //only act if message matches a filter, OR if the filter is "*"
|
|
|
for _, cat := range msgCatFilters {
|
|
|
if appconf.FilterEvtCat == "*" || cat == msg.EventCategory {
|
|
|
for _, name := range msgNameFilters {
|
|
|
if appconf.FilterEvtName == "*" || name == msg.EventName {
|
|
|
firewall(msg.EventCategory, msg.IP, msg.Ports)
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
err := kazooPublish(ta.Domain, input.Data.ChallengeText, "request")
|
|
|
if err != nil {
|
|
|
log.Println(err.Error())
|
|
|
return c.JSON(errorOut(http.StatusInternalServerError, "Error publishing message."+err.Error()))
|
|
|
}
|
|
|
*/
|
|
|
|
|
|
return rabbitmq.Ack
|
|
|
}
|
|
|
|
|
|
func kazooPublish(pub *rabbitmq.Publisher, challenge, event string) error {
|
|
|
msg := KzMessage{
|
|
|
MsgId: uuid.New().String(),
|
|
|
AppName: appname,
|
|
|
AppVersion: version,
|
|
|
EventCategory: "firewall_" + event,
|
|
|
EventName: "request", //"request" or "delete"
|
|
|
ServerId: appname + "@" + myHostname,
|
|
|
Server: myHostname,
|
|
|
Node: "freeswitch@" + myHostname,
|
|
|
}
|
|
|
|
|
|
msg_json, err := json.Marshal(msg)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
err = pub.Publish(msg_json,
|
|
|
[]string{appconf.AmqpPubRoutingKey},
|
|
|
rabbitmq.WithPublishOptionsContentType("application/json"),
|
|
|
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),
|
|
|
)
|
|
|
return errors.New("Error publishing to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_json) + " ERROR: " + err.Error())
|
|
|
}
|
|
|
|
|
|
func amqp() {
|
|
|
if appconf.AmqpDirectRoutingKey == "" {
|
|
|
appconf.AmqpDirectRoutingKey = "call_shield." + myHostname + ".*.*"
|
|
|
}
|
|
|
if appconf.AmqpSubRoutingKey == "" {
|
|
|
appconf.AmqpSubRoutingKey = "call_shield." + appconf.ServerType + ".firewall.*"
|
|
|
}
|
|
|
if appconf.AmqpSubExch == "" {
|
|
|
appconf.AmqpSubExch = appconf.AmqpExch
|
|
|
}
|
|
|
if appconf.AmqpPubExch == "" {
|
|
|
appconf.AmqpPubExch = appconf.AmqpExch
|
|
|
}
|
|
|
|
|
|
//parse config AMQP uri[s]
|
|
|
var amqpURIs []string
|
|
|
amqpURIs = strings.Split(appconf.AmqpURI, ",")
|
|
|
|
|
|
for n, uri := range amqpURIs {
|
|
|
go amqpConnect(uri, n)
|
|
|
}
|
|
|
|
|
|
log.Println(appnameFull + " system started.\n-----> [READY]")
|
|
|
fmt.Println(appnameFull + " system started.\n-----> [READY]")
|
|
|
|
|
|
// block main thread - wait for shutdown signal
|
|
|
sigs := make(chan os.Signal, 1)
|
|
|
done := make(chan bool, 1)
|
|
|
|
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
go func() {
|
|
|
sig := <-sigs
|
|
|
fmt.Printf("Received signal: ")
|
|
|
fmt.Println(sig)
|
|
|
done <- true
|
|
|
}()
|
|
|
for {
|
|
|
select {
|
|
|
case <-done:
|
|
|
fmt.Println("Shutting myself down...")
|
|
|
return
|
|
|
case <-time.After(100 * time.Millisecond):
|
|
|
//case <-time.After(time.Second):
|
|
|
//fmt.Println("tick")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
func amqpConnect(amqpURI string, connNumber int) {
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Connection
|
|
|
log.Println("Connecting to RabbitMQ: " + amqpURI)
|
|
|
amqpConn, err := rabbitmq.NewConn(
|
|
|
amqpURI,
|
|
|
rabbitmq.WithConnectionOptionsLogging,
|
|
|
)
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error())
|
|
|
}
|
|
|
defer amqpConn.Close()
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// Message Filters
|
|
|
msgCatFilters = strings.Split(appconf.FilterEvtCat, ",")
|
|
|
msgNameFilters = strings.Split(appconf.FilterEvtName, ",")
|
|
|
msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",")
|
|
|
if (len(msgCatFilters) > 0 && len(msgNameFilters) > 0 && len(msgAppNameFilters) > 0) && (msgCatFilters[0] != "*" || msgNameFilters[0] != "*" || msgAppNameFilters[0] != "*") {
|
|
|
fmt.Println("Filtering event app names matching:", msgAppNameFilters)
|
|
|
fmt.Println("Filtering event categories matching:", msgCatFilters)
|
|
|
fmt.Println("Filtering event names matching:", msgNameFilters)
|
|
|
}
|
|
|
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Publisher
|
|
|
// NOTE: IF THERE ARE MULTILPLE AMQP CONNECTIONS, THE PUBLISHER WILL ONLY BE ON THE LAST ONE!
|
|
|
publisher, err = rabbitmq.NewPublisher(
|
|
|
amqpConn,
|
|
|
rabbitmq.WithPublisherOptionsLogging,
|
|
|
rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType),
|
|
|
rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch),
|
|
|
rabbitmq.WithPublisherOptionsExchangeDeclare,
|
|
|
)
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error())
|
|
|
}
|
|
|
defer publisher.Close()
|
|
|
|
|
|
logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.")
|
|
|
|
|
|
publisher.NotifyReturn(func(r rabbitmq.Return) {
|
|
|
logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body)))
|
|
|
})
|
|
|
publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
|
|
|
logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
|
|
|
})
|
|
|
|
|
|
go publisher_loop(publisher)
|
|
|
|
|
|
queue := "q_w" + strconv.Itoa(connNumber) + "_" + appname + "_" + myHostname
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Setup Consumer
|
|
|
var consumer *rabbitmq.Consumer
|
|
|
consumer, err = rabbitmq.NewConsumer(
|
|
|
amqpConn,
|
|
|
queue,
|
|
|
rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch),
|
|
|
rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType),
|
|
|
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey),
|
|
|
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpDirectRoutingKey),
|
|
|
rabbitmq.WithConsumerOptionsConsumerName("consumer_w"+strconv.Itoa(connNumber)+"_"+appname+"_"+myHostname),
|
|
|
rabbitmq.WithConsumerOptionsQueueAutoDelete,
|
|
|
rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers),
|
|
|
rabbitmq.WithConsumerOptionsExchangeDeclare,
|
|
|
//rabbitmq.WithConsumerOptionsQuorum,
|
|
|
//rabbitmq.WithConsumerOptionsQueueDurable,
|
|
|
//rabbitmq.WithConsumerOptionsBindingExchangeDurable,
|
|
|
)
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error())
|
|
|
log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error())
|
|
|
}
|
|
|
defer consumer.Close()
|
|
|
|
|
|
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: "+queue+"'.")
|
|
|
|
|
|
//logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process inbound AMQP messages.")
|
|
|
|
|
|
// block here
|
|
|
err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages
|
|
|
if err != nil {
|
|
|
fmt.Println("Unable to start/run RabbitMQ consumer: " + err.Error())
|
|
|
log.Fatal("Unable to start/run RabbitMQ consumer: " + err.Error())
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
func publisher_loop(pub *rabbitmq.Publisher) {
|
|
|
/////////////////////////////////////////////
|
|
|
// RabbitMQ Publish Test
|
|
|
sigch := make(chan os.Signal, 1)
|
|
|
signal.Notify(sigch, syscall.SIGUSR1, syscall.SIGUSR2)
|
|
|
|
|
|
//create test message to send
|
|
|
pubtest := KzMessage{
|
|
|
MsgId: uuid.New().String(),
|
|
|
AppName: appname,
|
|
|
AppVersion: version,
|
|
|
EventCategory: "firewall_remove",
|
|
|
EventName: "request",
|
|
|
ServerId: appname + "@" + myHostname,
|
|
|
Server: myHostname,
|
|
|
Node: "firewallagent_" + appconf.ServerType + "@" + myHostname,
|
|
|
}
|
|
|
|
|
|
msg_pubtest, err := json.Marshal(pubtest)
|
|
|
if err != nil {
|
|
|
log.Println("Unable to marshal pubtest into JSON! Published test message will be blank.")
|
|
|
}
|
|
|
|
|
|
// block thread
|
|
|
for {
|
|
|
select {
|
|
|
case sig := <-sigch:
|
|
|
log.Println("Received signal " + fmt.Sprintf("%s", sig) + " on pubch channel. Publishing test message... ")
|
|
|
if sig == syscall.SIGUSR2 {
|
|
|
msgFromFile, err := readJsonFile(appconf.PubMessageFile)
|
|
|
if err == nil {
|
|
|
msg_pubtest = msgFromFile
|
|
|
}
|
|
|
log.Println("Publishing test message to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_pubtest))
|
|
|
err = pub.Publish(msg_pubtest,
|
|
|
[]string{appconf.AmqpPubRoutingKey},
|
|
|
rabbitmq.WithPublishOptionsContentType("application/json"),
|
|
|
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),
|
|
|
)
|
|
|
if err != nil {
|
|
|
log.Println("Error publishing message: " + err.Error())
|
|
|
} else {
|
|
|
log.Println("Message PUBLISHED.")
|
|
|
}
|
|
|
}
|
|
|
case <-time.After(50 * time.Millisecond):
|
|
|
//case <-time.After(time.Second):
|
|
|
//fmt.Println("tick")
|
|
|
}
|
|
|
}
|
|
|
}
|