Skip to content

⚡ 并发处理组件

Bubble 并发处理组件提供了高效的多任务并行执行能力,支持工作池、流水线处理和实时进度监控。

🎯 组件概览

核心功能

  • 工作池模式 - 控制并发数量,避免资源过载
  • 流水线处理 - 多阶段数据处理管道
  • 实时监控 - 任务执行状态和性能指标
  • 错误处理 - 失败重试和错误恢复机制
  • 动态扩缩容 - 根据负载自动调整工作者数量

📝 工作池实现

基础工作池

go
// pkg/bubble/concurrent/worker_pool.go
package concurrent

import (
    "context"
    "fmt"
    "sync"
    "time"
)

// 任务接口
type Task interface {
    ID() string
    Execute(ctx context.Context) error
    Priority() int
    Timeout() time.Duration
}

// 任务结果
type TaskResult struct {
    TaskID    string
    Success   bool
    Error     error
    Duration  time.Duration
    StartTime time.Time
    EndTime   time.Time
}

// 工作池配置
type WorkerPoolConfig struct {
    MinWorkers    int           // 最小工作者数量
    MaxWorkers    int           // 最大工作者数量
    QueueSize     int           // 任务队列大小
    IdleTimeout   time.Duration // 空闲超时时间
    ScaleInterval time.Duration // 扩缩容检查间隔
}

// 默认配置
func DefaultWorkerPoolConfig() WorkerPoolConfig {
    return WorkerPoolConfig{
        MinWorkers:    2,
        MaxWorkers:    10,
        QueueSize:     1000,
        IdleTimeout:   30 * time.Second,
        ScaleInterval: 10 * time.Second,
    }
}

// 工作池
type WorkerPool struct {
    config      WorkerPoolConfig
    taskQueue   chan Task
    resultQueue chan TaskResult
    workers     []*Worker
    
    ctx     context.Context
    cancel  context.CancelFunc
    wg      sync.WaitGroup
    mu      sync.RWMutex
    
    // 监控指标
    metrics *PoolMetrics
}

// 监控指标
type PoolMetrics struct {
    mu                sync.RWMutex
    TasksSubmitted    int64
    TasksCompleted    int64
    TasksFailed       int64
    ActiveWorkers     int
    QueueLength       int
    AverageWaitTime   time.Duration
    AverageExecTime   time.Duration
}

// 创建工作池
func NewWorkerPool(config WorkerPoolConfig) *WorkerPool {
    ctx, cancel := context.WithCancel(context.Background())
    
    pool := &WorkerPool{
        config:      config,
        taskQueue:   make(chan Task, config.QueueSize),
        resultQueue: make(chan TaskResult, config.QueueSize),
        workers:     make([]*Worker, 0, config.MaxWorkers),
        ctx:         ctx,
        cancel:      cancel,
        metrics:     &PoolMetrics{},
    }
    
    return pool
}

// 启动工作池
func (p *WorkerPool) Start() error {
    // 创建最小数量的工作者
    for i := 0; i < p.config.MinWorkers; i++ {
        if err := p.addWorker(); err != nil {
            return fmt.Errorf("failed to create initial worker: %w", err)
        }
    }
    
    // 启动监控和扩缩容
    p.wg.Add(2)
    go p.monitor()
    go p.autoScale()
    
    return nil
}

// 提交任务
func (p *WorkerPool) Submit(task Task) error {
    select {
    case p.taskQueue <- task:
        p.metrics.mu.Lock()
        p.metrics.TasksSubmitted++
        p.metrics.QueueLength = len(p.taskQueue)
        p.metrics.mu.Unlock()
        return nil
    case <-p.ctx.Done():
        return p.ctx.Err()
    default:
        return fmt.Errorf("task queue is full")
    }
}

// 获取结果通道
func (p *WorkerPool) Results() <-chan TaskResult {
    return p.resultQueue
}

// 获取指标
func (p *WorkerPool) Metrics() PoolMetrics {
    p.metrics.mu.RLock()
    defer p.metrics.mu.RUnlock()
    
    return *p.metrics
}

// 停止工作池
func (p *WorkerPool) Stop() {
    p.cancel()
    close(p.taskQueue)
    p.wg.Wait()
    close(p.resultQueue)
}

