dynacache.go - hugo - [fork] hugo port for 9front
 (HTM) git clone https://git.drkhsh.at/hugo.git
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) Submodules
 (DIR) README
 (DIR) LICENSE
       ---
       dynacache.go (15686B)
       ---
            1 // Copyright 2024 The Hugo Authors. All rights reserved.
            2 //
            3 // Licensed under the Apache License, Version 2.0 (the "License");
            4 // you may not use this file except in compliance with the License.
            5 // You may obtain a copy of the License at
            6 // http://www.apache.org/licenses/LICENSE-2.0
            7 //
            8 // Unless required by applicable law or agreed to in writing, software
            9 // distributed under the License is distributed on an "AS IS" BASIS,
           10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
           11 // See the License for the specific language governing permissions and
           12 // limitations under the License.
           13 
           14 package dynacache
           15 
           16 import (
           17         "context"
           18         "fmt"
           19         "math"
           20         "path"
           21         "regexp"
           22         "runtime"
           23         "sync"
           24         "time"
           25 
           26         "github.com/bep/lazycache"
           27         "github.com/bep/logg"
           28         "github.com/gohugoio/hugo/common/collections"
           29         "github.com/gohugoio/hugo/common/herrors"
           30         "github.com/gohugoio/hugo/common/loggers"
           31         "github.com/gohugoio/hugo/common/paths"
           32         "github.com/gohugoio/hugo/common/rungroup"
           33         "github.com/gohugoio/hugo/config"
           34         "github.com/gohugoio/hugo/helpers"
           35         "github.com/gohugoio/hugo/identity"
           36         "github.com/gohugoio/hugo/resources/resource"
           37 )
           38 
           39 const minMaxSize = 10
           40 
           41 type KeyIdentity struct {
           42         Key      any
           43         Identity identity.Identity
           44 }
           45 
           46 // New creates a new cache.
           47 func New(opts Options) *Cache {
           48         if opts.CheckInterval == 0 {
           49                 opts.CheckInterval = time.Second * 2
           50         }
           51 
           52         if opts.MaxSize == 0 {
           53                 opts.MaxSize = 100000
           54         }
           55         if opts.Log == nil {
           56                 panic("nil Log")
           57         }
           58 
           59         if opts.MinMaxSize == 0 {
           60                 opts.MinMaxSize = 30
           61         }
           62 
           63         stats := &stats{
           64                 opts:             opts,
           65                 adjustmentFactor: 1.0,
           66                 currentMaxSize:   opts.MaxSize,
           67                 availableMemory:  config.GetMemoryLimit(),
           68         }
           69 
           70         infol := opts.Log.InfoCommand("dynacache")
           71 
           72         evictedIdentities := collections.NewStack[KeyIdentity]()
           73 
           74         onEvict := func(k, v any) {
           75                 if !opts.Watching {
           76                         return
           77                 }
           78                 identity.WalkIdentitiesShallow(v, func(level int, id identity.Identity) bool {
           79                         evictedIdentities.Push(KeyIdentity{Key: k, Identity: id})
           80                         return false
           81                 })
           82                 resource.MarkStale(v)
           83         }
           84 
           85         c := &Cache{
           86                 partitions:        make(map[string]PartitionManager),
           87                 onEvict:           onEvict,
           88                 evictedIdentities: evictedIdentities,
           89                 opts:              opts,
           90                 stats:             stats,
           91                 infol:             infol,
           92         }
           93 
           94         c.stop = c.start()
           95 
           96         return c
           97 }
           98 
           99 // Options for the cache.
          100 type Options struct {
          101         Log           loggers.Logger
          102         CheckInterval time.Duration
          103         MaxSize       int
          104         MinMaxSize    int
          105         Watching      bool
          106 }
          107 
          108 // Options for a partition.
          109 type OptionsPartition struct {
          110         // When to clear the this partition.
          111         ClearWhen ClearWhen
          112 
          113         // Weight is a number between 1 and 100 that indicates how, in general, how big this partition may get.
          114         Weight int
          115 }
          116 
          117 func (o OptionsPartition) WeightFraction() float64 {
          118         return float64(o.Weight) / 100
          119 }
          120 
          121 func (o OptionsPartition) CalculateMaxSize(maxSizePerPartition int) int {
          122         return int(math.Floor(float64(maxSizePerPartition) * o.WeightFraction()))
          123 }
          124 
          125 // A dynamic partitioned cache.
          126 type Cache struct {
          127         mu sync.RWMutex
          128 
          129         partitions map[string]PartitionManager
          130 
          131         onEvict           func(k, v any)
          132         evictedIdentities *collections.Stack[KeyIdentity]
          133 
          134         opts  Options
          135         infol logg.LevelLogger
          136 
          137         stats    *stats
          138         stopOnce sync.Once
          139         stop     func()
          140 }
          141 
          142 // DrainEvictedIdentities drains the evicted identities from the cache.
          143 func (c *Cache) DrainEvictedIdentities() []KeyIdentity {
          144         return c.evictedIdentities.Drain()
          145 }
          146 
          147 // DrainEvictedIdentitiesMatching drains the evicted identities from the cache that match the given predicate.
          148 func (c *Cache) DrainEvictedIdentitiesMatching(predicate func(KeyIdentity) bool) []KeyIdentity {
          149         return c.evictedIdentities.DrainMatching(predicate)
          150 }
          151 
          152 // ClearMatching clears all partition for which the predicate returns true.
          153 func (c *Cache) ClearMatching(predicatePartition func(k string, p PartitionManager) bool, predicateValue func(k, v any) bool) {
          154         if predicatePartition == nil {
          155                 predicatePartition = func(k string, p PartitionManager) bool { return true }
          156         }
          157         if predicateValue == nil {
          158                 panic("nil predicateValue")
          159         }
          160         g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
          161                 NumWorkers: len(c.partitions),
          162                 Handle: func(ctx context.Context, partition PartitionManager) error {
          163                         partition.clearMatching(predicateValue)
          164                         return nil
          165                 },
          166         })
          167 
          168         for k, p := range c.partitions {
          169                 if !predicatePartition(k, p) {
          170                         continue
          171                 }
          172                 g.Enqueue(p)
          173         }
          174 
          175         g.Wait()
          176 }
          177 
          178 // ClearOnRebuild prepares the cache for a new rebuild taking the given changeset into account.
          179 // predicate is optional and will clear any entry for which it returns true.
          180 func (c *Cache) ClearOnRebuild(predicate func(k, v any) bool, changeset ...identity.Identity) {
          181         g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
          182                 NumWorkers: len(c.partitions),
          183                 Handle: func(ctx context.Context, partition PartitionManager) error {
          184                         partition.clearOnRebuild(predicate, changeset...)
          185                         return nil
          186                 },
          187         })
          188 
          189         for _, p := range c.partitions {
          190                 g.Enqueue(p)
          191         }
          192 
          193         g.Wait()
          194 
          195         // Clear any entries marked as stale above.
          196         g = rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
          197                 NumWorkers: len(c.partitions),
          198                 Handle: func(ctx context.Context, partition PartitionManager) error {
          199                         partition.clearStale()
          200                         return nil
          201                 },
          202         })
          203 
          204         for _, p := range c.partitions {
          205                 g.Enqueue(p)
          206         }
          207 
          208         g.Wait()
          209 }
          210 
          211 type keysProvider interface {
          212         Keys() []string
          213 }
          214 
          215 // Keys returns a list of keys in all partitions.
          216 func (c *Cache) Keys(predicate func(s string) bool) []string {
          217         if predicate == nil {
          218                 predicate = func(s string) bool { return true }
          219         }
          220         var keys []string
          221         for pn, g := range c.partitions {
          222                 pkeys := g.(keysProvider).Keys()
          223                 for _, k := range pkeys {
          224                         p := path.Join(pn, k)
          225                         if predicate(p) {
          226                                 keys = append(keys, p)
          227                         }
          228                 }
          229 
          230         }
          231         return keys
          232 }
          233 
          234 func calculateMaxSizePerPartition(maxItemsTotal, totalWeightQuantity, numPartitions int) int {
          235         if numPartitions == 0 {
          236                 panic("numPartitions must be > 0")
          237         }
          238         if totalWeightQuantity == 0 {
          239                 panic("totalWeightQuantity must be > 0")
          240         }
          241 
          242         avgWeight := float64(totalWeightQuantity) / float64(numPartitions)
          243         return int(math.Floor(float64(maxItemsTotal) / float64(numPartitions) * (100.0 / avgWeight)))
          244 }
          245 
          246 // Stop stops the cache.
          247 func (c *Cache) Stop() {
          248         c.stopOnce.Do(func() {
          249                 c.stop()
          250         })
          251 }
          252 
          253 func (c *Cache) adjustCurrentMaxSize() {
          254         c.mu.RLock()
          255         defer c.mu.RUnlock()
          256 
          257         if len(c.partitions) == 0 {
          258                 return
          259         }
          260         var m runtime.MemStats
          261         runtime.ReadMemStats(&m)
          262         s := c.stats
          263         s.memstatsCurrent = m
          264         // fmt.Printf("\n\nAvailable = %v\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\nMaxSize = %d\nAdjustmentFactor=%f\n\n", helpers.FormatByteCount(s.availableMemory), helpers.FormatByteCount(m.Alloc), helpers.FormatByteCount(m.TotalAlloc), helpers.FormatByteCount(m.Sys), m.NumGC, c.stats.currentMaxSize, s.adjustmentFactor)
          265 
          266         if s.availableMemory >= s.memstatsCurrent.Alloc {
          267                 if s.adjustmentFactor <= 1.0 {
          268                         s.adjustmentFactor += 0.2
          269                 }
          270         } else {
          271                 // We're low on memory.
          272                 s.adjustmentFactor -= 0.4
          273         }
          274 
          275         if s.adjustmentFactor <= 0 {
          276                 s.adjustmentFactor = 0.05
          277         }
          278 
          279         if !s.adjustCurrentMaxSize() {
          280                 return
          281         }
          282 
          283         totalWeight := 0
          284         for _, pm := range c.partitions {
          285                 totalWeight += pm.getOptions().Weight
          286         }
          287 
          288         maxSizePerPartition := calculateMaxSizePerPartition(c.stats.currentMaxSize, totalWeight, len(c.partitions))
          289 
          290         evicted := 0
          291         for _, p := range c.partitions {
          292                 evicted += p.adjustMaxSize(p.getOptions().CalculateMaxSize(maxSizePerPartition))
          293         }
          294 
          295         if evicted > 0 {
          296                 c.infol.
          297                         WithFields(
          298                                 logg.Fields{
          299                                         {Name: "evicted", Value: evicted},
          300                                         {Name: "numGC", Value: m.NumGC},
          301                                         {Name: "limit", Value: helpers.FormatByteCount(c.stats.availableMemory)},
          302                                         {Name: "alloc", Value: helpers.FormatByteCount(m.Alloc)},
          303                                         {Name: "totalAlloc", Value: helpers.FormatByteCount(m.TotalAlloc)},
          304                                 },
          305                         ).Logf("adjusted partitions' max size")
          306         }
          307 }
          308 
          309 func (c *Cache) start() func() {
          310         ticker := time.NewTicker(c.opts.CheckInterval)
          311         quit := make(chan struct{})
          312 
          313         go func() {
          314                 for {
          315                         select {
          316                         case <-ticker.C:
          317                                 c.adjustCurrentMaxSize()
          318                                 // Reset the ticker to avoid drift.
          319                                 ticker.Reset(c.opts.CheckInterval)
          320                         case <-quit:
          321                                 ticker.Stop()
          322                                 return
          323                         }
          324                 }
          325         }()
          326 
          327         return func() {
          328                 close(quit)
          329         }
          330 }
          331 
          332 var partitionNameRe = regexp.MustCompile(`^\/[a-zA-Z0-9]{4}(\/[a-zA-Z0-9]+)?(\/[a-zA-Z0-9]+)?`)
          333 
          334 // GetOrCreatePartition gets or creates a partition with the given name.
          335 func GetOrCreatePartition[K comparable, V any](c *Cache, name string, opts OptionsPartition) *Partition[K, V] {
          336         if c == nil {
          337                 panic("nil Cache")
          338         }
          339         if opts.Weight < 1 || opts.Weight > 100 {
          340                 panic("invalid Weight, must be between 1 and 100")
          341         }
          342 
          343         if partitionNameRe.FindString(name) != name {
          344                 panic(fmt.Sprintf("invalid partition name %q", name))
          345         }
          346 
          347         c.mu.RLock()
          348         p, found := c.partitions[name]
          349         c.mu.RUnlock()
          350         if found {
          351                 return p.(*Partition[K, V])
          352         }
          353 
          354         c.mu.Lock()
          355         defer c.mu.Unlock()
          356 
          357         // Double check.
          358         p, found = c.partitions[name]
          359         if found {
          360                 return p.(*Partition[K, V])
          361         }
          362 
          363         // At this point, we don't know the number of partitions or their configuration, but
          364         // this will be re-adjusted later.
          365         const numberOfPartitionsEstimate = 10
          366         maxSize := opts.CalculateMaxSize(c.opts.MaxSize / numberOfPartitionsEstimate)
          367 
          368         onEvict := func(k K, v V) {
          369                 c.onEvict(k, v)
          370         }
          371 
          372         // Create a new partition and cache it.
          373         partition := &Partition[K, V]{
          374                 c:       lazycache.New(lazycache.Options[K, V]{MaxEntries: maxSize, OnEvict: onEvict}),
          375                 maxSize: maxSize,
          376                 trace:   c.opts.Log.Logger().WithLevel(logg.LevelTrace).WithField("partition", name),
          377                 opts:    opts,
          378         }
          379 
          380         c.partitions[name] = partition
          381 
          382         return partition
          383 }
          384 
          385 // Partition is a partition in the cache.
          386 type Partition[K comparable, V any] struct {
          387         c *lazycache.Cache[K, V]
          388 
          389         zero V
          390 
          391         trace logg.LevelLogger
          392         opts  OptionsPartition
          393 
          394         maxSize int
          395 }
          396 
          397 // GetOrCreate gets or creates a value for the given key.
          398 func (p *Partition[K, V]) GetOrCreate(key K, create func(key K) (V, error)) (V, error) {
          399         v, err := p.doGetOrCreate(key, create)
          400         if err != nil {
          401                 return p.zero, err
          402         }
          403         if resource.StaleVersion(v) > 0 {
          404                 p.c.Delete(key)
          405                 return p.doGetOrCreate(key, create)
          406         }
          407         return v, err
          408 }
          409 
          410 func (p *Partition[K, V]) doGetOrCreate(key K, create func(key K) (V, error)) (V, error) {
          411         v, _, err := p.c.GetOrCreate(key, create)
          412         return v, err
          413 }
          414 
          415 func (p *Partition[K, V]) GetOrCreateWitTimeout(key K, duration time.Duration, create func(key K) (V, error)) (V, error) {
          416         v, err := p.doGetOrCreateWitTimeout(key, duration, create)
          417         if err != nil {
          418                 return p.zero, err
          419         }
          420         if resource.StaleVersion(v) > 0 {
          421                 p.c.Delete(key)
          422                 return p.doGetOrCreateWitTimeout(key, duration, create)
          423         }
          424         return v, err
          425 }
          426 
          427 // GetOrCreateWitTimeout gets or creates a value for the given key and times out if the create function
          428 // takes too long.
          429 func (p *Partition[K, V]) doGetOrCreateWitTimeout(key K, duration time.Duration, create func(key K) (V, error)) (V, error) {
          430         resultch := make(chan V, 1)
          431         errch := make(chan error, 1)
          432 
          433         go func() {
          434                 var (
          435                         v   V
          436                         err error
          437                 )
          438                 defer func() {
          439                         if r := recover(); r != nil {
          440                                 if rerr, ok := r.(error); ok {
          441                                         err = rerr
          442                                 } else {
          443                                         err = fmt.Errorf("panic: %v", r)
          444                                 }
          445                         }
          446                         if err != nil {
          447                                 errch <- err
          448                         } else {
          449                                 resultch <- v
          450                         }
          451                 }()
          452                 v, _, err = p.c.GetOrCreate(key, create)
          453         }()
          454 
          455         select {
          456         case v := <-resultch:
          457                 return v, nil
          458         case err := <-errch:
          459                 return p.zero, err
          460         case <-time.After(duration):
          461                 return p.zero, &herrors.TimeoutError{
          462                         Duration: duration,
          463                 }
          464         }
          465 }
          466 
          467 func (p *Partition[K, V]) clearMatching(predicate func(k, v any) bool) {
          468         p.c.DeleteFunc(func(key K, v V) bool {
          469                 if predicate(key, v) {
          470                         p.trace.Log(
          471                                 logg.StringFunc(
          472                                         func() string {
          473                                                 return fmt.Sprintf("clearing cache key %v", key)
          474                                         },
          475                                 ),
          476                         )
          477                         return true
          478                 }
          479                 return false
          480         })
          481 }
          482 
          483 func (p *Partition[K, V]) clearOnRebuild(predicate func(k, v any) bool, changeset ...identity.Identity) {
          484         if predicate == nil {
          485                 predicate = func(k, v any) bool {
          486                         return false
          487                 }
          488         }
          489         opts := p.getOptions()
          490         if opts.ClearWhen == ClearNever {
          491                 return
          492         }
          493 
          494         if opts.ClearWhen == ClearOnRebuild {
          495                 // Clear all.
          496                 p.Clear()
          497                 return
          498         }
          499 
          500         depsFinder := identity.NewFinder(identity.FinderConfig{})
          501 
          502         shouldDelete := func(key K, v V) bool {
          503                 // We always clear elements marked as stale.
          504                 if resource.StaleVersion(v) > 0 {
          505                         return true
          506                 }
          507 
          508                 // Now check if this entry has changed based on the changeset
          509                 // based on filesystem events.
          510                 if len(changeset) == 0 {
          511                         // Nothing changed.
          512                         return false
          513                 }
          514 
          515                 var probablyDependent bool
          516                 identity.WalkIdentitiesShallow(v, func(level int, id2 identity.Identity) bool {
          517                         for _, id := range changeset {
          518                                 if r := depsFinder.Contains(id, id2, -1); r > 0 {
          519                                         // It's probably dependent, evict from cache.
          520                                         probablyDependent = true
          521                                         return true
          522                                 }
          523                         }
          524                         return false
          525                 })
          526 
          527                 return probablyDependent
          528         }
          529 
          530         // First pass.
          531         // Second pass needs to be done in a separate loop to catch any
          532         // elements marked as stale in the other partitions.
          533         p.c.DeleteFunc(func(key K, v V) bool {
          534                 if predicate(key, v) || shouldDelete(key, v) {
          535                         p.trace.Log(
          536                                 logg.StringFunc(
          537                                         func() string {
          538                                                 return fmt.Sprintf("first pass: clearing cache key %v", key)
          539                                         },
          540                                 ),
          541                         )
          542                         return true
          543                 }
          544                 return false
          545         })
          546 }
          547 
          548 func (p *Partition[K, V]) Keys() []K {
          549         var keys []K
          550         p.c.DeleteFunc(func(key K, v V) bool {
          551                 keys = append(keys, key)
          552                 return false
          553         })
          554         return keys
          555 }
          556 
          557 func (p *Partition[K, V]) clearStale() {
          558         p.c.DeleteFunc(func(key K, v V) bool {
          559                 staleVersion := resource.StaleVersion(v)
          560                 if staleVersion > 0 {
          561                         p.trace.Log(
          562                                 logg.StringFunc(
          563                                         func() string {
          564                                                 return fmt.Sprintf("second pass: clearing cache key %v", key)
          565                                         },
          566                                 ),
          567                         )
          568                 }
          569 
          570                 return staleVersion > 0
          571         })
          572 }
          573 
          574 // adjustMaxSize adjusts the max size of the and returns the number of items evicted.
          575 func (p *Partition[K, V]) adjustMaxSize(newMaxSize int) int {
          576         if newMaxSize < minMaxSize {
          577                 newMaxSize = minMaxSize
          578         }
          579         oldMaxSize := p.maxSize
          580         if newMaxSize == oldMaxSize {
          581                 return 0
          582         }
          583         p.maxSize = newMaxSize
          584         // fmt.Println("Adjusting max size of partition from", oldMaxSize, "to", newMaxSize)
          585         return p.c.Resize(newMaxSize)
          586 }
          587 
          588 func (p *Partition[K, V]) getMaxSize() int {
          589         return p.maxSize
          590 }
          591 
          592 func (p *Partition[K, V]) getOptions() OptionsPartition {
          593         return p.opts
          594 }
          595 
          596 func (p *Partition[K, V]) Clear() {
          597         p.c.DeleteFunc(func(key K, v V) bool {
          598                 return true
          599         })
          600 }
          601 
          602 func (p *Partition[K, V]) Get(ctx context.Context, key K) (V, bool) {
          603         return p.c.Get(key)
          604 }
          605 
          606 type PartitionManager interface {
          607         adjustMaxSize(addend int) int
          608         getMaxSize() int
          609         getOptions() OptionsPartition
          610         clearOnRebuild(predicate func(k, v any) bool, changeset ...identity.Identity)
          611         clearMatching(predicate func(k, v any) bool)
          612         clearStale()
          613 }
          614 
          615 const (
          616         ClearOnRebuild ClearWhen = iota + 1
          617         ClearOnChange
          618         ClearNever
          619 )
          620 
          621 type ClearWhen int
          622 
          623 type stats struct {
          624         opts            Options
          625         memstatsCurrent runtime.MemStats
          626         currentMaxSize  int
          627         availableMemory uint64
          628 
          629         adjustmentFactor float64
          630 }
          631 
          632 func (s *stats) adjustCurrentMaxSize() bool {
          633         newCurrentMaxSize := int(math.Floor(float64(s.opts.MaxSize) * s.adjustmentFactor))
          634 
          635         if newCurrentMaxSize < s.opts.MinMaxSize {
          636                 newCurrentMaxSize = int(s.opts.MinMaxSize)
          637         }
          638         changed := newCurrentMaxSize != s.currentMaxSize
          639         s.currentMaxSize = newCurrentMaxSize
          640         return changed
          641 }
          642 
          643 // CleanKey turns s into a format suitable for a cache key for this package.
          644 // The key will be a Unix-styled path with a leading slash but no trailing slash.
          645 func CleanKey(s string) string {
          646         return path.Clean(paths.ToSlashPreserveLeading(s))
          647 }