|
- // Copyright 2019 The Gitea Authors. All rights reserved.
- // Use of this source code is governed by a MIT-style
- // license that can be found in the LICENSE file.
-
- package queue
-
- import (
- "context"
- "sync"
- "sync/atomic"
- "time"
-
- "code.gitea.io/gitea/modules/log"
- "code.gitea.io/gitea/modules/util"
- )
-
- // WorkerPool represent a dynamically growable worker pool for a
- // provided handler function. They have an internal channel which
- // they use to detect if there is a block and will grow and shrink in
- // response to demand as per configuration.
- type WorkerPool struct {
- lock sync.Mutex
- baseCtx context.Context
- cancel context.CancelFunc
- cond *sync.Cond
- qid int64
- maxNumberOfWorkers int
- numberOfWorkers int
- batchLength int
- handle HandlerFunc
- dataChan chan Data
- blockTimeout time.Duration
- boostTimeout time.Duration
- boostWorkers int
- numInQueue int64
- }
-
- // WorkerPoolConfiguration is the basic configuration for a WorkerPool
- type WorkerPoolConfiguration struct {
- QueueLength int
- BatchLength int
- BlockTimeout time.Duration
- BoostTimeout time.Duration
- BoostWorkers int
- MaxWorkers int
- }
-
- // NewWorkerPool creates a new worker pool
- func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPool {
- ctx, cancel := context.WithCancel(context.Background())
-
- dataChan := make(chan Data, config.QueueLength)
- pool := &WorkerPool{
- baseCtx: ctx,
- cancel: cancel,
- batchLength: config.BatchLength,
- dataChan: dataChan,
- handle: handle,
- blockTimeout: config.BlockTimeout,
- boostTimeout: config.BoostTimeout,
- boostWorkers: config.BoostWorkers,
- maxNumberOfWorkers: config.MaxWorkers,
- }
-
- return pool
- }
-
- // Push pushes the data to the internal channel
- func (p *WorkerPool) Push(data Data) {
- atomic.AddInt64(&p.numInQueue, 1)
- p.lock.Lock()
- if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
- p.lock.Unlock()
- p.pushBoost(data)
- } else {
- p.lock.Unlock()
- p.dataChan <- data
- }
- }
-
- func (p *WorkerPool) pushBoost(data Data) {
- select {
- case p.dataChan <- data:
- default:
- p.lock.Lock()
- if p.blockTimeout <= 0 {
- p.lock.Unlock()
- p.dataChan <- data
- return
- }
- ourTimeout := p.blockTimeout
- timer := time.NewTimer(p.blockTimeout)
- p.lock.Unlock()
- select {
- case p.dataChan <- data:
- util.StopTimer(timer)
- case <-timer.C:
- p.lock.Lock()
- if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
- p.lock.Unlock()
- p.dataChan <- data
- return
- }
- p.blockTimeout *= 2
- ctx, cancel := context.WithCancel(p.baseCtx)
- mq := GetManager().GetManagedQueue(p.qid)
- boost := p.boostWorkers
- if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
- boost = p.maxNumberOfWorkers - p.numberOfWorkers
- }
- if mq != nil {
- log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
-
- start := time.Now()
- pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
- go func() {
- <-ctx.Done()
- mq.RemoveWorkers(pid)
- cancel()
- }()
- } else {
- log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
- }
- go func() {
- <-time.After(p.boostTimeout)
- cancel()
- p.lock.Lock()
- p.blockTimeout /= 2
- p.lock.Unlock()
- }()
- p.lock.Unlock()
- p.addWorkers(ctx, boost)
- p.dataChan <- data
- }
- }
- }
-
- // NumberOfWorkers returns the number of current workers in the pool
- func (p *WorkerPool) NumberOfWorkers() int {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.numberOfWorkers
- }
-
- // MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
- func (p *WorkerPool) MaxNumberOfWorkers() int {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.maxNumberOfWorkers
- }
-
- // BoostWorkers returns the number of workers for a boost
- func (p *WorkerPool) BoostWorkers() int {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.boostWorkers
- }
-
- // BoostTimeout returns the timeout of the next boost
- func (p *WorkerPool) BoostTimeout() time.Duration {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.boostTimeout
- }
-
- // BlockTimeout returns the timeout til the next boost
- func (p *WorkerPool) BlockTimeout() time.Duration {
- p.lock.Lock()
- defer p.lock.Unlock()
- return p.blockTimeout
- }
-
- // SetPoolSettings sets the setable boost values
- func (p *WorkerPool) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.maxNumberOfWorkers = maxNumberOfWorkers
- p.boostWorkers = boostWorkers
- p.boostTimeout = timeout
- }
-
- // SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
- // Changing this number will not change the number of current workers but will change the limit
- // for future additions
- func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
- p.lock.Lock()
- defer p.lock.Unlock()
- p.maxNumberOfWorkers = newMax
- }
-
- func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, isFlusher bool) (context.Context, context.CancelFunc) {
- var ctx context.Context
- var cancel context.CancelFunc
- start := time.Now()
- end := start
- hasTimeout := false
- if timeout > 0 {
- ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
- end = start.Add(timeout)
- hasTimeout = true
- } else {
- ctx, cancel = context.WithCancel(p.baseCtx)
- }
-
- mq := GetManager().GetManagedQueue(p.qid)
- if mq != nil {
- pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher)
- go func() {
- <-ctx.Done()
- mq.RemoveWorkers(pid)
- cancel()
- }()
- log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
- } else {
- log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
-
- }
- return ctx, cancel
- }
-
- // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
- func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
- ctx, cancel := p.commonRegisterWorkers(number, timeout, false)
- p.addWorkers(ctx, number)
- return cancel
- }
-
- // addWorkers adds workers to the pool
- func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
- for i := 0; i < number; i++ {
- p.lock.Lock()
- if p.cond == nil {
- p.cond = sync.NewCond(&p.lock)
- }
- p.numberOfWorkers++
- p.lock.Unlock()
- go func() {
- p.doWork(ctx)
-
- p.lock.Lock()
- p.numberOfWorkers--
- if p.numberOfWorkers == 0 {
- p.cond.Broadcast()
- } else if p.numberOfWorkers < 0 {
- // numberOfWorkers can't go negative but...
- log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
- p.numberOfWorkers = 0
- p.cond.Broadcast()
- }
- p.lock.Unlock()
- }()
- }
- }
-
- // Wait for WorkerPool to finish
- func (p *WorkerPool) Wait() {
- p.lock.Lock()
- defer p.lock.Unlock()
- if p.cond == nil {
- p.cond = sync.NewCond(&p.lock)
- }
- if p.numberOfWorkers <= 0 {
- return
- }
- p.cond.Wait()
- }
-
- // CleanUp will drain the remaining contents of the channel
- // This should be called after AddWorkers context is closed
- func (p *WorkerPool) CleanUp(ctx context.Context) {
- log.Trace("WorkerPool: %d CleanUp", p.qid)
- close(p.dataChan)
- for data := range p.dataChan {
- p.handle(data)
- atomic.AddInt64(&p.numInQueue, -1)
- select {
- case <-ctx.Done():
- log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
- return
- default:
- }
- }
- log.Trace("WorkerPool: %d CleanUp Done", p.qid)
- }
-
- // Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
- func (p *WorkerPool) Flush(timeout time.Duration) error {
- ctx, cancel := p.commonRegisterWorkers(1, timeout, true)
- defer cancel()
- return p.FlushWithContext(ctx)
- }
-
- // IsEmpty returns if true if the worker queue is empty
- func (p *WorkerPool) IsEmpty() bool {
- return atomic.LoadInt64(&p.numInQueue) == 0
- }
-
- // FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
- // NB: The worker will not be registered with the manager.
- func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
- log.Trace("WorkerPool: %d Flush", p.qid)
- for {
- select {
- case data := <-p.dataChan:
- p.handle(data)
- atomic.AddInt64(&p.numInQueue, -1)
- case <-p.baseCtx.Done():
- return p.baseCtx.Err()
- case <-ctx.Done():
- return ctx.Err()
- default:
- return nil
- }
- }
- }
-
- func (p *WorkerPool) doWork(ctx context.Context) {
- delay := time.Millisecond * 300
- var data = make([]Data, 0, p.batchLength)
- for {
- select {
- case <-ctx.Done():
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- case datum, ok := <-p.dataChan:
- if !ok {
- // the dataChan has been closed - we should finish up:
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- }
- data = append(data, datum)
- if len(data) >= p.batchLength {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- data = make([]Data, 0, p.batchLength)
- }
- default:
- timer := time.NewTimer(delay)
- select {
- case <-ctx.Done():
- util.StopTimer(timer)
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- case datum, ok := <-p.dataChan:
- util.StopTimer(timer)
- if !ok {
- // the dataChan has been closed - we should finish up:
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- }
- log.Trace("Worker shutting down")
- return
- }
- data = append(data, datum)
- if len(data) >= p.batchLength {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- data = make([]Data, 0, p.batchLength)
- }
- case <-timer.C:
- delay = time.Millisecond * 100
- if len(data) > 0 {
- log.Trace("Handling: %d data, %v", len(data), data)
- p.handle(data...)
- atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
- data = make([]Data, 0, p.batchLength)
- }
-
- }
- }
- }
- }
|