// 添加工作者
func (p *WorkerPool) addWorker() error {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if len(p.workers) >= p.config.MaxWorkers {
        return fmt.Errorf("max workers limit reached")
    }
    
    worker := NewWorker(len(p.workers), p.taskQueue, p.resultQueue, p.ctx)
    p.workers = append(p.workers, worker)
    
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        worker.Run()
    }()
    
    p.metrics.mu.Lock()
    p.metrics.ActiveWorkers = len(p.workers)
    p.metrics.mu.Unlock()
    
    return nil
}

// 监控任务执行
func (p *WorkerPool) monitor() {
    defer p.wg.Done()
    
    for {
        select {
        case result := <-p.resultQueue:
            p.updateMetrics(result)
            
        case <-p.ctx.Done():
            return
        }
    }
}

// 更新指标
func (p *WorkerPool) updateMetrics(result TaskResult) {
    p.metrics.mu.Lock()
    defer p.metrics.mu.Unlock()
    
    if result.Success {
        p.metrics.TasksCompleted++
    } else {
        p.metrics.TasksFailed++
    }
    
    // 更新平均执行时间
    totalTasks := p.metrics.TasksCompleted + p.metrics.TasksFailed
    if totalTasks > 0 {
        p.metrics.AverageExecTime = time.Duration(
            (int64(p.metrics.AverageExecTime)*totalTasks + int64(result.Duration)) / totalTasks,
        )
    }
    
    p.metrics.QueueLength = len(p.taskQueue)
}

// 自动扩缩容
func (p *WorkerPool) autoScale() {
    defer p.wg.Done()
    
    ticker := time.NewTicker(p.config.ScaleInterval)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            p.scaleWorkers()
            
        case <-p.ctx.Done():
            return
        }
    }
}

// 扩缩容逻辑
func (p *WorkerPool) scaleWorkers() {
    p.mu.RLock()
    currentWorkers := len(p.workers)
    queueLen := len(p.taskQueue)
    p.mu.RUnlock()
    
    // 扩容条件:队列长度大于工作者数量的2倍
    if queueLen > currentWorkers*2 && currentWorkers < p.config.MaxWorkers {
        p.addWorker()
        return
    }
    
    // 缩容条件:队列为空且工作者数量大于最小值
    if queueLen == 0 && currentWorkers > p.config.MinWorkers {
        // 发送停止信号给一个工作者
        // 这里简化实现,实际可能需要更复杂的逻辑
    }
}

// 工作者
type Worker struct {
    id          int
    taskQueue   <-chan Task
    resultQueue chan<- TaskResult
    ctx         context.Context
}

func NewWorker(id int, taskQueue <-chan Task, resultQueue chan<- TaskResult, ctx context.Context) *Worker {
    return &Worker{
        id:          id,
        taskQueue:   taskQueue,
        resultQueue: resultQueue,
        ctx:         ctx,
    }
}

func (w *Worker) Run() {
    for {
        select {
        case task, ok := <-w.taskQueue:
            if !ok {
                return // 任务队列已关闭
            }
            
            result := w.executeTask(task)
            
            select {
            case w.resultQueue <- result:
            case <-w.ctx.Done():
                return
            }
            
        case <-w.ctx.Done():
            return
        }
    }
}

func (w *Worker) executeTask(task Task) TaskResult {
    startTime := time.Now()
    
    // 创建带超时的上下文
    timeout := task.Timeout()
    if timeout == 0 {
        timeout = 30 * time.Second // 默认超时
    }
    
    ctx, cancel := context.WithTimeout(w.ctx, timeout)
    defer cancel()
    
    // 执行任务
    err := task.Execute(ctx)
    
    endTime := time.Now()
    duration := endTime.Sub(startTime)
    
    return TaskResult{
        TaskID:    task.ID(),
        Success:   err == nil,
        Error:     err,
        Duration:  duration,
        StartTime: startTime,
        EndTime:   endTime,
    }
}

优先级任务队列

go
// pkg/bubble/concurrent/priority_queue.go
package concurrent

import (
    "container/heap"
    "sync"
    "time"
)

// 优先级任务包装
type PriorityTask struct {
    Task      Task
    Priority  int
    SubmitTime time.Time
    Index     int // 在堆中的索引
}

// 优先级队列
type PriorityQueue struct {
    items []*PriorityTask
    mu    sync.RWMutex
}

func (pq *PriorityQueue) Len() int {
    return len(pq.items)
}

func (pq *PriorityQueue) Less(i, j int) bool {
    // 优先级高的排在前面,如果优先级相同则按提交时间排序
    if pq.items[i].Priority == pq.items[j].Priority {
        return pq.items[i].SubmitTime.Before(pq.items[j].SubmitTime)
    }
    return pq.items[i].Priority > pq.items[j].Priority
}

