爬虫笔记:提高数据采集效率!代理池和线程池的使用


前言

爬虫和反爬虫是一对矛和盾,反爬虫很常见的一个方法就是封IP,一个IP短时间内频繁访问,可以做限流或者是加入黑名单,我之前的后台开发相关博客也有涉及这一块。

不过今天说的是爬虫,所以应对的方法就是用代理池,每次请求都用不同的IP就行,再加上UA模拟,完全是正常用户的行为,可以避开限流和黑名单反爬。

然后爬虫是一种IO密集型程序,如果全程单线程执行那会很慢,因此可以用多线程来提高数据采集效率,不过自己管理多线程太麻烦,所以我选择了线程池~

代理池

一个完善的代理池,应该可以实现以下功能

  • 批量采集代理(或者通过接口导入我们购买的代理,不过偶尔用一用还是免费的就好)
  • 采集到之后自动验证代理有效性
  • 将有效代理存储起来
  • 提供获取随机代理的接口
  • 提供管理(删除、增加)代理的接口

自己造轮子太麻烦了,用Python的初衷不就是”人生苦短,我用Python“吗,并且社区也没让我们失望,开源好用的Python代理池项目有很多,这里我选了一个在GitHub上有14k+ Stars的项目来用,名字叫ProxyPool

经过试用还不错!

当然还有其他很多线程池项目,我没测试,有兴趣的同学可以看看参考资料的第一个链接。

部署运行

项目地址:https://github.com/jhao104/proxy_pool

官方文档提供了两种部署方式,包括下载代码运行和docker,既然有docker那肯定选最方便的docker啦!

不过官方的docker命令还不够方便,因为这个代理池还需要依赖Redis服务,这里我写了一个docker-compose配置来用:

version: "3"
services:
  redis:
    image: redis
    expose:
      - 6379

  web:
    restart: always
    image: jhao104/proxy_pool
    environment:
      - DB_CONN=redis://redis:6379/0
    ports:
      - "5010:5010"
    depends_on:
      - redis

找个文件夹保存一下,然后执行命令启动docker容器

docker-compose up

这里我配置的端口是5010跟官网一样,有需要的同学可以自己修改~

项目启动起来之后,浏览器访问http://127.0.0.1:5010,可以得到所有接口,各个接口顾名思义很容易理解。

{
  "url": [
    {
      "desc": "get a proxy",
      "params": "type: ''https'|''",
      "url": "/get"
    },
    {
      "desc": "get and delete a proxy",
      "params": "",
      "url": "/pop"
    },
    {
      "desc": "delete an unable proxy",
      "params": "proxy: 'e.g. 127.0.0.1:8080'",
      "url": "/delete"
    },
    {
      "desc": "get all proxy from proxy pool",
      "params": "type: ''https'|''",
      "url": "/all"
    },
    {
      "desc": "return proxy count",
      "params": "",
      "url": "/count"
    }
  ]
}

代码中使用

由于这个代理池提供了HTTP接口,理论上可以支持任何语言使用

这里我用Python来写

获取随机代理

这里我写了两个方法,封装了获取随机代理和删除代理的操作

import requests

PROXY_POOL_URL = 'http://127.0.0.1:5010'

def get_proxy():
    proxy = requests.get(f"{PROXY_POOL_URL}/get/").json().get("proxy")
    return {'http': proxy, 'https': proxy}

def delete_proxy(proxy):
    requests.get(f"{PROXY_POOL_URL}/delete/?proxy={proxy}")

获取随机Header

使用fake_useragent这个库来生成随机的UserAgent,模拟不同的用户浏览器请求

from fake_useragent import UserAgent

def get_header():
    return {
        "Accept": "application/json, text/plain, */*",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,th;q=0.6",
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "Pragma": "no-cache",
        "User-Agent": ua.random
    }

网络请求封装

因为我们没有买收费代理,所以使用的是代理池自动采集的免费代理,众所周知免费代理的质量不好保证,所以我写了重试功能,失败次数超过最大重试次数之后就删除这个代理,换个代理重新来~

最大重试次数可以配置MAX_RETRY_COUNT变量

MAX_RETRY_COUNT = 5

