- package main
- import (
- "fmt"
- "sync"
- "time"
- )
- // 生产数据
- func producer(num ...int)(ret<-chan int) {
- out:=make(chan int)
- go func() {
- defer close(out)
- for k,v:=range num {
- fmt.Printf("我在生产第 %d 个数据 \ n",k)
- out<-v
- fmt.Printf("生产第 %d 个数据完成 \ n",k)
- }
- }()
- ret=out
- return
- }
- // 从通道获取数据, 计算数据
- func square(inCh <-chan int) (ret <-chan int) {
- out:=make(chan int)
- go func() {
- defer close(out)
- for n:=range inCh{
- out<-n*n
- time.Sleep(1*time.Second)
- }
- }()
- ret=out
- return
- }
- // 给通道启动协程
- func merge(cs ...<-chan int)(ret <-chan int) {
- out:=make(chan int,100)
- var wg sync.WaitGroup
- collect:= func(in<-chan int) {
- defer wg.Done()
- for n:=range in{
- out<-n
- }
- }
- wg.Add(len(cs))
- for k,n:=range cs{
- fmt.Printf("我在给执行第 %d 个协程 \ n",k)
- go collect(n)
- fmt.Printf("执行第 %d 个协程完成 \ n",k)
- }
- go func() {
- wg.Wait()
- close(out)
- }()
- ret=out
- return
- }
- func main() {
- in:=producer(0,1,2,3,4,5,6,7,8,9,10)
- ch1:=square(in)
- ch2:=square(in)
- ch3:=square(in)
- ch4:=square(in)
- ch5:=square(in)
- // 拿到数据结果
- for ret:=range merge(ch1,ch2,ch3,ch4,ch5){
- time.Sleep(1*time.Second)
- fmt.Printf("=\n",ret)
- }
- }
来源: http://www.bubuko.com/infodetail-2931773.html