func (pq *PriorityQueue) Swap(i, j int) {
    pq.items[i], pq.items[j] = pq.items[j], pq.items[i]
    pq.items[i].Index = i
    pq.items[j].Index = j
}

func (pq *PriorityQueue) Push(x interface{}) {
    item := x.(*PriorityTask)
    item.Index = len(pq.items)
    pq.items = append(pq.items, item)
}

func (pq *PriorityQueue) Pop() interface{} {
    old := pq.items
    n := len(old)
    item := old[n-1]
    old[n-1] = nil
    item.Index = -1
    pq.items = old[0 : n-1]
    return item
}

// 优先级工作池
type PriorityWorkerPool struct {
    config      WorkerPoolConfig
    priorityQueue *PriorityQueue
    resultQueue chan TaskResult
    workers     []*Worker
    
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
    mu     sync.RWMutex
    
    // 任务分发信号
    taskSignal chan struct{}
}

func NewPriorityWorkerPool(config WorkerPoolConfig) *PriorityWorkerPool {
    ctx, cancel := context.WithCancel(context.Background())
    
    return &PriorityWorkerPool{
        config:        config,
        priorityQueue: &PriorityQueue{},
        resultQueue:   make(chan TaskResult, config.QueueSize),
        workers:       make([]*Worker, 0, config.MaxWorkers),
        ctx:           ctx,
        cancel:        cancel,
        taskSignal:    make(chan struct{}, config.QueueSize),
    }
}

func (p *PriorityWorkerPool) Start() error {
    heap.Init(p.priorityQueue)
    
    // 创建工作者
    for i := 0; i < p.config.MinWorkers; i++ {
        if err := p.addPriorityWorker(); err != nil {
            return err
        }
    }
    
    return nil
}

func (p *PriorityWorkerPool) Submit(task Task) error {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if len(p.priorityQueue.items) >= p.config.QueueSize {
        return fmt.Errorf("task queue is full")
    }
    
    priorityTask := &PriorityTask{
        Task:       task,
        Priority:   task.Priority(),
        SubmitTime: time.Now(),
    }
    
    heap.Push(p.priorityQueue, priorityTask)
    
    // 通知工作者有新任务
    select {
    case p.taskSignal <- struct{}{}:
    default:
    }
    
    return nil
}

func (p *PriorityWorkerPool) getNextTask() Task {
    p.mu.Lock()
    defer p.mu.Unlock()
    
    if p.priorityQueue.Len() == 0 {
        return nil
    }
    
    priorityTask := heap.Pop(p.priorityQueue).(*PriorityTask)
    return priorityTask.Task
}

func (p *PriorityWorkerPool) addPriorityWorker() error {
    worker := &PriorityWorker{
        id:          len(p.workers),
        pool:        p,
        resultQueue: p.resultQueue,
        ctx:         p.ctx,
    }
    
    p.workers = append(p.workers, (*Worker)(worker))
    
    p.wg.Add(1)
    go func() {
        defer p.wg.Done()
        worker.Run()
    }()
    
    return nil
}

// 优先级工作者
type PriorityWorker struct {
    id          int
    pool        *PriorityWorkerPool
    resultQueue chan<- TaskResult
    ctx         context.Context
}

func (w *PriorityWorker) Run() {
    for {
        select {
        case <-w.pool.taskSignal:
            task := w.pool.getNextTask()
            if task != nil {
                result := w.executeTask(task)
                
                select {
                case w.resultQueue <- result:
                case <-w.ctx.Done():
                    return
                }
            }
            
        case <-w.ctx.Done():
            return
        }
    }
}

func (w *PriorityWorker) executeTask(task Task) TaskResult {
    startTime := time.Now()
    
    timeout := task.Timeout()
    if timeout == 0 {
        timeout = 30 * time.Second
    }
    
    ctx, cancel := context.WithTimeout(w.ctx, timeout)
    defer cancel()
    
    err := task.Execute(ctx)
    
    endTime := time.Now()
    duration := endTime.Sub(startTime)
    
    return TaskResult{
        TaskID:    task.ID(),
        Success:   err == nil,
        Error:     err,
        Duration:  duration,
        StartTime: startTime,
        EndTime:   endTime,
    }
}

🔄 流水线处理

数据处理管道

go
// pkg/bubble/concurrent/pipeline.go
package concurrent

import (
    "context"
    "sync"
)

// 流水线阶段接口
type Stage interface {
    Process(ctx context.Context, input <-chan interface{}) <-chan interface{}
    Name() string
}

