在开始说正事之前我先给大家介绍一下这份代码的背景,以免大家有一种雾里看花的感觉。在本系列的前几篇博客中有一篇是用多线程进行百度图片的抓取,但是当时使用的多线程是非常粗略的,只是开了几个线程让抓取的速度提升了一些(其实提升了很多),初步的使用了一下线程,这篇博客将线程的使用进行了一些深入。
博主最近准备学习分布式网络爬虫,鉴于手头资料太少,所以如果有这方面的爱好者欢迎大家共同交流与学习。
博主这次的需求是抓取一些淘宝的数据,在还没有开始学习分布式之前,我们需要掌握基本的并行爬虫的相关知识。在这里我要先吐槽一下《自己动手写网络爬虫》这本书,不得不说,这本书让我认识到了什么叫做:有一本好书,真的会提升很多学习的效率。反正这本书不适合入门,而且非常的老,代码都不能用!!但其中还是有一些值得学习的思想,但。。。不说了,都是泪。。。
本次代码仍有很多不完善的地方,比如并没有用到线程池,然后只是给大家提供一种思路,代码并不完善,博主也在不断学习当中,不得不说 Java 网络爬虫的学习曲线还是很陡峭的,但我认为需要开发一个好爬虫是需要语言功底很扎实的,所以学习 Java 爬虫还是很值得的。
对于并行爬虫而言,处理空队列要比处理序列爬虫更加复杂,空的队列并不意味着爬虫已经完成了工作,因为此刻其他的进程或线程可能依然在解析网页,并且马上会产生新的 URL。进程或者线程管理者需要给报告队列为空的线程发送临时的休眠信号,线程管理员需要不断追踪休眠线程的数目,只有当所有的线程都休眠的时候,爬虫才可以终止。
接下来就看一下具体的代码:
我们假设从 Redis 数据库中的爬虫队列里取待解析的 url。
- packagemultithreading;importjava.util.ArrayList;importjava.util.List;/**
- * Created by hg_yi on 17-6-13.
- *
- * 线程计数器应该是一个共享变量
- */
- public class MultithreadCrawler{
- public static void main(String[] args)throwsInterruptedException {//创建一个收集线程的列表List threadList = newArrayList();
- //创建线程的个数
- intthreadNum =5;
- RunThread run =newRunThread();
- run.setThreads(threadNum);//创建5个线程,并对其进行收集
- for(inti =0; i < threadNum; i++) {
- Thread thread =newThread(run);
- thread.start();
- threadList.add(thread);
- }//main线程需要等待所有子线程退出
- while(threadList.size() >0) {
- Thread child = threadList.remove(0);
- child.join();
- }
- }
- }
可以看到,我创建了 5 个线程,博主也正在深入学习 Java 线程的相关知识之中,目前水平还是有限的,此博客主要是记录一些多线程爬虫中需要注意的东西。可以看到我使用了一个容器将所有创建的线程进行了收集,然后为了防止主线程提前退出而让所有子线程结束,我告知主线程需要等待每一个子线程执行完毕之后,你主线程才可以结束,也就是最后的 while 循环做的事情。
然后在 for 循环中我让每个子线程都执行 RunThread 类中的 run 方法,这样操作的目的主要是考虑到了线程之间数据共享的问题。
- packagemultithreading;importredisqueue.RedisQueue;importjava.util.ArrayList;importjava.util.List;/**
- * Created by hg_yi on 17-6-13.
- *
- * 我们只保证在给数据库中写入url,还有就是改变线程线程计数器的值的时候,是需要同步的。
- *
- * 很明显线程计数器threads是所有线程共享的。
- */
- public class RunThread extends Thread{
- //线程计数器需要对所有线程可见,是共享变量
- intthreads =0;//redis队列的对象,也是所有对象共享的变量RedisQueue redisQueue =newRedisQueue();//创建线程锁
- private staticObject lock =newObject();public void setThreads(intthreads) {this.threads = threads;
- }public void parseToVisitUrltoRedis()throwsException {//用来保存新提取出来的url列表(此变量不应是共享变量,我们把它变为每个线程的私有变量)
- //我们应该知道的是在Java中哪些变量在线程之间是不共享的,参考资料:
- //http://www.cnblogs.com/xudong-bupt/archive/2013/05/22/3087864.htmlList urlList = newArrayList();
- while(true) {//从爬虫队列中取出待抓取的url
- if(!redisQueue.toVisitIsEmpty()) {
- String url = redisQueue.getToVisit();/**
- * 对此url进行解析,提取出新的url列表
- * 解析出来的url顺便就写进urlList中了
- *
- * 在这个过程中不要求保证同步,每个线程都负责解析自己所属的url,解析完成
- * 之后将url写入自己的urlList之中,当在解析过程中发生阻塞,则切换到其他
- * 线程,保证程序的高并发性。
- */
- /**
- * 在此同步块中主要进行提取出来的url的写操作,必须是同步操作,保证一个同
- * 一时间只有一个线程在对Redis数据库进行写操作。
- */}else{//在改变线程计数器的值的时候必须保证线程的同步性
- synchronized(lock) {//等待线程数的计数器的计数器减1threads--;//如果仍然有其他线程在活动,则通知此线程进行等待
- if(threads >0) {/*调用线程的wait方法会将此线程挂起,直到有其他线程调用notify\
- notifyAll将此线程进行唤醒*/wait();
- threads++;
- }else{//如果其他的线程都在等待,说明待抓取队列已空,则通知所有线程进行退出notifyAll();return;
- }
- }
- }
- }
- }public void run() {//虽然run方法不能抛出异常,但是可以在run方法中进行try,catch
- try{
- parseToVisitUrltoRedis();
- }catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
请大家务必详细看代码的注解!!! 请大家务必详细看代码的注解!!! 请大家务必详细看代码的注解!!!
在 run 方法中我们主要实现的就是在从 Redis 数据库中的 url 队列中提取到当前需要抓取的 url 并对其进行解析,将新 url 再添加到队列中。可以看到,通过引入一个线程计数器,我们解决了上述问题,如果大家还有什么问题的话欢迎在评论区进行交流。
关于线程池版本的爬虫代码,我就不再贴出,有兴趣的同学可以对代码重新进行优化。
来源: http://blog.csdn.net/championhengyi/article/details/73204966