def request_get(url) -> Tuple[Response, str]:
    retry_count = 1
    proxy = get_proxy()
    while retry_count <= MAX_RETRY_COUNT:
        logger.debug(f'第{retry_count}次请求 - 网址 {url} - 代理 {proxy.get("http")}')
        try:
            resp = requests.get(url, proxies=proxy, headers=get_header(), timeout=15)
            return resp, proxy.get('http')
        except Exception:
            logger.error(f'请求失败 - 网址 {url}')
            retry_count += 1
    # 删除代理池中代理
    logger.warning(f'全部{MAX_RETRY_COUNT}次请求都失败 - 删除代理 {proxy.get("http")}')
    delete_proxy(proxy.get('http'))
    return request_get(url)

这个函数返回的是一个(Response, str)类型的元组,考虑到不同请求拿到的数据格式可能不一样,所以没有用resp.json()或者resp.text形式,可以调用这个函数拿到数据后自行处理。

同时还会返回一个str类型的代理服务器地址,是ip:port形式。

调用方法就是这种形式:resp, proxy = request_get(url)

因为我封装的这个request_get函数只是最基础的获取数据,但拿到的数据不一定是正确可用的,比如触发了限流或者黑名单,拿到的数据就是空的,这时候在调用这个函数拿到数据后可以加一次判断,假如这个代理IP已经被封禁了,可以调用delete_proxy方法删除该代理。

线程池

爬虫是一种IO密集型程序,如果全程单线程执行那会很慢,因此可以用多线程来提高数据采集效率,不过自己管理多线程太麻烦,所以我选择了线程池~

线程池是一组预先实例化的空闲线程,准备好接受工作。为每个要异步执行的任务创建一个新的线程对象是很昂贵的。使用线程池,你可以将任务添加到任务队列,线程池为任务分配一个可用线程。线程池有助于避免创建或销毁不必要的线程。

之前我用过threadpool这个pip包实现线程池,感觉还不错,但是拿来爬虫有几率出现不明原因的假死,不知道哪里出问题了,后面看网上资料说这个threadpool更适合CPU密集形的操作…

PS:我看了threadpool的源码实现,牛哇421行代码就实现了线程池的功能~

然后他是基于threading模块实现的,可以的

这次我改用Python标准库自带的线程池实现,事实上,Python里有两种“池”

  • multiprocessing.Pool
  • multiprocessing.pool.Threadpool

这两种的异同:

multiprocessing.pool.ThreadPool 的行为方式与 multiprocessing.Pool 相同。不同之处在于 multiprocessing.pool.Threadpool 使用线程来运行 worker 的逻辑,而 multiprocessing.Pool 使用工作进程。

但这俩我暂时也不用,因为有更好的选择。

Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的进一步抽象(这里主要关注线程池),不仅可以帮我们自动调度线程,还可以做到:

  1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  2. 当一个线程完成的时候,主线程能够立即知道。
  3. 让多线程和多进程的编码接口一致。

所以来看看代码吧

代码

简单用法

def crawl_data(page):
    ...

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
pool = ThreadPoolExecutor(8)
logger.info('线程池启动')
tasks = [pool.submit(crawl_data, page) for page in range(1, 100)]
wait(tasks, return_when=ALL_COMPLETED)
logger.info('线程池结束')

上面代码解析:

  • crawl_data函数是爬虫函数,具体代码省略
  • ThreadPoolExecutor(8)表示创建线程池,同时8个线程并行
  • 然后用列表生成器,pool.submit方法用来把任务添加到线程池
  • wait函数用来等待线程池执行结束。

除了pool.submit方法之外,还支持map方法批量添加任务

使用方法如下:

pool = ThreadPoolExecutor(8)
pool.map(crawl_data, range(1,100))

map方法的第二个参数是要传给任务的参数列表,所以就是列表里有多少个参数,就创建多少个任务~

经过测试非常稳,哈哈哈,还是标准库的东西好用~

参考资料

  • https://suyin-blog.club/2021/2G4HXBY/#proxy-pool-推荐
  • python threadpool 的前世今生:https://zhangchenchen.github.io/2017/05/18/python-thread-pool/
  • https://www.delftstack.com/zh/howto/python/python-threadpool-differences/
  • [python] ThreadPoolExecutor线程池:https://www.jianshu.com/p/b9b3d66aa0be