欲看此文, 必先可先看:
golang 实现并发爬虫一 (单任务版本爬虫功能)
gollang 实现并发爬虫二 (简单调度器)
上文中的用简单的调度器实现了并发爬虫.
并且, 也提到了这种并发爬虫的实现可以提高爬取效率.
当 workerCount 为 1 和 workerCount 为 10 时其爬取效率是有明显不同的.
然而, 文末其实也提到了这个简单调度器实现的爬虫有个不可控或者说是控制力太小了的问题.
究其原因就是因为我们实现的方法是来一个 request 就给创建一个 groutine.
为了让这个程序变得更为可控. 得想想怎么可以优化下了.
现在, 非常明显, 优化点就是我不想要来一个 request 就创建一个这个实现过程.
那么, 我们可以想到队列.
把 request 放到 request 队列里.
那么, request 队列里一定是会有一个 request 的头的, 我们就可以把这个 request 的头元素给到 worker 去做实现.
也就是这样:
but, 这样是没有对 worker 进行一个控制的.
我们希望 request 可以选择我们想要的一个 worker.
那么, 我们也可以让 scheduler 维护一个 worker 的队列.
这里用了三个并行的模块:
1.engine 引擎模块.
2.scheduler 调度器模块.
3.worker 工作模块.
这三者通信都是通过 channel 来通信的.
上图中可知道调度器模块实际上是维护了 2 个 channel, 一个是 request 的 channel, 一个是 worker 的 channel.
- // 队列调度器
- // 这个 scheduler 与 engine 和 worker 之间的通信都是通过 channel 来连接的.
- // 故尔它的肚子里应该有 request 相关的 channel 和 worker 相关的 channel.
- // 另外注意这里 worker 的 channel 的类型是 chan Request.
- type QueuedScheduler struct {
- requestChan chan con_engine.Request
- workerChan chan chan con_engine.Request
- }
那么, 我们就只需要在这个 scheduler 调度器的两个 channel 里, 各取一个元素, 即取 request 和 worker(chan con_engine.Request), 把 request 发给 worker 就可以了.
一直不断的去取和发送, 这就是这个队列调度器要做的事情了.
那个弯曲的箭头也就是指的这个事情了. 在 request 的队列里找到合适的 request 发给 worker 队列里合适的 worker 就好.
这就是一个整体的思想了.
稍微说下关于维护如何两个队列的代码.
重点在于怎么才能做到各读取一个元素.
channel 的读取是会阻塞的.
如果我先读取 request, 如果读取不到, 那么在等待的时候就没有办法取到 worker 了.
解决方案就是用 select, 因为 select 会保证一点, select 里的每一个 case 都会被执行到且会很快速的执行.
- func (s *QueuedScheduler) Run() {
- s.requestChan = make(chan con_engine.Request) // 指针接收者才能改变里面的内容.
- s.workerChan = make(chan chan con_engine.Request)
- go func() {
- var requestQ []con_engine.Request
- var workerQ []chan con_engine.Request
- for {
- var activeRequest con_engine.Request
- var activeWorker chan con_engine.Request
- if len(requestQ)> 0 && len(workerQ)> 0 {
- activeRequest = requestQ[0]
- activeWorker = workerQ[0]
- }
- // 收到一个 request 就让 request 排队, 收到一个 worker 就让 worker 排队. 所有的 channel 操作都放到 select 里.
- select {
- case r := <-s.requestChan:
- requestQ = append(requestQ, r)
- case w := <-s.workerChan:
- workerQ = append(workerQ, w)
- case activeWorker <- activeRequest:
- requestQ = requestQ[1:]
- workerQ = workerQ[1:]
- }
- }
- }()
- }
select 就是在做三件事情:
1. 从 requestChan 里收一个 request, 将这个 request 存在变量 requestQ 里.
2. 从 workerChan 里收一个 worker, 将这个 worker 存在变量 workerQ 里.
3. 把第一个 requestQ 里的第一个元素发给第一个 workerQ 里的第一个元素.
其他代码就感兴趣的同学自己看吧.
作者就先说到这里.
总体调度的思想上面的图中.
具体的实现在源码里.
欢迎大家留言指教.
源码:
posted on 2020-04-24 09:00 公子若不胖天下谁胖 阅读 (...) 评论 (...) 编辑 收藏
来源: https://www.cnblogs.com/anmutu/p/12765207.html