EasySpider爬虫架构设计练气版

  发布日期:   2018-06-14
  最新修改:   2020-10-23
  阅读次数:   53 次

一、前言:

  • 说到数据收集,大家就会想到爬虫,而说到爬虫,目前市场上开源的爬虫很多且非常强大,从单机的到分布式的层出不穷、争奇斗艳。
  • 但是对于学习来说,我还是觉得有必要自己尝试着设计一个爬虫架构,可能你开始没什么想法,此时你可以参考一些优秀的如python爬虫scrapy,java爬虫WebMagic,当然WebMagic的设计思想也部分来自于scrapy。
  • 本次,我说介绍的架构可能很漏,但是确是通过思考后自己设想并付诸实践整理出来的,其中也参考了webmagic的模块化思想。

二、模块

结构模块

  • 其主要分为五个模块
  • 1、spider:负责与用户交互的模块,配置启动url,代理、数据解析等功能
  • 2、scheduler:负责请求链接的管理调度,目前只是实现了基本的存储获取,后续会扩展为支持各种调度策略的一个模块
  • 3、download:下载模块,负责资源的下载,数据的缓存等。
  • 4、pipeline:数据通道,实现很简单,对抽取出来的数据做存储或者分析之用。
  • 5、engine:引擎,负责调度四个类型的模块进行运转。

三、代码详解

  • 创建四个Base类来分别定义spider、scheduler、download、pipeline的方法字段标准

  • BaseSpider

      from lxml.html import etree;
      from com.anjie.e_request import ERequest;
    
      class BaseSpider:
          def __init__(self):
              super(BaseSpider, self).__init__();
              self.seed_url = None;
              self.pipeline_item = None;
              self.requests = [];
    
          def addRequest_urls(self, urls):
              if urls:
                  for r in map(lambda url:ERequest(url), urls):
                      self.requests.append(r);
    
          def pagerProcess(self, page):
              pass;
    
          def pagerBack(self, html):
              page = root = etree.HTML(html);
              self.pagerProcess(page);
              item = self.pipeline_item;
              requests = None;
              if len(self.requests) > 0:
                  requests = self.requests.copy();
    
              return (self.pipeline_item, requests)
    
          # 返回种子请求
        def getSeeds(self):
              result = [];
              for url in self.seed_url:
                  r = ERequest(url=url);
                  result.append(r);
              return result;
  • BaseScheduler

      class BaseScheduler:
          def __init__(self):
              super(BaseScheduler, self).__init__();
    
          def __iter__(self):
              return self
    
        def __next__(self):
              pass;
    
          def nextRequest(self):
              pass
    
       def addRequest(self, rq):
              pass
    
       def addRequests(self, rqs):
              pass;
    
          def addLoserRequest(self, rq):
              pass;
  • BaseDownload

      # Created by zaizai at 2017/9/22
    
      class BaseDownload:
          def __init__(self):
              super(BaseDownload, self).__init__();
    
          def excuteRequest(self, rq):
              pass;
  • BasePipeline

      class BasePipeline:
          def __init__(self):
              super(BasePipeline,self).__init__();
    
          def piplineData(self, data):
              pass
  • OK,如果后续需要实现不同的爬虫、下载器、调度器、数据管道则通过继承上面的base类,起到规范的作用。

  • OK,我们来看下目前的目录结构:

