// ___ ____ ___ ___ // \ \ / / | _ | __| \ \ / / || | __ || || _ | // \ \/ / |___ | |__ \ \/ / || |___ || ||___| // \ / | _ | _ | \ / || __ | || ||\\ // \/ |___ |___ | \/ || ____| || || \\ // // Copyright (c) 2021 Piotr Biernat. https://pbiernat.dev. MIT License // Repo: https://git.pbiernat.dev/golang/vegvisir // Based on: // Package memo provides a concurrency-safe non-blocking memoization // of a function. Requests for different keys proceed in parallel. // Concurrent requests for the same url block until the first completes. // This implementation uses a monitor goroutine. // See page 278. // Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan. // License: https://creativecommons.org/licenses/by-nc-sa/4.0/ package cache import ( "github.com/pquerna/ffjson/ffjson" "github.com/valyala/fasthttp" "log" "sync" "time" ) // Manager main struct type Manager struct { sync.RWMutex datastore Datastore prefix string ttl int queue *queue } type result struct { name string // cache item key body *ResponseCache // cache item body FIXME: use interface to cover all types of cache items err error } type entry struct { res result ready chan struct{} // closed when res is ready } // HandlerFunc non-blocking cache handler func signature definition type HandlerFunc func(url, method string, route *RouteCache) (*ResponseCache, error) type queue struct { items chan queueItem } type queueItem struct { url string method string route *RouteCache // FIXME: Refactor ^^ //key string response chan<- result // the client wants a single result } //var reqsSend = 0 // NewCachedManager Creates instance of non-blocking cache manager func NewCachedManager(datastore Datastore, prefix string, ttl int, handler HandlerFunc) *Manager { //log.Printf("Create DS: %s %d", prefix, ttl) m := &Manager{ datastore: datastore, prefix: prefix, ttl: ttl, queue: &queue{items: make(chan queueItem)}, } go m.server(handler) return m } // HTTPRequestHandler HTTP Handler for non-blocking cache reads func HTTPRequestHandler(url, method string, route *RouteCache) (*ResponseCache, error) { // FIXME: Refactor|Move to handler/ //start := time.Now() bckReq := fasthttp.AcquireRequest() bckResp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(bckReq) defer fasthttp.ReleaseResponse(bckResp) // copy headers from backend response and prepare request for backend - separate bckReq.SetRequestURI(route.TargetURL) bckReq.Header.SetMethod(method) err := fasthttp.Do(bckReq, bckResp) if err != nil { return nil, err } body, code := string(bckResp.Body()), bckResp.StatusCode() //log.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(body)) // save response to cache respCache := NewResponseCache(url, method, body, code, &bckResp.Header) // ^^ FIXME: rename NewResponseCache(with all depend struct) and move to handler/ (without dependencies) //reqsSend++ log.Println(time.Now().Nanosecond(), "HTTPRequestHandler() done:", route.TargetURL, url) return respCache, nil } // Fetch Non-blocking cache response read // FIXME: #68 - refactor func (m *Manager) Fetch(url, method string, route *RouteCache) (*ResponseCache, error) { //log.Println("Response CM: Fetch()") response := make(chan result) m.queue.items <- queueItem{url, method, route, response} res := <-response return res.body, res.err } // Close exec while server shutting down func (m *Manager) Close() { close(m.queue.items) } func (m *Manager) load(url, method string, route *RouteCache /*, output interface{}*/) (bool, *entry) { // FIXME second return type( interface{} ) //log.Println("Response CM: load()") key := m.prefix + method + "_" + url var data, err = m.datastore.GetKey(key) if err != nil { //log.Println(time.Now().Nanosecond(), "Cache item:", key, "not found") return false, &entry{ready: make(chan struct{})} } response := &ResponseCache{} // FIXME #67: Simplify jsonErr := ffjson.Unmarshal([]byte(data.(string)), response) if jsonErr != nil { log.Println("Converting error: ", err) // FIXME return false, &entry{ready: make(chan struct{})} } e := &entry{result{key, response, nil}, make(chan struct{})} close(e.ready) return true, e } func (m *Manager) save(name string, r interface{}) bool { // FIXME second argument type( interface{} ) //log.Println(time.Now().Nanosecond(), "Saving Response") data, err := ffjson.Marshal(&r) if err != nil { log.Println("JSON:", err) // FIXME return false } name = m.prefix + name err = m.datastore.SetKey(name, string(data), m.ttl) //log.Printf("%d Saved cache in to ds: %T %s", time.Now().Nanosecond(), m.datastore, name) if err != nil { log.Println("Error saving cache item:", name) // FIXME return false } return true } // FIXME: #68 - refactor // FIXME: // crashing on https://vps.pbiernat.dev/test-api/news (missing keep-alive header or some http/2 issue...) // no-errors when Connection: close header is applied on remote func (m *Manager) server(handler HandlerFunc) { for item := range m.queue.items { m.Lock() ok, e := m.load(item.url, item.method, item.route /*, &ResponseCache{}*/) // FIXME: &responseCache{} tmp fix m.Unlock() if !ok { //e = &entry{ready: make(chan struct{})/*, called: make(chan struct{})*/} //log.Println("e.call:", item.url, item.method, item.route) go m.call(e, handler, item.url, item.method, item.route) } //log.Println("e.deliver:", item.response) go m.deliver(e, item.response) } } func (m *Manager) call(e *entry, f HandlerFunc, url, method string, route *RouteCache) { e.res.name = method + "_" + url // FIXME: hardcoded key pattern m.RLock() e.res.body, e.res.err = f(url, method, route) m.save(e.res.name, e.res.body) m.RUnlock() close(e.ready) } func (m *Manager) deliver(e *entry, response chan<- result) { <-e.ready response <- e.res }