catalog-service/src/internal/app/server/server.go
Piotr Biernat ab362d3b56
All checks were successful
continuous-integration/drone/push Build is passing
Added migrations
2022-12-17 06:36:55 +01:00

149 lines
3.0 KiB
Go

package server
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/gofiber/fiber/v2"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/streadway/amqp"
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
)
type Server struct {
*fiber.App
conf *Config
log *fluentd.Logger
db *pgxpool.Pool
ebCh *amqp.Channel
discovery *discovery.Service
name string
addr string
kvNmspc string
}
type Headers struct {
RequestID string `reqHeader:"x-request-id"`
}
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, ebCh *amqp.Channel) *Server {
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.Port)
if err != nil {
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
}
logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address)
err = consul.Register()
if err != nil {
logger.Log("register error: %v", err)
}
cnf := fiber.Config{
AppName: conf.AppName,
ServerHeader: conf.AppName,
ReadTimeout: time.Millisecond * 50,
WriteTimeout: time.Millisecond * 50,
IdleTimeout: time.Millisecond * 50,
}
s := &Server{
fiber.New(cnf),
conf,
logger,
db,
ebCh,
consul,
conf.AppName,
conf.NetAddr,
conf.KVNamespace,
}
go func(s *Server) { // Consul KV config updater
interval := time.Second * 30
ticker := time.NewTicker(interval)
for range ticker.C {
s.updateKVConfig()
}
}(s)
SetupMiddlewares(s)
SetupRoutes(s)
return s
}
func (s *Server) Start() {
err := s.Listen(s.addr)
s.log.Log("Starting error: %v", err)
}
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
go func() {
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sigint
if err := s.gracefulShutdown(); err != nil {
s.log.Log("Server is not shutting down! Reason: %v", err)
}
close(forever)
}()
if err := s.Listen(s.addr); err != nil {
s.log.Log("Server is not running! Reason: %v", err)
}
<-forever
}
// GetRequestID Return current requets ID - works only when fiber context are running
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
var hdr = new(Headers)
if err := c.ReqHeaderParser(hdr); err != nil {
return "", err
}
return hdr.RequestID, nil
}
func (s *Server) updateKVConfig() error { // FIXME: duplicated in cmd/worker/main.go
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
if err != nil {
fmt.Println(err)
return err
}
if config == nil {
return errors.New("empty KV config data")
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.conf); err != nil {
return err
}
return nil
}
func (s *Server) gracefulShutdown() error {
s.log.Log("Server is going down...")
s.log.Log("Unregistering service: %s", s.discovery.GetID())
s.discovery.Unregister()
s.ebCh.Close()
s.db.Close()
s.log.Close()
return s.Shutdown()
}