catalog-service/src/internal/server/server.go

210 lines
5.0 KiB
Go

package server
import (
"bytes"
"encoding/json"
"log"
"os"
"strconv"
"time"
"git.pbiernat.io/egommerce/go-api-pkg/consul"
"git.pbiernat.io/egommerce/go-api-pkg/fluentd"
redis "github.com/go-redis/redis/v8"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/streadway/amqp"
db "git.pbiernat.io/egommerce/catalog-service/pkg/database"
p "git.pbiernat.io/egommerce/catalog-service/pkg/server"
"git.pbiernat.io/egommerce/catalog-service/internal"
)
type (
Server struct {
Base *p.Server
Config *internal.Config
Cache *redis.Client
Database *pgxpool.Pool
Eventbus *amqp.Channel
Logger *fluentd.Logger
Registry *consul.Service
}
OptionFn func(*Server) error
)
func New(c *internal.Config, opts ...OptionFn) *Server {
svr := &Server{
Base: p.New(c.Base),
Config: c,
}
for _, opt := range opts {
if err := opt(svr); err != nil {
log.Fatalf("Failed to attach extension to the server. Err: %v\n", err)
}
}
svr.Base.ShutdownFn = svr.Shutdown()
SetupMiddleware(svr)
SetupRouter(svr)
return svr
}
func (s *Server) Shutdown() p.ShutdownFn {
return func() error {
s.Logger.Log("Server %s is going down...", s.Base.AppID)
s.Registry.Unregister()
// s.clearMetadataCache()
s.Eventbus.Close()
s.Database.Close()
s.Logger.Log("Gone.")
s.Logger.Close()
return s.Base.Shutdown()
}
}
// @CHECK: merge s.Config and s.Base.Config to display all config as one array/map
func (s *Server) registerKVUpdater() { // @FIXME: merge duplication in server.go and worker.go
go func() {
ticker := time.NewTicker(time.Second * 10)
for range ticker.C {
config, _, err := s.Registry.KV().Get(s.Config.KVNamespace, nil)
if err != nil || config == nil {
return
}
kvCnf := bytes.NewBuffer(config.Value)
decoder := json.NewDecoder(kvCnf)
if err := decoder.Decode(&s.Config); err != nil {
return
}
}
}()
}
// func (s *Server) clearMetadataCache() {
// ctx := context.Background()
// key, address := s.getMetadataIPsKey(), s.Config.Base.AppID
// s.Cache.LRem(ctx, key, 0, address)
// }
// func (s *Server) getMetadataIPsKey() string {
// return "internal__" + s.Base.Config.AppName + "__ips"
// }
func WithCache(c *internal.Config) OptionFn {
return func(s *Server) error {
s.Cache = redis.NewClient(&redis.Options{
Addr: s.Config.CacheAddr,
Password: s.Config.CachePassword,
DB: 0,
})
return nil
}
}
func WithDatabase(c *internal.Config) OptionFn {
return func(s *Server) error {
dbConn, err := db.Connect(s.Config.DbURL)
if err != nil {
log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", s.Config.DbURL, err)
os.Exit(1)
}
s.Database = dbConn
return nil
}
}
func WithEventbus(c *internal.Config) OptionFn {
return func(s *Server) error {
conn, err := amqp.Dial(s.Config.EventBusURL)
if err != nil {
log.Fatalf("Failed to connect to the Eventbus: %s. Err: %v\n", s.Config.EventBusURL, err)
}
chn, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open new Eventbus channel. Err: %v\n", err)
}
s.Eventbus = chn
return nil
}
}
func WithLogger(c *internal.Config) OptionFn {
return func(s *Server) error {
logHost, logPort, err := fluentd.ParseAddr(s.Config.LoggerAddr)
if err != nil {
log.Fatalf("Failed to parse Fluentd address: %s. Err: %v", s.Config.LoggerAddr, err)
}
logger, err := fluentd.NewLogger(s.Config.Base.GetAppFullName(), logHost, logPort)
if err != nil {
log.Fatalf("Failed to connect to the Fluentd on %s:%d. Err: %v", logHost, logPort, err)
}
s.Logger = logger
return nil
}
}
func WithRegistry(c *internal.Config) OptionFn {
return func(s *Server) error {
c := s.Config
port, _ := strconv.Atoi(c.Base.NetAddr[1:]) // FIXME: can be IP:PORT which will cause error
registry, err := consul.NewService(c.RegistryAddr, c.Base.AppID, c.Base.AppName, c.Base.GetIP(), c.Base.AppDomain, c.Base.PathPrefix, port)
if err != nil {
log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", c.RegistryAddr, err)
}
err = registry.Register()
if err != nil {
log.Fatalf("Failed to register in the Consul service. Err: %v", err)
}
registry.RegisterHealthChecks()
s.registerKVUpdater()
s.Registry = registry
// svc, err := registry.Connect()
// if err != nil {
// log.Fatalf("Failed to initialize Consul Connect service. Err: %v", err)
// }
// confUpdated := make(chan bool, 1)
// go func(s *Server) { // startup register
// ticker := time.NewTicker(time.Second * 5)
// for range ticker.C {
// tlsCnf := svc.ServerTLSConfig()
// if len(tlsCnf.Certificates) > 0 { // FIXME more complex checking(validity date)
// fmt.Println("certs: ", tlsCnf.Certificates)
// confUpdated <- true
// }
// s.Base.App.Server().TLSConfig = tlsCnf
// fmt.Printf("CERTY TLS [routine]: %v\n", tlsCnf.Certificates)
// }
// }(s)
// fmt.Println("Waiting until receive certs...")
// <-confUpdated
// fmt.Println("CONTINUE STARTING...")
// defer svc.Close() // tmp - ensure svc.Close is called on exit
return nil
}
}