天天看点

scrapy 使用postgres异步存储代码

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html

from twisted.internet import reactor
from twisted.python import log

from txpostgres import txpostgres
import time


class Jianshu2Pipeline(object):

    def __init__(self):
        self.conn = txpostgres.Connection()
        parm = dict(
            database="test",
            user="postgres",
            password="postgres",
            host="localhost",
            port="5432"
        )
        self.d = self.conn.connect(**parm)

    @property
    def _sql(self):
        return "insert into  jianshu(title, url) values (%s,%s);"

    def process_item(self, item, spider):
        # run the interaction, making sure that if the insert fails, the table won't be
        # left behind created but empty
        tuple_parm = (item.get('title')[0], item.get('url'))
        # print('item:{}'.format(item))
        # print('parm:{}'.format(tuple_parm))
        # print('call process_item')
        self.d.addCallback(lambda _: self.conn.runInteraction(self.interaction, self._sql, tuple_parm))

        # close the connection, log any errors and stop the reactor
        return item

    def interaction(self, cur, sql, tuple_parm):
        """
        A callable that will execute inside a transaction.
        """
        # the parameter is a txpostgres Cursor

        self.d = cur.execute(sql, tuple_parm)
        # self.d.addCallback(lambda _: cur.execute('insert into test values (%s)', (1,)))
        # self.d.commit()
        print('call interaction')
        # yield self.d
        time.sleep(1)
        return self.d

    def close_spider(self, spider):
        print('call close_spider')
        # self.d.addCallback(lambda _: self.conn.close())
        # self.d.addErrback(log.err)
        # self.d.addBoth(lambda _: reactor.stop())
        pass
        # spider (Spider 对象) – 被关闭的spider
        # 可选实现,当spider被关闭时,这个方法被调用