开发者

浅谈Go语言中高效并发模式

目录
  • 一、并发编程基础回顾
    • 1.1 Goroutine
    • 1.2 Channel
    • 1.3 Select
  • 二、常用并发模式详解
    • 2.1 Worker Pool(工作池模式)
    • 2.2 Fan-in / Fan-out(扇入/扇出模式)
    • 2.3 Pipeline(流水线模式)
    • 2.4 Timeout / Cancel(超时与取消模式)
    • 2.5 Future / Promise(异步结果模式)
  • 三、进阶并发模式
    • 3.1 Or-Done 模式
    • 3.2 RGezCEOUPxate Limiting(限流模式)
  • 四、注意点
    • 4.1 使用说明
    • 4.2 模式选择对比
    • 4.3 选择建议

一、并发编程基础回顾

Go语言以其轻量级的Goroutine和简洁的并发控制机制(如Channel和Select)闻名,非常适合构建高并发、高吞吐量的系统。

1.1 Goroutine

Go语言中的Goroutine是轻量级线程,由Go运行时调度。启动一个Goroutine非常简单:

go func() {
    fmt.Println("Hello from goroutine")
}()

1.2 Channel

Channel是Go语言中用于Goroutine之间通信的机制,支持同步与数据传递:

ch := make(chan int)
go func() { ch <- 42 }()
value := <-ch
fmt.Println(value)

1.3 Select

Select用于在多个Channel操作中进行选择,常用于超时控制、多路复用等场景:

select {
case msg := <-ch1:
    fmt.Println("Got from ch1:", msg)
case msg := <-ch2:
    fmt.Println("Got from ch2:", msg)
case <-time.After(1 * time.Second):
    fmt.Println("Timeout")
}

二、常用并发模式详解

2.1 Worker Pool(工作池模式)

适用场景:任务分发与并行处理,如爬虫、批量任务处理。

案例:使用固定数量的Worker处理任务队列。

package main
import (
	"fmt"
	"sync"
	"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for j := range jobs {
		fmt.Printf("Worker %d started job %d\n", id, j)
		time.Sleep(time.Second) // 模拟耗时任务
		fmt.Printf("Worker %d finished job %d\n", id, j)
		results <- j * 2
	}
}
func main() {
	const numJobs = 5
	const numWorkers = 3
	jobs := make(chan int, numJobs)
	results编程客栈 := make(chan int, numJobs)
	var wg sync.WaitGroup
	// 启动3个worker
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, jobs, results, &wg)
	}
	// 发送5个任务
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs)
	// 等待所有worker完成
	wg.Wait()
	close(results)
	// 收集结果
	for result := range results {
		fmt.Println("Result:", result)
	}
}

2.2 Fan-in / Fan-out(扇入/扇出模式)

适用场景:将多个输入合并到一个输出(Fan-in),或将一个输入分发到多个处理单元(Fan-out)。

案例

package main
import (
	"fmt"
	"sync"
	"time"
)
func producer(id int, out chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 3; i++ {
		fmt.Printf("Producer %d produced %d\n", id, i)
		out <- id*10 + i
		time.Sleep(200 * time.Millisecond)
	}
}
func consumer(in <-chan int, done chan<- bool) {
	for value := range in {
		fmt.Printf("Consumed: %d\n", value)
	}
	done <- tr编程ue
}
func main() {
	ch := make(chan int)
	done := make(chan bool)
	var wg sync.WaitGroup
	// 启动3个producer
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go producer(i, ch, &wg)
	}
	// 启动1个consumer
	go consumer(ch, done)
	// 等待所有producer完成
	go func() {
		wg.Wait()
		close(ch)
	}()
	// 等待consumer完成
	<-done
	fmt.Println("All done!")
}

2.3 Pipeline(流水线模式)

适用场景:数据流处理,分阶段处理任务,如ETL流程。

案例:模拟数据经过多个阶段的处理。

package main
import (
	"fmt"
	"sync"
)
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}
func main() {
	// 构建pipeline: generator -> square -> print
	input := generator(1, 2, 3, 4, 5)
	squares := square(input)
	// 打印结果
	for result := range squares {
		fmt.Println(result)
	}
}

2.4 Timeout / Cancel(超时与取消模式)

适用场景:控制任务执行时间,防止阻塞或资源浪费。

