vegvisir/pkg/cache/manager.go

208 lines
5.8 KiB
Go
Raw Normal View History

// ___ ____ ___ ___
// \ \ / / | _ | __| \ \ / / || | __ || || _ |
// \ \/ / |___ | |__ \ \/ / || |___ || ||___|
// \ / | _ | _ | \ / || __ | || ||\\
// \/ |___ |___ | \/ || ____| || || \\
//
// Copyright (c) 2021 Piotr Biernat. https://pbiernat.dev. MIT License
// Repo: https://git.pbiernat.dev/golang/vegvisir
// Based on:
// Package memo provides a concurrency-safe non-blocking memoization
// of a function. Requests for different keys proceed in parallel.
// Concurrent requests for the same url block until the first completes.
// This implementation uses a monitor goroutine.
// See page 278.
// Copyright © 2016 Alan A. A. Donovan & Brian W. Kernighan.
// License: https://creativecommons.org/licenses/by-nc-sa/4.0/
package cache
import (
"github.com/pquerna/ffjson/ffjson"
"github.com/valyala/fasthttp"
"log"
"sync"
"time"
)
2021-11-15 21:41:38 +01:00
// Manager main struct
type Manager struct {
sync.RWMutex
datastore Datastore
prefix string
ttl int
queue *queue
}
type result struct {
name string // cache item key
body *ResponseCache // cache item body FIXME: use interface to cover all types of cache items
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
2021-11-15 21:41:38 +01:00
// HandlerFunc non-blocking cache handler func signature definition
type HandlerFunc func(url, method string, route *RouteCache) (*ResponseCache, error)
type queue struct {
items chan queueItem
}
type queueItem struct {
url string
method string
route *RouteCache
// FIXME: Refactor ^^
//key string
response chan<- result // the client wants a single result
}
//var reqsSend = 0
2021-11-15 21:41:38 +01:00
// NewCachedManager Creates instance of non-blocking cache manager
func NewCachedManager(datastore Datastore, prefix string, ttl int, handler HandlerFunc) *Manager {
//log.Printf("Create DS: %s %d", prefix, ttl)
2021-11-15 21:41:38 +01:00
m := &Manager{
datastore: datastore,
prefix: prefix,
ttl: ttl,
queue: &queue{items: make(chan queueItem)},
}
go m.server(handler)
return m
}
2021-11-15 21:41:38 +01:00
// HTTPRequestHandler HTTP Handler for non-blocking cache reads
func HTTPRequestHandler(url, method string, route *RouteCache) (*ResponseCache, error) { // FIXME: Refactor|Move to handler/
//start := time.Now()
bckReq := fasthttp.AcquireRequest()
bckResp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(bckReq)
defer fasthttp.ReleaseResponse(bckResp)
// copy headers from backend response and prepare request for backend - separate
bckReq.SetRequestURI(route.TargetURL)
bckReq.Header.SetMethod(method)
err := fasthttp.Do(bckReq, bckResp)
if err != nil {
2021-11-15 21:41:38 +01:00
return nil, err
}
body, code := string(bckResp.Body()), bckResp.StatusCode()
//log.Printf("%s, %s, %d bytes\n", url, time.Since(start), len(body))
// save response to cache
respCache := NewResponseCache(url, method, body, code, &bckResp.Header)
// ^^ FIXME: rename NewResponseCache(with all depend struct) and move to handler/ (without dependencies)
//reqsSend++
2021-11-15 21:41:38 +01:00
log.Println(time.Now().Nanosecond(), "HTTPRequestHandler() done:", route.TargetURL, url)
2021-11-15 21:41:38 +01:00
return respCache, nil
}
2021-11-15 21:41:38 +01:00
// Fetch Non-blocking cache response read
// FIXME: #68 - refactor
func (m *Manager) Fetch(url, method string, route *RouteCache) (*ResponseCache, error) {
//log.Println("Response CM: Fetch()")
response := make(chan result)
m.queue.items <- queueItem{url, method, route, response}
res := <-response
return res.body, res.err
}
2021-11-15 21:41:38 +01:00
// Close exec while server shutting down
func (m *Manager) Close() {
close(m.queue.items)
}
2021-11-15 21:41:38 +01:00
func (m *Manager) load(url, method string, route *RouteCache /*, output interface{}*/) (bool, *entry) { // FIXME second return type( interface{} )
//log.Println("Response CM: load()")
key := m.prefix + method + "_" + url
var data, err = m.datastore.GetKey(key)
if err != nil {
//log.Println(time.Now().Nanosecond(), "Cache item:", key, "not found")
return false, &entry{ready: make(chan struct{})}
}
response := &ResponseCache{} // FIXME #67: Simplify
jsonErr := ffjson.Unmarshal([]byte(data.(string)), response)
if jsonErr != nil {
log.Println("Converting error: ", err) // FIXME
return false, &entry{ready: make(chan struct{})}
}
e := &entry{result{key, response, nil}, make(chan struct{})}
close(e.ready)
return true, e
}
func (m *Manager) save(name string, r interface{}) bool { // FIXME second argument type( interface{} )
//log.Println(time.Now().Nanosecond(), "Saving Response")
data, err := ffjson.Marshal(&r)
if err != nil {
log.Println("JSON:", err) // FIXME
return false
}
name = m.prefix + name
err = m.datastore.SetKey(name, string(data), m.ttl)
//log.Printf("%d Saved cache in to ds: %T %s", time.Now().Nanosecond(), m.datastore, name)
if err != nil {
log.Println("Error saving cache item:", name) // FIXME
return false
}
return true
}
// FIXME: #68 - refactor
// FIXME:
// crashing on https://vps.pbiernat.dev/test-api/news (missing keep-alive header or some http/2 issue...)
// no-errors when Connection: close header is applied on remote
func (m *Manager) server(handler HandlerFunc) {
for item := range m.queue.items {
m.Lock()
2021-11-15 21:41:38 +01:00
ok, e := m.load(item.url, item.method, item.route /*, &ResponseCache{}*/) // FIXME: &responseCache{} tmp fix
m.Unlock()
if !ok {
//e = &entry{ready: make(chan struct{})/*, called: make(chan struct{})*/}
//log.Println("e.call:", item.url, item.method, item.route)
go m.call(e, handler, item.url, item.method, item.route)
}
//log.Println("e.deliver:", item.response)
go m.deliver(e, item.response)
}
}
func (m *Manager) call(e *entry, f HandlerFunc, url, method string, route *RouteCache) {
e.res.name = method + "_" + url // FIXME: hardcoded key pattern
m.RLock()
2021-11-15 21:41:38 +01:00
e.res.body, e.res.err = f(url, method, route)
m.save(e.res.name, e.res.body)
m.RUnlock()
close(e.ready)
}
func (m *Manager) deliver(e *entry, response chan<- result) {
<-e.ready
response <- e.res
}