07116946935c4f9ebfd94dedf9109c62-image.png

  • base:存储base类

  • mode:存储构建过程中定义的一些类型结构

  • module:存储默认的几个类,如默认的下载器、默认的调度器、默认的管道

  • spider:存放爬虫代码

  • utils:存储一个辅助工具类

  • 最外围为我们的engine类。

  • 我们先看下引擎类的代码,目前代码还是比较简单的。

      # Created by zaizai at 2017/9/21
    
      from com.anjie.module.default_download import DefaultDownload;
      from com.anjie.module.default_scheduler import DefaultScheduler;
      from com.anjie.utils.elog import Elog;
      from com.anjie.spider.myspider import Spider
    
      class Engine:
    
          def __init__(self,spider = None,scheduler = DefaultScheduler(),download =DefaultDownload(),pipline=None ):
              super(Engine, self).__init__();
              self.spider = spider;
              self.scheduler = scheduler;
              self.download = download();
              self.pipline = pipline;
    
          def addSpider(self, spider):
              self.spider = spider;
    
          def start(self):
              self.scheduler.addRequests(self.spider.getSeeds())
              while True:
                  rq = self.scheduler.nextRequest();
                  if rq is None:
                      Elog.warning('Engine is will stop,because scheduler has not more request be schedule');
                      break;
                  resultPage = self.download.excuteRequest(rq);
                  if resultPage is not None:
                      (pipelineItems, nextRequests) = self.spider.pagerBack(resultPage);
                      if pipelineItems and self.pipline:
                          self.pipline.piplineData(pipelineItems);
                      if nextRequests:
                          self.scheduler.addRequests(nextRequests);
                  else:
                      # 判断是否需要加入请求重新请求队列
        pass
    
       def sleep(self, time):
              pass;
    
          def stop(self):
              pass
    
      if __name__ == '__main__':
          e = Engine();
          e.addSpider(Spider());
          e.start()
  • 里面先从spider中获取seedUrl传入scheduler,然后就进行while循环从scheduler中不断取出url。代码很简单。

  • 我们再看下几个默认的模块

  • 默认的下载器

      import random;
      from urllib import request, error
    
      from com.anjie.base.download import BaseDownload;
      from com.anjie.utils.elog import Elog
      from com.anjie.utils.throttle import Throttle;
    
      # 默认的延迟事件
      DEFAULT_DELAY = 5
      # 默认的重试次数
      DEFAULT_RETRIES = 1
      # 默认的超时事件
      DEFAULT_TIMEOUT = 60
    
      class DefaultDownload(BaseDownload):
          def __init__(self, num_retries=DEFAULT_RETRIES, cache=None, proxies=None,
                       delay=DEFAULT_DELAY):
              super(DefaultDownload, self).__init__();
              # 重试次数
        self.num_retries = num_retries;
              # 代理
       # 缓存  self.cache = cache;
              self.proxies = proxies;
              self.throttle = Throttle(delay)
    
          def excuteRequest(self, rq):
              return self.download(rq)
    
          def download(self, rq):
              print('download url is %s' % rq.url);
              result = None;
              if self.cache is not None:
                  try:
                      result = self.cache[rq.url];
                  except KeyError as e:
                      Elog.warning('url %s is available in cache' % rq.url)
                      pass;
                  else:
                      # 没有异常时执行这里
        if result is not None and self.num_retries > 0 and 500 <= result['code'] < 600:
                          # 上次请求时没有拿到数据
        Elog.info("server error so ignore result from cache of url %s and re-download" % rq.url);
                          result = None;
    
              # 没有配置cache获取cache未缓存该url数据则走这里
        if result is not None:
                  return result['html'];
    
              if result is None:
                  Elog.info("url %s is haven't cache, so still need to download");
                  # proxy = random.choice(self.proxies) if self.proxies else None
        proxy = None;
                  if self.proxies is not None:
                      proxy = random.choice(self.proxies);
                  result = self.realDownload(rq, proxy=proxy,
                                             num_retries=self.num_retries);
              if self.cache is not None:
                  # save data to cache
        self.cache[rq.url] = result;
              return result['html'];
    
          def realDownload(self, rq, proxy, num_retries, data=None):
    
              # 进行延迟请求
        self.throttle.wait(rq.url)
    
              html = None;
              code = None;
              try:
                  rq = request.Request(url=rq.url, headers=rq.headers);
                  rp = request.urlopen(rq);
                  Elog.info('download over url is: %s' % rp.geturl())
                  html = rp.read();
                  code = rp.code;
              except error.URLError as e:
                  Elog.error('download  error :%s' % e.reason);
                  html = '';
                  if self.num_retries > 0:
                      if hasattr(e, 'code') and 500 <= e.code < 600:
                          return self.realDownload(rq, proxy, num_retries - 1);
                      pass;
              return {'html': html, 'code': code};
    
      if __name__ == '__main__':
          d = DefaultDownload();
          print(d.download('http://www.baidu.com'))
  • 默认的调度器:

      from urllib import parse;
      from urllib.robotparser import RobotFileParser;
    
      from com.anjie.base.scheduler import BaseScheduler
      from com.anjie.mode.spider_exception import SpiderException;
      from com.anjie.utils.elog import Elog
    
      class DefaultScheduler(BaseScheduler):
          def __init__(self):
              super(DefaultScheduler, self).__init__();
    
              # 待抓取队列
        self.belle_queue = list();
    
              # 下载失败队列
        self.loser_queue = [];
    
              # robots数据缓存
        self.rp_cache_queue = [];
    
              # robots禁止队列
        self.robots_loser_queue = []
    
              # 已爬取url
        self.crawl_over_queue = [];
    
              self.rp_cache = dict();
    
          def __iter__(self):
              return self
    
        def __next__(self):
              r = self.belle_queue.pop();
              if r is None:
                  raise StopIteration
        else:
                  return r;
    
          def nextRequest(self):
              r = self.belle_queue.pop();
              if r is None:
                  return None;
              else:
                  return r;
    
          def addRequest(self, rq):
              pass;
    
          def addRequests(self, rqs):
              self.belle_queue.extend(rqs);
    
          def addLoserRequest(self, rq):
              self.belle_queue.extend(rq);
    
          def start_craw(self):
              if not self.spider:
                  raise SpiderException("spider obeject is None")
    
              if not hasattr(self.spider, 'start_url'):
                  raise SpiderException("spider must have an start_url attribute")
    
              if not hasattr(self.spider, 'pager_back'):
                  raise SpiderException("spider must have an pager_back method")
    
              self.crawl_queue.extend(self.spider.start_url);
              # 初始化每个url请求的次数为0
    
        while self.crawl_queue:
                  url = self.crawl_queue.pop();
    
                  html = None;
                  # 咱要做一只优雅的爬虫
        rp = self.get_robots(url);
    
                  if rp is not None:
                      if rp.can_fetch(useragent=self.download.user_agent, url=url):
                          html = self.download.download(url=url);
                      else:
                          Elog.warning(
                              'current url : %s and user_agent: %s is be disallow for robots.txt' % (url, self.user_agent))
                          html = None;
    
                  if html:
                      self.crawl_over_queue.append(url);
    
                  pipeline_item = self.spider.pager_back(url, html);
                  # 判断是否有增加请求url
        print(pipeline_item)
                  if self.pipline and pipeline_item and pipeline_item['item']:
                      self.pipline.piplineData(pipeline_item['item']);
    
                  if pipeline_item['url']:
                      for r in pipeline_item['url']:
                          if not r in self.crawl_over_queue and r not in self.crawl_queue:
                              self.crawl_queue.append(r);
    
                              # 解析robots.txt文件
    
        def get_robots(self, url):
    
              """Initialize robots parser for this domain
       """  (proto, rest) = parse.splittype(url)
              # 获取域名res
        res, rest = parse.splithost(rest)
              rp = None;
              try:
                  rp = self.rp_cache[res];
              except KeyError as e:
                  Elog.error('key error');
                  pass;
              else:
                  return rp;
    
              rp = RobotFileParser()
              rp.set_url(parse.urljoin(url, '/robots.txt'))
              rp.read()
    
              if self.rp_cache is not None:
                  if rest:
                      self.rp_cache[res] = rp;
                  else:
                      Elog.info('url:%s 解析域名失败' % url);
    
              return rp
  • 默认的管道

      from com.anjie.base.pipeline import BasePipeline;
    
      class DefaultPipeline(BasePipeline):
          def __init__(self):
              super(DefaultPipeline, self).__init__();
              pass;
    
          def piplineData(self, data):
              for v in data:
                  print("<------------------------->")
                  print('\n'.join(['%15s : %s' % item for item in v.__dict__.items()]))
  • 再看下我们调度爬虫的案例,以抓取住房信息为例。

      from com.anjie.base.spider import BaseSpider;
      from com.anjie.spider.house import House;
    
      # Created by zaizai at 2017/9/22
    
      class Spider(BaseSpider):
          def __init__(self):
              super(Spider, self).__init__();
              self.seed_url = ['https://gz.zu.anjuke.com/?from=navigation'];
    
          def pagerProcess(self, page):
              next_link = [];
              list_result = page.xpath('//div[@class="maincontent"]//div[@class="zu-itemmod  "]')
              house_list = [];
              house = None;
              result_list = [];
              for node in list_result:
                  h = House();
                  # print(etree.tostring(node,encoding="utf-8",pretty_print=True,method="html").decode())
       # 抽取  title = node.xpath('.//div[@class="zu-info"]/h3/a/text()');
                  h.title = title;
                  # 抽取链接
        url = node.xpath('.//div[@class="zu-info"]/h3/a/@href')
                  h.url = url;
                  temp = node.xpath('.//div[@class="zu-info"]/p[1]/text()')
                  (house_type, sale_type, level, floor_number) = temp;
                  h.house_type = house_type;
                  h.sale_type = sale_type;
                  h.level = level;
                  h.floor_number = floor_number;
                  # 抽取地址
        area_name = node.xpath('.//div[@class="zu-info"]/address/a/text()')
                  h.area_name = area_name;
                  area = node.xpath('.//div[@class="zu-info"]/address/text()')
                  for ele in area:
                      if len(ele.strip()) > 0:
                          area = ele.strip();
                          break;
                  h.addr = area;
                  # 抽取联系人
        user = node.xpath('.//div[@class="zu-info"]/p[2]/span/text()')
                  h.user = user;
                  supplement = node.xpath('.//div[@class="zu-info"]/p[2]/em/text()')
                  h.supplement = supplement;
    
                  # 获取价格
        price = node.xpath('.//div[@class="zu-side"]//strong/text()')
    
                  h.price = price;
                  unit = node.xpath('.//div[@class="zu-side"]/p/text()')
                  h.unit = unit;
                  result_list.append(h);
    
              links = page.xpath('//*[@class="multi-page"]/a/@href');
              # 利用集合过滤重复的链接
        s = set(links)
              next_link = list(s);
              print('抽取的链接:%s' % next_link)
              self.pipeline_item = result_list;
              self.addRequest_urls(next_link);
  • 如此便完成了单机单线程版的爬虫架构。

  • 虽然是但鸡蛋线程,但是通过这样子设计,后续扩展为多进程多线程,甚至分布式。

  • 例如,我们的spider可以是多个的,根据spider名字对应指定的pipeline,download、scheduler可以是公用的。为实现改需求,我们改动的地方主要的engine就可以了。

  • 目前还在摸索当中,错漏之处还望指出。


   转载规则

《EasySpider爬虫架构设计练气版字》GajAngels 采用 知识共享署名-非商业性使用 4.0 国际许可协议 进行许可。