1) 什么是 pipeline
pipeline 是一种常见的算法模式, 针对不断循环的耗时任务, 如果要等一个循环结束后再轮到处理下一个任务的话, 时间上有点浪费
所以, 把耗时任务拆分为几个环节, 只要一个环节完成了, 就可以轮到下一个任务的那个环节就马上开始处理不用等到这个耗时任务全部结束了才开始
我认为应用在处理爬虫程序获取的数据上, 非常合适
2) pipeline 在框架中的角色
从框架提供的原理图上可以了解到, pipeline 对象扮演的角色是什么:
downloader 对象访问 url
访问成功后的 page 对象交给 parser 对象, 进行页面的解析工作
把解析成果交给 pipleline 对象去按顺序逐一处理, 例如: 去重封装存储发消息等等
3) 目标任务
任务步骤:
访问拉勾网站
找出网页里的图片链接
下载链接上的图片到本地
保存图片信息到 mysql 数据库
4) 创建 pipeline 对象
pipeline 类: DownloadImage
- package com.sinkinka.pipeline;
- import com.cv4j.netdiscovery.core.domain.ResultItems;
- import com.cv4j.netdiscovery.core.pipeline.Pipeline;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.URL;
- import java.net.URLConnection;
- import java.util.Map;
- public class DownloadImage implements Pipeline {@Override public void process(ResultItems resultItems) {
- Map < String,
- Object > map = resultItems.getAll();
- for (String key: map.keySet()) {
- String filePath = "./temp/" + key + ".png";
- saveRemoteImage(map.get(key).toString(), filePath);
- }
- }
- private boolean saveRemoteImage(String imgUrl, String filePath) {
- InputStream in =null;
- OutputStream out = null;
- try {
- URL url = new URL(imgUrl);
- URLConnection connection = url.openConnection();
- connection.setConnectTimeout(5000); in =connection.getInputStream();
- byte[] bs = new byte[1024];
- int len;
- out = new FileOutputStream(filePath);
- while ((len = in.read(bs)) != -1) {
- out.write(bs, 0, len);
- }
- } catch(Exception e) {
- return false;
- } finally {
- try {
- out.flush();
- out.close(); in .close();
- } catch(IOException e) {
- return false;
- }
- }
- return true;
- }
- }
pipeline 类: SaveImage
- package com.sinkinka.pipeline;
- import com.cv4j.netdiscovery.core.domain.ResultItems;
- import com.cv4j.netdiscovery.core.pipeline.Pipeline;
- import com.safframework.tony.common.utils.Preconditions;
- import java.sql. * ;
- import java.util.Map;
- public class SaveImage implements Pipeline {@Override public void process(ResultItems resultItems) {
- Map < String,
- Object > map = resultItems.getAll();
- for (String key: map.keySet()) {
- System.out.println("2" + key);
- saveCompanyInfo(key, map.get(key).toString());
- }
- }
- private boolean saveCompanyInfo(String shortName, String logoUrl) {
- int insertCount = 0;
- Connection conn = getMySqlConnection();
- Statement statement = null;
- if (Preconditions.isNotBlank(conn)) {
- try {
- statement = conn.createStatement();
- String insertSQL = "INSERT INTO company(shortname, logourl) VALUES('" + shortName + "','" + logoUrl + "')";
- insertCount = statement.executeUpdate(insertSQL);
- statement.close();
- conn.close();
- } catch(SQLException e) {
- return false;
- } finally {
- try {
- if (statement != null) statement.close();
- } catch(SQLException e) {}
- try {
- if (conn != null) conn.close();
- } catch(SQLException e) {}
- }
- }
- return insertCount > 0;
- }
- // 演示代码, 不建议用于生产环境
- private Connection getMySqlConnection() {
- // 使用的是 mysql connector 5
- // 数据库: test 账号 / 密码: root/123456
- final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
- final String DB_URL = "jdbc:mysql://localhost:3306/test";
- final String USER = "root";
- final String PASS = "123456";
- Connection conn = null;
- try {
- Class.forName(JDBC_DRIVER);
- conn = DriverManager.getConnection(DB_URL, USER, PASS);
- } catch(SQLException e) {
- return null;
- } catch(Exception e) {
- return null;
- }
- return conn;
- }
- }
5) 运行程序
Main 类
- package com.sinkinka;
- import com.cv4j.netdiscovery.core.Spider;
- import com.sinkinka.parser.LagouParser;
- import com.sinkinka.pipeline.DownloadImage;
- import com.sinkinka.pipeline.SaveImage;
- public class PipelineSpider {
- public static void main(String[] args) {
- String url = "https://xiaoyuan.lagou.com/";
- Spider.create()
- .name("lagou")
- .url(url)
- .parser(new LagouParser())
- .pipeline(new DownloadImage()) //1. 首先, 下载图片到本地目录
- .pipeline(new SaveImage()) //2. 然后, 把图片信息存储到数据库
- .run();
- }
- }
Parser 类
- package com.sinkinka.parser;
- import com.cv4j.netdiscovery.core.domain.Page;
- import com.cv4j.netdiscovery.core.domain.ResultItems;
- import com.cv4j.netdiscovery.core.parser.Parser;
- import com.cv4j.netdiscovery.core.parser.selector.Selectable;
- import java.util.List;
- public class LagouParser implements Parser {
- @Override
- public void process(Page page) {
- ResultItems resultItems = page.getResultItems();
- List<Selectable> liList = page.gethtml().xpath("//li[@class='nav-logo']").nodes();
- for(Selectable li : liList) {
- String logoUrl = li.xpath("//img/@src").get();
- String companyShortName = li.xpath("//div[@class='company-short-name']/text()").get();
- resultItems.put(companyShortName, logoUrl);
- }
- }
- }
通过 DownloadImage, 保存到本地的图片数据:
通过 SaveImage, 存储到数据库里的数据:
6) 总结
以上代码简单演示了 pipeline 模式的用法, 记住一点, pipeline 是有执行顺序的在大量数据高频次的生产环境中再去体会 pipeline 模式的优点吧
来源: https://juejin.im/post/5a866de26fb9a0635c047e83