Daemon that listens for AMQP messages to add IP addresses and ports to FirewallD. IP addresses expire and are removed automatically after a configurable timeout.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

292 lines
9.6 KiB

package main
import (
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
"github.com/google/uuid"
rabbitmq "github.com/wagslane/go-rabbitmq"
)
var publisher *rabbitmq.Publisher
var msgCatFilters []string
var msgNameFilters []string
var msgAppNameFilters []string
type KzMessage struct {
MsgId string `json:"Msg-ID"`
AppName string `json:"App-Name"` //call_shield
AppVersion string `json:"App-Version"` //4.0.0
EventName string `json:"Event-Name"` //request
EventCategory string `json:"Event-Category"` //firewall_add firewall_remove
ServerId string `json:"Server-ID"`
Server string `json:"Server,omitempty"`
IP string `json:"IP-Address,omitempty"` //e.g. 1.2.3.4
Ports interface{} `json:"Ports,omitempty"` //e.g. sip http https 8443 [25,8443/tcp,53/udp,8080]
Node string `json:"Node,omitempty"`
//Node string `json:"Freeswitch-Node"` //freeswitch@server.com
}
func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action {
logit(6, "RoutingKey: "+string(d.RoutingKey))
logit(6, "AMQP message received: "+string(d.Body))
var msg KzMessage
err := json.Unmarshal(d.Body, &msg)
if err != nil {
log.Println("handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: " + string(d.Body) + "\nUnmarshalling error: " + err.Error())
return rabbitmq.NackDiscard
}
for _, appname := range msgAppNameFilters {
if appconf.FilterEvtAppName == "*" || appname == msg.AppName { //only act if message matches a filter, OR if the filter is "*"
for _, cat := range msgCatFilters {
if appconf.FilterEvtCat == "*" || cat == msg.EventCategory {
for _, name := range msgNameFilters {
if appconf.FilterEvtName == "*" || name == msg.EventName {
firewall(msg.EventCategory, msg.IP, msg.Ports)
break
}
}
break
}
}
break
}
}
/*
err := kazooPublish(ta.Domain, input.Data.ChallengeText, "request")
if err != nil {
log.Println(err.Error())
return c.JSON(errorOut(http.StatusInternalServerError, "Error publishing message."+err.Error()))
}
*/
return rabbitmq.Ack
}
func kazooPublish(pub *rabbitmq.Publisher, challenge, event string) error {
msg := KzMessage{
MsgId: uuid.New().String(),
AppName: appname,
AppVersion: version,
EventCategory: "firewall_" + event,
EventName: "request", //"request" or "delete"
ServerId: appname + "@" + myHostname,
Server: myHostname,
Node: "freeswitch@" + myHostname,
}
msg_json, err := json.Marshal(msg)
if err != nil {
return err
}
err = pub.Publish(msg_json,
[]string{appconf.AmqpPubRoutingKey},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),
)
return errors.New("Error publishing to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_json) + " ERROR: " + err.Error())
}
func amqp() {
if appconf.AmqpDirectRoutingKey == "" {
appconf.AmqpDirectRoutingKey = "call_shield." + myHostname + ".*.*"
}
if appconf.AmqpSubRoutingKey == "" {
appconf.AmqpSubRoutingKey = "call_shield." + appconf.ServerType + ".firewall.*"
}
if appconf.AmqpSubExch == "" {
appconf.AmqpSubExch = appconf.AmqpExch
}
if appconf.AmqpPubExch == "" {
appconf.AmqpPubExch = appconf.AmqpExch
}
//parse config AMQP uri[s]
var amqpURIs []string
amqpURIs = strings.Split(appconf.AmqpURI, ",")
for n, uri := range amqpURIs {
go amqpConnect(uri, n)
}
log.Println(appnameFull + " system started.\n-----> [READY]")
fmt.Println(appnameFull + " system started.\n-----> [READY]")
// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal: ")
fmt.Println(sig)
done <- true
}()
for {
select {
case <-done:
fmt.Println("Shutting myself down...")
return
case <-time.After(100 * time.Millisecond):
//case <-time.After(time.Second):
//fmt.Println("tick")
}
}
}
func amqpConnect(amqpURI string, connNumber int) {
/////////////////////////////////////////////
// RabbitMQ Setup Connection
log.Println("Connecting to RabbitMQ: " + amqpURI)
amqpConn, err := rabbitmq.NewConn(
amqpURI,
rabbitmq.WithConnectionOptionsLogging,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error())
}
defer amqpConn.Close()
/////////////////////////////////////////////
// Message Filters
msgCatFilters = strings.Split(appconf.FilterEvtCat, ",")
msgNameFilters = strings.Split(appconf.FilterEvtName, ",")
msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",")
if (len(msgCatFilters) > 0 && len(msgNameFilters) > 0 && len(msgAppNameFilters) > 0) && (msgCatFilters[0] != "*" || msgNameFilters[0] != "*" || msgAppNameFilters[0] != "*") {
fmt.Println("Filtering event app names matching:", msgAppNameFilters)
fmt.Println("Filtering event categories matching:", msgCatFilters)
fmt.Println("Filtering event names matching:", msgNameFilters)
}
/////////////////////////////////////////////
// RabbitMQ Setup Publisher
// NOTE: IF THERE ARE MULTILPLE AMQP CONNECTIONS, THE PUBLISHER WILL ONLY BE ON THE LAST ONE!
publisher, err = rabbitmq.NewPublisher(
amqpConn,
rabbitmq.WithPublisherOptionsLogging,
rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch),
rabbitmq.WithPublisherOptionsExchangeDeclare,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error())
}
defer publisher.Close()
logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.")
publisher.NotifyReturn(func(r rabbitmq.Return) {
logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body)))
})
publisher.NotifyPublish(func(c rabbitmq.Confirmation) {
logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack))
})
go publisher_loop(publisher)
queue := "q_w" + strconv.Itoa(connNumber) + "_" + appname + "_" + myHostname
/////////////////////////////////////////////
// RabbitMQ Setup Consumer
var consumer *rabbitmq.Consumer
consumer, err = rabbitmq.NewConsumer(
amqpConn,
queue,
rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch),
rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType),
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey),
rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpDirectRoutingKey),
rabbitmq.WithConsumerOptionsConsumerName("consumer_w"+strconv.Itoa(connNumber)+"_"+appname+"_"+myHostname),
rabbitmq.WithConsumerOptionsQueueAutoDelete,
rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers),
rabbitmq.WithConsumerOptionsExchangeDeclare,
//rabbitmq.WithConsumerOptionsQuorum,
//rabbitmq.WithConsumerOptionsQueueDurable,
//rabbitmq.WithConsumerOptionsBindingExchangeDurable,
)
if err != nil {
fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error())
log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error())
}
defer consumer.Close()
logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: "+queue+"'.")
//logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process inbound AMQP messages.")
// block here
err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages
if err != nil {
fmt.Println("Unable to start/run RabbitMQ consumer: " + err.Error())
log.Fatal("Unable to start/run RabbitMQ consumer: " + err.Error())
}
}
func publisher_loop(pub *rabbitmq.Publisher) {
/////////////////////////////////////////////
// RabbitMQ Publish Test
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGUSR1, syscall.SIGUSR2)
//create test message to send
pubtest := KzMessage{
MsgId: uuid.New().String(),
AppName: appname,
AppVersion: version,
EventCategory: "firewall_remove",
EventName: "request",
ServerId: appname + "@" + myHostname,
Server: myHostname,
Node: "firewallagent_" + appconf.ServerType + "@" + myHostname,
}
msg_pubtest, err := json.Marshal(pubtest)
if err != nil {
log.Println("Unable to marshal pubtest into JSON! Published test message will be blank.")
}
// block thread
for {
select {
case sig := <-sigch:
log.Println("Received signal " + fmt.Sprintf("%s", sig) + " on pubch channel. Publishing test message... ")
if sig == syscall.SIGUSR2 {
msgFromFile, err := readJsonFile(appconf.PubMessageFile)
if err == nil {
msg_pubtest = msgFromFile
}
log.Println("Publishing test message to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_pubtest))
err = pub.Publish(msg_pubtest,
[]string{appconf.AmqpPubRoutingKey},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch),
)
if err != nil {
log.Println("Error publishing message: " + err.Error())
} else {
log.Println("Message PUBLISHED.")
}
}
case <-time.After(50 * time.Millisecond):
//case <-time.After(time.Second):
//fmt.Println("tick")
}
}
}