diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4c1faf5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +.env.* +!.env.dist + +.vscode/ +__debug_bin \ No newline at end of file diff --git a/src/cmd/migrate/main.go b/src/cmd/migrate/main.go index 8894ff7..30388ee 100644 --- a/src/cmd/migrate/main.go +++ b/src/cmd/migrate/main.go @@ -11,8 +11,9 @@ import ( "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - "git.pbiernat.dev/egommerce/basket-service/internal/common" - cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" + baseCnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" + + cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" ) const ( @@ -35,25 +36,24 @@ Usage: ` func main() { - if cnf.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs) + if baseCnf.ErrLoadingEnvs != nil { + log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) } - c := common.NewConfig() + c := cnf.NewConfig() - // dbURL := cnf.GetEnv("DATABASE_URL", defDbURL) - mTblName := cnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) + // dbURL := baseCnf.GetEnv("DATABASE_URL", defDbURL) logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) if err != nil { log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err) } - logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) + logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) if err != nil { log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) } - defer logger.Close() + // defer logger.Close() flag.Usage = usage flag.Parse() @@ -65,9 +65,9 @@ func main() { Database: "egommerce", }) + mTbl := baseCnf.GetEnv("MIGRATIONS_TABLE_NAME", defMigrationsTableName) mig := migrations.NewCollection() - mig.SetTableName(mTblName) - + mig.SetTableName(mTbl) if err := mig.DiscoverSQLMigrations("./migrations"); err != nil { logger.Log("migration dicovery error: %#v", err) } diff --git a/src/cmd/server/main.go b/src/cmd/server/main.go index 9567505..e1cfe44 100644 --- a/src/cmd/server/main.go +++ b/src/cmd/server/main.go @@ -5,31 +5,30 @@ import ( "log" "os" - cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" + baseCnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" - "git.pbiernat.dev/egommerce/basket-service/internal/app/server" - "git.pbiernat.dev/egommerce/basket-service/internal/common" + cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" + svr "git.pbiernat.dev/egommerce/basket-service/internal/server" ) func main() { - if cnf.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs) + if baseCnf.ErrLoadingEnvs != nil { + log.Panicln("Error loading .env file", baseCnf.ErrLoadingEnvs) } - c := common.NewConfig() - fmt.Println("Creating new server...") - srv := server.New( + c := cnf.NewConfig() + srv := svr.New( c, - server.WithCache(c), - server.WithDatabase(c), - server.WithEventbus(c), - server.WithLogger(c), - server.WithRegistry(c), + svr.WithCache(c), + svr.WithDatabase(c), + svr.WithEventbus(c), + svr.WithLogger(c), + svr.WithRegistry(c), ) - forever := make(chan struct{}) - err := srv.Base.Start(forever, srv.Shutdown()) - <-forever + while := make(chan struct{}) + err := srv.Base.Start(while, srv.Shutdown()) + <-while if err != nil { log.Fatalf("Failed to start server. Reason: %v\n", err) @@ -38,43 +37,4 @@ func main() { fmt.Println("Done.") os.Exit(0) - - // c.AppDomain = cnf.GetEnv("APP_DOMAIN", defAppDomain) - // c.Port, _ = strconv.Atoi(c.NetAddr[1:]) - // c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc) - - // logHost, logPort := fluentd.ParseAddr(c.LoggerAddr) - // logger := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) - // defer logger.Close() - - // db conn - // dbConn, err := database.Connect(c.DbURL) - // if err != nil { // fixme: add wait-for-db... - // logger.Log("Failed to connect to Database server: %v\n", err) - // os.Exit(1) - // } - // defer dbConn.Close() - - // redis conn - // redis := redis.NewClient(&redis.Options{ - // Addr: c.CacheAddr, - // Password: c.CachePassword, - // DB: 0, - // }) - // defer redis.Close() - - // eventbus conn - // ebConn, ebCh, err := amqp.Open(c.EventBusURL) - // if err != nil { - // logger.Log("Failed to connect to EventBus server: %v\n", err) - // os.Exit(1) - // } - // defer ebCh.Close() - // defer amqp.Close(ebConn) - - // err = amqp.NewExchange(ebCh, c.EventBusExchange) - // if err != nil { - // logger.Log("Failed to declare EventBus exchange: %v\n", err) - // os.Exit(1) - // } } diff --git a/src/cmd/worker/main.go b/src/cmd/worker/main.go index 043b9d4..74281d3 100644 --- a/src/cmd/worker/main.go +++ b/src/cmd/worker/main.go @@ -1,217 +1,30 @@ package main import ( - "fmt" "log" - "os" - "os/signal" - "strings" - "syscall" - "time" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" - "github.com/go-redis/redis/v8" - "github.com/streadway/amqp" - - "git.pbiernat.dev/egommerce/go-api-pkg/consul" - - "git.pbiernat.dev/egommerce/basket-service/internal/app/event" - "git.pbiernat.dev/egommerce/basket-service/internal/app/service" - "git.pbiernat.dev/egommerce/basket-service/internal/app/ui" - "git.pbiernat.dev/egommerce/basket-service/internal/common" "git.pbiernat.dev/egommerce/basket-service/pkg/config" - "git.pbiernat.dev/egommerce/basket-service/pkg/database" -) -const ( -// defAppName = "basket-worker" -// defLoggerAddr = "api-logger:24224" -// defRegistryAddr = "api-registry:8500" -// defDbURL = "postgres://postgres:12345678@postgres-db:5432/egommerce" -// defCacheAddr = "api-cache:6379" -// defCachePassword = "12345678" -// defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" -// defEventBusURL = "amqp://guest:guest@api-eventbus:5672" -// ebEventsExchange = "api-events" -// ebEventsQueue = "basket-worker" -// defKVNmspc = "dev.egommerce/service/basket-worker" + cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" + "git.pbiernat.dev/egommerce/basket-service/internal/worker" ) func main() { if config.ErrLoadingEnvs != nil { - log.Panicln("Error loading .env file", config.ErrLoadingEnvs) + log.Panicln("Error loading .env file.") } - c := common.NewConfig() - logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) - if err != nil { - log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err) - } - - logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) - if err != nil { - log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) - } - defer logger.Close() - - // consul, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) - registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) - if err != nil { - logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) - } - - go func(consul *consul.Service) { - ticker := time.NewTicker(time.Second * 15) - for range ticker.C { - updateKVConfig(consul, c) // FIXME: duplicated in internal/app/server/server.go - } - }(registry) - - // db conn - dbConn, err := database.Connect(c.DbURL) - if err != nil { - logger.Log("Failed to connect to Database server: %v\n", err) - os.Exit(1) - } - defer dbConn.Close() - - // redis conn - redis := redis.NewClient(&redis.Options{ - Addr: c.CacheAddr, - Password: c.CachePassword, - DB: 0, - }) - defer redis.Close() - - // eventbus conn - ebConn, ebCh, err := rabbitmq.Open(c.EventBusURL) - if err != nil { - logger.Log("Failed to connect to EventBus server: %v\n", err) - os.Exit(1) - } - defer ebCh.Close() - defer rabbitmq.Close(ebConn) - - err = rabbitmq.NewExchange(ebCh, c.EventBusExchange) - if err != nil { - logger.Log("Failed to declare EventBus exchange: %v\n", err) - os.Exit(1) - } - - // create and bind queues - _, err = ebCh.QueueDeclare( - c.EventBusQueue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments + c := cnf.NewConfig() + wrk := worker.New( + c, + worker.WithCache(c), + worker.WithDatabase(c), + worker.WithEventbus(c), + worker.WithLogger(c), + worker.WithRegistry(c), ) - if err != nil { - logger.Log("Failed to declare EventBus queue: %v\n", err) - os.Exit(1) - } - - rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket") - rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket") - rabbitmq.BindQueueToExchange(ebCh, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity") - - // event consume - msgs, err := ebCh.Consume( - c.EventBusQueue, // queue - "", // consumer - false, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - logger.Log("Failed to register a consumer: %s", err) - os.Exit(1) - } forever := make(chan struct{}) - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint - - logger.Log("Worker %s stopped working...\n", c.GetAppFullName()) - - close(forever) - }() - - go func() { - basketSrv := service.NewBasketService(dbConn, redis, ebCh, logger) - - for d := range msgs { - go func(d amqp.Delivery) { - msg, err := rabbitmq.Deserialize(d.Body) - if err != nil { - logger.Log("deserialize error: %v\n", err) - d.Reject(false) // FIXME: or Nack? how to handle erros in queue... - return - } - - eName := fmt.Sprintf("%s", msg["event"]) - data := (msg["data"]).(map[string]interface{}) - logger.Log("Message<%s>: %v\n", eName, data) - - reqID := data["request_id"].(string) // FIXME Check input params! - basketID := data["basket_id"].(string) // FIXME Check input params! - - switch true { - case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET): - productID := int(data["product_id"].(float64)) - qty := int(data["quantity"].(float64)) - - basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID) - if err == nil { - logger.Log("Product #%d added to basket #%s. ReqID: #%s", productID, basket.ID, reqID) - } - case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): - productID := int(data["product_id"].(float64)) - qty := int(data["quantity"].(float64)) - - basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID) - if err == nil { - logger.Log("Product #%d removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID) - } - } - - if err != nil { - logger.Log("%s error: %s", eName, err.Error()) - d.Reject(false) // FIXME: or Nack? how to handle erros in queue... - return - } - - logger.Log("ACK: %s", eName) - d.Ack(false) - }(d) - } - }() - - logger.Log("Waiting for messages...") + wrk.Start() <-forever } - -func updateKVConfig(s *consul.Service, oldCnf *common.Config) error { // FIXME: duplicated in internal/app/server/server.go - // data, _, err := s.KV().Get(oldCnf.KVNamespace, nil) - // if err != nil { - // return err - // } - - // if data == nil { - // return errors.New("empty KV config data") - // } - - // buf := bytes.NewBuffer(data.Value) - // decoder := json.NewDecoder(buf) - // if err := decoder.Decode(oldCnf); err != nil { - // return err - // } - - return nil -} diff --git a/src/go.mod b/src/go.mod index ee0ac61..35f246b 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( git.pbiernat.dev/egommerce/api-entities v0.0.26 - git.pbiernat.dev/egommerce/go-api-pkg v0.0.150 + git.pbiernat.dev/egommerce/go-api-pkg v0.0.151 github.com/georgysavva/scany/v2 v2.0.0 github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/pg/v10 v10.10.7 diff --git a/src/go.sum b/src/go.sum index a0201ad..7acd140 100644 --- a/src/go.sum +++ b/src/go.sum @@ -7,6 +7,8 @@ git.pbiernat.dev/egommerce/go-api-pkg v0.0.149 h1:7K4z/XUMAPrnvOPcFfkeeNCpuc5IHo git.pbiernat.dev/egommerce/go-api-pkg v0.0.149/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= git.pbiernat.dev/egommerce/go-api-pkg v0.0.150 h1:DMM3Kxb6HNw4BExA7Ss7P9ivs+TIeO9DxjfHPKeWrSg= git.pbiernat.dev/egommerce/go-api-pkg v0.0.150/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.151 h1:MKf+tka3Bhh4Zbn5cLqO6H39gsf7el/GUT8ittaIujM= +git.pbiernat.dev/egommerce/go-api-pkg v0.0.151/go.mod h1:w2N79aoumjrrcrGPJLkCwxAHtrLd7G4Uj8VOxvPooa0= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= diff --git a/src/internal/common/config.go b/src/internal/config/config.go similarity index 70% rename from src/internal/common/config.go rename to src/internal/config/config.go index ca27f70..ca2a8e4 100644 --- a/src/internal/common/config.go +++ b/src/internal/config/config.go @@ -1,7 +1,6 @@ package common import ( - "fmt" "os" cnf "git.pbiernat.dev/egommerce/basket-service/pkg/config" @@ -27,14 +26,6 @@ const ( type Config struct { Base *srv.Config - // AppID string - // AppName string - // NetAddr string - // PathPrefix string - - // IdleTimeout time.Duration `json:"idle_timeout"` // miliseconds - // ReadTimeout time.Duration `json:"read_timeout"` // miliseconds - // WriteTimeout time.Duration `json:"write_timeout"` // miliseconds DbURL string `json:"db_url"` CacheAddr string `json:"cache_addr"` @@ -46,13 +37,6 @@ type Config struct { RegistryAddr string // Fields with JSON mappings are available through Consul KV storage - - // Port int - // KVNamespace string - // MongoDbUrl string `json:"mongodb_url"` - // HttpReadTimeout int `json:"http_read_timeout"` - // HttpWriteTimeout int `json:"http_write_timeout"` - // HttpIdleTimeout int `json:"http_idle_timeout"` } func NewConfig() *Config { @@ -74,11 +58,3 @@ func NewConfig() *Config { return c } - -func (c *Config) GetAppFullName() string { - return fmt.Sprintf("%s_%s", c.Base.AppName, c.Base.AppID) // @TODO check if Base prop can be private -} - -func (c *Config) GetListenAddr() string { - return "" // @TODO: Implement me! -} diff --git a/src/internal/app/event/basket.go b/src/internal/event/basket.go similarity index 100% rename from src/internal/app/event/basket.go rename to src/internal/event/basket.go diff --git a/src/internal/app/event/event.go b/src/internal/event/event.go similarity index 100% rename from src/internal/app/event/event.go rename to src/internal/event/event.go diff --git a/src/internal/app/event/order.go b/src/internal/event/order.go similarity index 100% rename from src/internal/app/event/order.go rename to src/internal/event/order.go diff --git a/src/internal/app/server/basket_handler.go b/src/internal/server/basket_handler.go similarity index 94% rename from src/internal/app/server/basket_handler.go rename to src/internal/server/basket_handler.go index f13d877..04d984f 100644 --- a/src/internal/app/server/basket_handler.go +++ b/src/internal/server/basket_handler.go @@ -5,8 +5,8 @@ import ( "time" "git.pbiernat.dev/egommerce/api-entities/http" - "git.pbiernat.dev/egommerce/basket-service/internal/app/service" - "git.pbiernat.dev/egommerce/basket-service/internal/app/ui" + "git.pbiernat.dev/egommerce/basket-service/internal/service" + "git.pbiernat.dev/egommerce/basket-service/internal/ui" "github.com/gofiber/fiber/v2" ) diff --git a/src/internal/app/server/health_handler.go b/src/internal/server/health_handler.go similarity index 100% rename from src/internal/app/server/health_handler.go rename to src/internal/server/health_handler.go diff --git a/src/internal/app/server/middleware.go b/src/internal/server/middleware.go similarity index 100% rename from src/internal/app/server/middleware.go rename to src/internal/server/middleware.go diff --git a/src/internal/app/server/router.go b/src/internal/server/router.go similarity index 100% rename from src/internal/app/server/router.go rename to src/internal/server/router.go diff --git a/src/internal/app/server/server.go b/src/internal/server/server.go similarity index 66% rename from src/internal/app/server/server.go rename to src/internal/server/server.go index 5f2319f..25b2fec 100644 --- a/src/internal/app/server/server.go +++ b/src/internal/server/server.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "encoding/json" - "fmt" "log" "os" "strconv" @@ -20,13 +19,13 @@ import ( db "git.pbiernat.dev/egommerce/basket-service/pkg/database" srv "git.pbiernat.dev/egommerce/basket-service/pkg/server" - "git.pbiernat.dev/egommerce/basket-service/internal/common" + cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" ) type ( Server struct { Base *srv.Server - Config *common.Config + Config *cnf.Config Cache *redis.Client Database *pgxpool.Pool @@ -37,10 +36,10 @@ type ( kvNmspc string } - OptionFn func(*Server) error + OptionFn func(*Server) error // FIXME: similar in worker ) -func New(c *common.Config, opts ...OptionFn) *Server { +func New(c *cnf.Config, opts ...OptionFn) *Server { svr := &Server{ Base: srv.New(c.Base), Config: c, @@ -48,7 +47,7 @@ func New(c *common.Config, opts ...OptionFn) *Server { for _, opt := range opts { if err := opt(svr); err != nil { - log.Fatalf("Failed to start HTTP Server. Err: %v\n", err) + log.Fatalf("Failed to attach extension to the server. Err: %v\n", err) } } @@ -58,7 +57,7 @@ func New(c *common.Config, opts ...OptionFn) *Server { return svr } -func WithCache(c *common.Config) OptionFn { +func WithCache(c *cnf.Config) OptionFn { redis := redis.NewClient(&redis.Options{ Addr: c.CacheAddr, Password: c.CachePassword, @@ -70,10 +69,9 @@ func WithCache(c *common.Config) OptionFn { return nil } - // defer redis.Close() } -func WithDatabase(c *common.Config) OptionFn { +func WithDatabase(c *cnf.Config) OptionFn { dbConn, err := db.Connect(c.DbURL) if err != nil { log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", c.DbURL, err) @@ -85,10 +83,9 @@ func WithDatabase(c *common.Config) OptionFn { return nil } - // defer dbConn.Close() } -func WithEventbus(c *common.Config) OptionFn { +func WithEventbus(c *cnf.Config) OptionFn { conn, err := amqp.Dial(c.EventBusURL) if err != nil { log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", c.EventBusURL, err) @@ -104,53 +101,49 @@ func WithEventbus(c *common.Config) OptionFn { return nil } - - // defer ebCh.Close() - // defer amqp.Close(ebConn) } -func WithLogger(c *common.Config) OptionFn { - logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) - if err != nil { - log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) - } - - logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) - if err != nil { - log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) - } - +func WithLogger(c *cnf.Config) OptionFn { return func(s *Server) error { + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) + } + + logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) + if err != nil { + log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) + } + s.Logger = logger return nil } - // defer logger.Close() } -func WithRegistry(c *common.Config) OptionFn { - port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which now will cause error - registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port) - if err != nil { - log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err) - } - - err = registry.Register() - if err != nil { - log.Fatalf("Failed to register in the Consul service. Err: %v", err) - } - +func WithRegistry(c *cnf.Config) OptionFn { return func(s *Server) error { + port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error + registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, c.Base.AppName, c.Base.PathPrefix, port) + if err != nil { + log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err) + } + + err = registry.Register() + if err != nil { + log.Fatalf("Failed to register in the Consul service. Err: %v", err) + } + s.Registry = registry - go func( /*s *Server*/ ) { // Consul KV updater + go func() { // Consul KV updater ticker := time.NewTicker(time.Second * 15) for range ticker.C { - s.updateKVConfig() + updateKVConfig(s) } }() - go func( /*s *Server*/ ) { // Server metadata cache updater + go func() { // Server metadata cache updater ticker := time.NewTicker(time.Second * 5) for range ticker.C { s.cacheMetadata() @@ -177,7 +170,7 @@ func (s *Server) Shutdown() srv.PurgeFn { } // @CHECK: merge s.Config and s.Base.Config to display all config as one array/map -func (s *Server) updateKVConfig() { +func updateKVConfig(s *Server) { // @FIXME: merge duplication in server.go and worker.go config, _, err := s.Registry.KV().Get(s.kvNmspc, nil) if err != nil || config == nil { return @@ -204,7 +197,6 @@ func (s *Server) cacheMetadata() { func (s *Server) clearMetadataCache() { ctx := context.Background() - fmt.Printf("metadata: %v", s.Config.Base) key, address := s.getMetadataIPsKey(), s.Config.Base.AppID s.Cache.LRem(ctx, key, 0, address) diff --git a/src/internal/app/service/basket.go b/src/internal/service/basket.go similarity index 98% rename from src/internal/app/service/basket.go rename to src/internal/service/basket.go index 12a88fb..6f52dd1 100644 --- a/src/internal/app/service/basket.go +++ b/src/internal/service/basket.go @@ -3,15 +3,17 @@ package service import ( "context" - "git.pbiernat.dev/egommerce/api-entities/model" - "git.pbiernat.dev/egommerce/basket-service/internal/app/event" - "git.pbiernat.dev/egommerce/go-api-pkg/api" - "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" - "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" "github.com/georgysavva/scany/v2/pgxscan" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v5/pgxpool" "github.com/streadway/amqp" + + "git.pbiernat.dev/egommerce/api-entities/model" + "git.pbiernat.dev/egommerce/go-api-pkg/api" + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" + + "git.pbiernat.dev/egommerce/basket-service/internal/event" ) const ( diff --git a/src/internal/app/ui/basket.go b/src/internal/ui/basket.go similarity index 87% rename from src/internal/app/ui/basket.go rename to src/internal/ui/basket.go index bf76ae3..22c00e8 100644 --- a/src/internal/app/ui/basket.go +++ b/src/internal/ui/basket.go @@ -3,9 +3,10 @@ package ui import ( "context" - def "git.pbiernat.dev/egommerce/api-entities/http" + entity "git.pbiernat.dev/egommerce/api-entities/http" "git.pbiernat.dev/egommerce/api-entities/model" - "git.pbiernat.dev/egommerce/basket-service/internal/app/service" + + "git.pbiernat.dev/egommerce/basket-service/internal/service" ) func AddProductToBasket(srv *service.BasketService, productID, qty int, basketID, reqID string) (*model.BasketModel, error) { @@ -74,15 +75,14 @@ func RemoveProductFromBasket(srv *service.BasketService, productID, qty int, bas return basket, nil } -func CheckoutBasket(srv *service.BasketService, basketID, reqID string) (*def.BasketCheckoutResponse, error) { - // ctx := context.Background() - res := &def.BasketCheckoutResponse{} +func CheckoutBasket(srv *service.BasketService, basketID, reqID string) (*entity.BasketCheckoutResponse, error) { + res := &entity.BasketCheckoutResponse{} basketID, err := srv.Checkout(reqID, basketID) if err != nil { return res, err } + res.ID = basketID - // ctx.Done() // FIXME return res, nil } diff --git a/src/internal/worker/worker.go b/src/internal/worker/worker.go new file mode 100644 index 0000000..8d9c588 --- /dev/null +++ b/src/internal/worker/worker.go @@ -0,0 +1,272 @@ +package worker + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/go-redis/redis/v8" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/streadway/amqp" + + "git.pbiernat.dev/egommerce/go-api-pkg/consul" + "git.pbiernat.dev/egommerce/go-api-pkg/fluentd" + "git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq" + + "git.pbiernat.dev/egommerce/basket-service/pkg/database" + + cnf "git.pbiernat.dev/egommerce/basket-service/internal/config" + "git.pbiernat.dev/egommerce/basket-service/internal/event" + "git.pbiernat.dev/egommerce/basket-service/internal/service" + "git.pbiernat.dev/egommerce/basket-service/internal/ui" +) + +type ( + Worker struct { + Config *cnf.Config + + Cache *redis.Client + Database *pgxpool.Pool + Eventbus *amqp.Channel + Logger *fluentd.Logger + Registry *consul.Service + + kvNmspc string + } + OptionFn func(*Worker) error // FIXME: similar in server/server.go +) + +func New(c *cnf.Config, opts ...OptionFn) *Worker { + wrk := &Worker{ + Config: c, + } + + for _, opt := range opts { + if err := opt(wrk); err != nil { + log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err) + } + } + + return wrk +} + +func (w *Worker) Start() error { + // event consume + msgs, err := w.Eventbus.Consume( + w.Config.EventBusQueue, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + w.Logger.Log("Failed to register a consumer: %s", err) + os.Exit(1) + } + + forever := make(chan struct{}) + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-sigint + + w.Logger.Log("Shutting down %s worker...\n", w.Config.Base.GetAppFullName()) + w.Shutdown() + + close(forever) + }() + + go func() { + basketSrv := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger) + // basketSrv := service.NewBasketService(w) + + for d := range msgs { + go func(d amqp.Delivery) { + msg, err := rabbitmq.Deserialize(d.Body) + if err != nil { + w.Logger.Log("deserialize error: %v\n", err) + d.Reject(false) // FIXME: or Nack? how to handle erros in queue... + return + } + + eName := fmt.Sprintf("%s", msg["event"]) + data := (msg["data"]).(map[string]interface{}) + w.Logger.Log("Message<%s>: %v\n", eName, data) + + reqID := data["request_id"].(string) // FIXME Check input params! + basketID := data["basket_id"].(string) // FIXME Check input params! + + switch true { + case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET): + productID := int(data["product_id"].(float64)) + qty := int(data["quantity"].(float64)) + + basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID) + if err == nil { + w.Logger.Log("Product #%d added to basket #%s. ReqID: #%s", productID, basket.ID, reqID) + } + case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): + productID := int(data["product_id"].(float64)) + qty := int(data["quantity"].(float64)) + + basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID) + if err == nil { + w.Logger.Log("Product #%d removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID) + } + } + + if err != nil { + w.Logger.Log("%s error: %s", eName, err.Error()) + d.Reject(false) // FIXME: or Nack? how to handle erros in queue... + return + } + + w.Logger.Log("ACK: %s", eName) + d.Ack(false) + }(d) + } + }() + + w.Logger.Log("Waiting for messages...2") + + return nil +} + +func (w *Worker) Shutdown() error { + w.Logger.Log("Worker is going down...") + + w.Registry.Unregister() + w.Eventbus.Close() + w.Database.Close() + w.Logger.Close() + + return nil +} + +func WithCache(c *cnf.Config) OptionFn { + return func(w *Worker) error { + conn := redis.NewClient(&redis.Options{ + Addr: c.CacheAddr, + Password: c.CachePassword, + DB: 0, + }) + + w.Cache = conn + + return nil + } +} + +func WithDatabase(c *cnf.Config) OptionFn { + return func(w *Worker) error { + conn, err := database.Connect(c.DbURL) + if err != nil { + w.Logger.Log("Failed to connect to Database server: %v\n", err) + os.Exit(1) + } + + w.Database = conn + + return nil + } +} + +func WithEventbus(c *cnf.Config) OptionFn { + return func(w *Worker) error { + _, chn, err := rabbitmq.Open(c.EventBusURL) + if err != nil { + w.Logger.Log("Failed to connect to EventBus server: %v\n", err) + os.Exit(1) + } + + err = rabbitmq.NewExchange(chn, c.EventBusExchange) + if err != nil { + w.Logger.Log("Failed to declare EventBus exchange: %v\n", err) + os.Exit(1) + } + + _, err = chn.QueueDeclare( + c.EventBusQueue, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + w.Logger.Log("Failed to declare EventBus queue: %v\n", err) + os.Exit(1) + } + + // w.BindQueues() + rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket") + rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket") + rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity") + + w.Eventbus = chn + + return nil + } +} + +func WithLogger(c *cnf.Config) OptionFn { + return func(w *Worker) error { + logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) + if err != nil { + w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) + os.Exit(1) + } + + logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) + if err != nil { + w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) + os.Exit(1) + } + + w.Logger = logger + + return nil + } +} + +func WithRegistry(c *cnf.Config) OptionFn { + return func(w *Worker) error { + registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) + if err != nil { + w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) + } + + go func() { // Consul KV updater + ticker := time.NewTicker(time.Second * 15) + for range ticker.C { + updateKVConfig(w) + } + }() + + w.Registry = registry + + return nil + } +} + +// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map +func updateKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go + config, _, err := w.Registry.KV().Get(w.kvNmspc, nil) + if err != nil || config == nil { + return + } + + kvCnf := bytes.NewBuffer(config.Value) + decoder := json.NewDecoder(kvCnf) + if err := decoder.Decode(&w.Config); err != nil { + return + } +} diff --git a/src/pkg/server/server.go b/src/pkg/server/server.go index 7893a74..eec40c2 100644 --- a/src/pkg/server/server.go +++ b/src/pkg/server/server.go @@ -14,24 +14,19 @@ import ( ) type ( - HeaderRequestID struct { - RequestID string `reqHeader:"x-request-id"` - } Server struct { *fiber.App *Config addr string // e.g. "127.0.0.1:8080" - // name string // e.g. "awesome-rest-api" // kvNmspc string - // cache *redis.Client - // db *pgxpool.Pool - // discovery *discovery.Service - // ebCh *amqp.Channel - // log *fluentd.Logger } - PurgeFn func(*Server) error + HeaderRequestID struct { + RequestID string `reqHeader:"x-request-id"` + } + OptionFn func(*Server) error + PurgeFn func(*Server) error ) func New(conf *Config) *Server { @@ -48,6 +43,30 @@ func New(conf *Config) *Server { } } +func (s *Server) Start(while chan struct{}, prgFn PurgeFn) error { + go func() { + sigint := make(chan os.Signal, 1) + signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-sigint + + fmt.Print("Shutting down... ") + if err := prgFn(s); err != nil { + log.Fatalf("Failed to shutdown server. Reason: %v\n", err) + } + + close(while) + }() + + err := s.Listen(s.addr) + if err != nil { + log.Fatalf("Failed to start server. Reason: %v\n", err) + close(while) + } + <-while + + return err +} + func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { var hdr = new(HeaderRequestID) if err := c.ReqHeaderParser(hdr); err != nil { @@ -60,26 +79,3 @@ func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { func (s *Server) Error(c *fiber.Ctx, code int, msg string) error { return c.Status(code).JSON(http.ErrorResponse{Error: msg}) } - -func (s *Server) Start(forever chan struct{}, prgFn PurgeFn) error { - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint - - fmt.Println("shutting down: after term signal.") - if err := prgFn(s); err != nil { - log.Fatalf("Failed to shutdown server. Reason: %v\n", err) - } - - close(forever) - }() - - err := s.Listen(s.addr) - <-forever - // if err := s.Listen(s.addr); err != nil { - // s.logger.Log("Failed to start server! Reason: %v\n", err) - // } - - return err -}