dynacache.go - hugo - [fork] hugo port for 9front
(HTM) git clone https://git.drkhsh.at/hugo.git
(DIR) Log
(DIR) Files
(DIR) Refs
(DIR) Submodules
(DIR) README
(DIR) LICENSE
---
dynacache.go (15686B)
---
1 // Copyright 2024 The Hugo Authors. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 package dynacache
15
16 import (
17 "context"
18 "fmt"
19 "math"
20 "path"
21 "regexp"
22 "runtime"
23 "sync"
24 "time"
25
26 "github.com/bep/lazycache"
27 "github.com/bep/logg"
28 "github.com/gohugoio/hugo/common/collections"
29 "github.com/gohugoio/hugo/common/herrors"
30 "github.com/gohugoio/hugo/common/loggers"
31 "github.com/gohugoio/hugo/common/paths"
32 "github.com/gohugoio/hugo/common/rungroup"
33 "github.com/gohugoio/hugo/config"
34 "github.com/gohugoio/hugo/helpers"
35 "github.com/gohugoio/hugo/identity"
36 "github.com/gohugoio/hugo/resources/resource"
37 )
38
39 const minMaxSize = 10
40
41 type KeyIdentity struct {
42 Key any
43 Identity identity.Identity
44 }
45
46 // New creates a new cache.
47 func New(opts Options) *Cache {
48 if opts.CheckInterval == 0 {
49 opts.CheckInterval = time.Second * 2
50 }
51
52 if opts.MaxSize == 0 {
53 opts.MaxSize = 100000
54 }
55 if opts.Log == nil {
56 panic("nil Log")
57 }
58
59 if opts.MinMaxSize == 0 {
60 opts.MinMaxSize = 30
61 }
62
63 stats := &stats{
64 opts: opts,
65 adjustmentFactor: 1.0,
66 currentMaxSize: opts.MaxSize,
67 availableMemory: config.GetMemoryLimit(),
68 }
69
70 infol := opts.Log.InfoCommand("dynacache")
71
72 evictedIdentities := collections.NewStack[KeyIdentity]()
73
74 onEvict := func(k, v any) {
75 if !opts.Watching {
76 return
77 }
78 identity.WalkIdentitiesShallow(v, func(level int, id identity.Identity) bool {
79 evictedIdentities.Push(KeyIdentity{Key: k, Identity: id})
80 return false
81 })
82 resource.MarkStale(v)
83 }
84
85 c := &Cache{
86 partitions: make(map[string]PartitionManager),
87 onEvict: onEvict,
88 evictedIdentities: evictedIdentities,
89 opts: opts,
90 stats: stats,
91 infol: infol,
92 }
93
94 c.stop = c.start()
95
96 return c
97 }
98
99 // Options for the cache.
100 type Options struct {
101 Log loggers.Logger
102 CheckInterval time.Duration
103 MaxSize int
104 MinMaxSize int
105 Watching bool
106 }
107
108 // Options for a partition.
109 type OptionsPartition struct {
110 // When to clear the this partition.
111 ClearWhen ClearWhen
112
113 // Weight is a number between 1 and 100 that indicates how, in general, how big this partition may get.
114 Weight int
115 }
116
117 func (o OptionsPartition) WeightFraction() float64 {
118 return float64(o.Weight) / 100
119 }
120
121 func (o OptionsPartition) CalculateMaxSize(maxSizePerPartition int) int {
122 return int(math.Floor(float64(maxSizePerPartition) * o.WeightFraction()))
123 }
124
125 // A dynamic partitioned cache.
126 type Cache struct {
127 mu sync.RWMutex
128
129 partitions map[string]PartitionManager
130
131 onEvict func(k, v any)
132 evictedIdentities *collections.Stack[KeyIdentity]
133
134 opts Options
135 infol logg.LevelLogger
136
137 stats *stats
138 stopOnce sync.Once
139 stop func()
140 }
141
142 // DrainEvictedIdentities drains the evicted identities from the cache.
143 func (c *Cache) DrainEvictedIdentities() []KeyIdentity {
144 return c.evictedIdentities.Drain()
145 }
146
147 // DrainEvictedIdentitiesMatching drains the evicted identities from the cache that match the given predicate.
148 func (c *Cache) DrainEvictedIdentitiesMatching(predicate func(KeyIdentity) bool) []KeyIdentity {
149 return c.evictedIdentities.DrainMatching(predicate)
150 }
151
152 // ClearMatching clears all partition for which the predicate returns true.
153 func (c *Cache) ClearMatching(predicatePartition func(k string, p PartitionManager) bool, predicateValue func(k, v any) bool) {
154 if predicatePartition == nil {
155 predicatePartition = func(k string, p PartitionManager) bool { return true }
156 }
157 if predicateValue == nil {
158 panic("nil predicateValue")
159 }
160 g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
161 NumWorkers: len(c.partitions),
162 Handle: func(ctx context.Context, partition PartitionManager) error {
163 partition.clearMatching(predicateValue)
164 return nil
165 },
166 })
167
168 for k, p := range c.partitions {
169 if !predicatePartition(k, p) {
170 continue
171 }
172 g.Enqueue(p)
173 }
174
175 g.Wait()
176 }
177
178 // ClearOnRebuild prepares the cache for a new rebuild taking the given changeset into account.
179 // predicate is optional and will clear any entry for which it returns true.
180 func (c *Cache) ClearOnRebuild(predicate func(k, v any) bool, changeset ...identity.Identity) {
181 g := rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
182 NumWorkers: len(c.partitions),
183 Handle: func(ctx context.Context, partition PartitionManager) error {
184 partition.clearOnRebuild(predicate, changeset...)
185 return nil
186 },
187 })
188
189 for _, p := range c.partitions {
190 g.Enqueue(p)
191 }
192
193 g.Wait()
194
195 // Clear any entries marked as stale above.
196 g = rungroup.Run[PartitionManager](context.Background(), rungroup.Config[PartitionManager]{
197 NumWorkers: len(c.partitions),
198 Handle: func(ctx context.Context, partition PartitionManager) error {
199 partition.clearStale()
200 return nil
201 },
202 })
203
204 for _, p := range c.partitions {
205 g.Enqueue(p)
206 }
207
208 g.Wait()
209 }
210
211 type keysProvider interface {
212 Keys() []string
213 }
214
215 // Keys returns a list of keys in all partitions.
216 func (c *Cache) Keys(predicate func(s string) bool) []string {
217 if predicate == nil {
218 predicate = func(s string) bool { return true }
219 }
220 var keys []string
221 for pn, g := range c.partitions {
222 pkeys := g.(keysProvider).Keys()
223 for _, k := range pkeys {
224 p := path.Join(pn, k)
225 if predicate(p) {
226 keys = append(keys, p)
227 }
228 }
229
230 }
231 return keys
232 }
233
234 func calculateMaxSizePerPartition(maxItemsTotal, totalWeightQuantity, numPartitions int) int {
235 if numPartitions == 0 {
236 panic("numPartitions must be > 0")
237 }
238 if totalWeightQuantity == 0 {
239 panic("totalWeightQuantity must be > 0")
240 }
241
242 avgWeight := float64(totalWeightQuantity) / float64(numPartitions)
243 return int(math.Floor(float64(maxItemsTotal) / float64(numPartitions) * (100.0 / avgWeight)))
244 }
245
246 // Stop stops the cache.
247 func (c *Cache) Stop() {
248 c.stopOnce.Do(func() {
249 c.stop()
250 })
251 }
252
253 func (c *Cache) adjustCurrentMaxSize() {
254 c.mu.RLock()
255 defer c.mu.RUnlock()
256
257 if len(c.partitions) == 0 {
258 return
259 }
260 var m runtime.MemStats
261 runtime.ReadMemStats(&m)
262 s := c.stats
263 s.memstatsCurrent = m
264 // fmt.Printf("\n\nAvailable = %v\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\nMaxSize = %d\nAdjustmentFactor=%f\n\n", helpers.FormatByteCount(s.availableMemory), helpers.FormatByteCount(m.Alloc), helpers.FormatByteCount(m.TotalAlloc), helpers.FormatByteCount(m.Sys), m.NumGC, c.stats.currentMaxSize, s.adjustmentFactor)
265
266 if s.availableMemory >= s.memstatsCurrent.Alloc {
267 if s.adjustmentFactor <= 1.0 {
268 s.adjustmentFactor += 0.2
269 }
270 } else {
271 // We're low on memory.
272 s.adjustmentFactor -= 0.4
273 }
274
275 if s.adjustmentFactor <= 0 {
276 s.adjustmentFactor = 0.05
277 }
278
279 if !s.adjustCurrentMaxSize() {
280 return
281 }
282
283 totalWeight := 0
284 for _, pm := range c.partitions {
285 totalWeight += pm.getOptions().Weight
286 }
287
288 maxSizePerPartition := calculateMaxSizePerPartition(c.stats.currentMaxSize, totalWeight, len(c.partitions))
289
290 evicted := 0
291 for _, p := range c.partitions {
292 evicted += p.adjustMaxSize(p.getOptions().CalculateMaxSize(maxSizePerPartition))
293 }
294
295 if evicted > 0 {
296 c.infol.
297 WithFields(
298 logg.Fields{
299 {Name: "evicted", Value: evicted},
300 {Name: "numGC", Value: m.NumGC},
301 {Name: "limit", Value: helpers.FormatByteCount(c.stats.availableMemory)},
302 {Name: "alloc", Value: helpers.FormatByteCount(m.Alloc)},
303 {Name: "totalAlloc", Value: helpers.FormatByteCount(m.TotalAlloc)},
304 },
305 ).Logf("adjusted partitions' max size")
306 }
307 }
308
309 func (c *Cache) start() func() {
310 ticker := time.NewTicker(c.opts.CheckInterval)
311 quit := make(chan struct{})
312
313 go func() {
314 for {
315 select {
316 case <-ticker.C:
317 c.adjustCurrentMaxSize()
318 // Reset the ticker to avoid drift.
319 ticker.Reset(c.opts.CheckInterval)
320 case <-quit:
321 ticker.Stop()
322 return
323 }
324 }
325 }()
326
327 return func() {
328 close(quit)
329 }
330 }
331
332 var partitionNameRe = regexp.MustCompile(`^\/[a-zA-Z0-9]{4}(\/[a-zA-Z0-9]+)?(\/[a-zA-Z0-9]+)?`)
333
334 // GetOrCreatePartition gets or creates a partition with the given name.
335 func GetOrCreatePartition[K comparable, V any](c *Cache, name string, opts OptionsPartition) *Partition[K, V] {
336 if c == nil {
337 panic("nil Cache")
338 }
339 if opts.Weight < 1 || opts.Weight > 100 {
340 panic("invalid Weight, must be between 1 and 100")
341 }
342
343 if partitionNameRe.FindString(name) != name {
344 panic(fmt.Sprintf("invalid partition name %q", name))
345 }
346
347 c.mu.RLock()
348 p, found := c.partitions[name]
349 c.mu.RUnlock()
350 if found {
351 return p.(*Partition[K, V])
352 }
353
354 c.mu.Lock()
355 defer c.mu.Unlock()
356
357 // Double check.
358 p, found = c.partitions[name]
359 if found {
360 return p.(*Partition[K, V])
361 }
362
363 // At this point, we don't know the number of partitions or their configuration, but
364 // this will be re-adjusted later.
365 const numberOfPartitionsEstimate = 10
366 maxSize := opts.CalculateMaxSize(c.opts.MaxSize / numberOfPartitionsEstimate)
367
368 onEvict := func(k K, v V) {
369 c.onEvict(k, v)
370 }
371
372 // Create a new partition and cache it.
373 partition := &Partition[K, V]{
374 c: lazycache.New(lazycache.Options[K, V]{MaxEntries: maxSize, OnEvict: onEvict}),
375 maxSize: maxSize,
376 trace: c.opts.Log.Logger().WithLevel(logg.LevelTrace).WithField("partition", name),
377 opts: opts,
378 }
379
380 c.partitions[name] = partition
381
382 return partition
383 }
384
385 // Partition is a partition in the cache.
386 type Partition[K comparable, V any] struct {
387 c *lazycache.Cache[K, V]
388
389 zero V
390
391 trace logg.LevelLogger
392 opts OptionsPartition
393
394 maxSize int
395 }
396
397 // GetOrCreate gets or creates a value for the given key.
398 func (p *Partition[K, V]) GetOrCreate(key K, create func(key K) (V, error)) (V, error) {
399 v, err := p.doGetOrCreate(key, create)
400 if err != nil {
401 return p.zero, err
402 }
403 if resource.StaleVersion(v) > 0 {
404 p.c.Delete(key)
405 return p.doGetOrCreate(key, create)
406 }
407 return v, err
408 }
409
410 func (p *Partition[K, V]) doGetOrCreate(key K, create func(key K) (V, error)) (V, error) {
411 v, _, err := p.c.GetOrCreate(key, create)
412 return v, err
413 }
414
415 func (p *Partition[K, V]) GetOrCreateWitTimeout(key K, duration time.Duration, create func(key K) (V, error)) (V, error) {
416 v, err := p.doGetOrCreateWitTimeout(key, duration, create)
417 if err != nil {
418 return p.zero, err
419 }
420 if resource.StaleVersion(v) > 0 {
421 p.c.Delete(key)
422 return p.doGetOrCreateWitTimeout(key, duration, create)
423 }
424 return v, err
425 }
426
427 // GetOrCreateWitTimeout gets or creates a value for the given key and times out if the create function
428 // takes too long.
429 func (p *Partition[K, V]) doGetOrCreateWitTimeout(key K, duration time.Duration, create func(key K) (V, error)) (V, error) {
430 resultch := make(chan V, 1)
431 errch := make(chan error, 1)
432
433 go func() {
434 var (
435 v V
436 err error
437 )
438 defer func() {
439 if r := recover(); r != nil {
440 if rerr, ok := r.(error); ok {
441 err = rerr
442 } else {
443 err = fmt.Errorf("panic: %v", r)
444 }
445 }
446 if err != nil {
447 errch <- err
448 } else {
449 resultch <- v
450 }
451 }()
452 v, _, err = p.c.GetOrCreate(key, create)
453 }()
454
455 select {
456 case v := <-resultch:
457 return v, nil
458 case err := <-errch:
459 return p.zero, err
460 case <-time.After(duration):
461 return p.zero, &herrors.TimeoutError{
462 Duration: duration,
463 }
464 }
465 }
466
467 func (p *Partition[K, V]) clearMatching(predicate func(k, v any) bool) {
468 p.c.DeleteFunc(func(key K, v V) bool {
469 if predicate(key, v) {
470 p.trace.Log(
471 logg.StringFunc(
472 func() string {
473 return fmt.Sprintf("clearing cache key %v", key)
474 },
475 ),
476 )
477 return true
478 }
479 return false
480 })
481 }
482
483 func (p *Partition[K, V]) clearOnRebuild(predicate func(k, v any) bool, changeset ...identity.Identity) {
484 if predicate == nil {
485 predicate = func(k, v any) bool {
486 return false
487 }
488 }
489 opts := p.getOptions()
490 if opts.ClearWhen == ClearNever {
491 return
492 }
493
494 if opts.ClearWhen == ClearOnRebuild {
495 // Clear all.
496 p.Clear()
497 return
498 }
499
500 depsFinder := identity.NewFinder(identity.FinderConfig{})
501
502 shouldDelete := func(key K, v V) bool {
503 // We always clear elements marked as stale.
504 if resource.StaleVersion(v) > 0 {
505 return true
506 }
507
508 // Now check if this entry has changed based on the changeset
509 // based on filesystem events.
510 if len(changeset) == 0 {
511 // Nothing changed.
512 return false
513 }
514
515 var probablyDependent bool
516 identity.WalkIdentitiesShallow(v, func(level int, id2 identity.Identity) bool {
517 for _, id := range changeset {
518 if r := depsFinder.Contains(id, id2, -1); r > 0 {
519 // It's probably dependent, evict from cache.
520 probablyDependent = true
521 return true
522 }
523 }
524 return false
525 })
526
527 return probablyDependent
528 }
529
530 // First pass.
531 // Second pass needs to be done in a separate loop to catch any
532 // elements marked as stale in the other partitions.
533 p.c.DeleteFunc(func(key K, v V) bool {
534 if predicate(key, v) || shouldDelete(key, v) {
535 p.trace.Log(
536 logg.StringFunc(
537 func() string {
538 return fmt.Sprintf("first pass: clearing cache key %v", key)
539 },
540 ),
541 )
542 return true
543 }
544 return false
545 })
546 }
547
548 func (p *Partition[K, V]) Keys() []K {
549 var keys []K
550 p.c.DeleteFunc(func(key K, v V) bool {
551 keys = append(keys, key)
552 return false
553 })
554 return keys
555 }
556
557 func (p *Partition[K, V]) clearStale() {
558 p.c.DeleteFunc(func(key K, v V) bool {
559 staleVersion := resource.StaleVersion(v)
560 if staleVersion > 0 {
561 p.trace.Log(
562 logg.StringFunc(
563 func() string {
564 return fmt.Sprintf("second pass: clearing cache key %v", key)
565 },
566 ),
567 )
568 }
569
570 return staleVersion > 0
571 })
572 }
573
574 // adjustMaxSize adjusts the max size of the and returns the number of items evicted.
575 func (p *Partition[K, V]) adjustMaxSize(newMaxSize int) int {
576 if newMaxSize < minMaxSize {
577 newMaxSize = minMaxSize
578 }
579 oldMaxSize := p.maxSize
580 if newMaxSize == oldMaxSize {
581 return 0
582 }
583 p.maxSize = newMaxSize
584 // fmt.Println("Adjusting max size of partition from", oldMaxSize, "to", newMaxSize)
585 return p.c.Resize(newMaxSize)
586 }
587
588 func (p *Partition[K, V]) getMaxSize() int {
589 return p.maxSize
590 }
591
592 func (p *Partition[K, V]) getOptions() OptionsPartition {
593 return p.opts
594 }
595
596 func (p *Partition[K, V]) Clear() {
597 p.c.DeleteFunc(func(key K, v V) bool {
598 return true
599 })
600 }
601
602 func (p *Partition[K, V]) Get(ctx context.Context, key K) (V, bool) {
603 return p.c.Get(key)
604 }
605
606 type PartitionManager interface {
607 adjustMaxSize(addend int) int
608 getMaxSize() int
609 getOptions() OptionsPartition
610 clearOnRebuild(predicate func(k, v any) bool, changeset ...identity.Identity)
611 clearMatching(predicate func(k, v any) bool)
612 clearStale()
613 }
614
615 const (
616 ClearOnRebuild ClearWhen = iota + 1
617 ClearOnChange
618 ClearNever
619 )
620
621 type ClearWhen int
622
623 type stats struct {
624 opts Options
625 memstatsCurrent runtime.MemStats
626 currentMaxSize int
627 availableMemory uint64
628
629 adjustmentFactor float64
630 }
631
632 func (s *stats) adjustCurrentMaxSize() bool {
633 newCurrentMaxSize := int(math.Floor(float64(s.opts.MaxSize) * s.adjustmentFactor))
634
635 if newCurrentMaxSize < s.opts.MinMaxSize {
636 newCurrentMaxSize = int(s.opts.MinMaxSize)
637 }
638 changed := newCurrentMaxSize != s.currentMaxSize
639 s.currentMaxSize = newCurrentMaxSize
640 return changed
641 }
642
643 // CleanKey turns s into a format suitable for a cache key for this package.
644 // The key will be a Unix-styled path with a leading slash but no trailing slash.
645 func CleanKey(s string) string {
646 return path.Clean(paths.ToSlashPreserveLeading(s))
647 }