Piotr Biernat
bb61e95c17
All checks were successful
continuous-integration/drone/push Build is passing
132 lines
2.6 KiB
Go
132 lines
2.6 KiB
Go
package queue
|
|
|
|
import (
|
|
"log"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
var (
|
|
instance *AMQP
|
|
)
|
|
|
|
// AMQP struct
|
|
type AMQP struct {
|
|
conn *amqp.Connection
|
|
chn *amqp.Channel
|
|
queue amqp.Queue
|
|
err error
|
|
}
|
|
|
|
// New Create new AMQP instance
|
|
func New() *AMQP {
|
|
if instance != nil {
|
|
return instance
|
|
}
|
|
|
|
amqp := &AMQP{}
|
|
amqp.init()
|
|
instance = amqp
|
|
|
|
return instance
|
|
}
|
|
|
|
// Connect Connect to RabbitMQ server
|
|
func (a *AMQP) Connect(connStr string) {
|
|
a.conn, a.err = amqp.Dial(connStr)
|
|
failOnError(a.err, "Failed to connect to RabbitMQ", true)
|
|
|
|
// defer a.conn.Close()
|
|
}
|
|
|
|
//OpenChannel Open or create new channel
|
|
func (a *AMQP) OpenChannel() *amqp.Channel {
|
|
a.chn, a.err = a.conn.Channel()
|
|
failOnError(a.err, "Failed to open a channel", true)
|
|
|
|
// defer a.chn.Close()
|
|
return a.chn
|
|
}
|
|
|
|
// DeclareQueue Declares new queue
|
|
func (a *AMQP) DeclareQueue(name string /* , opts [string]interface{} */) amqp.Queue {
|
|
a.queue, a.err = a.chn.QueueDeclare(
|
|
name, // FIXME: ADD opts arg or sth like that
|
|
false, // durable
|
|
false, // delete when unused
|
|
false, // exclusive
|
|
false, // no-wait
|
|
nil, // arguments
|
|
)
|
|
failOnError(a.err, "Failed to declare a queue", true)
|
|
|
|
return a.queue
|
|
}
|
|
|
|
// Publish Send message to "queueName" queue
|
|
func (a *AMQP) Publish(queueName string, body string, ct string) {
|
|
a.err = a.chn.Publish(
|
|
"", // exchange
|
|
queueName, // routing key
|
|
false, // mandatory
|
|
false, // immediate
|
|
amqp.Publishing{
|
|
ContentType: ct,
|
|
Body: []byte(body),
|
|
},
|
|
)
|
|
failOnError(a.err, "Failed to publish a message", false)
|
|
}
|
|
|
|
// Consume Consume message from queue
|
|
func (a *AMQP) Consume(queueName string, callback func(d amqp.Delivery)) {
|
|
msgs := make(<-chan amqp.Delivery)
|
|
msgs, a.err = a.chn.Consume(
|
|
queueName, // queue
|
|
"", // consumer
|
|
false, // auto-ack
|
|
false, // exclusive
|
|
false, // no-local
|
|
false, // no-wait
|
|
nil, // args
|
|
)
|
|
failOnError(a.err, "Failed to register a consumer", false)
|
|
|
|
forever := make(chan bool)
|
|
|
|
go func() {
|
|
for d := range msgs {
|
|
go callback(d)
|
|
}
|
|
}()
|
|
|
|
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
|
<-forever
|
|
}
|
|
|
|
// Close close currently opened channel
|
|
func (a *AMQP) Close() {
|
|
a.chn.Close()
|
|
a.conn.Close()
|
|
}
|
|
|
|
// GetLastError Return last error
|
|
func (a *AMQP) GetLastError() error {
|
|
return a.err
|
|
}
|
|
|
|
func (a *AMQP) init() {
|
|
a.Connect("amqp://guest:guest@localhost:5672/") // FIXME Use env var
|
|
a.OpenChannel()
|
|
}
|
|
|
|
func failOnError(err error, msg string, fatal bool) {
|
|
if err != nil {
|
|
if fatal {
|
|
log.Fatalf("%s: %s", msg, err)
|
|
} else {
|
|
log.Printf("%s: %s", msg, err)
|
|
}
|
|
}
|
|
}
|