//LEAPI - ACME Certificate Renewal Control API - Copyright 2022-2025 Ruel Tmeizeh All Rights Reserved package main import ( "encoding/json" "fmt" "log" "os" "os/signal" "syscall" "time" "github.com/google/uuid" rabbitmq "github.com/wagslane/go-rabbitmq" ) var publisher *rabbitmq.Publisher var consumer *rabbitmq.Consumer var amqpExchange string = "nexus" var routingKey string = "nexus.dns_challenge_request" var routingKeyConsume string = "nexus.leapi" type KzMessage struct { MsgId string `json:"Msg-ID"` AppName string `json:"App-Name"` AppVersion string `json:"App-Version"` EventName string `json:"Event-Name"` EventCategory string `json:"Event-Category"` ServerId string `json:"Server-ID"` Node string `json:"Node,omitempty"` Server string `json:"Server,omitempty"` Domain string `json:"Domain,omitempty"` //e.g. pbx.company.com ChallengeDomain string `json:"Challenge-Domain,omitempty"` //usually a subdomain like _acme-challenge.pbx.company.com ChallengePhrase string `json:"Challenge-Phrase,omitempty"` Servers string `json:"servers,omitempty"` Domains string `json:"domains,omitempty"` } func handleAmqpMsg(d rabbitmq.Delivery) rabbitmq.Action { if appconf.Debug { log.Println("AMQP message received: " + string(d.Body)) } /* var msg KzMessage err := json.Unmarshal(d.Body, &msg) if err != nil { log.Println("handleAmqpMsg(): Error unmarshalling AMQP message into map[string]interface{}...discarding. Message body: " + string(d.Body) + "\nUnmarshalling error: " + err.Error()) return rabbitmq.NackDiscard } err := kazooDnsPublish(ta.Domain, input.Data.ChallengeText, "request") if err != nil { log.Println(err.Error()) return c.JSON(errorOut(http.StatusInternalServerError, "Error publishing DNS ADD message."+err.Error())) } */ return rabbitmq.Ack } func kazooDnsPublish(domain, challenge, eventName string) error { dnsmsg := KzMessage{ MsgId: uuid.New().String(), AppName: appname, AppVersion: version, EventCategory: "dns_challenge", EventName: eventName, //"request" or "delete" ServerId: appname + "@" + appconf.Hostname, Domain: domain, ChallengeDomain: "_acme-challenge." + domain, ChallengePhrase: challenge, //Node: myHostname, } dnsmsg_json, err := json.Marshal(dnsmsg) if err != nil { return err } err = publisher.Publish(dnsmsg_json, []string{routingKey}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsExchange(amqpExchange), ) //return errors.New("Error publishing to exchange '" + amqpExchange + "' with routing key '" + routingKey + "': \n" + string(dnsmsg_json) + " ERROR: " + err.Error()) return err } func amqp() { ///////////////////////////////////////////// // RabbitMQ Setup Connection log.Println("Connecting to RabbitMQ: " + appconf.AmqpURI) fmt.Println("Connecting to RabbitMQ: " + appconf.AmqpURI) amqpConn, err := rabbitmq.NewConn( appconf.AmqpURI, rabbitmq.WithConnectionOptionsLogging, ) if err != nil { fmt.Println("Unable to initialize RabbitMQ connection: " + err.Error()) log.Fatal("Unable to initialize RabbitMQ connection: " + err.Error()) } defer amqpConn.Close() ///////////////////////////////////////////// // RabbitMQ Setup Consumer log.Println("Starting AMQP consumer...") consumer, err = rabbitmq.NewConsumer( amqpConn, "q_"+appname+"_"+appconf.Hostname, rabbitmq.WithConsumerOptionsExchangeName(amqpExchange), rabbitmq.WithConsumerOptionsExchangeKind("topic"), rabbitmq.WithConsumerOptionsRoutingKey(routingKeyConsume), rabbitmq.WithConsumerOptionsConsumerName("consumer_"+appname+"_"+appconf.Hostname), rabbitmq.WithConsumerOptionsQueueAutoDelete, rabbitmq.WithConsumerOptionsConcurrency(2), //rabbitmq.WithConsumerOptionsQuorum, //rabbitmq.WithConsumerOptionsQueueDurable, //rabbitmq.WithConsumerOptionsExchangeDeclare, //rabbitmq.WithConsumerOptionsBindingExchangeDurable, ) if err != nil { log.Fatal("Unable to initialize RabbitMQ consumer: " + err.Error()) } defer consumer.Close() go func() { err = consumer.Run(handleAmqpMsg) //this is the function that we want to call to consume presence messages if err != nil { log.Fatal("Unable to start/run RabbitMQ consumer: " + err.Error()) } }() log.Println("Consuming on topic exchange: '" + amqpExchange + "' with routing key: '" + routingKeyConsume + "' using queue: 'consumer_" + appname + "_" + appconf.Hostname + "'.") ///////////////////////////////////////////// // RabbitMQ Setup Publisher log.Println("Starting AMQP publisher...") publisher, err = rabbitmq.NewPublisher( amqpConn, rabbitmq.WithPublisherOptionsLogging, rabbitmq.WithPublisherOptionsExchangeKind("topic"), rabbitmq.WithPublisherOptionsExchangeName(amqpExchange), rabbitmq.WithPublisherOptionsExchangeDeclare, ) if err != nil { log.Fatal("Unable to initialize RabbitMQ publisher: " + err.Error()) } defer publisher.Close() log.Println("AMQP publisher configured on topic exchange: '" + amqpExchange + "' with routing key: '" + routingKey + "'.") publisher.NotifyReturn(func(r rabbitmq.Return) { log.Println(fmt.Sprintf("RabbitMQ published message returned from server: %s\n", string(r.Body))) }) publisher.NotifyPublish(func(c rabbitmq.Confirmation) { log.Println(fmt.Sprintf("Message confirmed from RabbitMQ server. tag: %v, ack: %v\n", c.DeliveryTag, c.Ack)) }) ///////////////////////////////////////////// // RabbitMQ Publish Test sigch := make(chan os.Signal, 1) signal.Notify(sigch, syscall.SIGUSR2) //create test message to send pubtest := KzMessage{ MsgId: uuid.New().String(), AppName: appname, AppVersion: version, EventCategory: "dns_challenge", EventName: "request", ServerId: appname + "@" + appconf.Hostname, Domain: "example.com", ChallengeDomain: "_acme-challenge.pbx.example.com", ChallengePhrase: "01234abcdef98765fedcba", //Node: myHostname, } msg_pubtest, err := json.Marshal(pubtest) if err != nil { log.Println("Unable to marshal pubtest into JSON! Published test message will be blank.") } for { select { case sig := <-sigch: log.Println("Received signal " + fmt.Sprintf("%s", sig) + " on pubch channel. Publishing test message... ") if sig == syscall.SIGUSR2 { msgFromFile, err := readJsonFile(appconf.PubMessageFile) if err == nil { msg_pubtest = msgFromFile } log.Println("Publishing test message to exchange '" + amqpExchange + "' with routing key '" + routingKey + "': \n" + string(msg_pubtest)) err = publisher.Publish(msg_pubtest, []string{routingKey}, rabbitmq.WithPublishOptionsContentType("application/json"), rabbitmq.WithPublishOptionsExchange(amqpExchange), ) if err != nil { log.Println("Error publishing message: " + err.Error()) } else { log.Println("Message PUBLISHED.") } } case <-time.After(100 * time.Millisecond): //case <-time.After(time.Second): //fmt.Println("tick") } } }