Refactor, TLS support, v0.5
This commit is contained in:
parent
ed7801a221
commit
faefaceaf1
14
.app.config
Normal file
14
.app.config
Normal file
@ -0,0 +1,14 @@
|
|||||||
|
{
|
||||||
|
"ID": "basket",
|
||||||
|
"Name": "basket",
|
||||||
|
"Address": "__IP__",
|
||||||
|
"Tags": ["basket-svc", "basket", "https", "service"],
|
||||||
|
"Port": 443,
|
||||||
|
"Connect": {
|
||||||
|
"Native": true
|
||||||
|
},
|
||||||
|
"Check": {
|
||||||
|
"TCP": "__IP__:443",
|
||||||
|
"DeregisterCriticalServiceAfter": "10s"
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
SERVER_ADDR=:80
|
SERVER_ADDR=:443
|
||||||
|
|
||||||
APP_NAME=basket-svc
|
APP_NAME=basket-svc
|
||||||
APP_DOMAIN=basket.service.ego.io
|
APP_DOMAIN=basket.service.ego.io
|
||||||
|
@ -22,12 +22,13 @@ COPY --from=builder $BIN_OUTPUT /app
|
|||||||
COPY --from=builder /go/bin/migrate /bin/migrate
|
COPY --from=builder /go/bin/migrate /bin/migrate
|
||||||
COPY --from=builder /go/bin/health /bin/health
|
COPY --from=builder /go/bin/health /bin/health
|
||||||
COPY .env.docker /.env
|
COPY .env.docker /.env
|
||||||
|
COPY ./.app.config /
|
||||||
COPY ./bin /bin
|
COPY ./bin /bin
|
||||||
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
|
RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh
|
||||||
|
|
||||||
RUN apk add curl
|
RUN apk add curl
|
||||||
|
|
||||||
EXPOSE 80
|
EXPOSE 443
|
||||||
|
|
||||||
ENTRYPOINT ["entrypoint.sh"]
|
ENTRYPOINT ["entrypoint.sh"]
|
||||||
CMD ["sh", "-c", "/app"]
|
CMD ["sh", "-c", "/app"]
|
||||||
|
@ -24,8 +24,8 @@ waitForService "api-eventbus:5672"
|
|||||||
# waitForService "esb.service.ego.io:5672"
|
# waitForService "esb.service.ego.io:5672"
|
||||||
waitForService "api-logger:24224"
|
waitForService "api-logger:24224"
|
||||||
# waitForService "logger.service.ego.io:24224"
|
# waitForService "logger.service.ego.io:24224"
|
||||||
waitForService "pricing-svc:80"
|
waitForService "pricing-svc:443"
|
||||||
# waitForService "pricing.service.ego.io:80"
|
# waitForService "pricing.service.ego.io:443"
|
||||||
|
|
||||||
# run migrations
|
# run migrations
|
||||||
migrate.sh
|
migrate.sh
|
||||||
|
@ -21,5 +21,6 @@ fi
|
|||||||
|
|
||||||
# run migrations
|
# run migrations
|
||||||
migrate up
|
migrate up
|
||||||
|
echo "Done."
|
||||||
|
|
||||||
exit $version
|
exit $version
|
||||||
|
@ -55,7 +55,7 @@ func main() {
|
|||||||
log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err)
|
log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error)
|
logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
|
log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err)
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,7 @@ go 1.18
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
git.pbiernat.io/egommerce/api-entities v0.2.3
|
git.pbiernat.io/egommerce/api-entities v0.2.3
|
||||||
git.pbiernat.io/egommerce/go-api-pkg v0.2.88
|
git.pbiernat.io/egommerce/go-api-pkg v0.3.18
|
||||||
github.com/georgysavva/scany/v2 v2.0.0
|
github.com/georgysavva/scany/v2 v2.0.0
|
||||||
github.com/go-pg/migrations/v8 v8.1.0
|
github.com/go-pg/migrations/v8 v8.1.0
|
||||||
github.com/go-pg/pg/v10 v10.10.7
|
github.com/go-pg/pg/v10 v10.10.7
|
||||||
|
@ -37,8 +37,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
|
|||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
git.pbiernat.io/egommerce/api-entities v0.2.3 h1:mR6EYfZkAzh4teydb7KXDBWoxwVW3qasnmmH5J3mnas=
|
git.pbiernat.io/egommerce/api-entities v0.2.3 h1:mR6EYfZkAzh4teydb7KXDBWoxwVW3qasnmmH5J3mnas=
|
||||||
git.pbiernat.io/egommerce/api-entities v0.2.3/go.mod h1:INXAG5x4+i+vNwg1NpfPHiDW8nY1kn1K7pgLOtX+/I0=
|
git.pbiernat.io/egommerce/api-entities v0.2.3/go.mod h1:INXAG5x4+i+vNwg1NpfPHiDW8nY1kn1K7pgLOtX+/I0=
|
||||||
git.pbiernat.io/egommerce/go-api-pkg v0.2.88 h1:xya/39BnFeha3Oc76ad/ppoQd6AstTGQd87Qszamr1A=
|
git.pbiernat.io/egommerce/go-api-pkg v0.3.18 h1:0+C9BMsllrNvRbh4kb7dJ5lrzP1Lc7J4pb+KV76YrXk=
|
||||||
git.pbiernat.io/egommerce/go-api-pkg v0.2.88/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980=
|
git.pbiernat.io/egommerce/go-api-pkg v0.3.18/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980=
|
||||||
github.com/Azure/azure-sdk-for-go v44.0.0+incompatible h1:e82Yv2HNpS0kuyeCrV29OPKvEiqfs2/uJHic3/3iKdg=
|
github.com/Azure/azure-sdk-for-go v44.0.0+incompatible h1:e82Yv2HNpS0kuyeCrV29OPKvEiqfs2/uJHic3/3iKdg=
|
||||||
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
|
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
|
||||||
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
|
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=
|
||||||
|
@ -18,7 +18,7 @@ const (
|
|||||||
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
defEventBusURL = "amqp://guest:guest@api-eventbus:5672"
|
||||||
defKVNmspc = "dev.egommerce/service/basket"
|
defKVNmspc = "dev.egommerce/service/basket"
|
||||||
defLoggerAddr = "api-logger:24224"
|
defLoggerAddr = "api-logger:24224"
|
||||||
defNetAddr = ":80"
|
defNetAddr = ":443"
|
||||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||||
defPathPrefix = "/basket"
|
defPathPrefix = "/basket"
|
||||||
defRegistryAddr = "api-registry:8500"
|
defRegistryAddr = "api-registry:8500"
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"log"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -19,7 +21,7 @@ type (
|
|||||||
*fiber.App
|
*fiber.App
|
||||||
|
|
||||||
ID string
|
ID string
|
||||||
addr string // e.g. "127.0.0.1:80"
|
addr string // e.g. "127.0.0.1:443"
|
||||||
handlers map[string]any
|
handlers map[string]any
|
||||||
}
|
}
|
||||||
HeaderRequestID struct {
|
HeaderRequestID struct {
|
||||||
@ -47,8 +49,14 @@ func (s *Server) Start() error {
|
|||||||
SetupRouter(s)
|
SetupRouter(s)
|
||||||
|
|
||||||
// fmt.Printf("Starting server at: %s...\n", s.addr)
|
// fmt.Printf("Starting server at: %s...\n", s.addr)
|
||||||
|
cer, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
tlsCnf := &tls.Config{Certificates: []tls.Certificate{cer}}
|
||||||
|
|
||||||
ln, _ := net.Listen("tcp", s.addr)
|
ln, _ := net.Listen("tcp", s.addr)
|
||||||
// ln = tls.NewListener(ln, s.App.Server().TLSConfig)
|
ln = tls.NewListener(ln, tlsCnf)
|
||||||
|
|
||||||
return s.Listener(ln)
|
return s.Listener(ln)
|
||||||
}
|
}
|
||||||
|
@ -107,7 +107,6 @@ func (s *BasketService) RemoveItem(ctx context.Context, itemID int, basketID str
|
|||||||
if _, err := s.dbConn.Exec(ctx, sql, basketID, itemID); err != nil {
|
if _, err := s.dbConn.Exec(ctx, sql, basketID, itemID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// update basket updated_at field...
|
// update basket updated_at field...
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -121,10 +120,9 @@ func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemMo
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
price = productPrice.Price
|
price = productPrice.Price
|
||||||
}
|
}
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
sql := `UPDATE basket.basket_item SET quantity=$1, price=$2, updated_at=$3 WHERE basket_id=$4 AND product_id=$5`
|
sql := `UPDATE basket.basket_item SET quantity=$1, price=$2, updated_at=$3 WHERE basket_id=$4 AND product_id=$5`
|
||||||
if _, err := s.dbConn.Exec(ctx, sql, qty, price, now, item.BasketID, item.ProductID); err != nil {
|
if _, err := s.dbConn.Exec(ctx, sql, qty, price, time.Now(), item.BasketID, item.ProductID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package ui
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
entity "git.pbiernat.io/egommerce/api-entities/http"
|
entity "git.pbiernat.io/egommerce/api-entities/http"
|
||||||
"git.pbiernat.io/egommerce/api-entities/model"
|
"git.pbiernat.io/egommerce/api-entities/model"
|
||||||
@ -54,15 +55,16 @@ func RemoveProductFromBasket(srv *service.BasketService, productID, qty int, bas
|
|||||||
|
|
||||||
item, err := srv.FetchItem(ctx, basket.ID, productID)
|
item, err := srv.FetchItem(ctx, basket.ID, productID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
fmt.Printf("FetchItem not found item in db: %v\n", err)
|
||||||
ctx.Done() // FIXME
|
ctx.Done() // FIXME
|
||||||
return nil, err
|
return basket, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if item.Quantity <= qty {
|
if item.Quantity <= qty {
|
||||||
err = srv.RemoveItem(ctx, item.ProductID, item.BasketID)
|
err = srv.RemoveItem(ctx, item.ProductID, item.BasketID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ctx.Done() // FIXME
|
ctx.Done() // FIXME
|
||||||
return nil, err
|
return basket, err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
qty = item.Quantity - qty
|
qty = item.Quantity - qty
|
||||||
|
@ -1,133 +0,0 @@
|
|||||||
package worker
|
|
||||||
|
|
||||||
// import (
|
|
||||||
// "bytes"
|
|
||||||
// "encoding/json"
|
|
||||||
// "os"
|
|
||||||
|
|
||||||
// cnf "git.pbiernat.io/egommerce/basket-service/internal/config"
|
|
||||||
// "git.pbiernat.io/egommerce/basket-service/pkg/database"
|
|
||||||
// "git.pbiernat.io/egommerce/go-api-pkg/fluentd"
|
|
||||||
// "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq"
|
|
||||||
// "github.com/go-redis/redis/v8"
|
|
||||||
// )
|
|
||||||
|
|
||||||
// func WithCache(c *cnf.Config) OptionFn {
|
|
||||||
// return func(w *Worker) error {
|
|
||||||
// conn := redis.NewClient(&redis.Options{
|
|
||||||
// Addr: c.CacheAddr,
|
|
||||||
// Password: c.CachePassword,
|
|
||||||
// DB: 0,
|
|
||||||
// })
|
|
||||||
|
|
||||||
// w.Cache = conn
|
|
||||||
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func WithDatabase(c *cnf.Config) OptionFn {
|
|
||||||
// return func(w *Worker) error {
|
|
||||||
// conn, err := database.Connect(c.DbURL)
|
|
||||||
// if err != nil {
|
|
||||||
// w.Logger.Log("Failed to connect to Database server: %v\n", err)
|
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// w.Database = conn
|
|
||||||
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func WithEventbus(c *cnf.Config) OptionFn {
|
|
||||||
// return func(w *Worker) error {
|
|
||||||
// _, chn, err := rabbitmq.Open(c.EventBusURL)
|
|
||||||
// if err != nil {
|
|
||||||
// w.Logger.Log("Failed to connect to EventBus server: %v\n", err)
|
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// err = rabbitmq.NewExchange(chn, c.EventBusExchange)
|
|
||||||
// if err != nil {
|
|
||||||
// w.Logger.Log("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// _, err = chn.QueueDeclare(
|
|
||||||
// c.EventBusQueue, // name
|
|
||||||
// false, // durable
|
|
||||||
// false, // delete when unused
|
|
||||||
// false, // exclusive
|
|
||||||
// false, // no-wait
|
|
||||||
// nil, // arguments
|
|
||||||
// )
|
|
||||||
// if err != nil {
|
|
||||||
// w.Logger.Log("Failed to declare EventBus queue: %v\n", err)
|
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // w.bindQueues()
|
|
||||||
// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket")
|
|
||||||
// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket")
|
|
||||||
// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity")
|
|
||||||
|
|
||||||
// w.Eventbus = chn
|
|
||||||
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// func WithLogger(c *cnf.Config) OptionFn {
|
|
||||||
// return func(w *Worker) error {
|
|
||||||
// logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
|
|
||||||
// if err != nil {
|
|
||||||
// w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
|
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
|
|
||||||
// if err != nil {
|
|
||||||
// w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
|
|
||||||
// os.Exit(1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// w.Logger = logger
|
|
||||||
|
|
||||||
// return nil
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // func WithRegistry(c *cnf.Config) OptionFn {
|
|
||||||
// // return func(w *Worker) error {
|
|
||||||
// // registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
|
|
||||||
// // if err != nil {
|
|
||||||
// // w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// // w.Registry = registry
|
|
||||||
|
|
||||||
// // go func(w *Worker) { // Fetch Consul KV config and store it in app config
|
|
||||||
// // ticker := time.NewTicker(time.Second * 15)
|
|
||||||
// // for range ticker.C {
|
|
||||||
// // fetchKVConfig(w) // FIXME: duplicated in server
|
|
||||||
// // }
|
|
||||||
// // }(w)
|
|
||||||
|
|
||||||
// // return nil
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// // }
|
|
||||||
|
|
||||||
// func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
|
|
||||||
// config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil)
|
|
||||||
// if err != nil || config == nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// kvCnf := bytes.NewBuffer(config.Value)
|
|
||||||
// decoder := json.NewDecoder(kvCnf)
|
|
||||||
// if err := decoder.Decode(&w.Config); err != nil {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// }
|
|
@ -38,36 +38,9 @@ func New(c *Config) *Worker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *Worker) Start() error {
|
func (w *Worker) Start() error {
|
||||||
// Init
|
setupQueues(w)
|
||||||
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
|
|
||||||
if err != nil {
|
|
||||||
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
|
|
||||||
|
|
||||||
os.Exit(1)
|
err := w.doWork(w.doWrkUntil)
|
||||||
}
|
|
||||||
|
|
||||||
_, err = w.GetEventBus().QueueDeclare(
|
|
||||||
w.cnf.EventBusQueue, // name
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
false, // exclusive
|
|
||||||
false, // no-wait
|
|
||||||
nil, // arguments
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
|
|
||||||
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
|
|
||||||
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// // w.bindQueues()
|
|
||||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productAddedToBasket")
|
|
||||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productRemovedFromBasket")
|
|
||||||
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.updateQuantity")
|
|
||||||
|
|
||||||
err = w.doWork(w.doWrkUntil)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
|
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err)
|
||||||
close(w.doWrkUntil)
|
close(w.doWrkUntil)
|
||||||
@ -101,6 +74,8 @@ func (w *Worker) RegisterHandler(name string, fn func() any) {
|
|||||||
func (w *Worker) OnShutdown() {
|
func (w *Worker) OnShutdown() {
|
||||||
w.GetLogger().Log("Worker %s is going down...", w.ID)
|
w.GetLogger().Log("Worker %s is going down...", w.ID)
|
||||||
// fmt.Printf("Worker %s is going down...\n", w.ID)
|
// fmt.Printf("Worker %s is going down...\n", w.ID)
|
||||||
|
|
||||||
|
unbindQueues(w)
|
||||||
w.GetEventBus().Close()
|
w.GetEventBus().Close()
|
||||||
w.GetDatabase().Close()
|
w.GetDatabase().Close()
|
||||||
w.GetLogger().Log("Gone.")
|
w.GetLogger().Log("Gone.")
|
||||||
@ -162,6 +137,7 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
|
|||||||
msg, err := rabbitmq.Deserialize(m.Body)
|
msg, err := rabbitmq.Deserialize(m.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.GetLogger().Log("Deserialization error: %v\n", err)
|
w.GetLogger().Log("Deserialization error: %v\n", err)
|
||||||
|
fmt.Printf("Deserialization error: %v\n", err)
|
||||||
m.Reject(false)
|
m.Reject(false)
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -172,6 +148,7 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
|
|||||||
// reqID := (data["request_id"]).(string) // FIXME Check input params!
|
// reqID := (data["request_id"]).(string) // FIXME Check input params!
|
||||||
|
|
||||||
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
|
w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data)
|
||||||
|
fmt.Printf("Processing message \"%s\" with data: %v\n", name, data)
|
||||||
|
|
||||||
var ok = false
|
var ok = false
|
||||||
switch true { // Refactor -> use case for polymorphism
|
switch true { // Refactor -> use case for polymorphism
|
||||||
@ -179,8 +156,8 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
|
|||||||
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_ADDED_TO_BASKET)
|
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_ADDED_TO_BASKET)
|
||||||
case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
||||||
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_REMOVED_FROM_BASKET)
|
w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_REMOVED_FROM_BASKET)
|
||||||
// case strings.Contains(name, event.EVENT_BASKET_CHECKOUT):
|
case strings.Contains(name, event.EVENT_BASKET_CHECKOUT):
|
||||||
// w.Logger.Log("Event: %s", event.EVENT_BASKET_CHECKOUT)
|
w.GetLogger().Log("Event: %s", event.EVENT_BASKET_CHECKOUT)
|
||||||
}
|
}
|
||||||
|
|
||||||
// case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
// case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
||||||
@ -201,10 +178,49 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) {
|
|||||||
ok, _ = rnr.run(data)
|
ok, _ = rnr.run(data)
|
||||||
if ok {
|
if ok {
|
||||||
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
|
w.GetLogger().Log("Successful executed message \"%s\"\n", name)
|
||||||
|
fmt.Printf("Successful executed message \"%s\"\n", name)
|
||||||
m.Ack(false)
|
m.Ack(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.GetLogger().Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
|
w.GetLogger().Log("Error processing \"%s\": %v", name, err)
|
||||||
m.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...?
|
fmt.Printf("Error processing \"%s\": %v\n", name, err)
|
||||||
|
m.Reject(true) // FIXME: or Nack(repeat until success - maybe message shout know...?
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupQueues(w *Worker) {
|
||||||
|
err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange)
|
||||||
|
if err != nil {
|
||||||
|
w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err)
|
||||||
|
fmt.Printf("Failed to declare EventBus exchange: %v\n", err)
|
||||||
|
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
args := amqp.Table{}
|
||||||
|
args["x-message-ttl"] = 5
|
||||||
|
_, err = w.GetEventBus().QueueDeclare(
|
||||||
|
w.cnf.EventBusQueue, // name
|
||||||
|
true, // durable
|
||||||
|
false, // delete when unused
|
||||||
|
false, // exclusive
|
||||||
|
false, // no-wait
|
||||||
|
args, // arguments
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err)
|
||||||
|
fmt.Printf("Failed to declare EventBus queue: %v\n", err)
|
||||||
|
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productAddedToBasket")
|
||||||
|
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productRemovedFromBasket")
|
||||||
|
rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.updateQuantity")
|
||||||
|
}
|
||||||
|
|
||||||
|
func unbindQueues(w *Worker) {
|
||||||
|
w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.productAddedToBasket", w.cnf.EventBusExchange, nil)
|
||||||
|
w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.productRemovedFromBasket", w.cnf.EventBusExchange, nil)
|
||||||
|
w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.updateQuantity", w.cnf.EventBusExchange, nil)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user