package app import ( "fmt" "log" "os" "strconv" redis "github.com/go-redis/redis/v8" amqp "github.com/rabbitmq/amqp091-go" "git.pbiernat.io/egommerce/go-api-pkg/consul" "git.pbiernat.io/egommerce/go-api-pkg/fluentd" db "git.pbiernat.io/egommerce/catalog-service/pkg/database" ) type ( Plugin struct { name string fn PluginFn } PluginFn func() any ) func CachePlugin(cArr map[string]string) Plugin { return Plugin{ name: "cache", fn: func() any { return redis.NewClient(&redis.Options{ Addr: cArr["cacheAddr"], Password: cArr["cachePassword"], DB: 0, }) }, } } func DatabasePlugin(cArr map[string]string) Plugin { return Plugin{ name: "database", fn: func() any { dbConn, err := db.Connect(cArr["dbURL"]) if err != nil { log.Fatalf("Failed to connect to the Database: %s. Err: %v\n", cArr["dbURL"], err) os.Exit(1) // TODO: retry in background... } return dbConn }, } } func EventbusPlugin(cArr map[string]string) Plugin { return Plugin{ name: "eventbus", fn: func() any { conn, err := amqp.Dial(cArr["eventBusURL"]) if err != nil { log.Fatalf("Failed to connect to the EventBus: %s. Err: %v\n", cArr["eventBusURL"], err) os.Exit(1) // TODO: retry in background... } chn, err := conn.Channel() if err != nil { log.Fatalf("Failed to open new EventBus channel. Err: %v\n", err) os.Exit(1) // TODO: retry in background... } return chn }, } } func LoggerPlugin(cArr map[string]string) Plugin { return Plugin{ name: "logger", fn: func() any { logHost, logPort, err := fluentd.ParseAddr(cArr["loggerAddr"]) if err != nil { log.Fatalf("Failed to parse FluentD address: %s. Err: %v", cArr["loggerAddr"], err) os.Exit(1) // TODO: retry in background... } logger, err := fluentd.NewLogger(cArr["appFullname"], logHost, logPort) if err != nil { log.Fatalf("Failed to connect to the FluentD on %s:%d. Err: %v", logHost, logPort, err) os.Exit(1) // TODO: retry in background... } return logger }, } } func RegistryPlugin(cArr map[string]string) Plugin { return Plugin{ name: "registry", fn: func() any { port, _ := strconv.Atoi(cArr["netAddr"][1:]) // FIXME: can be IP:PORT or :PORT // log.Printf("Consul retrieved port: %v", port) registry, err := consul.NewService(cArr["registryAddr"], cArr["id"], cArr["name"], cArr["registryDomainOverIP"], cArr["ip"], cArr["domain"], cArr["pathPrefix"], port) if err != nil { log.Fatalf("Failed to connect to the Consul on: %s. Err: %v", cArr["registryAddr"], err) os.Exit(1) // TODO: retry in background... } svc, _ := registry.Connect() tlsCnf := svc.ServerTLSConfig() // s.Base.App.Server().TLSConfig = tlsCnf fmt.Println("Podmiana configa TLS", tlsCnf) // defer svc.Close() err = registry.Register() if err != nil { log.Fatalf("Failed to register in the Consul service. Err: %v", err) os.Exit(1) // TODO: retry in background... } registry.RegisterHealthChecks() // a.registerKVUpdater() // FIXME run as goroutine return registry // go func() { // Consul KV updater // ticker := time.NewTicker(time.Second * 15) // for range ticker.C { // fetchKVConfig(s) // FIXME: duplicated in worker // } // }() // go func() { // Server metadata cache updater // ticker := time.NewTicker(time.Second * 5) // for range ticker.C { // s.cacheMetadata() // } // }() }, } }