// 数据处理管道
type Pipeline struct {
    stages []Stage
    ctx    context.Context
    cancel context.CancelFunc
    wg     sync.WaitGroup
}

func NewPipeline(stages ...Stage) *Pipeline {
    ctx, cancel := context.WithCancel(context.Background())
    
    return &Pipeline{
        stages: stages,
        ctx:    ctx,
        cancel: cancel,
    }
}

func (p *Pipeline) Execute(input <-chan interface{}) <-chan interface{} {
    current := input
    
    for _, stage := range p.stages {
        current = stage.Process(p.ctx, current)
    }
    
    return current
}

func (p *Pipeline) Stop() {
    p.cancel()
    p.wg.Wait()
}

// 基础阶段实现
type FunctionStage struct {
    name string
    fn   func(interface{}) interface{}
}

func NewFunctionStage(name string, fn func(interface{}) interface{}) *FunctionStage {
    return &FunctionStage{
        name: name,
        fn:   fn,
    }
}

func (s *FunctionStage) Name() string {
    return s.name
}

func (s *FunctionStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
    output := make(chan interface{})
    
    go func() {
        defer close(output)
        
        for {
            select {
            case data, ok := <-input:
                if !ok {
                    return
                }
                
                result := s.fn(data)
                
                select {
                case output <- result:
                case <-ctx.Done():
                    return
                }
                
            case <-ctx.Done():
                return
            }
        }
    }()
    
    return output
}

// 并发处理阶段
type ConcurrentStage struct {
    name    string
    fn      func(interface{}) interface{}
    workers int
}

func NewConcurrentStage(name string, workers int, fn func(interface{}) interface{}) *ConcurrentStage {
    return &ConcurrentStage{
        name:    name,
        fn:      fn,
        workers: workers,
    }
}

func (s *ConcurrentStage) Name() string {
    return s.name
}

func (s *ConcurrentStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
    output := make(chan interface{})
    
    var wg sync.WaitGroup
    
    // 启动多个工作者
    for i := 0; i < s.workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            
            for {
                select {
                case data, ok := <-input:
                    if !ok {
                        return
                    }
                    
                    result := s.fn(data)
                    
                    select {
                    case output <- result:
                    case <-ctx.Done():
                        return
                    }
                    
                case <-ctx.Done():
                    return
                }
            }
        }()
    }
    
    // 等待所有工作者完成后关闭输出通道
    go func() {
        wg.Wait()
        close(output)
    }()
    
    return output
}

// 过滤阶段
type FilterStage struct {
    name      string
    predicate func(interface{}) bool
}

func NewFilterStage(name string, predicate func(interface{}) bool) *FilterStage {
    return &FilterStage{
        name:      name,
        predicate: predicate,
    }
}

func (s *FilterStage) Name() string {
    return s.name
}

func (s *FilterStage) Process(ctx context.Context, input <-chan interface{}) <-chan interface{} {
    output := make(chan interface{})
    
    go func() {
        defer close(output)
        
        for {
            select {
            case data, ok := <-input:
                if !ok {
                    return
                }
                
                if s.predicate(data) {
                    select {
                    case output <- data:
                    case <-ctx.Done():
                        return
                    }
                }
                
            case <-ctx.Done():
                return
            }
        }
    }()
    
    return output
}

📊 实时监控

性能监控组件

go
// pkg/bubble/concurrent/monitor.go
package concurrent

import (
    "sync"
    "time"
)

// 监控指标
type Metrics struct {
    mu                  sync.RWMutex
    TasksProcessed      int64
    TasksInProgress     int64
    AverageProcessTime  time.Duration
    ThroughputPerSecond float64
    ErrorRate           float64
    LastUpdateTime      time.Time
}

// 性能监控器
type PerformanceMonitor struct {
    metrics       *Metrics
    sampleWindow  time.Duration
    samples       []MetricSample
    mu            sync.RWMutex
}

type MetricSample struct {
    Timestamp      time.Time
    TasksCompleted int64
    TasksFailed    int64
    ProcessTime    time.Duration
}

func NewPerformanceMonitor() *PerformanceMonitor {
    return &PerformanceMonitor{
        metrics:      &Metrics{},
        sampleWindow: time.Minute,
        samples:      make([]MetricSample, 0, 60), // 60个样本
    }
}

func (m *PerformanceMonitor) RecordTaskStart() {
    m.metrics.mu.Lock()
    defer m.metrics.mu.Unlock()
    
    m.metrics.TasksInProgress++
}

