package queue import ( "log" "github.com/streadway/amqp" ) var ( instance *AMQP ) // AMQP struct type AMQP struct { conn *amqp.Connection chn *amqp.Channel queue amqp.Queue err error } // New Create new AMQP instance func New() *AMQP { if instance != nil { return instance } amqp := &AMQP{} amqp.init() instance = amqp return instance } // Connect Connect to RabbitMQ server func (a *AMQP) Connect(connStr string) { a.conn, a.err = amqp.Dial(connStr) failOnError(a.err, "Failed to connect to RabbitMQ", true) // defer a.conn.Close() } //OpenChannel Open or create new channel func (a *AMQP) OpenChannel() *amqp.Channel { a.chn, a.err = a.conn.Channel() failOnError(a.err, "Failed to open a channel", true) // defer a.chn.Close() return a.chn } // DeclareQueue Declares new queue func (a *AMQP) DeclareQueue(name string /* , opts [string]interface{} */) amqp.Queue { a.queue, a.err = a.chn.QueueDeclare( name, // FIXME: ADD opts arg or sth like that false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(a.err, "Failed to declare a queue", true) return a.queue } // Publish Send message to "queueName" queue func (a *AMQP) Publish(queueName string, body string, ct string) { a.err = a.chn.Publish( "", // exchange queueName, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: ct, Body: []byte(body), }, ) failOnError(a.err, "Failed to publish a message", false) } // Consume Consume message from queue func (a *AMQP) Consume(queueName string, callback func(d amqp.Delivery)) { msgs := make(<-chan amqp.Delivery) msgs, a.err = a.chn.Consume( queueName, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(a.err, "Failed to register a consumer", false) forever := make(chan bool) go func() { for d := range msgs { go callback(d) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever } // Close close currently opened channel func (a *AMQP) Close() { a.chn.Close() a.conn.Close() } // GetLastError Return last error func (a *AMQP) GetLastError() error { return a.err } func (a *AMQP) init() { a.Connect("amqp://guest:guest@localhost:5672/") // FIXME Use env var a.OpenChannel() } func failOnError(err error, msg string, fatal bool) { if err != nil { if fatal { log.Fatalf("%s: %s", msg, err) } else { log.Printf("%s: %s", msg, err) } } }