diff --git a/src/cmd/server/main.go b/src/cmd/server/main.go index 7722ff6..9c87bf8 100644 --- a/src/cmd/server/main.go +++ b/src/cmd/server/main.go @@ -1,13 +1,13 @@ package main import ( + "fmt" "log" "os" cnf "git.pbiernat.io/egommerce/go-api-pkg/config" "git.pbiernat.io/egommerce/catalog-service/internal/app" - "git.pbiernat.io/egommerce/catalog-service/internal/config" "git.pbiernat.io/egommerce/catalog-service/internal/server" ) @@ -16,24 +16,26 @@ func main() { log.Panicln("Error loading .env file", cnf.ErrLoadingEnvs) } - c := config.NewServerConfig("catalog") - srv := server.New(c) - app := app.NewApp(c, srv) + c := server.NewConfig("catalog") + cArr := c.GetArray() - app.RegisterHelper("cache", app.WithCache()) - app.RegisterHelper("database", app.WithDatabase()) - app.RegisterHelper("eventbus", app.WithEventbus()) - app.RegisterHelper("logger", app.WithLogger()) - app.RegisterHelper("registry", app.WithRegistry()) + doer := server.New(c) + a := app.NewApp(doer) + a.RegisterPlugin(app.LoggerPlugin(cArr)) + a.RegisterPlugin(app.CachePlugin(cArr)) + a.RegisterPlugin(app.DatabasePlugin(cArr)) + a.RegisterPlugin(app.EventbusPlugin(cArr)) + a.RegisterPlugin(app.RegistryPlugin(cArr)) while := make(chan struct{}) - err := app.Start(while) + err := a.Start(while) <-while if err != nil { - log.Fatalf("Failed to start application. Reason: %v\n", err) + log.Fatalf("Failed to start server. Reason: %v\n", err) os.Exit(1) } + fmt.Println("Gone") os.Exit(0) } diff --git a/src/cmd/worker/main.go b/src/cmd/worker/main.go index 24a5d34..e4d42d6 100644 --- a/src/cmd/worker/main.go +++ b/src/cmd/worker/main.go @@ -1,29 +1,33 @@ package main import ( + "fmt" "log" "os" - // "git.pbiernat.io/egommerce/go-api-pkg/config" + cnf "git.pbiernat.io/egommerce/go-api-pkg/config" + + "git.pbiernat.io/egommerce/catalog-service/internal/app" "git.pbiernat.io/egommerce/catalog-service/internal/worker" ) func main() { - // if config.ErrLoadingEnvs != nil { - // log.Fatalln("Error loading .env file.") - // } + if cnf.ErrLoadingEnvs != nil { + log.Fatalln("Error loading .env file.") + } - c := cnf.NewConfig("catalog-worker") - wrk := worker.New( - c, - worker.WithCache(c), - worker.WithDatabase(c), - worker.WithEventbus(c), - worker.WithLogger(c), - ) + c := worker.NewConfig("catalog-worker") + cArr := c.GetArray() + + doer := worker.New(c) + a := app.NewApp(doer) + a.RegisterPlugin(app.LoggerPlugin(cArr)) + a.RegisterPlugin(app.CachePlugin(cArr)) + a.RegisterPlugin(app.DatabasePlugin(cArr)) + a.RegisterPlugin(app.EventbusPlugin(cArr)) while := make(chan struct{}) - err := wrk.Start(while) + err := a.Start(while) <-while if err != nil { @@ -31,5 +35,6 @@ func main() { os.Exit(1) } + fmt.Println("Gone") os.Exit(0) } diff --git a/src/go.mod b/src/go.mod index 7430804..5f62dd6 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( git.pbiernat.io/egommerce/api-entities v0.2.3 - git.pbiernat.io/egommerce/go-api-pkg v0.2.72 + git.pbiernat.io/egommerce/go-api-pkg v0.2.88 github.com/georgysavva/scany/v2 v2.0.0 github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/pg/v10 v10.11.1 diff --git a/src/go.sum b/src/go.sum index 14ef396..83bf0eb 100644 --- a/src/go.sum +++ b/src/go.sum @@ -37,8 +37,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= git.pbiernat.io/egommerce/api-entities v0.2.3 h1:mR6EYfZkAzh4teydb7KXDBWoxwVW3qasnmmH5J3mnas= git.pbiernat.io/egommerce/api-entities v0.2.3/go.mod h1:INXAG5x4+i+vNwg1NpfPHiDW8nY1kn1K7pgLOtX+/I0= -git.pbiernat.io/egommerce/go-api-pkg v0.2.72 h1:289HdYxa5ZzKEirxZfat1i6B7jThvZwtvO3SZykFTcg= -git.pbiernat.io/egommerce/go-api-pkg v0.2.72/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980= +git.pbiernat.io/egommerce/go-api-pkg v0.2.88 h1:xya/39BnFeha3Oc76ad/ppoQd6AstTGQd87Qszamr1A= +git.pbiernat.io/egommerce/go-api-pkg v0.2.88/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980= github.com/Azure/azure-sdk-for-go v44.0.0+incompatible h1:e82Yv2HNpS0kuyeCrV29OPKvEiqfs2/uJHic3/3iKdg= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM= diff --git a/src/internal/app/app.go b/src/internal/app/app.go index da47d15..26225c9 100644 --- a/src/internal/app/app.go +++ b/src/internal/app/app.go @@ -6,64 +6,29 @@ import ( "os/signal" "strconv" "syscall" - - redis "github.com/go-redis/redis/v8" - "github.com/jackc/pgx/v5/pgxpool" - amqp "github.com/rabbitmq/amqp091-go" - - "git.pbiernat.io/egommerce/go-api-pkg/consul" - "git.pbiernat.io/egommerce/go-api-pkg/fluentd" - - cnf "git.pbiernat.io/egommerce/catalog-service/internal/config" - srv "git.pbiernat.io/egommerce/catalog-service/internal/server" - db "git.pbiernat.io/egommerce/catalog-service/pkg/database" ) type ( Doer interface { - Start(while chan struct{}) error - OnShutdown(*App) + Start() error + RegisterHandler(string, func() any) + OnShutdown() } Application interface { Start(while chan struct{}) - RegisterHelper(OptionFn) error + RegisterPlugin(PluginFn) error Shutdown() } - OptionFn func(*App) error App struct { - cnf *srv.ServerConfig - doer Doer - - // helpers - Cache *redis.Client - Database *pgxpool.Pool - Eventbus *amqp.Channel - Logger *fluentd.Logger - Registry *consul.Service } ) -func NewApp(c *cnf.ServerConfig, d Doer) *App { - app := &App{ - cnf: c, +func NewApp(d Doer) *App { + return &App{ doer: d, } - - // while := make(chan struct{}) - // // err := srv.Base.Start(while) - // err := app.Doer.Start(while) - // <-while - - // if err != nil { - // log.Fatalf("Failed to start application. Reason: %v\n", err) - // os.Exit(1) - // } - - // os.Exit(0) - - return app } func (a *App) Start(while chan struct{}) error { @@ -72,7 +37,6 @@ func (a *App) Start(while chan struct{}) error { signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-sigint - a.doer.OnShutdown(a) a.Shutdown() close(while) @@ -81,17 +45,24 @@ func (a *App) Start(while chan struct{}) error { run := a.createRunFile("./app.run") // FIXME path... defer a.removeRunFile(run) - a.doer.Start(while) + err := a.doer.Start() + if err != nil { + log.Fatalf("Failed to start app. Reason: %v\n", err) + close(while) + } + <-while + + return err } -func (a *App) RegisterHelper(name string, opt OptionFn) error { - if err := opt(a); err != nil { - log.Fatalf("Failed to attach extension to the server. Err: %v\n", err) - } +func (a *App) RegisterPlugin(p Plugin) error { + a.doer.RegisterHandler(p.name, p.fn) + + return nil } func (a *App) Shutdown() { - // ... + a.doer.OnShutdown() } func (a *App) createRunFile(path string) *os.File { @@ -108,108 +79,3 @@ func (a *App) createRunFile(path string) *os.File { func (a *App) removeRunFile(f *os.File) error { return f.Close() } - -func WithCache() OptionFn { - return func(a *App) error { - a.Cache = redis.NewClient(&redis.Options{ - Addr: a.cnf.CacheAddr, - Password: a.cnf.CachePassword, - DB: 0, - }) - - return nil - } -} - -func WithDatabase() OptionFn { - return func(a *App) error { - dbConn, err := db.Connect(a.cnf.DbURL) - if err != nil { - log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", a.cnf.DbURL, err) - os.Exit(1) // TODO: retry in background... for now treat as integraded service... - } - - a.Database = dbConn - - return nil - } -} - -func WithEventbus() OptionFn { - return func(a *App) error { - conn, err := amqp.Dial(a.cnf.EventBusURL) - if err != nil { - log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", a.cnf.EventBusURL, err) - } - - chn, err := conn.Channel() - if err != nil { - log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err) - } - - a.Eventbus = chn - - return nil - } -} - -func WithLogger() OptionFn { - return func(a *App) error { - logHost, logPort, err := fluentd.ParseAddr(a.cnf.LoggerAddr) - if err != nil { - log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", a.cnf.LoggerAddr, err) - } - - logger, err := fluentd.NewLogger(a.cnf.Base.GetAppFullName(), logHost, logPort) - if err != nil { - log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) - } - - a.Logger = logger - - return nil - } -} - -func WithRegistry() OptionFn { - return func(a *App) error { - port, _ := strconv.Atoi(a.cnf.Base.NetAddr[1:]) // FIXME: can be IP:PORT or :PORT - // log.Printf("Consul retrieved port: %v", port) - registry, err := consul.NewService(a.cnf.RegistryAddr, a.cnf.Base.AppID, a.cnf.Base.AppName, a.cnf.Base.RegistryDomainOverIP, a.cnf.Base.GetIP(), a.cnf.Base.AppDomain, a.cnf.Base.PathPrefix, port) - if err != nil { - log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err) - } - - err = registry.Register() - if err != nil { - log.Fatalf("Failed to register in the Consul service. Err: %v", err) - } - - registry.RegisterHealthChecks() - // a.registerKVUpdater() // FIXME run as goroutine - - a.Registry = registry - - // svc, _ := registry.Connect() - // tlsCnf := svc.ServerTLSConfig() - // s.Base.App.Server().TLSConfig = tlsCnf - // fmt.Println("Podmiana configa TLS") - // defer svc.Close() - - // go func() { // Consul KV updater - // ticker := time.NewTicker(time.Second * 15) - // for range ticker.C { - // fetchKVConfig(s) // FIXME: duplicated in worker - // } - // }() - - // go func() { // Server metadata cache updater - // ticker := time.NewTicker(time.Second * 5) - // for range ticker.C { - // s.cacheMetadata() - // } - // }() - - return nil - } -} diff --git a/src/internal/app/plugins.go b/src/internal/app/plugins.go new file mode 100644 index 0000000..d3c3e2c --- /dev/null +++ b/src/internal/app/plugins.go @@ -0,0 +1,139 @@ +package app + +import ( + "log" + "os" + "strconv" + + redis "github.com/go-redis/redis/v8" + amqp "github.com/rabbitmq/amqp091-go" + + "git.pbiernat.io/egommerce/go-api-pkg/consul" + "git.pbiernat.io/egommerce/go-api-pkg/fluentd" + + db "git.pbiernat.io/egommerce/catalog-service/pkg/database" +) + +type ( + Plugin struct { + name string + fn PluginFn + } + PluginFn func() any +) + +func CachePlugin(cArr map[string]string) Plugin { + return Plugin{ + name: "cache", + fn: func() any { + return redis.NewClient(&redis.Options{ + Addr: cArr["cacheAddr"], + Password: cArr["cachePassword"], + DB: 0, + }) + }, + } +} + +func DatabasePlugin(cArr map[string]string) Plugin { + return Plugin{ + name: "database", + fn: func() any { + dbConn, err := db.Connect(cArr["dbURL"]) + if err != nil { + log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err) + os.Exit(1) // TODO: retry in background... + } + + return dbConn + }, + } +} + +func EventbusPlugin(cArr map[string]string) Plugin { + return Plugin{ + name: "eventbus", + fn: func() any { + conn, err := amqp.Dial(cArr["eventBusURL"]) + if err != nil { + log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err) + os.Exit(1) // TODO: retry in background... + } + + chn, err := conn.Channel() + if err != nil { + log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err) + os.Exit(1) // TODO: retry in background... + } + + return chn + }, + } +} + +func LoggerPlugin(cArr map[string]string) Plugin { + return Plugin{ + name: "logger", + fn: func() any { + logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"]) + if err != nil { + log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err) + os.Exit(1) // TODO: retry in background... + } + + logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort) + if err != nil { + log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err) + os.Exit(1) // TODO: retry in background... + } + + return logger + }, + } +} + +func RegistryPlugin(cArr map[string]string) Plugin { + return Plugin{ + name: "registry", + fn: func() any { + port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT + // log.Printf("Consul retrieved port: %v", port) + registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port) + if err != nil { + log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err) + os.Exit(1) // TODO: retry in background... + } + + err = registry.Register() + if err != nil { + log.Fatalf("Failed to register in the Consul service. Err: %v", err) + os.Exit(1) // TODO: retry in background... + } + + registry.RegisterHealthChecks() + // a.registerKVUpdater() // FIXME run as goroutine + + return registry + + // svc, _ := registry.Connect() + // tlsCnf := svc.ServerTLSConfig() + // s.Base.App.Server().TLSConfig = tlsCnf + // fmt.Println("Podmiana configa TLS") + // defer svc.Close() + + // go func() { // Consul KV updater + // ticker := time.NewTicker(time.Second * 15) + // for range ticker.C { + // fetchKVConfig(s) // FIXME: duplicated in worker + // } + // }() + + // go func() { // Server metadata cache updater + // ticker := time.NewTicker(time.Second * 5) + // for range ticker.C { + // s.cacheMetadata() + // } + // }() + }, + } +} diff --git a/src/internal/server/catalog_handler.go b/src/internal/server/catalog_handler.go index 17b03b3..9d1add5 100644 --- a/src/internal/server/catalog_handler.go +++ b/src/internal/server/catalog_handler.go @@ -13,9 +13,9 @@ import ( ) func (s *Server) GetProductListHandler(c *fiber.Ctx) error { - reqID, _ := s.Base.GetRequestID(c) + reqID, _ := s.GetRequestID(c) req := new(def.GetProductListRequest) - catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger) + catalogSrv := service.NewCatalogService(s.GetDatabase(), s.GetEventBus(), s.GetLogger()) res, err := ui.GetProductList(catalogSrv, req.CategoryID, reqID) if err != nil { return err @@ -25,10 +25,10 @@ func (s *Server) GetProductListHandler(c *fiber.Ctx) error { } func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error { - reqID, _ := s.Base.GetRequestID(c) + reqID, _ := s.GetRequestID(c) req := new(def.AddProductToBasketRequest) if err := c.BodyParser(req); err != nil { - return s.Base.Error(c, 400, err.Error()) + return s.Error(c, 400, err.Error()) } basketID := prepareBasket(c) @@ -37,24 +37,24 @@ func (s *Server) AddProductToBasketHandler(c *fiber.Ctx) error { qty = req.Quantity } - catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger) + catalogSrv := service.NewCatalogService(s.GetDatabase(), s.GetEventBus(), s.GetLogger()) res, err := ui.AddProductToBasket(catalogSrv, req.ProductID, qty, basketID, reqID) if err != nil { - s.Logger.Log("AddProductToBasketHandler error: ", err) + s.GetLogger().Log("AddProductToBasketHandler error: ", err) if res.ProductID == 0 { - return s.Base.Error(c, 404, fmt.Sprintf("Product #%d not exists", req.ProductID)) + return s.Error(c, 404, fmt.Sprintf("Product #%d not exists", req.ProductID)) } - return s.Base.Error(c, 400, "Failed to add product to basket") + return s.Error(c, 400, "Failed to add product to basket") } return c.JSON(res) } func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error { - reqID, _ := s.Base.GetRequestID(c) + reqID, _ := s.GetRequestID(c) req := new(def.RemoveProductFromBasketRequest) if err := c.BodyParser(req); err != nil { - return s.Base.Error(c, 400, err.Error()) + return s.Error(c, 400, err.Error()) } basketID := prepareBasket(c) @@ -63,11 +63,11 @@ func (s *Server) RemoveProductFromBasketHandler(c *fiber.Ctx) error { qty = req.Quantity } - catalogSrv := service.NewCatalogService(s.Database, s.Eventbus, s.Logger) + catalogSrv := service.NewCatalogService(s.GetDatabase(), s.GetEventBus(), s.GetLogger()) res, err := ui.RemoveProductFromBasket(catalogSrv, req.ProductID, qty, basketID, reqID) if err != nil { - s.Logger.Log("RemoveProductFromBasketHandler error: ", err) - return s.Base.Error(c, 400, "Failed to remove product from basket") + s.GetLogger().Log("RemoveProductFromBasketHandler error: ", err) + return s.Error(c, 400, "Failed to remove product from basket") } return c.JSON(res) diff --git a/src/internal/config/server.go b/src/internal/server/config.go similarity index 75% rename from src/internal/config/server.go rename to src/internal/server/config.go index 65ed96a..b32d91d 100644 --- a/src/internal/config/server.go +++ b/src/internal/server/config.go @@ -1,4 +1,4 @@ -package config +package server import ( "fmt" @@ -26,7 +26,7 @@ const ( defEbEventsQueue = "catalog-svc" ) -type ServerConfig struct { +type Config struct { ID string Name string Domain string @@ -52,8 +52,8 @@ type ServerConfig struct { // Fields with JSON mappings are available through Consul KV storage } -func NewServerConfig(name string) *ServerConfig { - c := new(ServerConfig) +func NewConfig(name string) *Config { + c := new(Config) c.ID, _ = os.Hostname() c.Name = name @@ -74,15 +74,11 @@ func NewServerConfig(name string) *ServerConfig { return c } -func (c *ServerConfig) GetAppFullName() string { +func (c *Config) GetAppFullName() string { return fmt.Sprintf("%s_%s", c.Name, c.ID) } -// func (c *ServerConfig) GetAddressForRegistry() string { - -// } - -func (c *ServerConfig) GetIP() string { +func (c *Config) GetIP() string { host, _ := os.Hostname() ips, _ := net.LookupIP(host) // for _, ip := range ips { @@ -91,3 +87,25 @@ func (c *ServerConfig) GetIP() string { return ips[0].String() } + +func (c *Config) GetArray() map[string]string { // FIXME fix types etc + arr := make(map[string]string) + arr["id"] = c.ID + arr["name"] = c.Name + arr["appFullname"] = c.GetAppFullName() + arr["domain"] = c.Domain + arr["ip"] = c.GetIP() + arr["netAddr"] = c.NetAddr + arr["registryDomainOverIP"] = c.RegistryDomainOverIP + arr["pathPrefix"] = c.PathPrefix + arr["cacheAddr"] = c.CacheAddr + arr["cachePassword"] = c.CachePassword + arr["dbURL"] = c.DbURL + arr["eventBusExchange"] = c.EventBusExchange + arr["eventBusURL"] = c.EventBusURL + arr["kvNamespace"] = c.KVNamespace + arr["loggerAddr"] = c.LoggerAddr + arr["registryAddr"] = c.RegistryAddr + + return arr +} diff --git a/src/internal/server/middleware.go b/src/internal/server/middleware.go index 165472c..32328d9 100644 --- a/src/internal/server/middleware.go +++ b/src/internal/server/middleware.go @@ -12,7 +12,7 @@ import ( // "github.com/gofiber/fiber/v2/middleware/cors" func SetupMiddleware(s *Server) { - s.Base.Use(LoggingMiddleware(s.Logger)) + s.Use(LoggingMiddleware(s.GetLogger())) } func LoggingMiddleware(log *fluentd.Logger) func(c *fiber.Ctx) error { diff --git a/src/internal/server/server.go b/src/internal/server/server.go index 73393c4..5d1d426 100644 --- a/src/internal/server/server.go +++ b/src/internal/server/server.go @@ -1,78 +1,74 @@ package server import ( - "fmt" - "log" "net" "time" + "github.com/go-redis/redis/v8" "github.com/gofiber/fiber/v2" + "github.com/jackc/pgx/v5/pgxpool" + amqp "github.com/rabbitmq/amqp091-go" "git.pbiernat.io/egommerce/api-entities/http" - "git.pbiernat.io/egommerce/catalog-service/app" - cnf "git.pbiernat.io/egommerce/catalog-service/config" + "git.pbiernat.io/egommerce/go-api-pkg/consul" + "git.pbiernat.io/egommerce/go-api-pkg/fluentd" ) type ( Server struct { *fiber.App - cnf *cnf.ServerConfig - addr string // e.g. "127.0.0.1:80" - - ShutdownFn + ID string + addr string // e.g. "127.0.0.1:80" + handlers map[string]any } HeaderRequestID struct { RequestID string `reqHeader:"x-request-id"` } - ShutdownFn func() error ) -func New(conf *cnf.Config) *Server { +func New(c *Config) *Server { return &Server{ + ID: c.ID, App: fiber.New(fiber.Config{ - AppName: conf.ID, - ServerHeader: conf.Name + ":" + conf.ID, - ReadTimeout: conf.ReadTimeout * time.Millisecond, - WriteTimeout: conf.WriteTimeout * time.Millisecond, - IdleTimeout: conf.IdleTimeout * time.Millisecond, + AppName: c.ID, + ServerHeader: c.Name + ":" + c.ID, + ReadTimeout: c.ReadTimeout * time.Millisecond, + WriteTimeout: c.WriteTimeout * time.Millisecond, + IdleTimeout: c.IdleTimeout * time.Millisecond, }), - cnf: conf, - addr: conf.NetAddr, + addr: c.NetAddr, + handlers: make(map[string]any), } } -func (s *Server) Init() { - // svr.Base.ShutdownFn = svr.Shutdown() - +func (s *Server) Start() error { SetupMiddleware(s) SetupRouter(s) -} -func (s *Server) Start(while chan struct{}) { - // fmt.Println("[DOER] Started server at: " + s.addr) + // fmt.Printf("Starting server at: %s...\n", s.addr) ln, _ := net.Listen("tcp", s.addr) // ln = tls.NewListener(ln, s.App.Server().TLSConfig) - err := s.Listener(ln) - if err != nil { - log.Fatalf("Failed to start server: %s. Reason: %v\n", s.cnf.ID, err) - close(while) - } - <-while - // return err + return s.Listener(ln) } -func (s *Server) OnShutdown(a *app.App) { - a.Logger.Log("Server %s is going down...", a.cnf.ID) +func (s *Server) RegisterHandler(name string, fn func() any) { + // fmt.Printf("Registering plugin( with handler): %s... OK\n", name) + s.handlers[name] = fn() +} - fmt.Println("Unregistering...") - a.Registry.Unregister() +func (s *Server) OnShutdown() { + // s.GetLogger().Log("Server %s is going down...", s.ID) + + s.GetRegistry().Unregister() // a.clearMetadataCache() - a.Eventbus.Close() - a.Database.Close() - a.Logger.Log("Gone.") - a.Logger.Close() + s.GetEventBus().Close() + s.GetDatabase().Close() + s.GetLogger().Log("Gone.") + s.GetLogger().Close() + + s.Shutdown() } func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) { @@ -88,6 +84,27 @@ func (s *Server) Error(c *fiber.Ctx, code int, msg string) error { return c.Status(code).JSON(http.ErrorResponse{Error: msg}) } +// Plugin helper funcitons +func (s *Server) GetCache() *redis.Client { + return (s.handlers["cache"]).(*redis.Client) +} + +func (s *Server) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue + return (s.handlers["database"]).(*pgxpool.Pool) +} + +func (s *Server) GetEventBus() *amqp.Channel { + return (s.handlers["eventbus"]).(*amqp.Channel) +} + +func (s *Server) GetLogger() *fluentd.Logger { + return (s.handlers["logger"]).(*fluentd.Logger) +} + +func (s *Server) GetRegistry() *consul.Service { + return (s.handlers["registry"]).(*consul.Service) +} + // @CHECK: merge s.Config and s.Base.Config to display all config as one array/map // func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go // go func() { diff --git a/src/internal/service/catalog.go b/src/internal/service/catalog.go index dacf538..8c3d28c 100644 --- a/src/internal/service/catalog.go +++ b/src/internal/service/catalog.go @@ -5,8 +5,10 @@ import ( "git.pbiernat.io/egommerce/api-entities/model" "git.pbiernat.io/egommerce/catalog-service/internal/event" + "git.pbiernat.io/egommerce/go-api-pkg/fluentd" "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq" + "github.com/georgysavva/scany/v2/pgxscan" "github.com/jackc/pgx/v5/pgxpool" amqp "github.com/rabbitmq/amqp091-go" diff --git a/src/internal/worker/config.go b/src/internal/worker/config.go new file mode 100644 index 0000000..96552b6 --- /dev/null +++ b/src/internal/worker/config.go @@ -0,0 +1,85 @@ +package worker + +import ( + "fmt" + "net" + "os" + + cnf "git.pbiernat.io/egommerce/go-api-pkg/config" +) + +const ( + defName = "catalog-worker" + defDomain = "catalog-worker" + defCacheAddr = "egommerce.local:6379" + defCachePassword = "12345678" + defDbURL = "postgres://postgres:12345678@db-postgres:5432/egommerce" + defEventBusURL = "amqp://guest:guest@api-eventbus:5672" + defKVNmspc = "dev.egommerce/service/catalog-worker" + defLoggerAddr = "api-logger:24224" + defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" + defEbEventsExchange = "api-events" + defEbEventsQueue = "catalog-svc" +) + +type Config struct { + ID string + Name string + + LoggerAddr string `json:"logger_addr"` + DbURL string `json:"db_url"` + CacheAddr string `json:"cache_addr"` + CachePassword string `json:"cache_password"` + MongoDbUrl string `json:"mongodb_url"` + EventBusURL string `json:"eventbus_url"` + EventBusExchange string `json:"eventbus_exchange"` + EventBusQueue string `json:"eventbus_queue"` + KVNamespace string +} + +func NewConfig(name string) *Config { + c := new(Config) + + c.ID, _ = os.Hostname() + c.Name = name + + c.CacheAddr = cnf.GetEnv("CACHE_ADDR", defCacheAddr) + c.CachePassword = cnf.GetEnv("CACHE_PASSWORD", defCachePassword) + c.DbURL = cnf.GetEnv("DATABASE_URL", defDbURL) + c.EventBusExchange = defEbEventsExchange + c.EventBusURL = cnf.GetEnv("EVENTBUS_URL", defEventBusURL) + c.KVNamespace = cnf.GetEnv("APP_KV_NAMESPACE", defKVNmspc) + c.LoggerAddr = cnf.GetEnv("LOGGER_ADDR", defLoggerAddr) + + return c +} + +func (c *Config) GetAppFullName() string { + return fmt.Sprintf("%s_%s", c.Name, c.ID) +} + +func (c *Config) GetIP() string { + host, _ := os.Hostname() + ips, _ := net.LookupIP(host) + // for _, ip := range ips { + // return ip.String() + // } + + return ips[0].String() +} + +func (c *Config) GetArray() map[string]string { // FIXME fix types etc + arr := make(map[string]string) + arr["id"] = c.ID + arr["name"] = c.Name + arr["appFullname"] = c.GetAppFullName() + arr["cacheAddr"] = c.CacheAddr + arr["cachePassword"] = c.CachePassword + arr["dbURL"] = c.DbURL + arr["eventBusExchange"] = c.EventBusExchange + arr["eventBusURL"] = c.EventBusURL + arr["kvNamespace"] = c.KVNamespace + arr["loggerAddr"] = c.LoggerAddr + + return arr +} diff --git a/src/internal/worker/worker.go b/src/internal/worker/worker.go index 1c7146c..704ee81 100644 --- a/src/internal/worker/worker.go +++ b/src/internal/worker/worker.go @@ -4,133 +4,161 @@ import ( "fmt" "log" "os" - "os/signal" - "strconv" "strings" - "syscall" "github.com/go-redis/redis/v8" "github.com/jackc/pgx/v5/pgxpool" amqp "github.com/rabbitmq/amqp091-go" - "git.pbiernat.io/egommerce/go-api-pkg/consul" "git.pbiernat.io/egommerce/go-api-pkg/fluentd" "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq" "git.pbiernat.io/egommerce/catalog-service/internal/event" - cnf "git.pbiernat.io/egommerce/catalog-service/internal/server" "git.pbiernat.io/egommerce/catalog-service/internal/service" ) type ( Worker struct { - Config *cnf.Config - Cache *redis.Client - Database *pgxpool.Pool - Eventbus *amqp.Channel - Logger *fluentd.Logger - Registry *consul.Service + ID string + cnf *Config + handlers map[string]any + services map[string]any + doWrkUntil chan struct{} } - OptionFn func(*Worker) error // FIXME: similar in server/server.go ) -func New(c *cnf.Config, opts ...OptionFn) *Worker { - w := &Worker{ - Config: c, +func New(c *Config) *Worker { + return &Worker{ + ID: c.ID, + cnf: c, + handlers: make(map[string]any), + services: make(map[string]any), + doWrkUntil: make(chan struct{}), } - - for _, opt := range opts { - if err := opt(w); err != nil { - log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err) - } - } - - return w } -func (w *Worker) Start(while chan struct{}) error { - go func() { - sigint := make(chan os.Signal, 1) - signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-sigint - - w.Shutdown() - close(while) - }() - - run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker) - defer w.removeRunFile(run) - - err := w.doWork() +func (w *Worker) Start() error { + // Init + err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange) if err != nil { - log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.Config.Base.AppID, err) - close(while) - } + w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err) + fmt.Printf("Failed to declare EventBus exchange: %v\n", err) - w.Logger.Log("Waiting for messages...") - - return nil -} - -func (w *Worker) Shutdown() error { - w.Logger.Log("Worker %s is going down...", w.Config.Base.AppID) - - w.Registry.Unregister() - w.Eventbus.Close() - w.Database.Close() - w.Logger.Log("Gone.") - w.Logger.Close() - - return nil -} - -func (w *Worker) createRunFile(path string) *os.File { - run, err := os.Create(path) - if err != nil { - log.Fatalf("Failed to create run file. Reason: %v\n", err) os.Exit(1) } - run.WriteString(strconv.Itoa(os.Getpid())) - return run -} - -func (w *Worker) removeRunFile(f *os.File) error { - return f.Close() -} - -func (w *Worker) doWork() error { - msgs, err := w.Eventbus.Consume( - w.Config.EventBusQueue, // queue - "", // consumer - false, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args + _, err = w.GetEventBus().QueueDeclare( + w.cnf.EventBusQueue, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments ) if err != nil { - w.Logger.Log("Failed to register a consumer: %s", err) + w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err) + fmt.Printf("Failed to declare EventBus queue: %v\n", err) + os.Exit(1) } - go func() { - catalogSrvc := service.NewCatalogService(w.Database, w.Eventbus, w.Logger) + err = w.doWork(w.doWrkUntil) + if err != nil { + log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err) + close(w.doWrkUntil) + } + <-w.doWrkUntil + return err + + // go func() { + // sigint := make(chan os.Signal, 1) + // signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + // <-sigint + + // w.Shutdown() + // close(while) + // }() + + // run := w.createRunFile("./app.run") // TODO move to common library (shared between server and worker) + // defer w.removeRunFile(run) + + // w.Logger.Log("Waiting for messages...") + + // return nil +} + +func (w *Worker) RegisterHandler(name string, fn func() any) { + // fmt.Printf("Registering plugin( with handler): %s... OK\n", name) + w.handlers[name] = fn() +} + +func (w *Worker) OnShutdown() { + w.GetLogger().Log("Worker %s is going down...", w.ID) + // fmt.Printf("Worker %s is going down...\n", w.ID) + w.GetEventBus().Close() + w.GetDatabase().Close() + w.GetLogger().Log("Gone.") + w.GetLogger().Close() + + close(w.doWrkUntil) +} + +// Plugin helper funcitons +func (w *Worker) GetCache() *redis.Client { + return (w.handlers["cache"]).(*redis.Client) +} + +func (w *Worker) GetDatabase() *pgxpool.Pool { // FIXME hardcoded index issue + return (w.handlers["database"]).(*pgxpool.Pool) +} + +func (w *Worker) GetEventBus() *amqp.Channel { + return (w.handlers["eventbus"]).(*amqp.Channel) +} + +func (w *Worker) GetLogger() *fluentd.Logger { + return (w.handlers["logger"]).(*fluentd.Logger) +} + +func (w *Worker) doWork(while chan struct{}) error { + w.services["catalog"] = + service.NewCatalogService(w.GetDatabase(), w.GetEventBus(), w.GetLogger()) + + cSrv := (w.services["catalog"]).(*service.CatalogService) // FIXME simplify maybe...? + + msgs, err := w.GetEventBus().Consume( + w.cnf.EventBusQueue, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + w.GetLogger().Log("Failed to register a consumer: %s", err) + fmt.Printf("Failed to register a consumer: %s", err) + os.Exit(1) + // close(while) + } + + go func() { for d := range msgs { - go func(d amqp.Delivery) { - w.processMsg(catalogSrvc, d) - }(d) + // go func(d amqp.Delivery) { + w.processMsg(cSrv, d) + // }(d) } }() + <-while return nil } -func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) { +func (w *Worker) processMsg(cSrv *service.CatalogService, d amqp.Delivery) { msg, err := rabbitmq.Deserialize(d.Body) if err != nil { - w.Logger.Log("Deserialization error: %v\n", err) + w.GetLogger().Log("Deserialization error: %v\n", err) d.Reject(false) return @@ -140,22 +168,22 @@ func (w *Worker) processMsg(srvc *service.CatalogService, d amqp.Delivery) { data := (msg["data"]).(map[string]interface{}) // reqID := (data["request_id"]).(string) // FIXME Check input params! - w.Logger.Log("Processing message \"%s\" with data: %v\n", name, data) + w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data) var ok = false switch true { // Refactor -> use case for polymorphism case strings.Contains(name, event.EVENT_WAREHOUSE_STOCK_UPDATED): - w.Logger.Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED) + w.GetLogger().Log("Event: %s", event.EVENT_WAREHOUSE_STOCK_UPDATED) } - rnr := NewCommandRunner(data, srvc) + rnr := NewCommandRunner(data, cSrv) ok, _ = rnr.run(data) if ok { - w.Logger.Log("Successful executed message \"%s\"\n", name) + w.GetLogger().Log("Successful executed message \"%s\"\n", name) d.Ack(false) return } - w.Logger.Log("Error processing \"%s\": %s (%v)", name, err.Error(), err) + w.GetLogger().Log("Error processing \"%s\": %s (%v)", name, err.Error(), err) d.Reject(false) // FIXME: or Nack(repeat until success - maybe message shoud know...? }