This commit is contained in:
parent
dfc621e920
commit
58d32e702f
@ -40,7 +40,7 @@ func main() {
|
||||
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
|
||||
}
|
||||
|
||||
c := cnf.NewConfig()
|
||||
c := cnf.NewConfig("basket-migrator")
|
||||
|
||||
// dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
|
||||
@ -16,7 +15,7 @@ func main() {
|
||||
log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs)
|
||||
}
|
||||
|
||||
c := cnf.NewConfig()
|
||||
c := cnf.NewConfig("basket-server")
|
||||
srv := svr.New(
|
||||
c,
|
||||
svr.WithCache(c),
|
||||
@ -35,6 +34,5 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Println("Done.")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
@ -11,10 +11,10 @@ import (
|
||||
|
||||
func main() {
|
||||
if config.ErrLoadingEnvs != nil {
|
||||
log.Panicln("Error loading .env file.")
|
||||
log.Fatalln("Error loading .env file.")
|
||||
}
|
||||
|
||||
c := cnf.NewConfig()
|
||||
c := cnf.NewConfig("basket-worker")
|
||||
wrk := worker.New(
|
||||
c,
|
||||
worker.WithCache(c),
|
||||
@ -24,7 +24,7 @@ func main() {
|
||||
worker.WithRegistry(c),
|
||||
)
|
||||
|
||||
forever := make(chan struct{})
|
||||
wrk.Start()
|
||||
<-forever
|
||||
while := make(chan struct{})
|
||||
wrk.Start(while)
|
||||
<-while
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ const (
|
||||
defCachePassword = "12345678"
|
||||
defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce"
|
||||
defEventBusURL = "amqp://guest:guest@api-eventbus:56721"
|
||||
defKVNmspc = "dev.egommerce/service/basket-svc"
|
||||
defKVNmspc = "dev.egommerce/service/basket"
|
||||
defLoggerAddr = "api-logger:24224"
|
||||
defNetAddr = ":80"
|
||||
defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017"
|
||||
@ -34,17 +34,18 @@ type Config struct {
|
||||
EventBusQueue string `json:"eventbus_queue"`
|
||||
EventBusURL string `json:"eventbus_url"`
|
||||
LoggerAddr string `json:"logger_addr"`
|
||||
KVNamespace string
|
||||
RegistryAddr string
|
||||
|
||||
// Fields with JSON mappings are available through Consul KV storage
|
||||
}
|
||||
|
||||
func NewConfig() *Config {
|
||||
func NewConfig(name string) *Config {
|
||||
c := new(Config)
|
||||
c.Base = new(srv.Config)
|
||||
|
||||
c.Base.AppID, _ = os.Hostname()
|
||||
c.Base.AppName = cnf.GetEnv("APP_NAME", defAppName)
|
||||
c.Base.AppName = name
|
||||
c.Base.NetAddr = cnf.GetEnv("SERVER_ADDR", defNetAddr)
|
||||
c.Base.PathPrefix = cnf.GetEnv("APP_PATH_PREFIX", defPathPrefix)
|
||||
|
||||
@ -53,6 +54,7 @@ func NewConfig() *Config {
|
||||
c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL)
|
||||
c.EventBusExchange = defEbEventsExchange
|
||||
c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL)
|
||||
c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc)
|
||||
c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr)
|
||||
c.RegistryAddr = cnf.GetEnv("REGISTRY_ADDR", defRegistryAddr)
|
||||
|
||||
|
@ -1,17 +1,17 @@
|
||||
package event
|
||||
|
||||
const (
|
||||
EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasketEvent"
|
||||
EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasketEvent"
|
||||
EVENT_PRODUCT_ADDED_TO_BASKET = "event.ProductAddedToBasket"
|
||||
EVENT_PRODUCT_REMOVED_FROM_BASKET = "event.ProductRemovedFromBasket"
|
||||
)
|
||||
|
||||
type ProductAddedToBasketEvent struct {
|
||||
type ProductAddedToBasket struct {
|
||||
*Event
|
||||
ProductID string `json:"product_id"`
|
||||
BasketID string `json:"basket_id"`
|
||||
}
|
||||
|
||||
type ProductRemovedFromBasketEvent struct {
|
||||
type ProductRemovedFromBasket struct {
|
||||
*Event
|
||||
ProductID string `json:"product_id"`
|
||||
BasketID string `json:"basket_id"`
|
||||
|
@ -32,8 +32,6 @@ type (
|
||||
Eventbus *amqp.Channel
|
||||
Logger *fluentd.Logger
|
||||
Registry *consul.Service
|
||||
|
||||
kvNmspc string
|
||||
}
|
||||
|
||||
OptionFn func(*Server) error // FIXME: similar in worker
|
||||
@ -139,7 +137,7 @@ func WithRegistry(c *cnf.Config) OptionFn {
|
||||
go func() { // Consul KV updater
|
||||
ticker := time.NewTicker(time.Second * 15)
|
||||
for range ticker.C {
|
||||
updateKVConfig(s)
|
||||
fetchKVConfig(s) // FIXME: duplicated in worker
|
||||
}
|
||||
}()
|
||||
|
||||
@ -156,13 +154,13 @@ func WithRegistry(c *cnf.Config) OptionFn {
|
||||
|
||||
func (s *Server) Shutdown() srv.PurgeFn {
|
||||
return func(srv *srv.Server) error {
|
||||
// s.Logger.Log("Server is going down... Unregistering service: %s", s.Base.AppID)
|
||||
s.Logger.Log("Server is going down... Unregistering service...")
|
||||
s.Logger.Log("Server %s is going down...", s.Base.AppID)
|
||||
|
||||
s.Registry.Unregister()
|
||||
s.clearMetadataCache()
|
||||
s.Eventbus.Close()
|
||||
s.Database.Close()
|
||||
s.Logger.Log("Gone.")
|
||||
s.Logger.Close()
|
||||
|
||||
return s.Base.Shutdown()
|
||||
@ -170,8 +168,8 @@ func (s *Server) Shutdown() srv.PurgeFn {
|
||||
}
|
||||
|
||||
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||
func updateKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
|
||||
config, _, err := s.Registry.KV().Get(s.kvNmspc, nil)
|
||||
func fetchKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go
|
||||
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
|
||||
if err != nil || config == nil {
|
||||
return
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
)
|
||||
|
||||
func AddProductToBasket(srv *service.BasketService, productID, qty int, basketID, reqID string) (*model.BasketModel, error) {
|
||||
// FIXME: error occurs when 0 is passed
|
||||
ctx := context.Background()
|
||||
basket, err := srv.FetchFromDB(ctx, basketID)
|
||||
if err != nil {
|
||||
|
55
src/internal/worker/command.go
Normal file
55
src/internal/worker/command.go
Normal file
@ -0,0 +1,55 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"git.pbiernat.dev/egommerce/basket-service/internal/service"
|
||||
"git.pbiernat.dev/egommerce/basket-service/internal/ui"
|
||||
)
|
||||
|
||||
var (
|
||||
AddProductToBasket = "event.ProductAddedToBasket"
|
||||
RemoveProductFromBasket = "event.ProductRemovedFromBasket"
|
||||
)
|
||||
|
||||
type Command interface {
|
||||
run(CommandData) (bool, any)
|
||||
}
|
||||
|
||||
type CommandData map[string]interface{}
|
||||
|
||||
type CommandRunner struct {
|
||||
cmd Command
|
||||
}
|
||||
|
||||
func (r *CommandRunner) run(data CommandData) (bool, any) {
|
||||
return r.cmd.run(data)
|
||||
}
|
||||
|
||||
type AddProductToBasketCommand struct {
|
||||
srvc *service.BasketService
|
||||
}
|
||||
|
||||
func (c *AddProductToBasketCommand) run(data CommandData) (bool, any) {
|
||||
reqID := data["request_id"].(string) // FIXME Check input params!
|
||||
productID := int(data["product_id"].(float64)) // FIXME Check input params!
|
||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
qty := int(data["quantity"].(float64)) // FIXME Check input params!
|
||||
|
||||
basket, err := ui.AddProductToBasket(c.srvc, productID, qty, basketID, reqID)
|
||||
|
||||
return err == nil, basket
|
||||
}
|
||||
|
||||
type RemoveProductFromBasketCommand struct {
|
||||
srvc *service.BasketService
|
||||
}
|
||||
|
||||
func (c *RemoveProductFromBasketCommand) run(data CommandData) (bool, any) {
|
||||
reqID := data["request_id"].(string) // FIXME Check input params!
|
||||
productID := int(data["product_id"].(float64)) // FIXME Check input params!
|
||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
qty := int(data["quantity"].(float64)) // FIXME Check input params!
|
||||
|
||||
basket, err := ui.RemoveProductFromBasket(c.srvc, productID, qty, basketID, reqID)
|
||||
|
||||
return err == nil, basket
|
||||
}
|
@ -24,20 +24,16 @@ import (
|
||||
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
|
||||
"git.pbiernat.dev/egommerce/basket-service/internal/event"
|
||||
"git.pbiernat.dev/egommerce/basket-service/internal/service"
|
||||
"git.pbiernat.dev/egommerce/basket-service/internal/ui"
|
||||
)
|
||||
|
||||
type (
|
||||
Worker struct {
|
||||
Config *cnf.Config
|
||||
|
||||
Config *cnf.Config
|
||||
Cache *redis.Client
|
||||
Database *pgxpool.Pool
|
||||
Eventbus *amqp.Channel
|
||||
Logger *fluentd.Logger
|
||||
Registry *consul.Service
|
||||
|
||||
kvNmspc string
|
||||
}
|
||||
OptionFn func(*Worker) error // FIXME: similar in server/server.go
|
||||
)
|
||||
@ -56,101 +52,6 @@ func New(c *cnf.Config, opts ...OptionFn) *Worker {
|
||||
return wrk
|
||||
}
|
||||
|
||||
func (w *Worker) Start() error {
|
||||
// event consume
|
||||
msgs, err := w.Eventbus.Consume(
|
||||
w.Config.EventBusQueue, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
w.Logger.Log("Failed to register a consumer: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
forever := make(chan struct{})
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
w.Logger.Log("Shutting down %s worker...\n", w.Config.Base.GetAppFullName())
|
||||
w.Shutdown()
|
||||
|
||||
close(forever)
|
||||
}()
|
||||
|
||||
go func() {
|
||||
basketSrv := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger)
|
||||
// basketSrv := service.NewBasketService(w)
|
||||
|
||||
for d := range msgs {
|
||||
go func(d amqp.Delivery) {
|
||||
msg, err := rabbitmq.Deserialize(d.Body)
|
||||
if err != nil {
|
||||
w.Logger.Log("deserialize error: %v\n", err)
|
||||
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
|
||||
return
|
||||
}
|
||||
|
||||
eName := fmt.Sprintf("%s", msg["event"])
|
||||
data := (msg["data"]).(map[string]interface{})
|
||||
w.Logger.Log("Message<%s>: %v\n", eName, data)
|
||||
|
||||
reqID := data["request_id"].(string) // FIXME Check input params!
|
||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
|
||||
switch true {
|
||||
case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
||||
productID := int(data["product_id"].(float64))
|
||||
qty := int(data["quantity"].(float64))
|
||||
|
||||
basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID)
|
||||
if err == nil {
|
||||
w.Logger.Log("Product #%d added to basket #%s. ReqID: #%s", productID, basket.ID, reqID)
|
||||
}
|
||||
case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
||||
productID := int(data["product_id"].(float64))
|
||||
qty := int(data["quantity"].(float64))
|
||||
|
||||
basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID)
|
||||
if err == nil {
|
||||
w.Logger.Log("Product #%d removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
w.Logger.Log("%s error: %s", eName, err.Error())
|
||||
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
|
||||
return
|
||||
}
|
||||
|
||||
w.Logger.Log("ACK: %s", eName)
|
||||
d.Ack(false)
|
||||
}(d)
|
||||
}
|
||||
}()
|
||||
|
||||
w.Logger.Log("Waiting for messages...2")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) Shutdown() error {
|
||||
w.Logger.Log("Worker is going down...")
|
||||
|
||||
w.Registry.Unregister()
|
||||
w.Eventbus.Close()
|
||||
w.Database.Close()
|
||||
w.Logger.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithCache(c *cnf.Config) OptionFn {
|
||||
return func(w *Worker) error {
|
||||
conn := redis.NewClient(&redis.Options{
|
||||
@ -206,7 +107,7 @@ func WithEventbus(c *cnf.Config) OptionFn {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// w.BindQueues()
|
||||
// w.bindQueues()
|
||||
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket")
|
||||
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket")
|
||||
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity")
|
||||
@ -244,22 +145,22 @@ func WithRegistry(c *cnf.Config) OptionFn {
|
||||
w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
|
||||
}
|
||||
|
||||
go func() { // Consul KV updater
|
||||
w.Registry = registry
|
||||
|
||||
go func(w *Worker) { // Fetch Consul KV config and store it in app config
|
||||
ticker := time.NewTicker(time.Second * 15)
|
||||
for range ticker.C {
|
||||
updateKVConfig(w)
|
||||
fetchKVConfig(w) // FIXME: duplicated in server
|
||||
}
|
||||
}()
|
||||
|
||||
w.Registry = registry
|
||||
}(w)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
|
||||
func updateKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
|
||||
config, _, err := w.Registry.KV().Get(w.kvNmspc, nil)
|
||||
func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
|
||||
config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil)
|
||||
if err != nil || config == nil {
|
||||
return
|
||||
}
|
||||
@ -270,3 +171,114 @@ func updateKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and w
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Start(while chan struct{}) error {
|
||||
go func() {
|
||||
sigint := make(chan os.Signal, 1)
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
w.Shutdown()
|
||||
|
||||
close(while)
|
||||
}()
|
||||
|
||||
err := w.doWork()
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err)
|
||||
close(while)
|
||||
}
|
||||
|
||||
w.Logger.Log("Waiting for messages...")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) Shutdown() error {
|
||||
w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID)
|
||||
|
||||
w.Registry.Unregister()
|
||||
w.Eventbus.Close()
|
||||
w.Database.Close()
|
||||
w.Logger.Log("Gone.")
|
||||
w.Logger.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) doWork() error {
|
||||
msgs, err := w.Eventbus.Consume(
|
||||
w.Config.EventBusQueue, // queue
|
||||
"", // consumer
|
||||
false, // auto-ack
|
||||
false, // exclusive
|
||||
false, // no-local
|
||||
false, // no-wait
|
||||
nil, // args
|
||||
)
|
||||
if err != nil {
|
||||
w.Logger.Log("Failed to register a consumer: %s", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
go func() {
|
||||
basketSrvc := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger)
|
||||
|
||||
for d := range msgs {
|
||||
go func(d amqp.Delivery) {
|
||||
w.processMsg(basketSrvc, d)
|
||||
}(d)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) processMsg(srvc *service.BasketService, d amqp.Delivery) {
|
||||
msg, err := rabbitmq.Deserialize(d.Body)
|
||||
if err != nil {
|
||||
w.Logger.Log("Deserialization error: %v\n", err)
|
||||
d.Reject(false)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
rnr := &CommandRunner{}
|
||||
name := fmt.Sprintf("%s", msg["event"])
|
||||
data := (msg["data"]).(map[string]interface{})
|
||||
reqID := (data["request_id"]).(string) // FIXME Check input params!
|
||||
|
||||
w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data)
|
||||
|
||||
var ok = false
|
||||
switch true { // Refactor -> use case for polymorphism
|
||||
case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET):
|
||||
basketID := data["basket_id"].(string) // FIXME Check input params!
|
||||
productID := data["product_id"] // FIXME Check input params!
|
||||
|
||||
rnr.cmd = &AddProductToBasketCommand{srvc}
|
||||
w.Logger.Log("Adding product #%d to basket #%s. ReqID: #%s", productID, basketID, reqID)
|
||||
case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
|
||||
basketID := data["basket_id"].(string)
|
||||
productID := data["product_id"].(float64)
|
||||
|
||||
rnr.cmd = &RemoveProductFromBasketCommand{srvc}
|
||||
w.Logger.Log("Removing product #%d from basket #%s. ReqID: #%s", productID, basketID, reqID)
|
||||
}
|
||||
|
||||
ok, _ = rnr.run(data)
|
||||
if ok {
|
||||
w.Logger.Log("Successful executed message \"%s\"\n", name)
|
||||
return
|
||||
}
|
||||
|
||||
if !ok {
|
||||
w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err)
|
||||
d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...?
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
w.Logger.Log("Finalized processing: %s", name)
|
||||
d.Ack(false)
|
||||
}
|
||||
|
@ -1,7 +1,6 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
@ -49,7 +48,6 @@ func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error {
|
||||
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigint
|
||||
|
||||
fmt.Print("Shutting down... ")
|
||||
if err := prgFn(s); err != nil {
|
||||
log.Fatalf("Failed to shutdown server. Reason: %v\n", err)
|
||||
}
|
||||
@ -59,7 +57,7 @@ func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error {
|
||||
|
||||
err := s.Listen(s.addr)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to start server. Reason: %v\n", err)
|
||||
log.Fatalf("Failed to start server: %s. Reason: %v\n", s.Config.AppID, err)
|
||||
close(while)
|
||||
}
|
||||
<-while
|
||||
|
Loading…
Reference in New Issue
Block a user