Files

116 lines
3.4 KiB
Go
Raw Permalink Normal View History

// Copyright 2019 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package queue
import (
"context"
2024-11-14 13:28:46 -06:00
"errors"
2025-06-18 03:48:09 +02:00
"maps"
"sync"
"time"
"code.gitea.io/gitea/modules/log"
2023-05-08 19:49:59 +08:00
"code.gitea.io/gitea/modules/setting"
)
2023-05-08 19:49:59 +08:00
// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
type Manager struct {
2023-05-08 19:49:59 +08:00
mu sync.Mutex
2023-05-08 19:49:59 +08:00
qidCounter int64
Queues map[int64]ManagedWorkerPoolQueue
}
2023-05-08 19:49:59 +08:00
type ManagedWorkerPoolQueue interface {
GetName() string
GetType() string
GetItemTypeName() string
GetWorkerNumber() int
GetWorkerActiveNumber() int
GetWorkerMaxNumber() int
SetWorkerMaxNumber(num int)
GetQueueItemNumber() int
2023-05-08 19:49:59 +08:00
// FlushWithContext tries to make the handler process all items in the queue synchronously.
// It is for testing purpose only. It's not designed to be used in a cluster.
2024-11-14 13:28:46 -06:00
// Negative timeout means discarding all items in the queue.
2023-05-08 19:49:59 +08:00
FlushWithContext(ctx context.Context, timeout time.Duration) error
// RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
RemoveAllItems(ctx context.Context) error
2022-01-22 21:22:14 +00:00
}
2023-05-08 19:49:59 +08:00
var manager *Manager
func init() {
2023-05-08 19:49:59 +08:00
manager = &Manager{
Queues: make(map[int64]ManagedWorkerPoolQueue),
}
}
func GetManager() *Manager {
return manager
}
2023-05-08 19:49:59 +08:00
func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
m.mu.Lock()
defer m.mu.Unlock()
m.qidCounter++
m.Queues[m.qidCounter] = managed
}
2023-05-08 19:49:59 +08:00
func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
m.mu.Lock()
defer m.mu.Unlock()
return m.Queues[qid]
}
2023-05-08 19:49:59 +08:00
func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
m.mu.Lock()
defer m.mu.Unlock()
2023-05-08 19:49:59 +08:00
queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
2025-06-18 03:48:09 +02:00
maps.Copy(queues, m.Queues)
2023-05-08 19:49:59 +08:00
return queues
}
2023-05-08 19:49:59 +08:00
// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
// It is for testing purpose only. It's not designed to be used in a cluster.
2024-11-14 13:28:46 -06:00
// Negative timeout means discarding all items in the queue.
2023-05-08 19:49:59 +08:00
func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
2024-11-14 13:28:46 -06:00
var finalErrors []error
2023-05-08 19:49:59 +08:00
qs := m.ManagedQueues()
for _, q := range qs {
if err := q.FlushWithContext(ctx, timeout); err != nil {
2024-11-14 13:28:46 -06:00
finalErrors = append(finalErrors, err)
2023-05-08 19:49:59 +08:00
}
}
2024-11-14 13:28:46 -06:00
return errors.Join(finalErrors...)
}
2023-05-08 19:49:59 +08:00
// CreateSimpleQueue creates a simple queue from global setting config provider by name
2023-05-26 15:31:55 +08:00
func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
}
2023-05-08 19:49:59 +08:00
// CreateUniqueQueue creates a unique queue from global setting config provider by name
2023-05-26 15:31:55 +08:00
func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
}
2023-05-26 15:31:55 +08:00
func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
2023-05-08 19:49:59 +08:00
queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
if err != nil {
log.Error("Failed to get queue settings for %q: %v", name, err)
return nil
}
2023-05-26 15:31:55 +08:00
w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
2023-05-08 19:49:59 +08:00
if err != nil {
log.Error("Failed to create queue %q: %v", name, err)
return nil
}
2023-05-08 19:49:59 +08:00
GetManager().AddManagedQueue(w)
return w
}