From faefaceaf17a96343d265001fba5cc39b03f7fbe Mon Sep 17 00:00:00 2001 From: Piotr Biernat Date: Sat, 20 Jul 2024 19:20:44 +0200 Subject: [PATCH] Refactor, TLS support, v0.5 --- .app.config | 14 ++++ .env.dist | 2 +- Dockerfile.target | 3 +- bin/entrypoint.sh | 4 +- bin/migrate.sh | 1 + src/cmd/migrate/main.go | 2 +- src/go.mod | 2 +- src/go.sum | 4 +- src/internal/server/config.go | 2 +- src/internal/server/server.go | 12 ++- src/internal/service/basket.go | 4 +- src/internal/ui/basket.go | 6 +- src/internal/worker/ext.go | 133 --------------------------------- src/internal/worker/worker.go | 82 ++++++++++++-------- 14 files changed, 89 insertions(+), 182 deletions(-) create mode 100644 .app.config delete mode 100644 src/internal/worker/ext.go diff --git a/.app.config b/.app.config new file mode 100644 index 0000000..57efa4e --- /dev/null +++ b/.app.config @@ -0,0 +1,14 @@ +{ + "ID": "basket", + "Name": "basket", + "Address": "__IP__", + "Tags": ["basket-svc", "basket", "https", "service"], + "Port": 443, + "Connect": { + "Native": true + }, + "Check": { + "TCP": "__IP__:443", + "DeregisterCriticalServiceAfter": "10s" + } +} \ No newline at end of file diff --git a/.env.dist b/.env.dist index 17976d2..757a003 100644 --- a/.env.dist +++ b/.env.dist @@ -1,4 +1,4 @@ -SERVER_ADDR=:80 +SERVER_ADDR=:443 APP_NAME=basket-svc APP_DOMAIN=basket.service.ego.io diff --git a/Dockerfile.target b/Dockerfile.target index 6ae777c..1980273 100644 --- a/Dockerfile.target +++ b/Dockerfile.target @@ -22,12 +22,13 @@ COPY --from=builder $BIN_OUTPUT /app COPY --from=builder /go/bin/migrate /bin/migrate COPY --from=builder /go/bin/health /bin/health COPY .env.docker /.env +COPY ./.app.config / COPY ./bin /bin RUN chmod 755 /bin/entrypoint.sh /bin/migrate.sh RUN apk add curl -EXPOSE 80 +EXPOSE 443 ENTRYPOINT ["entrypoint.sh"] CMD ["sh", "-c", "/app"] diff --git a/bin/entrypoint.sh b/bin/entrypoint.sh index a7b5e8c..a002a1b 100755 --- a/bin/entrypoint.sh +++ b/bin/entrypoint.sh @@ -24,8 +24,8 @@ waitForService "api-eventbus:5672" # waitForService "esb.service.ego.io:5672" waitForService "api-logger:24224" # waitForService "logger.service.ego.io:24224" -waitForService "pricing-svc:80" -# waitForService "pricing.service.ego.io:80" +waitForService "pricing-svc:443" +# waitForService "pricing.service.ego.io:443" # run migrations migrate.sh diff --git a/bin/migrate.sh b/bin/migrate.sh index 9766c7e..34fc7d5 100644 --- a/bin/migrate.sh +++ b/bin/migrate.sh @@ -21,5 +21,6 @@ fi # run migrations migrate up +echo "Done." exit $version diff --git a/src/cmd/migrate/main.go b/src/cmd/migrate/main.go index 94aba4d..6e4fa55 100644 --- a/src/cmd/migrate/main.go +++ b/src/cmd/migrate/main.go @@ -55,7 +55,7 @@ func main() { log.Fatalf("Error parsing logger addr: %s. Err: %v", c.LoggerAddr, err) } - logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) + logger, err := fluentd.NewLogger(c.GetAppFullName(), logHost, logPort) // @Refactor NewLogger return (logger, error) if err != nil { log.Fatalf("Error connecting to %s:%d. Err: %v", logHost, logPort, err) } diff --git a/src/go.mod b/src/go.mod index 6ad9164..e29e0fd 100644 --- a/src/go.mod +++ b/src/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( git.pbiernat.io/egommerce/api-entities v0.2.3 - git.pbiernat.io/egommerce/go-api-pkg v0.2.88 + git.pbiernat.io/egommerce/go-api-pkg v0.3.18 github.com/georgysavva/scany/v2 v2.0.0 github.com/go-pg/migrations/v8 v8.1.0 github.com/go-pg/pg/v10 v10.10.7 diff --git a/src/go.sum b/src/go.sum index 99ea1fb..9ebe000 100644 --- a/src/go.sum +++ b/src/go.sum @@ -37,8 +37,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= git.pbiernat.io/egommerce/api-entities v0.2.3 h1:mR6EYfZkAzh4teydb7KXDBWoxwVW3qasnmmH5J3mnas= git.pbiernat.io/egommerce/api-entities v0.2.3/go.mod h1:INXAG5x4+i+vNwg1NpfPHiDW8nY1kn1K7pgLOtX+/I0= -git.pbiernat.io/egommerce/go-api-pkg v0.2.88 h1:xya/39BnFeha3Oc76ad/ppoQd6AstTGQd87Qszamr1A= -git.pbiernat.io/egommerce/go-api-pkg v0.2.88/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980= +git.pbiernat.io/egommerce/go-api-pkg v0.3.18 h1:0+C9BMsllrNvRbh4kb7dJ5lrzP1Lc7J4pb+KV76YrXk= +git.pbiernat.io/egommerce/go-api-pkg v0.3.18/go.mod h1:XIy2mmvRNIzQmYIUAcDZafhRPxTQFS2HDmsK7ZQ6980= github.com/Azure/azure-sdk-for-go v44.0.0+incompatible h1:e82Yv2HNpS0kuyeCrV29OPKvEiqfs2/uJHic3/3iKdg= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM= diff --git a/src/internal/server/config.go b/src/internal/server/config.go index c5b43ba..64b9b63 100644 --- a/src/internal/server/config.go +++ b/src/internal/server/config.go @@ -18,7 +18,7 @@ const ( defEventBusURL = "amqp://guest:guest@api-eventbus:5672" defKVNmspc = "dev.egommerce/service/basket" defLoggerAddr = "api-logger:24224" - defNetAddr = ":80" + defNetAddr = ":443" defMongoDbURL = "mongodb://mongodb:12345678@mongo-db:27017" defPathPrefix = "/basket" defRegistryAddr = "api-registry:8500" diff --git a/src/internal/server/server.go b/src/internal/server/server.go index 6799c72..1bfa7b6 100644 --- a/src/internal/server/server.go +++ b/src/internal/server/server.go @@ -1,6 +1,8 @@ package server import ( + "crypto/tls" + "log" "net" "time" @@ -19,7 +21,7 @@ type ( *fiber.App ID string - addr string // e.g. "127.0.0.1:80" + addr string // e.g. "127.0.0.1:443" handlers map[string]any } HeaderRequestID struct { @@ -47,8 +49,14 @@ func (s *Server) Start() error { SetupRouter(s) // fmt.Printf("Starting server at: %s...\n", s.addr) + cer, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key") + if err != nil { + log.Fatal(err) + } + tlsCnf := &tls.Config{Certificates: []tls.Certificate{cer}} + ln, _ := net.Listen("tcp", s.addr) - // ln = tls.NewListener(ln, s.App.Server().TLSConfig) + ln = tls.NewListener(ln, tlsCnf) return s.Listener(ln) } diff --git a/src/internal/service/basket.go b/src/internal/service/basket.go index e294672..e89b1e4 100644 --- a/src/internal/service/basket.go +++ b/src/internal/service/basket.go @@ -107,7 +107,6 @@ func (s *BasketService) RemoveItem(ctx context.Context, itemID int, basketID str if _, err := s.dbConn.Exec(ctx, sql, basketID, itemID); err != nil { return err } - // update basket updated_at field... return nil @@ -121,10 +120,9 @@ func (s *BasketService) UpdateItem(ctx context.Context, item *model.BasketItemMo if err == nil { price = productPrice.Price } - now := time.Now() sql := `UPDATE basket.basket_item SET quantity=$1, price=$2, updated_at=$3 WHERE basket_id=$4 AND product_id=$5` - if _, err := s.dbConn.Exec(ctx, sql, qty, price, now, item.BasketID, item.ProductID); err != nil { + if _, err := s.dbConn.Exec(ctx, sql, qty, price, time.Now(), item.BasketID, item.ProductID); err != nil { return err } diff --git a/src/internal/ui/basket.go b/src/internal/ui/basket.go index 81a1635..4f5b8d0 100644 --- a/src/internal/ui/basket.go +++ b/src/internal/ui/basket.go @@ -2,6 +2,7 @@ package ui import ( "context" + "fmt" entity "git.pbiernat.io/egommerce/api-entities/http" "git.pbiernat.io/egommerce/api-entities/model" @@ -54,15 +55,16 @@ func RemoveProductFromBasket(srv *service.BasketService, productID, qty int, bas item, err := srv.FetchItem(ctx, basket.ID, productID) if err != nil { + fmt.Printf("FetchItem not found item in db: %v\n", err) ctx.Done() // FIXME - return nil, err + return basket, err } if item.Quantity <= qty { err = srv.RemoveItem(ctx, item.ProductID, item.BasketID) if err != nil { ctx.Done() // FIXME - return nil, err + return basket, err } } else { qty = item.Quantity - qty diff --git a/src/internal/worker/ext.go b/src/internal/worker/ext.go deleted file mode 100644 index 9ba2f79..0000000 --- a/src/internal/worker/ext.go +++ /dev/null @@ -1,133 +0,0 @@ -package worker - -// import ( -// "bytes" -// "encoding/json" -// "os" - -// cnf "git.pbiernat.io/egommerce/basket-service/internal/config" -// "git.pbiernat.io/egommerce/basket-service/pkg/database" -// "git.pbiernat.io/egommerce/go-api-pkg/fluentd" -// "git.pbiernat.io/egommerce/go-api-pkg/rabbitmq" -// "github.com/go-redis/redis/v8" -// ) - -// func WithCache(c *cnf.Config) OptionFn { -// return func(w *Worker) error { -// conn := redis.NewClient(&redis.Options{ -// Addr: c.CacheAddr, -// Password: c.CachePassword, -// DB: 0, -// }) - -// w.Cache = conn - -// return nil -// } -// } - -// func WithDatabase(c *cnf.Config) OptionFn { -// return func(w *Worker) error { -// conn, err := database.Connect(c.DbURL) -// if err != nil { -// w.Logger.Log("Failed to connect to Database server: %v\n", err) -// os.Exit(1) -// } - -// w.Database = conn - -// return nil -// } -// } - -// func WithEventbus(c *cnf.Config) OptionFn { -// return func(w *Worker) error { -// _, chn, err := rabbitmq.Open(c.EventBusURL) -// if err != nil { -// w.Logger.Log("Failed to connect to EventBus server: %v\n", err) -// os.Exit(1) -// } - -// err = rabbitmq.NewExchange(chn, c.EventBusExchange) -// if err != nil { -// w.Logger.Log("Failed to declare EventBus exchange: %v\n", err) -// os.Exit(1) -// } - -// _, err = chn.QueueDeclare( -// c.EventBusQueue, // name -// false, // durable -// false, // delete when unused -// false, // exclusive -// false, // no-wait -// nil, // arguments -// ) -// if err != nil { -// w.Logger.Log("Failed to declare EventBus queue: %v\n", err) -// os.Exit(1) -// } - -// // w.bindQueues() -// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket") -// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket") -// rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity") - -// w.Eventbus = chn - -// return nil -// } -// } - -// func WithLogger(c *cnf.Config) OptionFn { -// return func(w *Worker) error { -// logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr) -// if err != nil { -// w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err) -// os.Exit(1) -// } - -// logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort) -// if err != nil { -// w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err) -// os.Exit(1) -// } - -// w.Logger = logger - -// return nil -// } -// } - -// // func WithRegistry(c *cnf.Config) OptionFn { -// // return func(w *Worker) error { -// // registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0) -// // if err != nil { -// // w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err) -// // } - -// // w.Registry = registry - -// // go func(w *Worker) { // Fetch Consul KV config and store it in app config -// // ticker := time.NewTicker(time.Second * 15) -// // for range ticker.C { -// // fetchKVConfig(w) // FIXME: duplicated in server -// // } -// // }(w) - -// // return nil -// // } - -// // } - -// func fetchKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go -// config, _, err := w.Registry.KV().Get(w.Config.KVNamespace, nil) -// if err != nil || config == nil { -// return -// } - -// kvCnf := bytes.NewBuffer(config.Value) -// decoder := json.NewDecoder(kvCnf) -// if err := decoder.Decode(&w.Config); err != nil { -// return -// } -// } diff --git a/src/internal/worker/worker.go b/src/internal/worker/worker.go index 6667f4b..10bc926 100644 --- a/src/internal/worker/worker.go +++ b/src/internal/worker/worker.go @@ -38,36 +38,9 @@ func New(c *Config) *Worker { } func (w *Worker) Start() error { - // Init - err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange) - if err != nil { - w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err) - fmt.Printf("Failed to declare EventBus exchange: %v\n", err) + setupQueues(w) - os.Exit(1) - } - - _, err = w.GetEventBus().QueueDeclare( - w.cnf.EventBusQueue, // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err) - fmt.Printf("Failed to declare EventBus queue: %v\n", err) - - os.Exit(1) - } - - // // w.bindQueues() - rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productAddedToBasket") - rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productRemovedFromBasket") - rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.updateQuantity") - - err = w.doWork(w.doWrkUntil) + err := w.doWork(w.doWrkUntil) if err != nil { log.Fatalf("Failed to start worker: %s. Reason: %v\n", w.ID, err) close(w.doWrkUntil) @@ -101,6 +74,8 @@ func (w *Worker) RegisterHandler(name string, fn func() any) { func (w *Worker) OnShutdown() { w.GetLogger().Log("Worker %s is going down...", w.ID) // fmt.Printf("Worker %s is going down...\n", w.ID) + + unbindQueues(w) w.GetEventBus().Close() w.GetDatabase().Close() w.GetLogger().Log("Gone.") @@ -162,6 +137,7 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) { msg, err := rabbitmq.Deserialize(m.Body) if err != nil { w.GetLogger().Log("Deserialization error: %v\n", err) + fmt.Printf("Deserialization error: %v\n", err) m.Reject(false) return @@ -172,6 +148,7 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) { // reqID := (data["request_id"]).(string) // FIXME Check input params! w.GetLogger().Log("Processing message \"%s\" with data: %v\n", name, data) + fmt.Printf("Processing message \"%s\" with data: %v\n", name, data) var ok = false switch true { // Refactor -> use case for polymorphism @@ -179,8 +156,8 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) { w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_ADDED_TO_BASKET) case strings.Contains(name, event.EVENT_PRODUCT_REMOVED_FROM_BASKET): w.GetLogger().Log("Event: %s", event.EVENT_PRODUCT_REMOVED_FROM_BASKET) - // case strings.Contains(name, event.EVENT_BASKET_CHECKOUT): - // w.Logger.Log("Event: %s", event.EVENT_BASKET_CHECKOUT) + case strings.Contains(name, event.EVENT_BASKET_CHECKOUT): + w.GetLogger().Log("Event: %s", event.EVENT_BASKET_CHECKOUT) } // case strings.Contains(name, event.EVENT_PRODUCT_ADDED_TO_BASKET): @@ -201,10 +178,49 @@ func (w *Worker) processMsg(srvc *service.BasketService, m amqp.Delivery) { ok, _ = rnr.run(data) if ok { w.GetLogger().Log("Successful executed message \"%s\"\n", name) + fmt.Printf("Successful executed message \"%s\"\n", name) m.Ack(false) return } - w.GetLogger().Log("Error processing \"%s\": %s (%v)", name, err.Error(), err) - m.Reject(false) // FIXME: or Nack(repeat until success - maybe message shout know...? + w.GetLogger().Log("Error processing \"%s\": %v", name, err) + fmt.Printf("Error processing \"%s\": %v\n", name, err) + m.Reject(true) // FIXME: or Nack(repeat until success - maybe message shout know...? +} + +func setupQueues(w *Worker) { + err := rabbitmq.NewExchange(w.GetEventBus(), w.cnf.EventBusExchange) + if err != nil { + w.GetLogger().Log("Failed to declare EventBus exchange: %v\n", err) + fmt.Printf("Failed to declare EventBus exchange: %v\n", err) + + os.Exit(1) + } + + args := amqp.Table{} + args["x-message-ttl"] = 5 + _, err = w.GetEventBus().QueueDeclare( + w.cnf.EventBusQueue, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + args, // arguments + ) + if err != nil { + w.GetLogger().Log("Failed to declare EventBus queue: %v\n", err) + fmt.Printf("Failed to declare EventBus queue: %v\n", err) + + os.Exit(1) + } + + rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productAddedToBasket") + rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.productRemovedFromBasket") + rabbitmq.BindQueueToExchange(w.GetEventBus(), w.cnf.EventBusQueue, w.cnf.EventBusExchange, "catalog.basket.updateQuantity") +} + +func unbindQueues(w *Worker) { + w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.productAddedToBasket", w.cnf.EventBusExchange, nil) + w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.productRemovedFromBasket", w.cnf.EventBusExchange, nil) + w.GetEventBus().QueueUnbind(w.cnf.EventBusQueue, "catalog.basket.updateQuantity", w.cnf.EventBusExchange, nil) }