package main import ( "encoding/json" "errors" "fmt" "log" "os" "os/signal" "strconv" "strings" "syscall" "time" "github.com/google/uuid" rabbitmq "github.com/wagslane/go-rabbitmq" ) var publisher *rabbitmq.Publisher var msgCatFilters []string var msgNameFilters []string var msgAppNameFilters []string type KzMessage struct { MsgId string `json:"Msg-ID"` AppName string `json:"App-Name"` //call_shield AppVersion string `json:"App-Version"` //4.0.0 EventName string `json:"Event-Name"` //request EventCategory string `json:"Event-Category"` //firewall_add firewall_remove ServerId string `json:"Server-ID"` Server string `json:"Server,omitempty"` IP string `json:"IP-Address,omitempty"` //e.g. 1.2.3.4 Ports interface{} `json:"Ports,omitempty"` //e.g. sip http https 8443 [25,8443/tcp,53/udp,8080] Node string `json:"Node,omitempty"` //Node string `json:"Freeswitch-Node"` //freeswitch@server.com } func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action { logit(6, "RoutingKey: "+string(d.RoutingKey)) logit(6, "AMQP message received: "+string(d.Body)) var msg KzMessage err := json.Unmarshal(d.Body, &msg) if err != nil { log.Println("handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: " + string(d.Body) + "\nUnmarshalling error: " + err.Error()) return rabbitmq.NackDiscard } for _, appname := range msgAppNameFilters { if appconf.FilterEvtAppName == "*" || appname == msg.AppName { //only act if message matches a filter, OR if the filter is "*" for _, cat := range msgCatFilters { if appconf.FilterEvtCat == "*" || cat == msg.EventCategory { for _, name := range msgNameFilters { if appconf.FilterEvtName == "*" || name == msg.EventName { firewall(msg.EventCategory, msg.IP, msg.Ports) break } } break } } break } } /* err := kazooPublish(ta.Domain, input.Data.ChallengeText, "request") if err != nil { log.Println(err.Error()) return c.JSON(errorOut(http.StatusInternalServerError, "Error publishing message."+err.Error())) } */ return rabbitmq.Ack } func kazooPublish(pub *rabbitmq.Publisher, challenge, event string) error { msg := KzMessage{ MsgId: uuid.New().String(), AppName: appname, AppVersion: version, EventCategory: "firewall_" + event, EventName: "request", //"request" or "delete" ServerId: appname + "@" + myHostname, Server: myHostname, Node: "freeswitch@" + myHostname, } msg_json, err := json.Marshal(msg) if err != nil { return err } err = pub.Publish(msg_json, []string{appconf.AmqpPubRoutingKey}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch), ) return errors.New("Error publishing to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_json) + " ERROR: " + err.Error()) } func amqp() { if appconf.AmqpDirectRoutingKey == "" { appconf.AmqpDirectRoutingKey = "call_shield." + myHostname + ".*.*" } if appconf.AmqpSubRoutingKey == "" { appconf.AmqpSubRoutingKey = "call_shield." + appconf.ServerType + ".firewall.*" } if appconf.AmqpSubExch == "" { appconf.AmqpSubExch = appconf.AmqpExch } if appconf.AmqpPubExch == "" { appconf.AmqpPubExch = appconf.AmqpExch } //parse config AMQP uri[s] var amqpURIs []string amqpURIs = strings.Split(appconf.AmqpURI, ",") for n, uri := range amqpURIs { go amqpConnect(uri, n) } log.Println(appnameFull + " system started.\n-----> [READY]") fmt.Println(appnameFull + " system started.\n-----> [READY]") // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs fmt.Printf("Received signal: ") fmt.Println(sig) done <- true }() for { select { case <-done: fmt.Println("Shutting myself down...") return case <-time.After(100 * time.Millisecond): //case <-time.After(time.Second): //fmt.Println("tick") } } } func amqpConnect(amqpURI string, connNumber int) { ///////////////////////////////////////////// // RabbitMQ Setup Connection log.Println("Connecting to RabbitMQ: " + amqpURI) amqpConn, err := rabbitmq.NewConn( amqpURI, rabbitmq.WithConnectionOptionsLogging, ) if err != nil { fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error()) log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error()) } defer amqpConn.Close() ///////////////////////////////////////////// // Message Filters msgCatFilters = strings.Split(appconf.FilterEvtCat, ",") msgNameFilters = strings.Split(appconf.FilterEvtName, ",") msgAppNameFilters = strings.Split(appconf.FilterEvtAppName, ",") if (len(msgCatFilters) > 0 && len(msgNameFilters) > 0 && len(msgAppNameFilters) > 0) && (msgCatFilters[0] != "*" || msgNameFilters[0] != "*" || msgAppNameFilters[0] != "*") { fmt.Println("Filtering event app names matching:", msgAppNameFilters) fmt.Println("Filtering event categories matching:", msgCatFilters) fmt.Println("Filtering event names matching:", msgNameFilters) } ///////////////////////////////////////////// // RabbitMQ Setup Publisher // NOTE: IF THERE ARE MULTILPLE AMQP CONNECTIONS, THE PUBLISHER WILL ONLY BE ON THE LAST ONE! publisher, err = rabbitmq.NewPublisher( amqpConn, rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsExchangeKind(appconf.AmqpExchType), rabbitmq.WithPublisherOptionsExchangeName(appconf.AmqpPubExch), rabbitmq.WithPublisherOptionsExchangeDeclare, ) if err != nil { fmt.Println("Unable to initialize RabbitMQ publisher: " + err.Error()) log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error()) } defer publisher.Close() logit(7, "Publisher configured on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpPubExch+"' with routing key: '"+appconf.AmqpPubRoutingKey+"'.") publisher.NotifyReturn(func(r rabbitmq.Return) { logit(4, fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body))) }) publisher.NotifyPublish(func(c rabbitmq.Confirmation) { logit(7, fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack)) }) go publisher_loop(publisher) queue := "q_w" + strconv.Itoa(connNumber) + "_" + appname + "_" + myHostname ///////////////////////////////////////////// // RabbitMQ Setup Consumer var consumer *rabbitmq.Consumer consumer, err = rabbitmq.NewConsumer( amqpConn, queue, rabbitmq.WithConsumerOptionsExchangeName(appconf.AmqpSubExch), rabbitmq.WithConsumerOptionsExchangeKind(appconf.AmqpExchType), rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpSubRoutingKey), rabbitmq.WithConsumerOptionsRoutingKey(appconf.AmqpDirectRoutingKey), rabbitmq.WithConsumerOptionsConsumerName("consumer_w"+strconv.Itoa(connNumber)+"_"+appname+"_"+myHostname), rabbitmq.WithConsumerOptionsQueueAutoDelete, rabbitmq.WithConsumerOptionsConcurrency(appconf.AmqpWorkers), rabbitmq.WithConsumerOptionsExchangeDeclare, //rabbitmq.WithConsumerOptionsQuorum, //rabbitmq.WithConsumerOptionsQueueDurable, //rabbitmq.WithConsumerOptionsBindingExchangeDurable, ) if err != nil { fmt.Println("Unable to initialize RabbitMQ consumer: " + err.Error()) log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error()) } defer consumer.Close() logit(6, "Consuming on '"+appconf.AmqpExchType+"' exchange: '"+appconf.AmqpSubExch+"' with routing key: '"+appconf.AmqpSubRoutingKey+"' using queue: "+queue+"'.") //logit(6, "Using "+fmt.Sprintf("%d", appconf.AmqpWorkers)+" concurrent workers to process inbound AMQP messages.") // block here err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages if err != nil { fmt.Println("Unable to start/run RabbitMQ consumer: " + err.Error()) log.Fatal("Unable to start/run RabbitMQ consumer: " + err.Error()) } } func publisher_loop(pub *rabbitmq.Publisher) { ///////////////////////////////////////////// // RabbitMQ Publish Test sigch := make(chan os.Signal, 1) signal.Notify(sigch, syscall.SIGUSR1, syscall.SIGUSR2) //create test message to send pubtest := KzMessage{ MsgId: uuid.New().String(), AppName: appname, AppVersion: version, EventCategory: "firewall_remove", EventName: "request", ServerId: appname + "@" + myHostname, Server: myHostname, Node: "firewallagent_" + appconf.ServerType + "@" + myHostname, } msg_pubtest, err := json.Marshal(pubtest) if err != nil { log.Println("Unable to marshal pubtest into JSON! Published test message will be blank.") } // block thread for { select { case sig := <-sigch: log.Println("Received signal " + fmt.Sprintf("%s", sig) + " on pubch channel. Publishing test message... ") if sig == syscall.SIGUSR2 { msgFromFile, err := readJsonFile(appconf.PubMessageFile) if err == nil { msg_pubtest = msgFromFile } log.Println("Publishing test message to exchange '" + appconf.AmqpPubExch + "' with routing key '" + appconf.AmqpPubRoutingKey + "': \n" + string(msg_pubtest)) err = pub.Publish(msg_pubtest, []string{appconf.AmqpPubRoutingKey}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsExchange(appconf.AmqpPubExch), ) if err != nil { log.Println("Error publishing message: " + err.Error()) } else { log.Println("Message PUBLISHED.") } } case <-time.After(50 * time.Millisecond): //case <-time.After(time.Second): //fmt.Println("tick") } } }