From a68d97a22828b57cadf1bffc5a1894e58447a965 Mon Sep 17 00:00:00 2001 From: Piotr Biernat Date: Sat, 20 Mar 2021 10:45:01 +0100 Subject: [PATCH] WIP: Rebuild project structure --- modd.conf | 2 +- src/database/mongodb.go | 58 +++++++ src/handler/articles.go | 136 ++++++++++++++++ src/handler/base.go | 110 +++++++++++++ src/handler/products.go | 93 +++++++++++ src/main.go | 57 +++++++ src/model/article.go | 25 +++ src/model/product.go | 25 +++ src/queue/consumer/test_worker.go | 24 +++ src/queue/consumer/test_worker_pubsub.go | 31 ++++ src/queue/rabbitmq.go | 191 +++++++++++++++++++++++ 11 files changed, 751 insertions(+), 1 deletion(-) create mode 100644 src/database/mongodb.go create mode 100644 src/handler/articles.go create mode 100644 src/handler/base.go create mode 100644 src/handler/products.go create mode 100644 src/main.go create mode 100644 src/model/article.go create mode 100644 src/model/product.go create mode 100644 src/queue/consumer/test_worker.go create mode 100644 src/queue/consumer/test_worker_pubsub.go create mode 100644 src/queue/rabbitmq.go diff --git a/modd.conf b/modd.conf index d76bc78..593e015 100644 --- a/modd.conf +++ b/modd.conf @@ -1,4 +1,4 @@ **/*.go !**/*_test.go { - prep: go build -o build main.go + prep: go build -o build src/main.go daemon +sigterm: ./build } \ No newline at end of file diff --git a/src/database/mongodb.go b/src/database/mongodb.go new file mode 100644 index 0000000..2e030fd --- /dev/null +++ b/src/database/mongodb.go @@ -0,0 +1,58 @@ +package database + +import ( + "github.com/Kamva/mgm/v3" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// MongoDb type +type MongoDb struct { + database string +} + +// New Connect to MongoDB Server +func New( /* dbName string */ ) *MongoDb { + dbName := "go-rest-api" // FIXME + + mgm.SetDefaultConfig(nil, dbName, options.Client().ApplyURI("mongodb://user:passwd@localhost:27017")) + + return &MongoDb{ + database: dbName, + } +} + +// FindByID func +func (m *MongoDb) FindByID(model mgm.Model, id string) error { + err := mgm.Coll(model).FindByID(id, model) + if err != nil { + return err + } + + return nil +} + +// FindAll func +func (m *MongoDb) FindAll(model mgm.Model, collection interface{}, filter bson.D) error { + err := mgm.Coll(model).SimpleFind(collection, filter) + if err != nil { + return err + } + + return nil +} + +// Create func +func (m *MongoDb) Create(model mgm.Model) error { + return mgm.Coll(model).Create(model) +} + +// Update func +func (m *MongoDb) Update(model mgm.Model) error { + return mgm.Coll(model).Update(model) +} + +// Remove func +func (m *MongoDb) Remove(model mgm.Model) error { + return mgm.Coll(model).Delete(model) +} diff --git a/src/handler/articles.go b/src/handler/articles.go new file mode 100644 index 0000000..c10a7e5 --- /dev/null +++ b/src/handler/articles.go @@ -0,0 +1,136 @@ +package handler + +import ( + "log" + "net/http" + + "github.com/labstack/echo/v4" + "go.mongodb.org/mongo-driver/bson" + + "go-rest-api/src/model" +) + +// AttachArticleHandlersToRouter func +func AttachArticleHandlersToRouter(e *echo.Echo) { + h := NewArticlesHandler() + + e.GET("/rabbit", h.testRabbit) + + g := e.Group("/articles") + g.GET("", h.getAllArticles) + g.GET("/:id", h.getOneArticle) + g.POST("", h.createArticle) + g.PUT("/:id", h.updateArticle) + g.DELETE("/:id", h.removeArticle) +} + +// NewArticlesHandler return new Articles Handler +func NewArticlesHandler() ArticlesHandler { + return ArticlesHandler{BaseHandler: *NewHandler()} +} + +// ArticlesHandler type +type ArticlesHandler struct { + BaseHandler +} + +// ArticleGenerateHeavyPDFMessage Struct that holds all data necessary for demo action +type ArticleGenerateHeavyPDFMessage struct { + Title string `json:"title"` + Content string `json:"content"` + Pages int `json:"pages"` +} + +// Generate Modify base struct with some appendix +func (m *ArticleGenerateHeavyPDFMessage) Generate(appendix string) { + m.Content += "\n" + appendix +} + +func (h *ArticlesHandler) testRabbit(c echo.Context) error { + log.Println("test rabbit publish method calling...") + + q := h.Queue.DeclareQueue("article_queue_test") + m := ArticleGenerateHeavyPDFMessage{ + Title: "Test title", + Content: "Lorem ipsum dolor sit amet", + Pages: 1, + } + m.Generate("My fancy, new appendix") + + body, _ := h.JSONEncode(m) + h.Queue.Publish(q.Name, body, "application/json") + log.Println(body) + // regular rabbit queue ^^ + + // exchangeName := "messages" + // if err := h.Queue.DeclareExchange(exchangeName, "fanout"); err != nil { + // log.Printf("Error creating `%s` exchange\n", exchangeName) + // } + // h.Queue.PublishToExchange(exchangeName, "Some Test Message..."+time.Now().String(), "text/plain") + // pub/sub pattern ^^ + + response := "Please wait until generating pdf or some other long task (in rabbitMQ queue) done" + return c.JSON(http.StatusOK, response) +} + +func (h *ArticlesHandler) getAllArticles(c echo.Context) error { + coll := &model.Articles{} + articles := h.BaseHandler.GetAllObjects(&model.Article{}, coll, bson.D{}) + + return c.JSON(http.StatusOK, articles) +} + +func (h *ArticlesHandler) getOneArticle(c echo.Context) error { + id := c.Param("id") + art := &model.Article{} + + if _, err := h.BaseHandler.GetSingleObject(art, id); err != nil { + return err + } + + return c.JSON(http.StatusOK, art) +} + +func (h *ArticlesHandler) createArticle(c echo.Context) error { + art := &model.Article{} + c.Bind(art) + + if _, err := h.BaseHandler.CreateObject(art); err != nil { + return err + } + + return c.JSON(http.StatusCreated, art) +} + +func (h *ArticlesHandler) updateArticle(c echo.Context) error { + id := c.Param("id") + model := &model.Article{} + + art, err := h.BaseHandler.GetSingleObject(model, id) + if err != nil { + return err + } + DeserializeFromRequest(c.Request(), art) + + if _, err := h.BaseHandler.UpdateObject(art); err != nil { + return err + } + + return c.JSON(http.StatusOK, art) +} + +func (h *ArticlesHandler) removeArticle(c echo.Context) error { + id := c.Param("id") + model := &model.Article{} + + art, err := h.BaseHandler.GetSingleObject(model, id) + if err != nil { + return err + } + + if err := h.BaseHandler.RemoveObject(art); err != nil { + return err + } + + return c.NoContent(http.StatusOK) +} diff --git a/src/handler/base.go b/src/handler/base.go new file mode 100644 index 0000000..35ec18f --- /dev/null +++ b/src/handler/base.go @@ -0,0 +1,110 @@ +package handler + +import ( + "encoding/json" + "go-rest-api/src/database" + "go-rest-api/src/queue" + "io/ioutil" + "net/http" + + "github.com/Kamva/mgm/v3" + "github.com/go-playground/validator/v10" + "github.com/labstack/echo/v4" + "go.mongodb.org/mongo-driver/bson" +) + +var ( + internalErr = "Unable to operate on Object." + objectNotFoundErr = "Object not found." + validationErr = "Validation Error." +) + +// DeserializeFromRequest func +func DeserializeFromRequest(request *http.Request, output interface{}) { + defer request.Body.Close() // FIXME bad place for defer...? + + body, _ := ioutil.ReadAll(request.Body) + _ = json.Unmarshal(body, output) +} + +// BaseHandler type +type BaseHandler struct { + Queue *queue.AMQP + db *database.MongoDb + validator *validator.Validate +} + +// NewHandler Create BaseHandler instance +func NewHandler() *BaseHandler { + return &BaseHandler{ + Queue: queue.New(), + db: database.New(), + validator: validator.New(), + } +} + +// GetAllObjects Retrieve all objects +func (h *BaseHandler) GetAllObjects(model mgm.Model, coll interface{}, filter bson.D) interface{} { + h.db.FindAll(model, coll, filter) + + return coll +} + +// GetSingleObject Retrieve single object +func (h *BaseHandler) GetSingleObject(model mgm.Model, id string) (mgm.Model, *echo.HTTPError) { + if err := h.db.FindByID(model, id); err != nil { + return nil, echo.NewHTTPError(http.StatusNotFound, objectNotFoundErr) + } + + return model, nil +} + +// CreateObject Try to create a new object +func (h *BaseHandler) CreateObject(model mgm.Model) (mgm.Model, *echo.HTTPError) { + if err := h.Validate(model); err != nil { + return nil, echo.NewHTTPError(http.StatusBadRequest, validationErr+" "+err.Error()) + } + + if err := h.db.Create(model); err != nil { + return nil, echo.NewHTTPError(http.StatusInternalServerError, internalErr) + } + + return model, nil +} + +// UpdateObject Try to update a new object +func (h *BaseHandler) UpdateObject(model mgm.Model) (mgm.Model, *echo.HTTPError) { + if err := h.Validate(model); err != nil { + return nil, echo.NewHTTPError(http.StatusBadRequest, validationErr+" "+err.Error()) + } + + if err := h.db.Update(model); err != nil { + return nil, echo.NewHTTPError(http.StatusInternalServerError, internalErr) + } + + return model, nil +} + +//RemoveObject Try to remove object +func (h *BaseHandler) RemoveObject(model mgm.Model) *echo.HTTPError { + if err := h.db.Remove(model); err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, internalErr) + } + + return nil +} + +// Validate Validate model object +func (h *BaseHandler) Validate(i interface{}) error { + return h.validator.Struct(i) +} + +// JsonEncode Encode *object* to string +func (h *BaseHandler) JSONEncode(data interface{}) (string, error) { + body, err := json.Marshal(data) + if err != nil { + return "", err + } + + return string(body), nil +} diff --git a/src/handler/products.go b/src/handler/products.go new file mode 100644 index 0000000..b0a53f2 --- /dev/null +++ b/src/handler/products.go @@ -0,0 +1,93 @@ +package handler + +import ( + "go-rest-api/src/model" + "net/http" + + "github.com/labstack/echo/v4" + "go.mongodb.org/mongo-driver/bson" +) + +// AttachProductHandlersToRouter func +func AttachProductHandlersToRouter(e *echo.Echo) { + h := NewProductsHandler() + + g := e.Group("/products") + g.GET("", h.getAllProducts) + g.GET("/:id", h.getOneProduct) + g.POST("", h.createProduct) + g.PUT("/:id", h.updateProduct) + g.DELETE("/:id", h.removeProduct) +} + +// NewProductsHandler return new Products handler +func NewProductsHandler() ProductsHandler { + return ProductsHandler{BaseHandler: *NewHandler()} +} + +// ProductsHandler type +type ProductsHandler struct { + BaseHandler +} + +func (h *ProductsHandler) getAllProducts(c echo.Context) error { + coll := &model.Products{} + products := h.BaseHandler.GetAllObjects(&model.Product{}, coll, bson.D{}) + + return c.JSON(http.StatusOK, products) +} + +func (h *ProductsHandler) getOneProduct(c echo.Context) error { + id := c.Param("id") + prod := &model.Product{} + + if _, err := h.BaseHandler.GetSingleObject(prod, id); err != nil { + return err + } + + return c.JSON(http.StatusOK, prod) +} + +func (h *ProductsHandler) createProduct(c echo.Context) error { + prod := &model.Product{} + c.Bind(prod) + + if _, err := h.BaseHandler.CreateObject(prod); err != nil { + return err + } + + return c.JSON(http.StatusCreated, prod) +} + +func (h *ProductsHandler) updateProduct(c echo.Context) error { + id := c.Param("id") + model := &model.Product{} + + art, err := h.BaseHandler.GetSingleObject(model, id) + if err != nil { + return err + } + DeserializeFromRequest(c.Request(), art) + + if _, err := h.BaseHandler.UpdateObject(art); err != nil { + return err + } + + return c.JSON(http.StatusOK, art) +} + +func (h *ProductsHandler) removeProduct(c echo.Context) error { + id := c.Param("id") + model := &model.Product{} + + art, err := h.BaseHandler.GetSingleObject(model, id) + if err != nil { + return err + } + + if err := h.BaseHandler.RemoveObject(art); err != nil { + return err + } + + return c.NoContent(http.StatusOK) +} diff --git a/src/main.go b/src/main.go new file mode 100644 index 0000000..0f28bfc --- /dev/null +++ b/src/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "log" + "net/http" + "os" + "os/signal" + "time" + + "github.com/labstack/echo/v4" + "github.com/labstack/echo/v4/middleware" + + "go-rest-api/src/handler" +) + +var port = ":8000" + +func main() { + // framework setup + e := echo.New() + e.Use(middleware.Logger()) + e.Use(middleware.Recover()) + e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ + AllowOrigins: []string{"http://localhost:8000"}, // FIXME use env var or sth like that + AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept}, + })) + + handler.AttachArticleHandlersToRouter(e) + handler.AttachProductHandlersToRouter(e) + + e.Static("/", "./public") + e.GET("/", defaultHandler) + + go func() { + if err := e.Start(port); err != nil { + e.Logger.Info("Shutting down the server...") + } + }() + + signCh := make(chan os.Signal) + signal.Notify(signCh, os.Interrupt) + signal.Notify(signCh, os.Kill) + + sig := <-signCh + log.Println("Received terminal, graceful shutdown.", sig) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := e.Shutdown(ctx); err != nil { + e.Logger.Fatal(err) + } +} + +func defaultHandler(c echo.Context) error { + return echo.NewHTTPError(http.StatusMethodNotAllowed, "Method not allowed.") +} diff --git a/src/model/article.go b/src/model/article.go new file mode 100644 index 0000000..c5d452a --- /dev/null +++ b/src/model/article.go @@ -0,0 +1,25 @@ +package model + +import ( + "github.com/Kamva/mgm/v3" +) + +// Article type +type Article struct { + mgm.DefaultModel `bson:",inline"` + Title string `json:"title" bson:"title" validate:"required"` + Description string `json:"description" bson:"description" validate:"required"` + Content string `json:"content" bson:"content" validate:"required"` +} + +// Articles Collection +type Articles []Article + +// NewArticle create new Article +func NewArticle(title string, desc string, content string) *Article { + return &Article{ + Title: title, + Description: desc, + Content: content, + } +} diff --git a/src/model/product.go b/src/model/product.go new file mode 100644 index 0000000..8b4875c --- /dev/null +++ b/src/model/product.go @@ -0,0 +1,25 @@ +package model + +import ( + "github.com/Kamva/mgm/v3" +) + +// Product type +type Product struct { + mgm.DefaultModel `bson:",inline"` + Name string `json:"name" bson:"name" validate:"required"` + Price float64 `json:"price" bson:"price" validate:"required"` + VAT int `json:"vat" bson:"vat" validate:"required"` +} + +// Products Collection +type Products []Product + +// NewProduct create new Product +func NewProduct(name string, price float64, vat int) *Product { + return &Product{ + Name: name, + Price: price, + VAT: vat, + } +} diff --git a/src/queue/consumer/test_worker.go b/src/queue/consumer/test_worker.go new file mode 100644 index 0000000..8b23982 --- /dev/null +++ b/src/queue/consumer/test_worker.go @@ -0,0 +1,24 @@ +package main + +import ( + "log" + "time" + + "github.com/streadway/amqp" + + "go-rest-api/src/queue" +) + +// TestWorkerFunc +func main() { + amqp := queue.New() + q := amqp.DeclareQueue("article_queue_test") + amqp.Consume(q.Name, callback) +} + +func callback(d amqp.Delivery) { + log.Println("Received message: ", string(d.Body)) + time.Sleep(5 * time.Second) + log.Println("Finished!") + d.Ack(false) // !! important: finish job +} diff --git a/src/queue/consumer/test_worker_pubsub.go b/src/queue/consumer/test_worker_pubsub.go new file mode 100644 index 0000000..3e1f28a --- /dev/null +++ b/src/queue/consumer/test_worker_pubsub.go @@ -0,0 +1,31 @@ +package main + +import ( + "log" + "time" + + "github.com/streadway/amqp" + + "go-rest-api/src/queue" +) + +// TestWorkerFunc +func main() { + amqp := queue.New() + exchangeName := "messages" + if err := amqp.DeclareExchange(exchangeName, "fanout"); err != nil { + log.Printf("Error creating `%s` exchange\n", exchangeName) + } + q := amqp.DeclareTMPQueue() + if err := amqp.BindQueue(q.Name, exchangeName); err != nil { + log.Printf("Error binding `%s` with `%s`\n", q.Name, exchangeName) + } + amqp.Consume(q.Name, callback) +} + +func callback(d amqp.Delivery) { + time.Sleep(5 * time.Second) + log.Println("Received message: ", string(d.Body)) + // log.Println("Finished!") + d.Ack(false) // IMPORTANT ! +} diff --git a/src/queue/rabbitmq.go b/src/queue/rabbitmq.go new file mode 100644 index 0000000..a71a7e7 --- /dev/null +++ b/src/queue/rabbitmq.go @@ -0,0 +1,191 @@ +package queue + +import ( + "log" + + "github.com/streadway/amqp" +) + +var ( + instance *AMQP +) + +// AMQP struct +type AMQP struct { + conn *amqp.Connection + chn *amqp.Channel + queue amqp.Queue + err error +} + +// New reates new AMQP instance +func New() *AMQP { + if instance != nil { + return instance + } + + amqp := &AMQP{} + amqp.init() + instance = amqp + + return instance +} + +// Connect Connect to RabbitMQ server +func (a *AMQP) Connect(connStr string) { + a.conn, a.err = amqp.Dial(connStr) + failOnError(a.err, "Failed to connect to RabbitMQ", true) + + // defer a.conn.Close() +} + +// OpenChannel Open or create new channel +func (a *AMQP) OpenChannel() *amqp.Channel { + a.chn, a.err = a.conn.Channel() + failOnError(a.err, "Failed to open a channel", true) + + a.chn.Qos(1, 0, false) // Fair dispatch + + // defer a.chn.Close() + return a.chn +} + +// DeclareQueue Declares new queue +func (a *AMQP) DeclareQueue(name string /* , opts [string]interface{} */) amqp.Queue { + a.queue, a.err = a.chn.QueueDeclare( + name, // FIXME: ADD opts arg or sth like that + true, // durable - setted to true tells rabbit to keep unfinished task even if server crash + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(a.err, "Failed to declare a queue", true) + + return a.queue +} + +// DeclareTMPQueue Declares new temporary queue +func (a *AMQP) DeclareTMPQueue( /* , opts [string]interface{} */ ) amqp.Queue { + var queue amqp.Queue + queue, a.err = a.chn.QueueDeclare( + "", // FIXME: ADD opts arg or sth like that + false, // durable - setted to true tells rabbit to keep unfinished task even if server crash + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(a.err, "Failed to declare a tmp queue", true) + + return queue +} + +// DeclareExchange Declares exchange with passed name and type +func (a *AMQP) DeclareExchange(name, t string) error { + return a.chn.ExchangeDeclare( + name, // name + t, // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, + ) +} + +// BindQueue Binds queue with exchange +func (a *AMQP) BindQueue(qName, eName string) error { + return a.chn.QueueBind( + qName, // queue name + "", // routing key + eName, // exchange + false, + nil, + ) +} + +// Publish Send message to queue "queueName" +func (a *AMQP) Publish(name string, body string, ct string) { + a.err = a.chn.Publish( + "", // exchange + name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, // save tasks even if server/consumer crash (required queue durable setted to true) + ContentType: ct, + Body: []byte(body), + }, + ) + failOnError(a.err, "Failed to publish a message", false) +} + +// PublishToExchange Send message to exchange "name" +func (a *AMQP) PublishToExchange(name string, body string, ct string) { + a.err = a.chn.Publish( + name, // exchange + "", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, // save tasks even if server/consumer crash (required queue durable setted to true) + ContentType: ct, + Body: []byte(body), + }, + ) + failOnError(a.err, "Failed to publish a message", false) + log.Printf(" [x] Sent %s", body) +} + +// Consume Consume message from queue +func (a *AMQP) Consume(queueName string, callback func(d amqp.Delivery)) { + msgs := make(<-chan amqp.Delivery) + msgs, a.err = a.chn.Consume( + queueName, // queue + "", // consumer + false, // auto-ack manually ack for "non losing task" mode + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(a.err, "Failed to register a consumer", false) + + forever := make(chan bool) + + go func() { + for d := range msgs { + go callback(d) + } + }() + + log.Printf(" [*] Waiting for messages. To exit press CTRL+C") + <-forever +} + +// Close close currently opened channel +func (a *AMQP) Close() { + a.chn.Close() + a.conn.Close() +} + +// GetLastError Return last error +func (a *AMQP) GetLastError() error { + return a.err +} + +func (a *AMQP) init() { + a.Connect("amqp://guest:guest@localhost:5672/") // FIXME Use env var + a.OpenChannel() +} + +func failOnError(err error, msg string, fatal bool) { + if err != nil { + if fatal { + log.Fatalf("%s: %s", msg, err) + } else { + log.Printf("%s: %s", msg, err) + } + } +}