diff --git a/README.md b/README.md index 2542197..ed8b31e 100644 --- a/README.md +++ b/README.md @@ -26,3 +26,10 @@ systemctl daemon-reload systemctl enable kazoo_firewall_agent systemctl start kazoo_firewall_agent ``` + +### Config +Specifying a `server_type` of `freeswitch` or `ephemeral` will use firewall rules that are temporary and are deleted when their cache timeout expires. Any other server type, such as `kamailio` or anything else, will NOT auto-delete records when the cache expires, and will use permanent firewall rules. + +The `amqp_uri` config parameter can either be a single AMQP URI, or a comma separated list of multiple AMQP URIs (needed for `kamailio` server type with multiple Kazoo zones). Firewall agents running on Freeswitch do not need to connect to multiple Kazoo zones, only the local zone. + +See the sample config file for other potentially useful configuration parameters. diff --git a/amqp.go b/amqp.go index 58cd7d8..ce33561 100644 --- a/amqp.go +++ b/amqp.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "strconv" "strings" "syscall" "time" @@ -16,7 +17,6 @@ import ( ) var publisher *rabbitmq.Publisher -var consumer *rabbitmq.Consumer var msgCatFilters []string var msgNameFilters []string @@ -75,7 +75,7 @@ func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action { return rabbitmq.Ack } -func kazooPublish(challenge, event string) error { +func kazooPublish(pub *rabbitmq.Publisher, challenge, event string) error { msg := KzMessage{ MsgId: uuid.New().String(), AppName: appname, @@ -92,7 +92,7 @@ func kazooPublish(challenge, event string) error { return err } - err = publisher.Publish(msg_json, + err = pub.Publish(msg_json, []string{appconf.AmqpPubRoutingKey}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch), @@ -114,11 +114,47 @@ func amqp() { appconf.AmqpPubExch = appconf.AmqpExch } + //parse config AMQP uri[s] + var amqpURIs []string + amqpURIs = strings.Split(appconf.AmqpURI, ",") + + for n, uri := range amqpURIs { + go amqpConnect(uri, n) + } + + log.Println(appnameFull + " system started.\n-----> [READY]") + fmt.Println(appnameFull + " system started.\n-----> [READY]") + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + sig := <-sigs + fmt.Printf("Received signal: ") + fmt.Println(sig) + done <- true + }() + for { + select { + case <-done: + fmt.Println("Shutting myself down...") + return + case <-time.After(100 * time.Millisecond): + //case <-time.After(time.Second): + //fmt.Println("tick") + } + } + +} + +func amqpConnect(amqpURI string, connNumber int) { ///////////////////////////////////////////// // RabbitMQ Setup Connection - log.Println("Connecting to RabbitMQ: " + appconf.AmqpURI) + log.Println("Connecting to RabbitMQ: " + amqpURI) amqpConn, err := rabbitmq.NewConn( - appconf.AmqpURI, + amqpURI, rabbitmq.WithConnectionOptionsLogging, ) if err != nil { @@ -140,6 +176,7 @@ func amqp() { ///////////////////////////////////////////// // RabbitMQ Setup Publisher + // NOTE: IF THERE ARE MULTILPLE AMQP CONNECTIONS, THE PUBLISHER WILL ONLY BE ON THE LAST ONE! publisher, err = rabbitmq.NewPublisher( amqpConn, rabbitmq.WithPublisherOptionsLogging, @@ -162,18 +199,20 @@ func amqp() { logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack)) }) - go publisher_loop() + go publisher_loop(publisher) + queue := "q_w" + strconv.Itoa(connNumber) + "_" + appname + "_" + myHostname ///////////////////////////////////////////// // RabbitMQ Setup Consumer + var consumer *rabbitmq.Consumer consumer, err = rabbitmq.NewConsumer( amqpConn, - "q_"+appname+"_"+myHostname, + queue, rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch), rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType), rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey), rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpDirectRoutingKey), - rabbitmq.WithConsumerOptionsConsumerName("consumer_"+appname+"_"+myHostname), + rabbitmq.WithConsumerOptionsConsumerName("consumer_w"+strconv.Itoa(connNumber)+"_"+appname+"_"+myHostname), rabbitmq.WithConsumerOptionsQueueAutoDelete, rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers), rabbitmq.WithConsumerOptionsExchangeDeclare, @@ -187,14 +226,11 @@ func amqp() { } defer consumer.Close() - logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: 'q_"+appname+"_"+myHostname+"'.") + logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: "+queue+"'.") //logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process inbound AMQP messages.") - log.Println(appnameFull + " system started.\n-----> [READY]") - fmt.Println(appnameFull + " system started.\n-----> [READY]") - - // block main thread + // block here err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages if err != nil { fmt.Println("Unable to start/run RabbitMQ consumer: " + err.Error()) @@ -203,7 +239,7 @@ func amqp() { } -func publisher_loop() { +func publisher_loop(pub *rabbitmq.Publisher) { ///////////////////////////////////////////// // RabbitMQ Publish Test sigch := make(chan os.Signal, 1) @@ -218,7 +254,7 @@ func publisher_loop() { EventName: "request", ServerId: appname + "@" + myHostname, Server: myHostname, - Node: "freeswitch@" + myHostname, + Node: "firewallagent_" + appconf.ServerType + "@" + myHostname, } msg_pubtest, err := json.Marshal(pubtest) @@ -237,7 +273,7 @@ func publisher_loop() { msg_pubtest = msgFromFile } log.Println("Publishing test message to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_pubtest)) - err = publisher.Publish(msg_pubtest, + err = pub.Publish(msg_pubtest, []string{appconf.AmqpPubRoutingKey}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch), diff --git a/main.go b/main.go index 799c9b9..4b0bdfc 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,7 @@ import ( "time" ) -const appnameFull string = "RuhNet Kazoo Firewall Agent" +const appnameFull string = "Kazoo Firewall Agent" const appname string = "kazoo_firewall_agent" const version string = "1.0" const serverVersion string = "RuhNet " + appname + " v" + version @@ -134,21 +134,5 @@ func main() { } } - /* - log.Println(appnameFull + " system started.\n-----> [READY]") - fmt.Println(appnameFull + " system started.\n-----> [READY]") - - // block main thread - wait for shutdown signal - sigs := make(chan os.Signal, 1) - - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - go func() { - sig := <-sigs - fmt.Printf("Received signal: ") - fmt.Println(sig) - done <- true - }() - */ - amqp() //this func will block the main thread, until receiving on the 'done' channel. }