basket-service/src/internal/worker/worker.go
Piotr Biernat dfc621e920
All checks were successful
continuous-integration/drone/push Build is passing
Redactor
2023-04-08 04:12:57 +02:00

273 lines
6.7 KiB
Go

package worker
import (
"bytes"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/streadway/amqp"
"git.pbiernat.dev/egommerce/go-api-pkg/consul"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
"git.pbiernat.dev/egommerce/go-api-pkg/rabbitmq"
"git.pbiernat.dev/egommerce/basket-service/pkg/database"
cnf "git.pbiernat.dev/egommerce/basket-service/internal/config"
"git.pbiernat.dev/egommerce/basket-service/internal/event"
"git.pbiernat.dev/egommerce/basket-service/internal/service"
"git.pbiernat.dev/egommerce/basket-service/internal/ui"
)
type (
Worker struct {
Config *cnf.Config
Cache *redis.Client
Database *pgxpool.Pool
Eventbus *amqp.Channel
Logger *fluentd.Logger
Registry *consul.Service
kvNmspc string
}
OptionFn func(*Worker) error // FIXME: similar in server/server.go
)
func New(c *cnf.Config, opts ...OptionFn) *Worker {
wrk := &Worker{
Config: c,
}
for _, opt := range opts {
if err := opt(wrk); err != nil {
log.Fatalf("Failed to attach extension to the worker. Err: %v\n", err)
}
}
return wrk
}
func (w *Worker) Start() error {
// event consume
msgs, err := w.Eventbus.Consume(
w.Config.EventBusQueue, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
w.Logger.Log("Failed to register a consumer: %s", err)
os.Exit(1)
}
forever := make(chan struct{})
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
w.Logger.Log("Shutting down %s worker...\n", w.Config.Base.GetAppFullName())
w.Shutdown()
close(forever)
}()
go func() {
basketSrv := service.NewBasketService(w.Database, w.Cache, w.Eventbus, w.Logger)
// basketSrv := service.NewBasketService(w)
for d := range msgs {
go func(d amqp.Delivery) {
msg, err := rabbitmq.Deserialize(d.Body)
if err != nil {
w.Logger.Log("deserialize error: %v\n", err)
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
return
}
eName := fmt.Sprintf("%s", msg["event"])
data := (msg["data"]).(map[string]interface{})
w.Logger.Log("Message<%s>: %v\n", eName, data)
reqID := data["request_id"].(string) // FIXME Check input params!
basketID := data["basket_id"].(string) // FIXME Check input params!
switch true {
case strings.Contains(eName, event.EVENT_PRODUCT_ADDED_TO_BASKET):
productID := int(data["product_id"].(float64))
qty := int(data["quantity"].(float64))
basket, err := ui.AddProductToBasket(basketSrv, productID, qty, basketID, reqID)
if err == nil {
w.Logger.Log("Product #%d added to basket #%s. ReqID: #%s", productID, basket.ID, reqID)
}
case strings.Contains(eName, event.EVENT_PRODUCT_REMOVED_FROM_BASKET):
productID := int(data["product_id"].(float64))
qty := int(data["quantity"].(float64))
basket, err := ui.RemoveProductFromBasket(basketSrv, productID, qty, basketID, reqID)
if err == nil {
w.Logger.Log("Product #%d removed from basket #%s. ReqID: #%s", productID, basket.ID, reqID)
}
}
if err != nil {
w.Logger.Log("%s error: %s", eName, err.Error())
d.Reject(false) // FIXME: or Nack? how to handle erros in queue...
return
}
w.Logger.Log("ACK: %s", eName)
d.Ack(false)
}(d)
}
}()
w.Logger.Log("Waiting for messages...2")
return nil
}
func (w *Worker) Shutdown() error {
w.Logger.Log("Worker is going down...")
w.Registry.Unregister()
w.Eventbus.Close()
w.Database.Close()
w.Logger.Close()
return nil
}
func WithCache(c *cnf.Config) OptionFn {
return func(w *Worker) error {
conn := redis.NewClient(&redis.Options{
Addr: c.CacheAddr,
Password: c.CachePassword,
DB: 0,
})
w.Cache = conn
return nil
}
}
func WithDatabase(c *cnf.Config) OptionFn {
return func(w *Worker) error {
conn, err := database.Connect(c.DbURL)
if err != nil {
w.Logger.Log("Failed to connect to Database server: %v\n", err)
os.Exit(1)
}
w.Database = conn
return nil
}
}
func WithEventbus(c *cnf.Config) OptionFn {
return func(w *Worker) error {
_, chn, err := rabbitmq.Open(c.EventBusURL)
if err != nil {
w.Logger.Log("Failed to connect to EventBus server: %v\n", err)
os.Exit(1)
}
err = rabbitmq.NewExchange(chn, c.EventBusExchange)
if err != nil {
w.Logger.Log("Failed to declare EventBus exchange: %v\n", err)
os.Exit(1)
}
_, err = chn.QueueDeclare(
c.EventBusQueue, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
w.Logger.Log("Failed to declare EventBus queue: %v\n", err)
os.Exit(1)
}
// w.BindQueues()
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productAddedToBasket")
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.productRemovedFromBasket")
rabbitmq.BindQueueToExchange(chn, c.EventBusQueue, c.EventBusExchange, "catalog.basket.updateQuantity")
w.Eventbus = chn
return nil
}
}
func WithLogger(c *cnf.Config) OptionFn {
return func(w *Worker) error {
logHost, logPort, err := fluentd.ParseAddr(c.LoggerAddr)
if err != nil {
w.Logger.Log("Failed to parse Fluentd address: %s. Err: %v", c.LoggerAddr, err)
os.Exit(1)
}
logger, err := fluentd.NewLogger(c.Base.GetAppFullName(), logHost, logPort)
if err != nil {
w.Logger.Log("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
os.Exit(1)
}
w.Logger = logger
return nil
}
}
func WithRegistry(c *cnf.Config) OptionFn {
return func(w *Worker) error {
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.AppID, "", "", 0)
if err != nil {
w.Logger.Log("Error connecting to %s: %v", c.RegistryAddr, err)
}
go func() { // Consul KV updater
ticker := time.NewTicker(time.Second * 15)
for range ticker.C {
updateKVConfig(w)
}
}()
w.Registry = registry
return nil
}
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
func updateKVConfig(w *Worker) { // @FIXME: merge duplication in server.go and worker.go
config, _, err := w.Registry.KV().Get(w.kvNmspc, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&w.Config); err != nil {
return
}
}