前言
感谢 Handling 1 Million Requests per Minute with Go 这篇文章给予的巨大启发.
基础
我们使用 Go 语言, 基本上是因为他原生支持的高并发: Goroutine 和 Channel;
Go 的并发属于 CSP 并发模型的一种实现;
CSP 并发模型的核心概念是:"不要通过共享内存来通信, 而应该通过通信来共享内存".
简单用法
我一开始学习 Go 语言的时候, 遇到大访问量的时候, 会先创建一个带缓冲的 channel, 然后起一个 Go 协程来逐个读取 channel 中的数据并处理.
说他是并发是因为他没有占用主线程, 而是另起了一个协程独自运行. 但是这没有实现请求之间的并发.
特别注意: Go 语言中的 map 不是并发安全的, 要想实现并发安全, 需要自己实现 (如加锁), 或者使用 sync.Map.
- package main
- import (
- "fmt"
- "runtime"
- "time"
- )
- func main(){
- // 这里我们假设数据是 int 类型, 缓存格式设为 100
- dataChan:=make(chan int,100)
- go func(){
- for{
- select{
- case data:=<-dataChan:
- fmt.Println("data:",data)
- time.Sleep(1 * time.Second)// 这里延迟是模拟处理数据的耗时
- }
- }
- }()
- // 填充数据
- for i:=0;i<100;i++{
- dataChan<-i
- }
- // 这里循环打印查看协程个数
- for {
- fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
- time.Sleep(2 * time.Second)
- }
- }
这里打印出来的协程个数时 2, 为什么? 因为 main 方法独占一个主协程, 我们又起了一个协程, 所以是两个.
实现百万级的并发
首先我们要抽象出几个概念:
- Job:
- type Job interface {
- Do()
- }
- // 一个数据接口, 所有的数据都要实现该接口, 才能被传递进来
- // 实现 Job 接口的一个数据实例, 需要实现一个 Do() 方法, 对数据的处理就在这个 Do() 方法中.
Job 通道:
这里有两个 Job 通道:
1,WorkerPool 的 Job channel, 用于调用者把具体的数据写入到这里, WorkerPool 读取.
2,Worker 的 Job channel, 当 WorkerPool 读取到 Job, 并拿到可用的 Worker 的时候, 会将 Job 实例写入该 Worker 的 Job channel, 用来直接执行 Do() 方法.
- Worker:
- type Worker struct {
- JobQueue chan Job //Worker 的 Job 通道
- }
- // 每一个被初始化的 worker 都会在后期单独占用一个协程
- // 初始化的时候会先把自己的 JobQueue 传递到 Worker 通道中,
- // 然后阻塞读取自己的 JobQueue, 读到一个 Job 就执行 Job 对象的 Do() 方法.
工作池 (WorkerPool):
- type WorkerPool struct {
- workerlen int //WorkerPool 中同时 存在 Worker 的个数
- JobQueue chan Job // WorkerPool 的 Job 通道
- WorkerQueue chan chan Job
- }
- // 初始化时会按照传入的 num, 启动 num 个后台协程, 然后循环读取 Job 通道里面的数据,
- // 读到一个数据时, 再获取一个可用的 Worker, 并将 Job 对象传递到该 Worker 的 chan 通道
整个过程中 每个 Worker 都会被运行在一个协程中, 在整个 WorkerPool 中就会有 num 可空闲的 Worker, 当来一条数据的时候, 就会在工作池中去一个空闲的 Worker 去执行该 Job, 当工作池中没有可用的 worker 时, 就会阻塞等待一个空闲的 worker.
这是一个粗糙最简单的版本, 只是为了演示效果, 具体使用需要根据实际情况加一些特殊的处理.
当数据无限多的时候 func (wp *WorkerPool) Run() 会无限创建协程, 这里需要做一些处理, 这里是为了让所有的请求不等待, 并且体现一下最大峰值时的协程数. 具体因项目而异.
代码地址:
- main.go
- package main
- import (
- "fmt"
- "runtime"
- "time"
- )
- type Score struct {
- Num int
- }
- func (s *Score) Do() {
- fmt.Println("num:", s.Num)
- time.Sleep(1 * 1 * time.Second)
- }
- func main() {
- num := 100 * 100 * 20
- // debug.SetMaxThreads(num + 1000) // 设置最大线程数
- // 注册工作池, 传入任务
- // 参数 1 worker 并发个数
- p := NewWorkerPool(num)
- p.Run()
- datanum := 100 * 100 * 100 * 100
- go func() {
- for i := 1; i <= datanum; i++ {
- sc := &Score{Num: i}
- p.JobQueue <- sc
- }
- }()
- for {
- fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine())
- time.Sleep(2 * time.Second)
- }
- }
- job.go
- package main
- type Job interface {
- Do()
- }
- worker.go
- package main
- type Worker struct {
- JobQueue chan Job
- }
- func NewWorker() Worker {
- return Worker{JobQueue: make(chan Job)}
- }
- func (w Worker) Run(wq chan chan Job) {
- go func() {
- for {
- wq <- w.JobQueue
- select {
- case job := <-w.JobQueue:
- job.Do()
- }
- }
- }()
- }
- workerpool.go
- package main
- import "fmt"
- type WorkerPool struct {
- workerlen int
- JobQueue chan Job
- WorkerQueue chan chan Job
- }
- func NewWorkerPool(workerlen int) *WorkerPool {
- return &WorkerPool{
- workerlen: workerlen,
- JobQueue: make(chan Job),
- WorkerQueue: make(chan chan Job, workerlen),
- }
- }
- func (wp *WorkerPool) Run() {
- fmt.Println("初始化 worker")
- // 初始化 worker
- for i := 0; i < wp.workerlen; i++ {
- worker := NewWorker()
- worker.Run(wp.WorkerQueue)
- }
- // 循环获取可用的 worker, 往 worker 中写 job
- go func() {
- for {
- select {
- case job := <-wp.JobQueue:
- worker := <-wp.WorkerQueue
- worker <- job
- }
- }
- }()
- }
来源: http://www.bubuko.com/infodetail-2922315.html