一直觉得自己之前写的使用定时抓取构建 IP 代理池实在过于简陋, 并且有一部分的代码写的并不合理, 刚好最近又在学习多线程, 就将之前的代码进行了重构, 也方便对抓取代理 ip 有需求的人. 之前自己写的那篇文章就不删除了, 里面用到了 MySQL 以及循环调用 ip 的方法 (一些东西也是值得了解的. 取其精华, 弃其糟粕吧), 大家有兴趣的可以看一下 (最主要的还是不舍得访问量, 哈哈).
注: 由于 xici 代理网的 ip 代理并不是很稳定, 所以自己实现的 ip 代理池并不可用 (4000 个 IP 最后通过一系列逻辑处理和过滤之后大概只剩 30 多个, 并且这 30 个 ip 也极不稳定), 但感觉实现 ip 代理池的原理就是这样, 在往后估计也就是性能上的优化. 虽然 ip 代理池并不可用, 但是如果你想要学习多线程以及抓取 ip 做爬虫的话, 这篇文章我觉得是可以给你一些思路上的启发.
怎么设计一个 IP 代理池
其实设计一个 IP 代理池是非常容易的一件事情, 我们来看一下其中的步骤:
1. 首先肯定是从提供代理 ip 的网站上对 ip 进行抓取
2. 对抓取下来的 ip 进行初步的过滤, 比如我一开始就将 IP 类型不是 HTTPS 并且 IP 链接速度大于 2 秒的给过滤掉了
3. 对于符合我们要求的 IP, 我们要对其进行质量检测, 判断其是否可用, 这一步就是检测 IP 的质量, 也就是这一步刷掉了大量的 IP
4. 将符合要求的 ip 写进 Redis 数据库中, 我是以 List 形式存储在 Redis 中
5. 设定一个进行抓取的周期, 来更新你的 IP 代理池 (在将新的 IP 抓取下来并进行处理之后, 我们将原数据库清空, 并将新的 IP 写入其中)
第五个我是直接每次先清空数据库, 然后在进行新 IP 的抓取以及过滤然后才写入数据库中, 这样明显是不合理的, 因为它会造成你的 IP 代理池有很长一段时间是空缺的, 这也是我在写这篇博客的时候才想到的一个不合理的地方 (太懒, 代码也就不改了, 大家知道问题就行).
整体架构
实现细节
HttpResponse 的阻塞
在我们用代理 IP 进行网页抓取的时候, 经常会发生长时间的阻塞然后程序报错 (具体原因不祥, 如果你真的有兴趣弄明白, 建议看源码), 这个时候我们只要设定好链接时间并进行恰当的 try,catch 就可以解决这个问题.
如下面代码:
- /**
- * setConnectTimeout: 设置连接超时时间, 单位毫秒.
- * setConnectionRequestTimeout: 设置从 connect Manager 获取 Connection 超时时间, 单位毫秒.
- * 这个属性是新加的属性, 因为目前版本是可以共享连接池的.
- * setSocketTimeout: 请求获取数据的超时时间, 单位毫秒. 如果访问一个接口, 多少时间内无法返回数据,
- * 就直接放弃此次调用.
- */
- HttpHost proxy = new HttpHost(ip, Integer.parseInt(port));
- RequestConfig config = RequestConfig.custom().setProxy(proxy).setConnectTimeout(3000).setSocketTimeout(3000).build();
- HttpGet httpGet = new HttpGet(url);
- httpGet.setConfig(config);
- try {
- // 客户端执行 httpGet 方法, 返回响应
- CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
- // 得到服务响应状态码
- if (httpResponse.getStatusLine().getStatusCode() == 200) {
- entity = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
- }
- httpResponse.close();
- httpClient.close();
- } catch (ClientProtocolException e) {
- entity = null;
- } catch (IOException e) {
- entity = null;
- }
- return entity;
- }
线程类的实现
多线程需要注意的就是一点: 线程安全, 还有就是保证线程同步而不发生脏读, 这些东西需要结合整体代码逻辑去把握, 我就在这里不细说了. 考虑大家有可能产生疑惑, 所以我在代码中进行了详细的注释, 大家有兴趣的可以在我 github 上查看源码, 有不懂的或有疑惑的同学欢迎在评论区提问与讨论.
我贴一下提供多线程抓取代理 ip 的服务类:
- public class IPPool {
- // 成员变量 (非线程安全)
- private List<IPMessage> ipMessages;
- public IPPool(List<IPMessage> ipMessages) {
- this.ipMessages = ipMessages;
- }
- public void getIP(List<String> urls) {
- String ipAddress;
- String ipPort;
- for (int i = 0; i <urls.size(); i++) {
- // 随机挑选代理 IP(仔细想了想, 本步骤由于其他线程有可能在位置确定之后对 ipMessages 数量进行增加, 虽说不会改变已经选择的 ip 代理的位置, 但合情合理还是在对共享变量进行读写的时候要保证其原子性, 否则极易发生脏读)
- // 每个线程先将自己抓取下来的 ip 保存下来并进行过滤与检测
- List<IPMessage> ipMessages1 = new ArrayList<>();
- String url = urls.get(i);
- synchronized (ipMessages) {
- int rand = (int) (Math.random()*ipMessages.size());
- out.println("当前线程" + Thread.currentThread().getName() + "rand 值:" + rand + "ipMessages 大小:" + ipMessages.size());
- ipAddress = ipMessages.get(rand).getIPAddress();
- ipPort = ipMessages.get(rand).getIPPort();
- }
- // 这里要注意 Java 中非基本类型的参数传递方式, 实际上都是同一个对象
- boolean status = URLFecter.urlParse(url, ipAddress, ipPort, ipMessages1);
- // 如果 ip 代理池里面的 ip 不能用, 则切换下一个 IP 对本页进行重新抓取
- if (status == false) {
- i--;
- continue;
- } else {
- out.println("线程:" + Thread.currentThread().getName() + "已成功抓取" +
- url + "ipMessage1:" + ipMessages1.size());
- }
- // 对 ip 重新进行过滤, 只要速度在两秒以内的并且类型为 HTTPS 的
- ipMessages1 = IPFilter.Filter(ipMessages1);
- // 对 ip 进行质量检测, 将质量不合格的 ip 在 List 里进行删除
- IPUtils.IPIsable(ipMessages1);
- // 将质量合格的 ip 合并到共享变量 ipMessages 中, 进行合并的时候保证原子性
- synchronized (ipMessages) {
- out.println("线程" + Thread.currentThread().getName() + "已进入合并区" + "待合并大小 ipMessages1:" + ipMessages1.size());
- ipMessages.addAll(ipMessages1);
- }
- }
- }
- }
将 IPMessages(保存 IP 信息的对象) 写入 Redis 中
由于 Redis 只支持字符串与字节流数据, 所以我们要想将一个对象存储到 Redis 的 List 中, 则必须将对象序列化 (转换成字节流), 反之, 如果我们想要将 Redis 中的数据拿出来, 就要反序列化. 关于序列化与反序列化的知识不懂的大家百度, 我就不细说了. 来看一下这部分的代码:
序列化以及反序列化 (Java 中好像有现成的方法, 我没有具体了解):
- /**
- * Created by hg_yi on 17-8-9.
- *
- * java.io.ObjectOutputStream 代表对象输出流, 它的 writeObject(Object obj) 方法
- * 可对参数指定的 obj 对象进行序列化, 把得到的字节序列写到一个目标输出流中.
- *
- * java.io.ObjectInputStream 代表对象输入流, 它的 readObject() 方法一个源输入流中读
- * 取字节序列, 再把它们反序列化为一个对象, 并将其返回.
- *
- * 对象序列化包括如下步骤:
- * 1) 创建一个对象输出流, 它可以包装一个其他类型的目标输出流, 如文件输出流 (我这里是字节流);
- * 2) 通过对象输出流的 writeObject() 方法写对象.
- *
- * 对象反序列化的步骤如下:
- * 1) 创建一个对象输入流, 它可以包装一个其他类型的源输入流, 如文件输入流 (我这里是字节流);
- * 2) 通过对象输入流的 readObject() 方法读取对象.
- */
- public class SerializeUtil {
- public static byte[] serialize(Object object) {
- ObjectOutputStream oos;
- ByteArrayOutputStream baos;
- try {
- // 序列化
- baos = new ByteArrayOutputStream();
- oos = new ObjectOutputStream(baos);
- oos.writeObject(object);
- byte[] bytes = baos.toByteArray();
- return bytes;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- // 反序列化
- public static Object unserialize(byte[] bytes) {
- ByteArrayInputStream bais;
- ObjectInputStream ois;
- try {
- // 反序列化
- bais = new ByteArrayInputStream(bytes);
- ois = new ObjectInputStream(bais);
- return ois.readObject();
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- }
最后, 定时任务的实现我没有再用第一篇博客里面讲的 quartz, 代码太多并且不好理解. 当时太年轻, 还不知道 Timer 这个 Java 自带的产生定时任务的类, 它的实现相当于是开了一个单独的线程去帮助执行我们所设定的任务, 它的用法我就不细说了, 源码里有, 非常方便, 并且代码量也很少.
好了, 废话说了这么多, 贴上大家想要的东西:
戳我获得源码哦~ https://github.com/championheng/Crawl-and-IP-proxy-pools-regularly
来源: https://juejin.im/entry/5b851016f265da437247393c