WIP: Rebuild project structure
This commit is contained in:
parent
f30344c658
commit
a68d97a228
@ -1,4 +1,4 @@
|
|||||||
**/*.go !**/*_test.go {
|
**/*.go !**/*_test.go {
|
||||||
prep: go build -o build main.go
|
prep: go build -o build src/main.go
|
||||||
daemon +sigterm: ./build
|
daemon +sigterm: ./build
|
||||||
}
|
}
|
58
src/database/mongodb.go
Normal file
58
src/database/mongodb.go
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
package database
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/Kamva/mgm/v3"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MongoDb type
|
||||||
|
type MongoDb struct {
|
||||||
|
database string
|
||||||
|
}
|
||||||
|
|
||||||
|
// New Connect to MongoDB Server
|
||||||
|
func New( /* dbName string */ ) *MongoDb {
|
||||||
|
dbName := "go-rest-api" // FIXME
|
||||||
|
|
||||||
|
mgm.SetDefaultConfig(nil, dbName, options.Client().ApplyURI("mongodb://user:passwd@localhost:27017"))
|
||||||
|
|
||||||
|
return &MongoDb{
|
||||||
|
database: dbName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindByID func
|
||||||
|
func (m *MongoDb) FindByID(model mgm.Model, id string) error {
|
||||||
|
err := mgm.Coll(model).FindByID(id, model)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindAll func
|
||||||
|
func (m *MongoDb) FindAll(model mgm.Model, collection interface{}, filter bson.D) error {
|
||||||
|
err := mgm.Coll(model).SimpleFind(collection, filter)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create func
|
||||||
|
func (m *MongoDb) Create(model mgm.Model) error {
|
||||||
|
return mgm.Coll(model).Create(model)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update func
|
||||||
|
func (m *MongoDb) Update(model mgm.Model) error {
|
||||||
|
return mgm.Coll(model).Update(model)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove func
|
||||||
|
func (m *MongoDb) Remove(model mgm.Model) error {
|
||||||
|
return mgm.Coll(model).Delete(model)
|
||||||
|
}
|
136
src/handler/articles.go
Normal file
136
src/handler/articles.go
Normal file
@ -0,0 +1,136 @@
|
|||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
|
||||||
|
"go-rest-api/src/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AttachArticleHandlersToRouter func
|
||||||
|
func AttachArticleHandlersToRouter(e *echo.Echo) {
|
||||||
|
h := NewArticlesHandler()
|
||||||
|
|
||||||
|
e.GET("/rabbit", h.testRabbit)
|
||||||
|
|
||||||
|
g := e.Group("/articles")
|
||||||
|
g.GET("", h.getAllArticles)
|
||||||
|
g.GET("/:id", h.getOneArticle)
|
||||||
|
g.POST("", h.createArticle)
|
||||||
|
g.PUT("/:id", h.updateArticle)
|
||||||
|
g.DELETE("/:id", h.removeArticle)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewArticlesHandler return new Articles Handler
|
||||||
|
func NewArticlesHandler() ArticlesHandler {
|
||||||
|
return ArticlesHandler{BaseHandler: *NewHandler()}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ArticlesHandler type
|
||||||
|
type ArticlesHandler struct {
|
||||||
|
BaseHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// ArticleGenerateHeavyPDFMessage Struct that holds all data necessary for demo action
|
||||||
|
type ArticleGenerateHeavyPDFMessage struct {
|
||||||
|
Title string `json:"title"`
|
||||||
|
Content string `json:"content"`
|
||||||
|
Pages int `json:"pages"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate Modify base struct with some appendix
|
||||||
|
func (m *ArticleGenerateHeavyPDFMessage) Generate(appendix string) {
|
||||||
|
m.Content += "\n" + appendix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ArticlesHandler) testRabbit(c echo.Context) error {
|
||||||
|
log.Println("test rabbit publish method calling...")
|
||||||
|
|
||||||
|
q := h.Queue.DeclareQueue("article_queue_test")
|
||||||
|
m := ArticleGenerateHeavyPDFMessage{
|
||||||
|
Title: "Test title",
|
||||||
|
Content: "Lorem ipsum dolor sit amet",
|
||||||
|
Pages: 1,
|
||||||
|
}
|
||||||
|
m.Generate("My fancy, new appendix")
|
||||||
|
|
||||||
|
body, _ := h.JSONEncode(m)
|
||||||
|
h.Queue.Publish(q.Name, body, "application/json")
|
||||||
|
log.Println(body)
|
||||||
|
// regular rabbit queue ^^
|
||||||
|
|
||||||
|
// exchangeName := "messages"
|
||||||
|
// if err := h.Queue.DeclareExchange(exchangeName, "fanout"); err != nil {
|
||||||
|
// log.Printf("Error creating `%s` exchange\n", exchangeName)
|
||||||
|
// }
|
||||||
|
// h.Queue.PublishToExchange(exchangeName, "Some Test Message..."+time.Now().String(), "text/plain")
|
||||||
|
// pub/sub pattern ^^
|
||||||
|
|
||||||
|
response := "Please wait until generating pdf or some other long task (in rabbitMQ queue) done"
|
||||||
|
return c.JSON(http.StatusOK, response)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ArticlesHandler) getAllArticles(c echo.Context) error {
|
||||||
|
coll := &model.Articles{}
|
||||||
|
articles := h.BaseHandler.GetAllObjects(&model.Article{}, coll, bson.D{})
|
||||||
|
|
||||||
|
return c.JSON(http.StatusOK, articles)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ArticlesHandler) getOneArticle(c echo.Context) error {
|
||||||
|
id := c.Param("id")
|
||||||
|
art := &model.Article{}
|
||||||
|
|
||||||
|
if _, err := h.BaseHandler.GetSingleObject(art, id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(http.StatusOK, art)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ArticlesHandler) createArticle(c echo.Context) error {
|
||||||
|
art := &model.Article{}
|
||||||
|
c.Bind(art)
|
||||||
|
|
||||||
|
if _, err := h.BaseHandler.CreateObject(art); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(http.StatusCreated, art)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ArticlesHandler) updateArticle(c echo.Context) error {
|
||||||
|
id := c.Param("id")
|
||||||
|
model := &model.Article{}
|
||||||
|
|
||||||
|
art, err := h.BaseHandler.GetSingleObject(model, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
DeserializeFromRequest(c.Request(), art)
|
||||||
|
|
||||||
|
if _, err := h.BaseHandler.UpdateObject(art); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(http.StatusOK, art)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ArticlesHandler) removeArticle(c echo.Context) error {
|
||||||
|
id := c.Param("id")
|
||||||
|
model := &model.Article{}
|
||||||
|
|
||||||
|
art, err := h.BaseHandler.GetSingleObject(model, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.BaseHandler.RemoveObject(art); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.NoContent(http.StatusOK)
|
||||||
|
}
|
110
src/handler/base.go
Normal file
110
src/handler/base.go
Normal file
@ -0,0 +1,110 @@
|
|||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"go-rest-api/src/database"
|
||||||
|
"go-rest-api/src/queue"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/Kamva/mgm/v3"
|
||||||
|
"github.com/go-playground/validator/v10"
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
internalErr = "Unable to operate on Object."
|
||||||
|
objectNotFoundErr = "Object not found."
|
||||||
|
validationErr = "Validation Error."
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeserializeFromRequest func
|
||||||
|
func DeserializeFromRequest(request *http.Request, output interface{}) {
|
||||||
|
defer request.Body.Close() // FIXME bad place for defer...?
|
||||||
|
|
||||||
|
body, _ := ioutil.ReadAll(request.Body)
|
||||||
|
_ = json.Unmarshal(body, output)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BaseHandler type
|
||||||
|
type BaseHandler struct {
|
||||||
|
Queue *queue.AMQP
|
||||||
|
db *database.MongoDb
|
||||||
|
validator *validator.Validate
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewHandler Create BaseHandler instance
|
||||||
|
func NewHandler() *BaseHandler {
|
||||||
|
return &BaseHandler{
|
||||||
|
Queue: queue.New(),
|
||||||
|
db: database.New(),
|
||||||
|
validator: validator.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAllObjects Retrieve all objects
|
||||||
|
func (h *BaseHandler) GetAllObjects(model mgm.Model, coll interface{}, filter bson.D) interface{} {
|
||||||
|
h.db.FindAll(model, coll, filter)
|
||||||
|
|
||||||
|
return coll
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSingleObject Retrieve single object
|
||||||
|
func (h *BaseHandler) GetSingleObject(model mgm.Model, id string) (mgm.Model, *echo.HTTPError) {
|
||||||
|
if err := h.db.FindByID(model, id); err != nil {
|
||||||
|
return nil, echo.NewHTTPError(http.StatusNotFound, objectNotFoundErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return model, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObject Try to create a new object
|
||||||
|
func (h *BaseHandler) CreateObject(model mgm.Model) (mgm.Model, *echo.HTTPError) {
|
||||||
|
if err := h.Validate(model); err != nil {
|
||||||
|
return nil, echo.NewHTTPError(http.StatusBadRequest, validationErr+" "+err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.db.Create(model); err != nil {
|
||||||
|
return nil, echo.NewHTTPError(http.StatusInternalServerError, internalErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return model, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateObject Try to update a new object
|
||||||
|
func (h *BaseHandler) UpdateObject(model mgm.Model) (mgm.Model, *echo.HTTPError) {
|
||||||
|
if err := h.Validate(model); err != nil {
|
||||||
|
return nil, echo.NewHTTPError(http.StatusBadRequest, validationErr+" "+err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.db.Update(model); err != nil {
|
||||||
|
return nil, echo.NewHTTPError(http.StatusInternalServerError, internalErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return model, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
//RemoveObject Try to remove object
|
||||||
|
func (h *BaseHandler) RemoveObject(model mgm.Model) *echo.HTTPError {
|
||||||
|
if err := h.db.Remove(model); err != nil {
|
||||||
|
return echo.NewHTTPError(http.StatusInternalServerError, internalErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate Validate model object
|
||||||
|
func (h *BaseHandler) Validate(i interface{}) error {
|
||||||
|
return h.validator.Struct(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// JsonEncode Encode *object* to string
|
||||||
|
func (h *BaseHandler) JSONEncode(data interface{}) (string, error) {
|
||||||
|
body, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(body), nil
|
||||||
|
}
|
93
src/handler/products.go
Normal file
93
src/handler/products.go
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"go-rest-api/src/model"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AttachProductHandlersToRouter func
|
||||||
|
func AttachProductHandlersToRouter(e *echo.Echo) {
|
||||||
|
h := NewProductsHandler()
|
||||||
|
|
||||||
|
g := e.Group("/products")
|
||||||
|
g.GET("", h.getAllProducts)
|
||||||
|
g.GET("/:id", h.getOneProduct)
|
||||||
|
g.POST("", h.createProduct)
|
||||||
|
g.PUT("/:id", h.updateProduct)
|
||||||
|
g.DELETE("/:id", h.removeProduct)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProductsHandler return new Products handler
|
||||||
|
func NewProductsHandler() ProductsHandler {
|
||||||
|
return ProductsHandler{BaseHandler: *NewHandler()}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProductsHandler type
|
||||||
|
type ProductsHandler struct {
|
||||||
|
BaseHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ProductsHandler) getAllProducts(c echo.Context) error {
|
||||||
|
coll := &model.Products{}
|
||||||
|
products := h.BaseHandler.GetAllObjects(&model.Product{}, coll, bson.D{})
|
||||||
|
|
||||||
|
return c.JSON(http.StatusOK, products)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ProductsHandler) getOneProduct(c echo.Context) error {
|
||||||
|
id := c.Param("id")
|
||||||
|
prod := &model.Product{}
|
||||||
|
|
||||||
|
if _, err := h.BaseHandler.GetSingleObject(prod, id); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(http.StatusOK, prod)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ProductsHandler) createProduct(c echo.Context) error {
|
||||||
|
prod := &model.Product{}
|
||||||
|
c.Bind(prod)
|
||||||
|
|
||||||
|
if _, err := h.BaseHandler.CreateObject(prod); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(http.StatusCreated, prod)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ProductsHandler) updateProduct(c echo.Context) error {
|
||||||
|
id := c.Param("id")
|
||||||
|
model := &model.Product{}
|
||||||
|
|
||||||
|
art, err := h.BaseHandler.GetSingleObject(model, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
DeserializeFromRequest(c.Request(), art)
|
||||||
|
|
||||||
|
if _, err := h.BaseHandler.UpdateObject(art); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.JSON(http.StatusOK, art)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ProductsHandler) removeProduct(c echo.Context) error {
|
||||||
|
id := c.Param("id")
|
||||||
|
model := &model.Product{}
|
||||||
|
|
||||||
|
art, err := h.BaseHandler.GetSingleObject(model, id)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := h.BaseHandler.RemoveObject(art); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.NoContent(http.StatusOK)
|
||||||
|
}
|
57
src/main.go
Normal file
57
src/main.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/labstack/echo/v4"
|
||||||
|
"github.com/labstack/echo/v4/middleware"
|
||||||
|
|
||||||
|
"go-rest-api/src/handler"
|
||||||
|
)
|
||||||
|
|
||||||
|
var port = ":8000"
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// framework setup
|
||||||
|
e := echo.New()
|
||||||
|
e.Use(middleware.Logger())
|
||||||
|
e.Use(middleware.Recover())
|
||||||
|
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
|
||||||
|
AllowOrigins: []string{"http://localhost:8000"}, // FIXME use env var or sth like that
|
||||||
|
AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept},
|
||||||
|
}))
|
||||||
|
|
||||||
|
handler.AttachArticleHandlersToRouter(e)
|
||||||
|
handler.AttachProductHandlersToRouter(e)
|
||||||
|
|
||||||
|
e.Static("/", "./public")
|
||||||
|
e.GET("/", defaultHandler)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := e.Start(port); err != nil {
|
||||||
|
e.Logger.Info("Shutting down the server...")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
signCh := make(chan os.Signal)
|
||||||
|
signal.Notify(signCh, os.Interrupt)
|
||||||
|
signal.Notify(signCh, os.Kill)
|
||||||
|
|
||||||
|
sig := <-signCh
|
||||||
|
log.Println("Received terminal, graceful shutdown.", sig)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := e.Shutdown(ctx); err != nil {
|
||||||
|
e.Logger.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func defaultHandler(c echo.Context) error {
|
||||||
|
return echo.NewHTTPError(http.StatusMethodNotAllowed, "Method not allowed.")
|
||||||
|
}
|
25
src/model/article.go
Normal file
25
src/model/article.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/Kamva/mgm/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Article type
|
||||||
|
type Article struct {
|
||||||
|
mgm.DefaultModel `bson:",inline"`
|
||||||
|
Title string `json:"title" bson:"title" validate:"required"`
|
||||||
|
Description string `json:"description" bson:"description" validate:"required"`
|
||||||
|
Content string `json:"content" bson:"content" validate:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Articles Collection
|
||||||
|
type Articles []Article
|
||||||
|
|
||||||
|
// NewArticle create new Article
|
||||||
|
func NewArticle(title string, desc string, content string) *Article {
|
||||||
|
return &Article{
|
||||||
|
Title: title,
|
||||||
|
Description: desc,
|
||||||
|
Content: content,
|
||||||
|
}
|
||||||
|
}
|
25
src/model/product.go
Normal file
25
src/model/product.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/Kamva/mgm/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Product type
|
||||||
|
type Product struct {
|
||||||
|
mgm.DefaultModel `bson:",inline"`
|
||||||
|
Name string `json:"name" bson:"name" validate:"required"`
|
||||||
|
Price float64 `json:"price" bson:"price" validate:"required"`
|
||||||
|
VAT int `json:"vat" bson:"vat" validate:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Products Collection
|
||||||
|
type Products []Product
|
||||||
|
|
||||||
|
// NewProduct create new Product
|
||||||
|
func NewProduct(name string, price float64, vat int) *Product {
|
||||||
|
return &Product{
|
||||||
|
Name: name,
|
||||||
|
Price: price,
|
||||||
|
VAT: vat,
|
||||||
|
}
|
||||||
|
}
|
24
src/queue/consumer/test_worker.go
Normal file
24
src/queue/consumer/test_worker.go
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"go-rest-api/src/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestWorkerFunc
|
||||||
|
func main() {
|
||||||
|
amqp := queue.New()
|
||||||
|
q := amqp.DeclareQueue("article_queue_test")
|
||||||
|
amqp.Consume(q.Name, callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
func callback(d amqp.Delivery) {
|
||||||
|
log.Println("Received message: ", string(d.Body))
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
log.Println("Finished!")
|
||||||
|
d.Ack(false) // !! important: finish job
|
||||||
|
}
|
31
src/queue/consumer/test_worker_pubsub.go
Normal file
31
src/queue/consumer/test_worker_pubsub.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
|
||||||
|
"go-rest-api/src/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestWorkerFunc
|
||||||
|
func main() {
|
||||||
|
amqp := queue.New()
|
||||||
|
exchangeName := "messages"
|
||||||
|
if err := amqp.DeclareExchange(exchangeName, "fanout"); err != nil {
|
||||||
|
log.Printf("Error creating `%s` exchange\n", exchangeName)
|
||||||
|
}
|
||||||
|
q := amqp.DeclareTMPQueue()
|
||||||
|
if err := amqp.BindQueue(q.Name, exchangeName); err != nil {
|
||||||
|
log.Printf("Error binding `%s` with `%s`\n", q.Name, exchangeName)
|
||||||
|
}
|
||||||
|
amqp.Consume(q.Name, callback)
|
||||||
|
}
|
||||||
|
|
||||||
|
func callback(d amqp.Delivery) {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
log.Println("Received message: ", string(d.Body))
|
||||||
|
// log.Println("Finished!")
|
||||||
|
d.Ack(false) // IMPORTANT !
|
||||||
|
}
|
191
src/queue/rabbitmq.go
Normal file
191
src/queue/rabbitmq.go
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/streadway/amqp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
instance *AMQP
|
||||||
|
)
|
||||||
|
|
||||||
|
// AMQP struct
|
||||||
|
type AMQP struct {
|
||||||
|
conn *amqp.Connection
|
||||||
|
chn *amqp.Channel
|
||||||
|
queue amqp.Queue
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// New reates new AMQP instance
|
||||||
|
func New() *AMQP {
|
||||||
|
if instance != nil {
|
||||||
|
return instance
|
||||||
|
}
|
||||||
|
|
||||||
|
amqp := &AMQP{}
|
||||||
|
amqp.init()
|
||||||
|
instance = amqp
|
||||||
|
|
||||||
|
return instance
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect Connect to RabbitMQ server
|
||||||
|
func (a *AMQP) Connect(connStr string) {
|
||||||
|
a.conn, a.err = amqp.Dial(connStr)
|
||||||
|
failOnError(a.err, "Failed to connect to RabbitMQ", true)
|
||||||
|
|
||||||
|
// defer a.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// OpenChannel Open or create new channel
|
||||||
|
func (a *AMQP) OpenChannel() *amqp.Channel {
|
||||||
|
a.chn, a.err = a.conn.Channel()
|
||||||
|
failOnError(a.err, "Failed to open a channel", true)
|
||||||
|
|
||||||
|
a.chn.Qos(1, 0, false) // Fair dispatch
|
||||||
|
|
||||||
|
// defer a.chn.Close()
|
||||||
|
return a.chn
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeclareQueue Declares new queue
|
||||||
|
func (a *AMQP) DeclareQueue(name string /* , opts [string]interface{} */) amqp.Queue {
|
||||||
|
a.queue, a.err = a.chn.QueueDeclare(
|
||||||
|
name, // FIXME: ADD opts arg or sth like that
|
||||||
|
true, // durable - setted to true tells rabbit to keep unfinished task even if server crash
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
failOnError(a.err, "Failed to declare a queue", true)
|
||||||
|
|
||||||
|
return a.queue
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeclareTMPQueue Declares new temporary queue
|
||||||
|
func (a *AMQP) DeclareTMPQueue( /* , opts [string]interface{} */ ) amqp.Queue {
|
||||||
|
var queue amqp.Queue
|
||||||
|
queue, a.err = a.chn.QueueDeclare(
|
||||||
|
"", // FIXME: ADD opts arg or sth like that
|
||||||
|
false, // durable - setted to true tells rabbit to keep unfinished task even if server crash
|
||||||
|
false, // delete when unused
|
||||||
|
true, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
nil, // arguments
|
||||||
|
)
|
||||||
|
failOnError(a.err, "Failed to declare a tmp queue", true)
|
||||||
|
|
||||||
|
return queue
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeclareExchange Declares exchange with passed name and type
|
||||||
|
func (a *AMQP) DeclareExchange(name, t string) error {
|
||||||
|
return a.chn.ExchangeDeclare(
|
||||||
|
name, // name
|
||||||
|
t, // type
|
||||||
|
true, // durable
|
||||||
|
false, // auto-deleted
|
||||||
|
false, // internal
|
||||||
|
false, // no-wait
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindQueue Binds queue with exchange
|
||||||
|
func (a *AMQP) BindQueue(qName, eName string) error {
|
||||||
|
return a.chn.QueueBind(
|
||||||
|
qName, // queue name
|
||||||
|
"", // routing key
|
||||||
|
eName, // exchange
|
||||||
|
false,
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish Send message to queue "queueName"
|
||||||
|
func (a *AMQP) Publish(name string, body string, ct string) {
|
||||||
|
a.err = a.chn.Publish(
|
||||||
|
"", // exchange
|
||||||
|
name, // routing key
|
||||||
|
false, // mandatory
|
||||||
|
false, // immediate
|
||||||
|
amqp.Publishing{
|
||||||
|
DeliveryMode: amqp.Persistent, // save tasks even if server/consumer crash (required queue durable setted to true)
|
||||||
|
ContentType: ct,
|
||||||
|
Body: []byte(body),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
failOnError(a.err, "Failed to publish a message", false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PublishToExchange Send message to exchange "name"
|
||||||
|
func (a *AMQP) PublishToExchange(name string, body string, ct string) {
|
||||||
|
a.err = a.chn.Publish(
|
||||||
|
name, // exchange
|
||||||
|
"", // routing key
|
||||||
|
false, // mandatory
|
||||||
|
false, // immediate
|
||||||
|
amqp.Publishing{
|
||||||
|
DeliveryMode: amqp.Persistent, // save tasks even if server/consumer crash (required queue durable setted to true)
|
||||||
|
ContentType: ct,
|
||||||
|
Body: []byte(body),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
failOnError(a.err, "Failed to publish a message", false)
|
||||||
|
log.Printf(" [x] Sent %s", body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Consume Consume message from queue
|
||||||
|
func (a *AMQP) Consume(queueName string, callback func(d amqp.Delivery)) {
|
||||||
|
msgs := make(<-chan amqp.Delivery)
|
||||||
|
msgs, a.err = a.chn.Consume(
|
||||||
|
queueName, // queue
|
||||||
|
"", // consumer
|
||||||
|
false, // auto-ack manually ack for "non losing task" mode
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-local
|
||||||
|
false, // no-wait
|
||||||
|
nil, // args
|
||||||
|
)
|
||||||
|
failOnError(a.err, "Failed to register a consumer", false)
|
||||||
|
|
||||||
|
forever := make(chan bool)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for d := range msgs {
|
||||||
|
go callback(d)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
|
||||||
|
<-forever
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close close currently opened channel
|
||||||
|
func (a *AMQP) Close() {
|
||||||
|
a.chn.Close()
|
||||||
|
a.conn.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetLastError Return last error
|
||||||
|
func (a *AMQP) GetLastError() error {
|
||||||
|
return a.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *AMQP) init() {
|
||||||
|
a.Connect("amqp://guest:guest@localhost:5672/") // FIXME Use env var
|
||||||
|
a.OpenChannel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func failOnError(err error, msg string, fatal bool) {
|
||||||
|
if err != nil {
|
||||||
|
if fatal {
|
||||||
|
log.Fatalf("%s: %s", msg, err)
|
||||||
|
} else {
|
||||||
|
log.Printf("%s: %s", msg, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user