From 27222a479e1338477d97c4beace13f247b9bd98e Mon Sep 17 00:00:00 2001 From: Piotr Biernat Date: Tue, 29 Sep 2020 22:35:59 +0200 Subject: [PATCH] Added base RabbitMQ queue support --- database/mongodb.go | 4 +- docker-compose.yml | 7 ++ go.mod | 1 + go.sum | 2 + handler/articles.go | 89 +++++++++++++---------- handler/base.go | 31 +++++++- handler/products.go | 93 ++++++++++++++++++++++++ main.go | 4 +- model/article.go | 2 +- model/product.go | 25 +++++++ queue/consumer/test_worker.go | 23 ++++++ queue/rabbitmq.go | 131 ++++++++++++++++++++++++++++++++++ 12 files changed, 368 insertions(+), 44 deletions(-) create mode 100644 handler/products.go create mode 100644 model/product.go create mode 100644 queue/consumer/test_worker.go create mode 100644 queue/rabbitmq.go diff --git a/database/mongodb.go b/database/mongodb.go index 4d77990..2e030fd 100644 --- a/database/mongodb.go +++ b/database/mongodb.go @@ -11,8 +11,8 @@ type MongoDb struct { database string } -// NewMongoDb func -func NewMongoDb( /* dbName string */ ) *MongoDb { +// New Connect to MongoDB Server +func New( /* dbName string */ ) *MongoDb { dbName := "go-rest-api" // FIXME mgm.SetDefaultConfig(nil, dbName, options.Client().ApplyURI("mongodb://user:passwd@localhost:27017")) diff --git a/docker-compose.yml b/docker-compose.yml index 15e88ac..d946eae 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,13 @@ services: networks: - app-network + rabbitmq: + image: rabbitmq:latest + env_file: .env + ports: + - 5672:5672 + networks: + - app-network networks: app-network: driver: bridge \ No newline at end of file diff --git a/go.mod b/go.mod index 2726015..b8cc52d 100644 --- a/go.mod +++ b/go.mod @@ -9,5 +9,6 @@ require ( github.com/gorilla/mux v1.8.0 github.com/labstack/echo v3.3.10+incompatible github.com/labstack/echo/v4 v4.1.17 + github.com/streadway/amqp v1.0.0 go.mongodb.org/mongo-driver v1.4.1 ) diff --git a/go.sum b/go.sum index 5eeee75..17f097f 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= diff --git a/handler/articles.go b/handler/articles.go index 11bb112..775430c 100644 --- a/handler/articles.go +++ b/handler/articles.go @@ -1,105 +1,118 @@ package handler import ( - "go-rest-api/database" "go-rest-api/model" + "log" "net/http" - "github.com/go-playground/validator/v10" "github.com/labstack/echo/v4" "go.mongodb.org/mongo-driver/bson" ) -var ( - internalErr = "Unable to operate on Article." - articleNotFoundErr = "Article not found." - validationErr = "Validation Error." -) - // AttachArticleHandlersToRouter func func AttachArticleHandlersToRouter(e *echo.Echo) { - db := database.NewMongoDb() - ah := NewArticlesHandler(db) + h := NewArticlesHandler() + + e.GET("/rabbit", h.testRabbit) g := e.Group("/articles") - g.GET("", ah.getAllArticles) - g.GET("/:id", ah.getOneArticle) - g.POST("", ah.createArticle) - g.PUT("/:id", ah.updateArticle) - g.DELETE("/:id", ah.removeArticle) + g.GET("", h.getAllArticles) + g.GET("/:id", h.getOneArticle) + g.POST("", h.createArticle) + g.PUT("/:id", h.updateArticle) + g.DELETE("/:id", h.removeArticle) } -// NewArticlesHandler return ArticlesHandler -func NewArticlesHandler(db *database.MongoDb) ArticlesHandler { - return ArticlesHandler{ - parent: BaseHandler{ - db: db, - validator: validator.New(), - }, - } +// NewArticlesHandler return new Articles Handler +func NewArticlesHandler() ArticlesHandler { + return ArticlesHandler{BaseHandler: *NewHandler()} } // ArticlesHandler type type ArticlesHandler struct { - parent BaseHandler + BaseHandler } -func (ah *ArticlesHandler) getAllArticles(c echo.Context) error { +// ArticleGenerateHeavyPDFMessage Struct that holds all data necessary for demo action +type ArticleGenerateHeavyPDFMessage struct { + Title string `json:"title"` + Content string `json:"content"` + Pages int `json:"pages"` +} + +func (h *ArticlesHandler) testRabbit(c echo.Context) error { + log.Println("test rabbit publish method calling...") + + q := h.amqp.DeclareQueue("article_queue_test") + m := ArticleGenerateHeavyPDFMessage{ + Title: "Test title", + Content: "Lorem ipsum dolor sit amet", + Pages: 10, + } + + body, _ := h.JSONEncode(m) + h.amqp.Publish(q.Name, body, "application/json") + + response := "Please wait until generating pdf or some other long task (in rabbitMQ queue) done" + return c.JSON(http.StatusOK, response) +} + +func (h *ArticlesHandler) getAllArticles(c echo.Context) error { coll := &model.Articles{} - articles := ah.parent.GetAllObjects(&model.Article{}, coll, bson.D{}) + articles := h.BaseHandler.GetAllObjects(&model.Article{}, coll, bson.D{}) return c.JSON(http.StatusOK, articles) } -func (ah *ArticlesHandler) getOneArticle(c echo.Context) error { +func (h *ArticlesHandler) getOneArticle(c echo.Context) error { id := c.Param("id") art := &model.Article{} - if _, err := ah.parent.GetSingleObject(art, id); err != nil { + if _, err := h.BaseHandler.GetSingleObject(art, id); err != nil { return err } return c.JSON(http.StatusOK, art) } -func (ah *ArticlesHandler) createArticle(c echo.Context) error { +func (h *ArticlesHandler) createArticle(c echo.Context) error { art := &model.Article{} c.Bind(art) - if _, err := ah.parent.CreateObject(art); err != nil { + if _, err := h.BaseHandler.CreateObject(art); err != nil { return err } - return c.JSON(http.StatusOK, art) + return c.JSON(http.StatusCreated, art) } -func (ah *ArticlesHandler) updateArticle(c echo.Context) error { +func (h *ArticlesHandler) updateArticle(c echo.Context) error { id := c.Param("id") model := &model.Article{} - art, err := ah.parent.GetSingleObject(model, id) + art, err := h.BaseHandler.GetSingleObject(model, id) if err != nil { return err } DeserializeFromRequest(c.Request(), art) - if _, err := ah.parent.UpdateObject(art); err != nil { + if _, err := h.BaseHandler.UpdateObject(art); err != nil { return err } return c.JSON(http.StatusOK, art) } -func (ah *ArticlesHandler) removeArticle(c echo.Context) error { +func (h *ArticlesHandler) removeArticle(c echo.Context) error { id := c.Param("id") model := &model.Article{} - - art, err := ah.parent.GetSingleObject(model, id) + + art, err := h.BaseHandler.GetSingleObject(model, id) if err != nil { return err } - if err := ah.parent.RemoveObject(art); err != nil { + if err := h.BaseHandler.RemoveObject(art); err != nil { return err } diff --git a/handler/base.go b/handler/base.go index 9f48558..dde1e04 100644 --- a/handler/base.go +++ b/handler/base.go @@ -3,6 +3,7 @@ package handler import ( "encoding/json" "go-rest-api/database" + "go-rest-api/queue" "io/ioutil" "net/http" @@ -12,9 +13,15 @@ import ( "go.mongodb.org/mongo-driver/bson" ) +var ( + internalErr = "Unable to operate on Object." + objectNotFoundErr = "Object not found." + validationErr = "Validation Error." +) + // DeserializeFromRequest func func DeserializeFromRequest(request *http.Request, output interface{}) { - defer request.Body.Close() + defer request.Body.Close() // FIXME bad place for defer...? body, _ := ioutil.ReadAll(request.Body) _ = json.Unmarshal(body, output) @@ -23,9 +30,19 @@ func DeserializeFromRequest(request *http.Request, output interface{}) { // BaseHandler type type BaseHandler struct { db *database.MongoDb + amqp *queue.AMQP validator *validator.Validate } +// NewHandler Create BaseHandler instance +func NewHandler() *BaseHandler { + return &BaseHandler{ + db: database.New(), + amqp: queue.New(), + validator: validator.New(), + } +} + // GetAllObjects Retrieve all objects func (h *BaseHandler) GetAllObjects(model mgm.Model, coll interface{}, filter bson.D) interface{} { h.db.FindAll(model, coll, filter) @@ -36,7 +53,7 @@ func (h *BaseHandler) GetAllObjects(model mgm.Model, coll interface{}, filter bs // GetSingleObject Retrieve single object func (h *BaseHandler) GetSingleObject(model mgm.Model, id string) (mgm.Model, *echo.HTTPError) { if err := h.db.FindByID(model, id); err != nil { - return nil, echo.NewHTTPError(http.StatusNotFound, articleNotFoundErr) + return nil, echo.NewHTTPError(http.StatusNotFound, objectNotFoundErr) } return model, nil @@ -81,3 +98,13 @@ func (h *BaseHandler) RemoveObject(model mgm.Model) *echo.HTTPError { func (h *BaseHandler) Validate(i interface{}) error { return h.validator.Struct(i) } + +// JsonEncode Encode *object* to string +func (h *BaseHandler) JSONEncode(data interface{}) (string, error) { + body, err := json.Marshal(data) + if err != nil { + return "", err + } + + return string(body), nil +} diff --git a/handler/products.go b/handler/products.go new file mode 100644 index 0000000..2afe05b --- /dev/null +++ b/handler/products.go @@ -0,0 +1,93 @@ +package handler + +import ( + "go-rest-api/model" + "net/http" + + "github.com/labstack/echo/v4" + "go.mongodb.org/mongo-driver/bson" +) + +// AttachProductHandlersToRouter func +func AttachProductHandlersToRouter(e *echo.Echo) { + h := NewProductsHandler() + + g := e.Group("/products") + g.GET("", h.getAllProducts) + g.GET("/:id", h.getOneProduct) + g.POST("", h.createProduct) + g.PUT("/:id", h.updateProduct) + g.DELETE("/:id", h.removeProduct) +} + +// NewProductsHandler return new Products handler +func NewProductsHandler() ProductsHandler { + return ProductsHandler{BaseHandler: *NewHandler()} +} + +// ProductsHandler type +type ProductsHandler struct { + BaseHandler +} + +func (h *ProductsHandler) getAllProducts(c echo.Context) error { + coll := &model.Products{} + products := h.BaseHandler.GetAllObjects(&model.Product{}, coll, bson.D{}) + + return c.JSON(http.StatusOK, products) +} + +func (h *ProductsHandler) getOneProduct(c echo.Context) error { + id := c.Param("id") + prod := &model.Product{} + + if _, err := h.BaseHandler.GetSingleObject(prod, id); err != nil { + return err + } + + return c.JSON(http.StatusOK, prod) +} + +func (h *ProductsHandler) createProduct(c echo.Context) error { + prod := &model.Product{} + c.Bind(prod) + + if _, err := h.BaseHandler.CreateObject(prod); err != nil { + return err + } + + return c.JSON(http.StatusCreated, prod) +} + +func (h *ProductsHandler) updateProduct(c echo.Context) error { + id := c.Param("id") + model := &model.Product{} + + art, err := h.BaseHandler.GetSingleObject(model, id) + if err != nil { + return err + } + DeserializeFromRequest(c.Request(), art) + + if _, err := h.BaseHandler.UpdateObject(art); err != nil { + return err + } + + return c.JSON(http.StatusOK, art) +} + +func (h *ProductsHandler) removeProduct(c echo.Context) error { + id := c.Param("id") + model := &model.Product{} + + art, err := h.BaseHandler.GetSingleObject(model, id) + if err != nil { + return err + } + + if err := h.BaseHandler.RemoveObject(art); err != nil { + return err + } + + return c.NoContent(http.StatusOK) +} diff --git a/main.go b/main.go index cb39d7c..a6b5ef2 100644 --- a/main.go +++ b/main.go @@ -16,15 +16,17 @@ import ( var port = ":8000" func main() { + // Echo fw setup e := echo.New() e.Use(middleware.Logger()) e.Use(middleware.Recover()) e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ - AllowOrigins: []string{"http://localhost:8000"}, + AllowOrigins: []string{"http://localhost:8000"}, // FIXME use env var or sth like that AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept}, })) handler.AttachArticleHandlersToRouter(e) + handler.AttachProductHandlersToRouter(e) e.Static("/", "./public") e.GET("/", defaultHandler) diff --git a/model/article.go b/model/article.go index d22c4a9..c5d452a 100644 --- a/model/article.go +++ b/model/article.go @@ -15,7 +15,7 @@ type Article struct { // Articles Collection type Articles []Article -// NewArticle func +// NewArticle create new Article func NewArticle(title string, desc string, content string) *Article { return &Article{ Title: title, diff --git a/model/product.go b/model/product.go new file mode 100644 index 0000000..8b4875c --- /dev/null +++ b/model/product.go @@ -0,0 +1,25 @@ +package model + +import ( + "github.com/Kamva/mgm/v3" +) + +// Product type +type Product struct { + mgm.DefaultModel `bson:",inline"` + Name string `json:"name" bson:"name" validate:"required"` + Price float64 `json:"price" bson:"price" validate:"required"` + VAT int `json:"vat" bson:"vat" validate:"required"` +} + +// Products Collection +type Products []Product + +// NewProduct create new Product +func NewProduct(name string, price float64, vat int) *Product { + return &Product{ + Name: name, + Price: price, + VAT: vat, + } +} diff --git a/queue/consumer/test_worker.go b/queue/consumer/test_worker.go new file mode 100644 index 0000000..989caad --- /dev/null +++ b/queue/consumer/test_worker.go @@ -0,0 +1,23 @@ +package main + +import ( + "log" + "time" + + "github.com/streadway/amqp" + + "go-rest-api/queue" +) + +// TestWorkerFunc +func main() { + amqp := queue.New() + q := amqp.DeclareQueue("article_queue_test") //failsafe + amqp.Consume(q.Name, callback) +} + +func callback(d amqp.Delivery) { + log.Println("Received message: ", string(d.Body)) + time.Sleep(5 * time.Second) + log.Println("Finished!") +} diff --git a/queue/rabbitmq.go b/queue/rabbitmq.go new file mode 100644 index 0000000..1450cec --- /dev/null +++ b/queue/rabbitmq.go @@ -0,0 +1,131 @@ +package queue + +import ( + "log" + + "github.com/streadway/amqp" +) + +var ( + instance *AMQP +) + +// AMQP struct +type AMQP struct { + conn *amqp.Connection + chn *amqp.Channel + queue amqp.Queue + err error +} + +// New Create new AMQP instance +func New() *AMQP { + if instance != nil { + return instance + } + + amqp := &AMQP{} + amqp.init() + instance = amqp + + return instance +} + +// Connect Connect to RabbitMQ server +func (a *AMQP) Connect(connStr string) { + a.conn, a.err = amqp.Dial(connStr) + failOnError(a.err, "Failed to connect to RabbitMQ", true) + + // defer a.conn.Close() +} + +//OpenChannel Open or create new channel +func (a *AMQP) OpenChannel() *amqp.Channel { + a.chn, a.err = a.conn.Channel() + failOnError(a.err, "Failed to open a channel", true) + + // defer a.chn.Close() + return a.chn +} + +// DeclareQueue Declares new queue +func (a *AMQP) DeclareQueue(name string /* , opts [string]interface{} */) amqp.Queue { + a.queue, a.err = a.chn.QueueDeclare( + name, // FIXME: ADD opts arg or sth like that + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(a.err, "Failed to declare a queue", true) + + return a.queue +} + +// Publish Send message to "queueName" queue +func (a *AMQP) Publish(queueName string, body string, ct string) { + a.err = a.chn.Publish( + "", // exchange + queueName, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: ct, + Body: []byte(body), + }, + ) + failOnError(a.err, "Failed to publish a message", false) +} + +// Consume Consume message from queue +func (a *AMQP) Consume(queueName string, callback func(d amqp.Delivery)) { + msgs := make(<-chan amqp.Delivery) + msgs, a.err = a.chn.Consume( + queueName, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(a.err, "Failed to register a consumer", false) + + forever := make(chan bool) + + go func() { + for d := range msgs { + go callback(d) + } + }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + <-forever +} + +// Close close currently opened channel +func (a *AMQP) Close() { + a.chn.Close() + a.conn.Close() +} + +// GetLastError Return last error +func (a *AMQP) GetLastError() error { + return a.err +} + +func (a *AMQP) init() { + a.Connect("amqp://guest:guest@localhost:5672/") // FIXME Use env var + a.OpenChannel() +} + +func failOnError(err error, msg string, fatal bool) { + if err != nil { + if fatal { + log.Fatalf("%s: %s", msg, err) + } else { + log.Printf("%s: %s", msg, err) + } + } +}