All checks were successful
continuous-integration/drone/push Build is passing
152 lines
3.1 KiB
Go
152 lines
3.1 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2"
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
"github.com/streadway/amqp"
|
|
|
|
discovery "git.pbiernat.dev/egommerce/go-api-pkg/consul"
|
|
"git.pbiernat.dev/egommerce/go-api-pkg/fluentd"
|
|
)
|
|
|
|
type Server struct {
|
|
*fiber.App
|
|
conf *Config
|
|
log *fluentd.Logger
|
|
db *pgxpool.Pool
|
|
ebCh *amqp.Channel
|
|
discovery *discovery.Service
|
|
name string
|
|
addr string
|
|
kvNmspc string
|
|
}
|
|
|
|
type Headers struct {
|
|
RequestID string `reqHeader:"x-request-id"`
|
|
}
|
|
|
|
func NewServer(conf *Config, logger *fluentd.Logger, db *pgxpool.Pool, ebCh *amqp.Channel) *Server {
|
|
consul, err := discovery.NewService(conf.RegistryAddr, conf.AppID, conf.AppName, conf.AppID, conf.AppDomain, conf.Port)
|
|
if err != nil {
|
|
logger.Log("Error connecting to %s: %v", conf.RegistryAddr, err)
|
|
}
|
|
|
|
logger.Log("Registering service with name: %s, address: %s", consul.Name, consul.Address)
|
|
err = consul.Register()
|
|
if err != nil {
|
|
logger.Log("register error: %v", err)
|
|
}
|
|
|
|
cnf := fiber.Config{
|
|
AppName: conf.AppName,
|
|
ServerHeader: conf.AppName,
|
|
ReadTimeout: time.Millisecond * 50,
|
|
WriteTimeout: time.Millisecond * 50,
|
|
IdleTimeout: time.Millisecond * 50,
|
|
}
|
|
s := &Server{
|
|
fiber.New(cnf),
|
|
conf,
|
|
logger,
|
|
db,
|
|
ebCh,
|
|
consul,
|
|
conf.AppName,
|
|
conf.NetAddr,
|
|
conf.KVNamespace,
|
|
}
|
|
|
|
go func(s *Server) { // Consul KV config updater
|
|
interval := time.Second * 30
|
|
ticker := time.NewTicker(interval)
|
|
for range ticker.C {
|
|
err := s.updateKVConfig()
|
|
if err != nil {
|
|
logger.Log("KV config update error (skipping): %v\n", err)
|
|
}
|
|
}
|
|
}(s)
|
|
|
|
SetupMiddlewares(s)
|
|
SetupRoutes(s)
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *Server) Start() {
|
|
err := s.Listen(s.addr)
|
|
s.log.Log("Starting error: %v", err)
|
|
}
|
|
|
|
func (s *Server) StartWithGracefulShutdown(forever chan struct{}) {
|
|
go func() {
|
|
sigint := make(chan os.Signal, 1)
|
|
signal.Notify(sigint, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
|
|
<-sigint
|
|
|
|
if err := s.gracefulShutdown(); err != nil {
|
|
s.log.Log("Server is not shutting down! Reason: %v", err)
|
|
}
|
|
|
|
close(forever)
|
|
}()
|
|
|
|
if err := s.Listen(s.addr); err != nil {
|
|
s.log.Log("Server is not running! Reason: %v", err)
|
|
}
|
|
|
|
<-forever
|
|
}
|
|
|
|
// GetRequestID Return current requets ID - works only when fiber context are running
|
|
func (s *Server) GetRequestID(c *fiber.Ctx) (string, error) {
|
|
var hdr = new(Headers)
|
|
if err := c.ReqHeaderParser(hdr); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return hdr.RequestID, nil
|
|
}
|
|
|
|
func (s *Server) updateKVConfig() error { // FIXME: duplicated in cmd/worker/main.go
|
|
config, _, err := s.discovery.KV().Get(s.kvNmspc, nil)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
|
|
return err
|
|
}
|
|
|
|
if config == nil {
|
|
return errors.New("empty KV config data")
|
|
}
|
|
|
|
kvCnf := bytes.NewBuffer(config.Value)
|
|
decoder := json.NewDecoder(kvCnf)
|
|
if err := decoder.Decode(&s.conf); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) gracefulShutdown() error {
|
|
s.log.Log("Server is going down...")
|
|
s.log.Log("Unregistering service: %s", s.discovery.GetID())
|
|
s.discovery.Unregister()
|
|
|
|
s.ebCh.Close()
|
|
s.db.Close()
|
|
s.log.Close()
|
|
|
|
return s.Shutdown()
|
|
}
|