rungroup.go - hugo - [fork] hugo port for 9front
 (HTM) git clone https://git.drkhsh.at/hugo.git
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) Submodules
 (DIR) README
 (DIR) LICENSE
       ---
       rungroup.go (2049B)
       ---
            1 // Copyright 2024 The Hugo Authors. All rights reserved.
            2 //
            3 // Licensed under the Apache License, Version 2.0 (the "License");
            4 // you may not use this file except in compliance with the License.
            5 // You may obtain a copy of the License at
            6 // http://www.apache.org/licenses/LICENSE-2.0
            7 //
            8 // Unless required by applicable law or agreed to in writing, software
            9 // distributed under the License is distributed on an "AS IS" BASIS,
           10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
           11 // See the License for the specific language governing permissions and
           12 // limitations under the License.
           13 
           14 package rungroup
           15 
           16 import (
           17         "context"
           18 
           19         "golang.org/x/sync/errgroup"
           20 )
           21 
           22 // Group is a group of workers that can be used to enqueue work and wait for
           23 // them to finish.
           24 type Group[T any] interface {
           25         Enqueue(T) error
           26         Wait() error
           27 }
           28 
           29 type runGroup[T any] struct {
           30         ctx context.Context
           31         g   *errgroup.Group
           32         ch  chan T
           33 }
           34 
           35 // Config is the configuration for a new Group.
           36 type Config[T any] struct {
           37         NumWorkers int
           38         Handle     func(context.Context, T) error
           39 }
           40 
           41 // Run creates a new Group with the given configuration.
           42 func Run[T any](ctx context.Context, cfg Config[T]) Group[T] {
           43         if cfg.NumWorkers <= 0 {
           44                 cfg.NumWorkers = 1
           45         }
           46         if cfg.Handle == nil {
           47                 panic("Handle must be set")
           48         }
           49 
           50         g, ctx := errgroup.WithContext(ctx)
           51         // Buffered for performance.
           52         ch := make(chan T, cfg.NumWorkers)
           53 
           54         for range cfg.NumWorkers {
           55                 g.Go(func() error {
           56                         for {
           57                                 select {
           58                                 case <-ctx.Done():
           59                                         return nil
           60                                 case v, ok := <-ch:
           61                                         if !ok {
           62                                                 return nil
           63                                         }
           64                                         if err := cfg.Handle(ctx, v); err != nil {
           65                                                 return err
           66                                         }
           67                                 }
           68                         }
           69                 })
           70         }
           71 
           72         return &runGroup[T]{
           73                 ctx: ctx,
           74                 g:   g,
           75                 ch:  ch,
           76         }
           77 }
           78 
           79 // Enqueue enqueues a new item to be handled by the workers.
           80 func (r *runGroup[T]) Enqueue(t T) error {
           81         select {
           82         case <-r.ctx.Done():
           83                 return nil
           84         case r.ch <- t:
           85         }
           86         return nil
           87 }
           88 
           89 // Wait waits for all workers to finish and returns the first error.
           90 func (r *runGroup[T]) Wait() error {
           91         close(r.ch)
           92         return r.g.Wait()
           93 }