rest-api/queue/rabbitmq.go

132 lines
2.6 KiB
Go
Raw Normal View History

2020-09-29 22:35:59 +02:00
package queue
import (
"log"
"github.com/streadway/amqp"
)
var (
instance *AMQP
)
// AMQP struct
type AMQP struct {
conn *amqp.Connection
chn *amqp.Channel
queue amqp.Queue
err error
}
// New Create new AMQP instance
func New() *AMQP {
if instance != nil {
return instance
}
amqp := &AMQP{}
amqp.init()
instance = amqp
return instance
}
// Connect Connect to RabbitMQ server
func (a *AMQP) Connect(connStr string) {
a.conn, a.err = amqp.Dial(connStr)
failOnError(a.err, "Failed to connect to RabbitMQ", true)
// defer a.conn.Close()
}
//OpenChannel Open or create new channel
func (a *AMQP) OpenChannel() *amqp.Channel {
a.chn, a.err = a.conn.Channel()
failOnError(a.err, "Failed to open a channel", true)
// defer a.chn.Close()
return a.chn
}
// DeclareQueue Declares new queue
func (a *AMQP) DeclareQueue(name string /* , opts [string]interface{} */) amqp.Queue {
a.queue, a.err = a.chn.QueueDeclare(
name, // FIXME: ADD opts arg or sth like that
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(a.err, "Failed to declare a queue", true)
return a.queue
}
// Publish Send message to "queueName" queue
func (a *AMQP) Publish(queueName string, body string, ct string) {
a.err = a.chn.Publish(
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: ct,
Body: []byte(body),
},
)
failOnError(a.err, "Failed to publish a message", false)
}
// Consume Consume message from queue
func (a *AMQP) Consume(queueName string, callback func(d amqp.Delivery)) {
msgs := make(<-chan amqp.Delivery)
msgs, a.err = a.chn.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(a.err, "Failed to register a consumer", false)
forever := make(chan bool)
go func() {
for d := range msgs {
go callback(d)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
}
// Close close currently opened channel
func (a *AMQP) Close() {
a.chn.Close()
a.conn.Close()
}
// GetLastError Return last error
func (a *AMQP) GetLastError() error {
return a.err
}
func (a *AMQP) init() {
a.Connect("amqp://guest:guest@localhost:5672/") // FIXME Use env var
a.OpenChannel()
}
func failOnError(err error, msg string, fatal bool) {
if err != nil {
if fatal {
log.Fatalf("%s: %s", msg, err)
} else {
log.Printf("%s: %s", msg, err)
}
}
}