func (m *PerformanceMonitor) RecordTaskComplete(duration time.Duration, success bool) {
    m.mu.Lock()
    defer m.mu.Unlock()
    
    sample := MetricSample{
        Timestamp:   time.Now(),
        ProcessTime: duration,
    }
    
    if success {
        sample.TasksCompleted = 1
    } else {
        sample.TasksFailed = 1
    }
    
    m.samples = append(m.samples, sample)
    
    // 清理过期样本
    cutoff := time.Now().Add(-m.sampleWindow)
    for len(m.samples) > 0 && m.samples[0].Timestamp.Before(cutoff) {
        m.samples = m.samples[1:]
    }
    
    m.updateMetrics()
}

func (m *PerformanceMonitor) updateMetrics() {
    if len(m.samples) == 0 {
        return
    }
    
    m.metrics.mu.Lock()
    defer m.metrics.mu.Unlock()
    
    var totalCompleted, totalFailed int64
    var totalTime time.Duration
    var taskCount int64
    
    for _, sample := range m.samples {
        totalCompleted += sample.TasksCompleted
        totalFailed += sample.TasksFailed
        if sample.TasksCompleted > 0 || sample.TasksFailed > 0 {
            totalTime += sample.ProcessTime
            taskCount++
        }
    }
    
    m.metrics.TasksProcessed = totalCompleted + totalFailed
    
    if taskCount > 0 {
        m.metrics.AverageProcessTime = totalTime / time.Duration(taskCount)
    }
    
    // 计算吞吐量(每秒处理的任务数)
    if len(m.samples) > 1 {
        timeSpan := m.samples[len(m.samples)-1].Timestamp.Sub(m.samples[0].Timestamp)
        if timeSpan > 0 {
            m.metrics.ThroughputPerSecond = float64(totalCompleted+totalFailed) / timeSpan.Seconds()
        }
    }
    
    // 计算错误率
    if totalCompleted+totalFailed > 0 {
        m.metrics.ErrorRate = float64(totalFailed) / float64(totalCompleted+totalFailed)
    }
    
    m.metrics.LastUpdateTime = time.Now()
}

func (m *PerformanceMonitor) GetMetrics() Metrics {
    m.metrics.mu.RLock()
    defer m.metrics.mu.RUnlock()
    
    return *m.metrics
}

🧪 使用示例

文件处理示例

go
// examples/file_processing.go
package main

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "time"
    
    "lzt/pkg/bubble/concurrent"
)

// 文件处理任务
type FileProcessTask struct {
    id       string
    filePath string
    priority int
}

func (t *FileProcessTask) ID() string {
    return t.id
}

func (t *FileProcessTask) Execute(ctx context.Context) error {
    // 模拟文件处理
    select {
    case <-time.After(100 * time.Millisecond):
        fmt.Printf("处理文件: %s\n", t.filePath)
        return nil
    case <-ctx.Done():
        return ctx.Err()
    }
}

func (t *FileProcessTask) Priority() int {
    return t.priority
}

func (t *FileProcessTask) Timeout() time.Duration {
    return 5 * time.Second
}

func main() {
    // 创建工作池
    config := concurrent.DefaultWorkerPoolConfig()
    config.MaxWorkers = 5
    
    pool := concurrent.NewWorkerPool(config)
    
    if err := pool.Start(); err != nil {
        panic(err)
    }
    defer pool.Stop()
    
    // 扫描文件并提交任务
    err := filepath.Walk("./testdata", func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        
        if !info.IsDir() {
            task := &FileProcessTask{
                id:       fmt.Sprintf("file-%s", info.Name()),
                filePath: path,
                priority: 1,
            }
            
            if err := pool.Submit(task); err != nil {
                fmt.Printf("提交任务失败: %v\n", err)
            }
        }
        
        return nil
    })
    
    if err != nil {
        panic(err)
    }
    
    // 监控执行结果
    go func() {
        for result := range pool.Results() {
            if result.Success {
                fmt.Printf("任务 %s 完成,耗时: %v\n", result.TaskID, result.Duration)
            } else {
                fmt.Printf("任务 %s 失败: %v\n", result.TaskID, result.Error)
            }
        }
    }()
    
    // 等待一段时间
    time.Sleep(5 * time.Second)
}

📚 相关资源

项目文档

外部参考


💡 并发建议: 合理控制并发数量,避免创建过多 goroutine。使用工作池模式可以有效控制资源使用,提高系统稳定性。

基于 MIT 许可证发布