天天看点

scrapy分布式调度源码及其实现过程

scrapy_redis.scheduler取代了scrapy自带的scheduler调度,scheduler实现队列、url去重、Request管理的功能, 负责调度各个spider的request请求,scheduler初始化时,通过settings文件读取queue和dupefilters的类型(一般就用上边默认的),配置queue和dupefilters使用的key(一般就是spider name加上queue或者dupefilters,这样对于同一种spider的不同实例,就会使用相同的数据块了)

scrapy_redis.scheduler源码如下:

import importlib

import six


from scrapy.utils.misc import load_object


from . import connection, defaults


# TODO: add SCRAPY_JOB support.
class Scheduler(object):

    """Redis-based scheduler


    Settings

    --------

    SCHEDULER_PERSIST : bool (default: False)

        Whether to persist or clear redis queue.

    SCHEDULER_FLUSH_ON_START : bool (default: False)

        Whether to flush redis queue on start.

    SCHEDULER_IDLE_BEFORE_CLOSE : int (default: 0)

        How many seconds to wait before closing if no message is received.

    SCHEDULER_QUEUE_KEY : str

        Scheduler redis key.

    SCHEDULER_QUEUE_CLASS : str

        Scheduler queue class.

    SCHEDULER_DUPEFILTER_KEY : str

        Scheduler dupefilter redis key.

    SCHEDULER_DUPEFILTER_CLASS : str

        Scheduler dupefilter class.

    SCHEDULER_SERIALIZER : str

        Scheduler serializer.


    """


    def __init__(self, server,

                 persist=False,

                 flush_on_start=False,

                 queue_key=defaults.SCHEDULER_QUEUE_KEY,

                 queue_cls=defaults.SCHEDULER_QUEUE_CLASS,

                 dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,

                 dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,

                 idle_before_close=0,

                 serializer=None):

        """Initialize scheduler.


        Parameters

        ----------

        server : Redis

            The redis server instance.

        persist : bool

            Whether to flush requests when closing. Default is False.

        flush_on_start : bool

            Whether to flush requests on start. Default is False.

        queue_key : str

            Requests queue key.

        queue_cls : str

            Importable path to the queue class.

        dupefilter_key : str

            Duplicates filter key.

        dupefilter_cls : str

            Importable path to the dupefilter class.

        idle_before_close : int

            Timeout before giving up.


        """

        if idle_before_close < 0:

            raise TypeError("idle_before_close cannot be negative")


        self.server = server

        self.persist = persist

        self.flush_on_start = flush_on_start

        self.queue_key = queue_key

        self.queue_cls = queue_cls

        self.dupefilter_cls = dupefilter_cls

        self.dupefilter_key = dupefilter_key

        self.idle_before_close = idle_before_close

        self.serializer = serializer

        self.stats = None


    def __len__(self):

        return len(self.queue)


    @classmethod

    def from_settings(cls, settings):

        kwargs = {

            'persist': settings.getbool('SCHEDULER_PERSIST'),

            'flush_on_start': settings.getbool('SCHEDULER_FLUSH_ON_START'),

            'idle_before_close': settings.getint('SCHEDULER_IDLE_BEFORE_CLOSE'),

        }


        # If these values are missing, it means we want to use the defaults.

        optional = {

            # TODO: Use custom prefixes for this settings to note that are

            # specific to scrapy-redis.

            'queue_key': 'SCHEDULER_QUEUE_KEY',

            'queue_cls': 'SCHEDULER_QUEUE_CLASS',

            'dupefilter_key': 'SCHEDULER_DUPEFILTER_KEY',

            # We use the default setting name to keep compatibility.

            'dupefilter_cls': 'DUPEFILTER_CLASS',

            'serializer': 'SCHEDULER_SERIALIZER',

        }

        for name, setting_name in optional.items():

            val = settings.get(setting_name)

            if val:

                kwargs[name] = val


        # Support serializer as a path to a module.

        if isinstance(kwargs.get('serializer'), six.string_types):

            kwargs['serializer'] = importlib.import_module(kwargs['serializer'])


        server = connection.from_settings(settings)

        # Ensure the connection is working.

        server.ping()


        return cls(server=server, **kwargs)


    @classmethod

    def from_crawler(cls, crawler):

        instance = cls.from_settings(crawler.settings)

        # FIXME: for now, stats are only supported from this constructor

        instance.stats = crawler.stats

        return instance


    def open(self, spider):

        self.spider = spider


        try:

            self.queue = load_object(self.queue_cls)(

                server=self.server,

                spider=spider,

                key=self.queue_key % {'spider': spider.name},

                serializer=self.serializer,

            )

        except TypeError as e:

            raise ValueError("Failed to instantiate queue class '%s': %s",

                             self.queue_cls, e)


        try:

            self.df = load_object(self.dupefilter_cls)(

                server=self.server,

                key=self.dupefilter_key % {'spider': spider.name},

                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),

            )

        except TypeError as e:

            raise ValueError("Failed to instantiate dupefilter class '%s': %s",

                             self.dupefilter_cls, e)


        if self.flush_on_start:

            self.flush()

        # notice if there are requests already in the queue to resume the crawl

        if len(self.queue):

            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))


    def close(self, reason):

        if not self.persist:

            self.flush()


    def flush(self):

        self.df.clear()

        self.queue.clear()


    def enqueue_request(self, request):

        if not request.dont_filter and self.df.request_seen(request):

            self.df.log(request, self.spider)

            return False

        if self.stats:

            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)

        self.queue.push(request)

        return True


    def next_request(self):

        block_pop_timeout = self.idle_before_close

        request = self.queue.pop(block_pop_timeout)

        if request and self.stats:

            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)

        return request


    def has_pending_requests(self):

        return len(self) > 0
           

