⚡ 并发处理组件
Bubble 并发处理组件提供了高效的多任务并行执行能力,支持工作池、流水线处理和实时进度监控。
🎯 组件概览
核心功能
- 工作池模式 - 控制并发数量,避免资源过载
- 流水线处理 - 多阶段数据处理管道
- 实时监控 - 任务执行状态和性能指标
- 错误处理 - 失败重试和错误恢复机制
- 动态扩缩容 - 根据负载自动调整工作者数量
📝 工作池实现
基础工作池
go
// pkg/bubble/concurrent/worker_pool.go
package concurrent
import (
"context"
"fmt"
"sync"
"time"
)
// 任务接口
type Task interface {
ID() string
Execute(ctx context.Context) error
Priority() int
Timeout() time.Duration
}
// 任务结果
type TaskResult struct {
TaskID string
Success bool
Error error
Duration time.Duration
StartTime time.Time
EndTime time.Time
}
// 工作池配置
type WorkerPoolConfig struct {
MinWorkers int // 最小工作者数量
MaxWorkers int // 最大工作者数量
QueueSize int // 任务队列大小
IdleTimeout time.Duration // 空闲超时时间
ScaleInterval time.Duration // 扩缩容检查间隔
}
// 默认配置
func DefaultWorkerPoolConfig() WorkerPoolConfig {
return WorkerPoolConfig{
MinWorkers: 2,
MaxWorkers: 10,
QueueSize: 1000,
IdleTimeout: 30 * time.Second,
ScaleInterval: 10 * time.Second,
}
}
// 工作池
type WorkerPool struct {
config WorkerPoolConfig
taskQueue chan Task
resultQueue chan TaskResult
workers []*Worker
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
// 监控指标
metrics *PoolMetrics
}
// 监控指标
type PoolMetrics struct {
mu sync.RWMutex
TasksSubmitted int64
TasksCompleted int64
TasksFailed int64
ActiveWorkers int
QueueLength int
AverageWaitTime time.Duration
AverageExecTime time.Duration
}
// 创建工作池
func NewWorkerPool(config WorkerPoolConfig) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &WorkerPool{
config: config,
taskQueue: make(chan Task, config.QueueSize),
resultQueue: make(chan TaskResult, config.QueueSize),
workers: make([]*Worker, 0, config.MaxWorkers),
ctx: ctx,
cancel: cancel,
metrics: &PoolMetrics{},
}
return pool
}
// 启动工作池
func (p *WorkerPool) Start() error {
// 创建最小数量的工作者
for i := 0; i < p.config.MinWorkers; i++ {
if err := p.addWorker(); err != nil {
return fmt.Errorf("failed to create initial worker: %w", err)
}
}
// 启动监控和扩缩容
p.wg.Add(2)
go p.monitor()
go p.autoScale()
return nil
}
// 提交任务
func (p *WorkerPool) Submit(task Task) error {
select {
case p.taskQueue <- task:
p.metrics.mu.Lock()
p.metrics.TasksSubmitted++
p.metrics.QueueLength = len(p.taskQueue)
p.metrics.mu.Unlock()
return nil
case <-p.ctx.Done():
return p.ctx.Err()
default:
return fmt.Errorf("task queue is full")
}
}
// 获取结果通道
func (p *WorkerPool) Results() <-chan TaskResult {
return p.resultQueue
}
// 获取指标
func (p *WorkerPool) Metrics() PoolMetrics {
p.metrics.mu.RLock()
defer p.metrics.mu.RUnlock()
return *p.metrics
}
// 停止工作池
func (p *WorkerPool) Stop() {
p.cancel()
close(p.taskQueue)
p.wg.Wait()
close(p.resultQueue)
}
// 添加工作者
func (p *WorkerPool) addWorker() error {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.workers) >= p.config.MaxWorkers {
return fmt.Errorf("max workers limit reached")
}
worker := NewWorker(len(p.workers), p.taskQueue, p.resultQueue, p.ctx)
p.workers = append(p.workers, worker)
p.wg.Add(1)
go func() {
defer p.wg.Done()
worker.Run()
}()
p.metrics.mu.Lock()
p.metrics.ActiveWorkers = len(p.workers)
p.metrics.mu.Unlock()
return nil
}
// 监控任务执行
func (p *WorkerPool) monitor() {
defer p.wg.Done()
for {
select {
case result := <-p.resultQueue:
p.updateMetrics(result)
case <-p.ctx.Done():
return
}
}
}
// 更新指标
func (p *WorkerPool) updateMetrics(result TaskResult) {
p.metrics.mu.Lock()
defer p.metrics.mu.Unlock()
if result.Success {
p.metrics.TasksCompleted++
} else {
p.metrics.TasksFailed++
}
// 更新平均执行时间
totalTasks := p.metrics.TasksCompleted + p.metrics.TasksFailed
if totalTasks > 0 {
p.metrics.AverageExecTime = time.Duration(
(int64(p.metrics.AverageExecTime)*totalTasks + int64(result.Duration)) / totalTasks,
)
}
p.metrics.QueueLength = len(p.taskQueue)
}
// 自动扩缩容
func (p *WorkerPool) autoScale() {
defer p.wg.Done()
ticker := time.NewTicker(p.config.ScaleInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.scaleWorkers()
case <-p.ctx.Done():
return
}
}
}
// 扩缩容逻辑
func (p *WorkerPool) scaleWorkers() {
p.mu.RLock()
currentWorkers := len(p.workers)
queueLen := len(p.taskQueue)
p.mu.RUnlock()
// 扩容条件:队列长度大于工作者数量的2倍
if queueLen > currentWorkers*2 && currentWorkers < p.config.MaxWorkers {
p.addWorker()
return
}
// 缩容条件:队列为空且工作者数量大于最小值
if queueLen == 0 && currentWorkers > p.config.MinWorkers {
// 发送停止信号给一个工作者
// 这里简化实现,实际可能需要更复杂的逻辑
}
}
// 工作者
type Worker struct {
id int
taskQueue <-chan Task
resultQueue chan<- TaskResult
ctx context.Context
}
func NewWorker(id int, taskQueue <-chan Task, resultQueue chan<- TaskResult, ctx context.Context) *Worker {
return &Worker{
id: id,
taskQueue: taskQueue,
resultQueue: resultQueue,
ctx: ctx,
}
}
func (w *Worker) Run() {
for {
select {
case task, ok := <-w.taskQueue:
if !ok {
return // 任务队列已关闭
}
result := w.executeTask(task)
select {
case w.resultQueue <- result:
case <-w.ctx.Done():
return
}
case <-w.ctx.Done():
return
}
}
}
func (w *Worker) executeTask(task Task) TaskResult {
startTime := time.Now()
// 创建带超时的上下文
timeout := task.Timeout()
if timeout == 0 {
timeout = 30 * time.Second // 默认超时
}
ctx, cancel := context.WithTimeout(w.ctx, timeout)
defer cancel()
// 执行任务
err := task.Execute(ctx)
endTime := time.Now()
duration := endTime.Sub(startTime)
return TaskResult{
TaskID: task.ID(),
Success: err == nil,
Error: err,
Duration: duration,
StartTime: startTime,
EndTime: endTime,
}
}优先级任务队列
go
// pkg/bubble/concurrent/priority_queue.go
package concurrent
import (
"container/heap"
"sync"
"time"
)
// 优先级任务包装
type PriorityTask struct {
Task Task
Priority int
SubmitTime time.Time
Index int // 在堆中的索引
}
// 优先级队列
type PriorityQueue struct {
items []*PriorityTask
mu sync.RWMutex
}
func (pq *PriorityQueue) Len() int {
return len(pq.items)
}
func (pq *PriorityQueue) Less(i, j int) bool {
// 优先级高的排在前面,如果优先级相同则按提交时间排序
if pq.items[i].Priority == pq.items[j].Priority {
return pq.items[i].SubmitTime.Before(pq.items[j].SubmitTime)
}
return pq.items[i].Priority > pq.items[j].Priority
}
func (pq *PriorityQueue) Swap(i, j int) {
pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
pq.items[i].Index = i
pq.items[j].Index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
item := x.(*PriorityTask)
item.Index = len(pq.items)
pq.items = append(pq.items, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := pq.items
n := len(old)
item := old[n-1]
old[n-1] = nil
item.Index = -1
pq.items = old[0 : n-1]
return item
}
// 优先级工作池
type PriorityWorkerPool struct {
config WorkerPoolConfig
priorityQueue *PriorityQueue
resultQueue chan TaskResult
workers []*Worker
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.RWMutex
// 任务分发信号
taskSignal chan struct{}
}
func NewPriorityWorkerPool(config WorkerPoolConfig) *PriorityWorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &PriorityWorkerPool{
config: config,
priorityQueue: &PriorityQueue{},
resultQueue: make(chan TaskResult, config.QueueSize),
workers: make([]*Worker, 0, config.MaxWorkers),
ctx: ctx,
cancel: cancel,
taskSignal: make(chan struct{}, config.QueueSize),
}
}
func (p *PriorityWorkerPool) Start() error {
heap.Init(p.priorityQueue)
// 创建工作者
for i := 0; i < p.config.MinWorkers; i++ {
if err := p.addPriorityWorker(); err != nil {
return err
}
}
return nil
}
func (p *PriorityWorkerPool) Submit(task Task) error {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.priorityQueue.items) >= p.config.QueueSize {
return fmt.Errorf("task queue is full")
}
priorityTask := &PriorityTask{
Task: task,
Priority: task.Priority(),
SubmitTime: time.Now(),
}
heap.Push(p.priorityQueue, priorityTask)
// 通知工作者有新任务
select {
case p.taskSignal <- struct{}{}:
default:
}
return nil
}
func (p *PriorityWorkerPool) getNextTask() Task {
p.mu.Lock()
defer p.mu.Unlock()
if p.priorityQueue.Len() == 0 {
return nil
}
priorityTask := heap.Pop(p.priorityQueue).(*PriorityTask)
return priorityTask.Task
}
func (p *PriorityWorkerPool) addPriorityWorker() error {
worker := &PriorityWorker{
id: len(p.workers),
pool: p,
resultQueue: p.resultQueue,
ctx: p.ctx,
}
p.workers = append(p.workers, (*Worker)(worker))
p.wg.Add(1)
go func() {
defer p.wg.Done()
worker.Run()
}()
return nil
}
// 优先级工作者
type PriorityWorker struct {
id int
pool *PriorityWorkerPool
resultQueue chan<- TaskResult
ctx context.Context
}
func (w *PriorityWorker) Run() {
for {
select {
case <-w.pool.taskSignal:
task := w.pool.getNextTask()
if task != nil {
result := w.executeTask(task)
select {
case w.resultQueue <- result:
case <-w.ctx.Done():
return
}
}
case <-w.ctx.Done():
return
}
}
}
func (w *PriorityWorker) executeTask(task Task) TaskResult {
startTime := time.Now()
timeout := task.Timeout()
if timeout == 0 {
timeout = 30 * time.Second
}
ctx, cancel := context.WithTimeout(w.ctx, timeout)
defer cancel()
err := task.Execute(ctx)
endTime := time.Now()
duration := endTime.Sub(startTime)
return TaskResult{
TaskID: task.ID(),
Success: err == nil,
Error: err,
Duration: duration,
StartTime: startTime,
EndTime: endTime,
}
}🔄 流水线处理
数据处理管道
go
// pkg/bubble/concurrent/pipeline.go
package concurrent
import (
"context"
"sync"
)
// 流水线阶段接口
type Stage interface {
Process(ctx context.Context, input <-chan interface{}) <-chan interface{}
Name() string
}
// 数据处理管道
type Pipeline struct {
stages []Stage
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
func NewPipeline(stages ...Stage) *Pipeline {
ctx, cancel := context.WithCancel(context.Background())
return &Pipeline{
stages: stages,
ctx: ctx,
cancel: cancel,
}
}
func (p *Pipeline) Execute(input <-chan interface{}) <-chan interface{} {
current := input
for _, stage := range p.stages {
current = stage.Process(p.ctx, current)
}
return current
}
func (p *Pipeline) Stop() {
p.cancel()
p.wg.Wait()
}
// 基础阶段实现
type FunctionStage struct {
name string
fn func(interface{}) interface{}
}
func NewFunctionStage(name string, fn func(interface{}) interface{}) *FunctionStage {
return &FunctionStage{
name: name,
fn: fn,
}
}
func (s *FunctionStage) Name() string {
return s.name
}
func (s *FunctionStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
output := make(chan interface{})
go func() {
defer close(output)
for {
select {
case data, ok := <-input:
if !ok {
return
}
result := s.fn(data)
select {
case output <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return output
}
// 并发处理阶段
type ConcurrentStage struct {
name string
fn func(interface{}) interface{}
workers int
}
func NewConcurrentStage(name string, workers int, fn func(interface{}) interface{}) *ConcurrentStage {
return &ConcurrentStage{
name: name,
fn: fn,
workers: workers,
}
}
func (s *ConcurrentStage) Name() string {
return s.name
}
func (s *ConcurrentStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
output := make(chan interface{})
var wg sync.WaitGroup
// 启动多个工作者
for i := 0; i < s.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case data, ok := <-input:
if !ok {
return
}
result := s.fn(data)
select {
case output <- result:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
}
// 等待所有工作者完成后关闭输出通道
go func() {
wg.Wait()
close(output)
}()
return output
}
// 过滤阶段
type FilterStage struct {
name string
predicate func(interface{}) bool
}
func NewFilterStage(name string, predicate func(interface{}) bool) *FilterStage {
return &FilterStage{
name: name,
predicate: predicate,
}
}
func (s *FilterStage) Name() string {
return s.name
}
func (s *FilterStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
output := make(chan interface{})
go func() {
defer close(output)
for {
select {
case data, ok := <-input:
if !ok {
return
}
if s.predicate(data) {
select {
case output <- data:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return output
}📊 实时监控
性能监控组件
go
// pkg/bubble/concurrent/monitor.go
package concurrent
import (
"sync"
"time"
)
// 监控指标
type Metrics struct {
mu sync.RWMutex
TasksProcessed int64
TasksInProgress int64
AverageProcessTime time.Duration
ThroughputPerSecond float64
ErrorRate float64
LastUpdateTime time.Time
}
// 性能监控器
type PerformanceMonitor struct {
metrics *Metrics
sampleWindow time.Duration
samples []MetricSample
mu sync.RWMutex
}
type MetricSample struct {
Timestamp time.Time
TasksCompleted int64
TasksFailed int64
ProcessTime time.Duration
}
func NewPerformanceMonitor() *PerformanceMonitor {
return &PerformanceMonitor{
metrics: &Metrics{},
sampleWindow: time.Minute,
samples: make([]MetricSample, 0, 60), // 60个样本
}
}
func (m *PerformanceMonitor) RecordTaskStart() {
m.metrics.mu.Lock()
defer m.metrics.mu.Unlock()
m.metrics.TasksInProgress++
}
func (m *PerformanceMonitor) RecordTaskComplete(duration time.Duration, success bool) {
m.mu.Lock()
defer m.mu.Unlock()
sample := MetricSample{
Timestamp: time.Now(),
ProcessTime: duration,
}
if success {
sample.TasksCompleted = 1
} else {
sample.TasksFailed = 1
}
m.samples = append(m.samples, sample)
// 清理过期样本
cutoff := time.Now().Add(-m.sampleWindow)
for len(m.samples) > 0 && m.samples[0].Timestamp.Before(cutoff) {
m.samples = m.samples[1:]
}
m.updateMetrics()
}
func (m *PerformanceMonitor) updateMetrics() {
if len(m.samples) == 0 {
return
}
m.metrics.mu.Lock()
defer m.metrics.mu.Unlock()
var totalCompleted, totalFailed int64
var totalTime time.Duration
var taskCount int64
for _, sample := range m.samples {
totalCompleted += sample.TasksCompleted
totalFailed += sample.TasksFailed
if sample.TasksCompleted > 0 || sample.TasksFailed > 0 {
totalTime += sample.ProcessTime
taskCount++
}
}
m.metrics.TasksProcessed = totalCompleted + totalFailed
if taskCount > 0 {
m.metrics.AverageProcessTime = totalTime / time.Duration(taskCount)
}
// 计算吞吐量(每秒处理的任务数)
if len(m.samples) > 1 {
timeSpan := m.samples[len(m.samples)-1].Timestamp.Sub(m.samples[0].Timestamp)
if timeSpan > 0 {
m.metrics.ThroughputPerSecond = float64(totalCompleted+totalFailed) / timeSpan.Seconds()
}
}
// 计算错误率
if totalCompleted+totalFailed > 0 {
m.metrics.ErrorRate = float64(totalFailed) / float64(totalCompleted+totalFailed)
}
m.metrics.LastUpdateTime = time.Now()
}
func (m *PerformanceMonitor) GetMetrics() Metrics {
m.metrics.mu.RLock()
defer m.metrics.mu.RUnlock()
return *m.metrics
}🧪 使用示例
文件处理示例
go
// examples/file_processing.go
package main
import (
"context"
"fmt"
"os"
"path/filepath"
"time"
"lzt/pkg/bubble/concurrent"
)
// 文件处理任务
type FileProcessTask struct {
id string
filePath string
priority int
}
func (t *FileProcessTask) ID() string {
return t.id
}
func (t *FileProcessTask) Execute(ctx context.Context) error {
// 模拟文件处理
select {
case <-time.After(100 * time.Millisecond):
fmt.Printf("处理文件: %s\n", t.filePath)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (t *FileProcessTask) Priority() int {
return t.priority
}
func (t *FileProcessTask) Timeout() time.Duration {
return 5 * time.Second
}
func main() {
// 创建工作池
config := concurrent.DefaultWorkerPoolConfig()
config.MaxWorkers = 5
pool := concurrent.NewWorkerPool(config)
if err := pool.Start(); err != nil {
panic(err)
}
defer pool.Stop()
// 扫描文件并提交任务
err := filepath.Walk("./testdata", func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
task := &FileProcessTask{
id: fmt.Sprintf("file-%s", info.Name()),
filePath: path,
priority: 1,
}
if err := pool.Submit(task); err != nil {
fmt.Printf("提交任务失败: %v\n", err)
}
}
return nil
})
if err != nil {
panic(err)
}
// 监控执行结果
go func() {
for result := range pool.Results() {
if result.Success {
fmt.Printf("任务 %s 完成,耗时: %v\n", result.TaskID, result.Duration)
} else {
fmt.Printf("任务 %s 失败: %v\n", result.TaskID, result.Error)
}
}
}()
// 等待一段时间
time.Sleep(5 * time.Second)
}📚 相关资源
项目文档
外部参考
💡 并发建议: 合理控制并发数量,避免创建过多 goroutine。使用工作池模式可以有效控制资源使用,提高系统稳定性。