这篇文章主要给大家介绍了关于 Go 语言同步与异步执行多个任务封装 (Runner 和 RunnerAsync) 的相关资料, 文中通过示例代码介绍的非常详细, 对大家的学习或者工作具有一定的参考学习价值, 需要的朋友们下面随着小编来一起学习学习吧
前言
同步适合多个连续执行的, 每一步的执行依赖于上一步操作, 异步执行则和任务执行顺序无关(如从 10 个站点抓取数据)
同步执行类 RunnerAsync
支持返回超时检测, 系统中断检测
错误常量定义
- // 超时错误
- var ErrTimeout = errors.New("received timeout")
- // 操作系统系统中断错误
- var ErrInterrupt = errors.New("received interrupt")
实现代码如下
- package task
- import (
- "os"
- "time"
- "os/signal"
- "sync"
- )
- // 异步执行任务
- type Runner struct {
- // 操作系统的信号检测
- interrupt chan os.Signal
- // 记录执行完成的状态
- complete chan error
- // 超时检测
- timeout <-chan time.Time
- // 保存所有要执行的任务, 顺序执行
- tasks []func(id int) error
- waitGroup sync.WaitGroup
- lock sync.Mutex
- errs []error
- }
- //new 一个 Runner 对象
- func NewRunner(d time.Duration) *Runner {
- return &Runner{
- interrupt: make(chan os.Signal, 1),
- complete: make(chan error),
- timeout: time.After(d),
- waitGroup: sync.WaitGroup{},
- lock: sync.Mutex{},
- }
- }
- // 添加一个任务
- func (this *Runner) Add(tasks ...func(id int) error) {
- this.tasks = append(this.tasks, tasks...)
- }
- // 启动 Runner, 监听错误信息
- func (this *Runner) Start() error {
- // 接收操作系统信号
- signal.Notify(this.interrupt, os.Interrupt)
- // 并发执行任务
- go func() {
- this.complete <- this.Run()
- }()
- select {
- // 返回执行结果
- case err := <-this.complete:
- return err
- // 超时返回
- case <-this.timeout:
- return ErrTimeout
- }
- }
- // 异步执行所有的任务
- func (this *Runner) Run() error {
- for id, task := range this.tasks {
- if this.gotInterrupt() {
- return ErrInterrupt
- }
- this.waitGroup.Add(1)
- go func(id int) {
- this.lock.Lock()
- // 执行任务
- err := task(id)
- // 加锁保存到结果集中
- this.errs = append(this.errs, err)
- this.lock.Unlock()
- this.waitGroup.Done()
- }(id)
- }
- this.waitGroup.Wait()
- return nil
- }
- // 判断是否接收到操作系统中断信号
- func (this *Runner) gotInterrupt() bool {
- select {
- case <-this.interrupt:
- // 停止接收别的信号
- signal.Stop(this.interrupt)
- return true
- // 正常执行
- default:
- return false
- }
- }
- // 获取执行完的 error
- func (this *Runner) GetErrs() []error {
- return this.errs
- }
使用方法
Add 添加一个任务, 任务为接收 int 类型的一个闭包
Start 开始执行伤, 返回一个 error 类型, nil 为执行完毕, ErrTimeout 代表执行超时, ErrInterrupt 代表执行被中断(类似 Ctrl + C 操作)
测试示例代码
- package task
- import (
- "testing"
- "time"
- "fmt"
- "os"
- "runtime"
- )
- func TestRunnerAsync_Start(t *testing.T) {
- // 开启多核
- runtime.GOMAXPROCS(runtime.NumCPU())
- // 创建 runner 对象, 设置超时时间
- runner := NewRunnerAsync(8 * time.Second)
- // 添加运行的任务
- runner.Add(
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- createTaskAsync(),
- )
- fmt.Println("同步执行任务")
- // 开始执行任务
- if err := runner.Start(); err != nil {
- switch err {
- case ErrTimeout:
- fmt.Println("执行超时")
- os.Exit(1)
- case ErrInterrupt:
- fmt.Println("任务被中断")
- os.Exit(2)
- }
- }
- t.Log("执行结束")
- }
- // 创建要执行的任务
- func createTaskAsync() func(id int) {
- return func(id int) {
- fmt.Printf("正在执行 %v 个任务 \ n", id)
- // 模拟任务执行, sleep 两秒
- //time.Sleep(1 * time.Second)
- }
- }
执行结果
同步执行任务
正在执行 0 个任务
正在执行 1 个任务
正在执行 2 个任务
正在执行 3 个任务
正在执行 4 个任务
正在执行 5 个任务
正在执行 6 个任务
正在执行 7 个任务
正在执行 8 个任务
正在执行 9 个任务
正在执行 10 个任务
正在执行 11 个任务
正在执行 12 个任务
runnerAsync_test.go:49: 执行结束
异步执行类 Runner
支持返回超时检测, 系统中断检测
实现代码如下
- package task
- import (
- "os"
- "time"
- "os/signal"
- "sync"
- )
- // 异步执行任务
- type Runner struct {
- // 操作系统的信号检测
- interrupt chan os.Signal
- // 记录执行完成的状态
- complete chan error
- // 超时检测
- timeout <-chan time.Time
- // 保存所有要执行的任务, 顺序执行
- tasks []func(id int) error
- waitGroup sync.WaitGroup
- lock sync.Mutex
- errs []error
- }
- //new 一个 Runner 对象
- func NewRunner(d time.Duration) *Runner {
- return &Runner{
- interrupt: make(chan os.Signal, 1),
- complete: make(chan error),
- timeout: time.After(d),
- waitGroup: sync.WaitGroup{},
- lock: sync.Mutex{},
- }
- }
- // 添加一个任务
- func (this *Runner) Add(tasks ...func(id int) error) {
- this.tasks = append(this.tasks, tasks...)
- }
- // 启动 Runner, 监听错误信息
- func (this *Runner) Start() error {
- // 接收操作系统信号
- signal.Notify(this.interrupt, os.Interrupt)
- // 并发执行任务
- go func() {
- this.complete <- this.Run()
- }()
- select {
- // 返回执行结果
- case err := <-this.complete:
- return err
- // 超时返回
- case <-this.timeout:
- return ErrTimeout
- }
- }
- // 异步执行所有的任务
- func (this *Runner) Run() error {
- for id, task := range this.tasks {
- if this.gotInterrupt() {
- return ErrInterrupt
- }
- this.waitGroup.Add(1)
- go func(id int) {
- this.lock.Lock()
- // 执行任务
- err := task(id)
- // 加锁保存到结果集中
- this.errs = append(this.errs, err)
- this.lock.Unlock()
- this.waitGroup.Done()
- }(id)
- }
- this.waitGroup.Wait()
- return nil
- }
- // 判断是否接收到操作系统中断信号
- func (this *Runner) gotInterrupt() bool {
- select {
- case <-this.interrupt:
- // 停止接收别的信号
- signal.Stop(this.interrupt)
- return true
- // 正常执行
- default:
- return false
- }
- }
- // 获取执行完的 error
- func (this *Runner) GetErrs() []error {
- return this.errs
- }
使用方法
Add 添加一个任务, 任务为接收 int 类型, 返回类型 error 的一个闭包
Start 开始执行伤, 返回一个 error 类型, nil 为执行完毕, ErrTimeout 代表执行超时, ErrInterrupt 代表执行被中断(类似 Ctrl + C 操作)
getErrs 获取所有的任务执行结果
测试示例代码
- package task
- import (
- "testing"
- "time"
- "fmt"
- "os"
- "runtime"
- )
- func TestRunner_Start(t *testing.T) {
- // 开启多核心
- runtime.GOMAXPROCS(runtime.NumCPU())
- // 创建 runner 对象, 设置超时时间
- runner := NewRunner(18 * time.Second)
- // 添加运行的任务
- runner.Add(
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- createTask(),
- )
- fmt.Println("异步执行任务")
- // 开始执行任务
- if err := runner.Start(); err != nil {
- switch err {
- case ErrTimeout:
- fmt.Println("执行超时")
- os.Exit(1)
- case ErrInterrupt:
- fmt.Println("任务被中断")
- os.Exit(2)
- }
- }
- t.Log("执行结束")
- t.Log(runner.GetErrs())
- }
- // 创建要执行的任务
- func createTask() func(id int) error {
- return func(id int) error {
- fmt.Printf("正在执行 %v 个任务 \ n", id)
- // 模拟任务执行, sleep
- //time.Sleep(1 * time.Second)
- return nil
- }
- }
执行结果
异步执行任务
正在执行 2 个任务
正在执行 1 个任务
正在执行 4 个任务
正在执行 3 个任务
正在执行 6 个任务
正在执行 5 个任务
正在执行 9 个任务
正在执行 7 个任务
正在执行 10 个任务
正在执行 13 个任务
正在执行 8 个任务
正在执行 11 个任务
正在执行 12 个任务
正在执行 0 个任务
runner_test.go:49: 执行结束
runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]
来源: http://www.phperz.com/article/18/0221/362565.html