案例:使用Context实现超时控制。

package main
import (
	"context"
	"fmt"
	"time"
)
func longRunningTask(ctx context.Context) {
	select {
	case <-time.After(3 * time.Second):
		fmt.Println("Task completed")
	case <-ctx.Done():
		fmt.Println("Task canceled:", ctx.Err())
	}
}
func main() {
	// 带超时的context
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	go longRunningTask(ctx)
	// 等待足够长时间观察结果
	time.Sleep(4 * time.Second)
}

2.5 Future / Promise(异步结果模式)

适用场景:异步执行任务并获取结果,常用于HTTP请求、数据库查询等。

案例:模拟异步任务并获取结果。

package main
import (
	"fmt"
	"time"
)
type Future struct {
	result chan int
}
func NewFuture() (*Future, func(int)) {
	f := &Future{result: make(chan int)}
	return f, func(r int) { f.result <- r }
}
func (f *Future) Get() int {
	return <-f.result
}
func asyncCompute(complete func(int)) {
	go func() {
		time.Sleep(2 * time.Second)
		complete(42)
	}()
}
func main() {
	future, complete := NewFuture()
	asyncCompute(complete)
	fmt.Println("Waiting for result...")
	result := future.Get()
	fmt.Println("Result:", result)
}

三、进阶并发模式

3.1 Or-Done 模式

适用场景:监听多个Channel,任意一个完成即退出。

package main
import (
	"fmt"
	"time"
)
func orDone(channels ...<-chan interface{}) <-chan interface{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}
	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			select {
			case <-channels[0]:
			case <-channels[1]:
			case <-channels[2]:
			case <-orDone(channels[3:]...):
			}
		}
	}()
	return orDone
}
func main() {
	sig := make(chan interface{})
	time.AfterFunc(2*time.Second, func() { close(sig) })
	start := time.Now()
	<-orDone(sig, time.After(5*time.Second))
	fmt.Printf("Done after %v\n", time.Since(start))
}

3.2 Rate Limiting(限流模式)

适用场景:控制请求频率,防止系统过载。

package main
import (
	"fmt"
	"time"
)
func main() {
	requests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		requests <- i
	}
	close(requests)
	// 限制每200ms处理一个请求
	limiter := time.Tick(200 * time.Millisecond)
	for req := range requests {
		<-limiter
		fmt.Println("Request", req, time.Now())
	}
	// 突发限制模式
	burstyLimiter := make(chan time.Time, 3)
	for i := 0; i < 3; i++ {
		burstyLimiter <- time.Now()
	}
	go func() {
		for t := range time.Tick(200 * time.Millisecond) {
			burstyLimiter <- t
		}
	}()
	burstyRequests := make(chan int, 5)
	for i := 1; i <= 5; i++ {
		burstyRequests <- i
	}
	close(burstyRequests)
	for req := range burstyRequests {
		<-burstyLimiter
		fmt.Println("Bursty request", req, time.Now())
	}
}

四、注意点

4.1 使用说明

  1. 每个示例都是独立的Go程序,可以直接保存为.go文件运行
  2. 所有示例都包含了必要的包导入和错误处理
  3. 每个示例都有详细的注释说明关键步骤
  4. 可以通过调整参数(如worker数量、超时时间等)观察不同行为

4.2 模式选择对比

模式适用场景优点缺点
Worker Pool任务并行处理控制并发数,防止资源耗尽需要任务队列管理
Fan-in/Fan-out数据合并/分发提高吞吐量需注意Channel关闭时机
Pipeline流式处理模块化、易扩展阶段多时延迟高
Timeout/Cancel任务超时控制避免阻塞需正确使用Context
Future/Promise异步结果获取非阻塞编程结果获取需同步
Or-Done多事件监听灵活组合实现较复杂
Rate Limiting流量控制防止过载需合理设置阈值

4.3 选择建议

  • 避免共享内存,优先使用Channel通信
  • 使用sync.WaitGroup等待Goroutine完成
  • 合理使用context管理生命周期
  • 注意Channel的关闭,避免panic
  • 使用select+default实现非阻塞操作
  • 控制Goroutine数量,防止资php源泄漏

到此这篇关于浅谈Go语言中高效并发模式 的文章就介绍到这了,更多相关Go语言高效并发模式内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.cppcns.www.devze.comcom)!

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