在爬虫开始运行时,读取配置文件配置,并创建Request队列对象和Request的url去重对象。

def open(self, spider):

        self.spider = spider


        try:

            self.queue = load_object(self.queue_cls)(

                server=self.server,

                spider=spider,

                key=self.queue_key % {'spider': spider.name},

                serializer=self.serializer,

            )

        except TypeError as e:

            raise ValueError("Failed to instantiate queue class '%s': %s",

                             self.queue_cls, e)


        try:

            self.df = load_object(self.dupefilter_cls)(

                server=self.server,

                key=self.dupefilter_key % {'spider': spider.name},

                debug=spider.settings.getbool('DUPEFILTER_DEBUG'),

            )

        except TypeError as e:

            raise ValueError("Failed to instantiate dupefilter class '%s': %s",

                             self.dupefilter_cls, e)


        if self.flush_on_start:

            self.flush()

        # notice if there are requests already in the queue to resume the crawl

        if len(self.queue):

            spider.log("Resuming crawl (%d requests scheduled)" % len(self.queue))
           

调度器实现了两个重要的方法,一个是入队列一个是出队列;当一个Request提交给调度器后,会根据是启用重复下载来去重,然后放入队列中:

    def enqueue_request(self, request):

        if not request.dont_filter and self.df.request_seen(request):

            self.df.log(request, self.spider)

            return False

        if self.stats:

            self.stats.inc_value('scheduler/enqueued/redis', spider=self.spider)

        self.queue.push(request)

        return True
           

另一个重要方法是出队列next_request,当调度完成时,next_request被调用,scheduler就通过queue容器的接口,取出一个request,把他发送给相应的spider,让spider进行爬取工作。

    def next_request(self):

        block_pop_timeout = self.idle_before_close

        request = self.queue.pop(block_pop_timeout)

        if request and self.stats:

            self.stats.inc_value('scheduler/dequeued/redis', spider=self.spider)

        return request
           

当爬虫关闭时,会根据persist参数来决定是否清空队列,该参数说明:persist : bool    Whether to flush requests when closing. Default is False.默认是False,但往往在配置文件中配置SCHEDULER_PERSIST = True(不清空)

    def close(self, reason):

        if not self.persist:

            self.flush()


    def flush(self):

        self.df.clear()

        self.queue.clear()