rungroup.go - hugo - [fork] hugo port for 9front
(HTM) git clone https://git.drkhsh.at/hugo.git
(DIR) Log
(DIR) Files
(DIR) Refs
(DIR) Submodules
(DIR) README
(DIR) LICENSE
---
rungroup.go (2049B)
---
1 // Copyright 2024 The Hugo Authors. All rights reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 package rungroup
15
16 import (
17 "context"
18
19 "golang.org/x/sync/errgroup"
20 )
21
22 // Group is a group of workers that can be used to enqueue work and wait for
23 // them to finish.
24 type Group[T any] interface {
25 Enqueue(T) error
26 Wait() error
27 }
28
29 type runGroup[T any] struct {
30 ctx context.Context
31 g *errgroup.Group
32 ch chan T
33 }
34
35 // Config is the configuration for a new Group.
36 type Config[T any] struct {
37 NumWorkers int
38 Handle func(context.Context, T) error
39 }
40
41 // Run creates a new Group with the given configuration.
42 func Run[T any](ctx context.Context, cfg Config[T]) Group[T] {
43 if cfg.NumWorkers <= 0 {
44 cfg.NumWorkers = 1
45 }
46 if cfg.Handle == nil {
47 panic("Handle must be set")
48 }
49
50 g, ctx := errgroup.WithContext(ctx)
51 // Buffered for performance.
52 ch := make(chan T, cfg.NumWorkers)
53
54 for range cfg.NumWorkers {
55 g.Go(func() error {
56 for {
57 select {
58 case <-ctx.Done():
59 return nil
60 case v, ok := <-ch:
61 if !ok {
62 return nil
63 }
64 if err := cfg.Handle(ctx, v); err != nil {
65 return err
66 }
67 }
68 }
69 })
70 }
71
72 return &runGroup[T]{
73 ctx: ctx,
74 g: g,
75 ch: ch,
76 }
77 }
78
79 // Enqueue enqueues a new item to be handled by the workers.
80 func (r *runGroup[T]) Enqueue(t T) error {
81 select {
82 case <-r.ctx.Done():
83 return nil
84 case r.ch <- t:
85 }
86 return nil
87 }
88
89 // Wait waits for all workers to finish and returns the first error.
90 func (r *runGroup[T]) Wait() error {
91 close(r.ch)
92 return r.g.Wait()
93 }