爬虫笔记:提高数据采集效率!代理池和线程池的使用
前言
爬虫和反爬虫是一对矛和盾,反爬虫很常见的一个方法就是封IP,一个IP短时间内频繁访问,可以做限流或者是加入黑名单,我之前的后台开发相关博客也有涉及这一块。
不过今天说的是爬虫,所以应对的方法就是用代理池,每次请求都用不同的IP就行,再加上UA模拟,完全是正常用户的行为,可以避开限流和黑名单反爬。
然后爬虫是一种IO密集型程序,如果全程单线程执行那会很慢,因此可以用多线程来提高数据采集效率,不过自己管理多线程太麻烦,所以我选择了线程池~
代理池
一个完善的代理池,应该可以实现以下功能
- 批量采集代理(或者通过接口导入我们购买的代理,不过偶尔用一用还是免费的就好)
- 采集到之后自动验证代理有效性
- 将有效代理存储起来
- 提供获取随机代理的接口
- 提供管理(删除、增加)代理的接口
自己造轮子太麻烦了,用Python的初衷不就是”人生苦短,我用Python“吗,并且社区也没让我们失望,开源好用的Python代理池项目有很多,这里我选了一个在GitHub上有14k+ Stars的项目来用,名字叫ProxyPool
。
经过试用还不错!
当然还有其他很多线程池项目,我没测试,有兴趣的同学可以看看参考资料的第一个链接。
部署运行
项目地址:https://github.com/jhao104/proxy_pool
官方文档提供了两种部署方式,包括下载代码运行和docker,既然有docker那肯定选最方便的docker啦!
不过官方的docker命令还不够方便,因为这个代理池还需要依赖Redis服务,这里我写了一个docker-compose配置来用:
version: "3"
services:
redis:
image: redis
expose:
- 6379
web:
restart: always
image: jhao104/proxy_pool
environment:
- DB_CONN=redis://redis:6379/0
ports:
- "5010:5010"
depends_on:
- redis
找个文件夹保存一下,然后执行命令启动docker容器
docker-compose up
这里我配置的端口是5010跟官网一样,有需要的同学可以自己修改~
项目启动起来之后,浏览器访问http://127.0.0.1:5010
,可以得到所有接口,各个接口顾名思义很容易理解。
{
"url": [
{
"desc": "get a proxy",
"params": "type: ''https'|''",
"url": "/get"
},
{
"desc": "get and delete a proxy",
"params": "",
"url": "/pop"
},
{
"desc": "delete an unable proxy",
"params": "proxy: 'e.g. 127.0.0.1:8080'",
"url": "/delete"
},
{
"desc": "get all proxy from proxy pool",
"params": "type: ''https'|''",
"url": "/all"
},
{
"desc": "return proxy count",
"params": "",
"url": "/count"
}
]
}
代码中使用
由于这个代理池提供了HTTP接口,理论上可以支持任何语言使用
这里我用Python来写
获取随机代理
这里我写了两个方法,封装了获取随机代理和删除代理的操作
import requests
PROXY_POOL_URL = 'http://127.0.0.1:5010'
def get_proxy():
proxy = requests.get(f"{PROXY_POOL_URL}/get/").json().get("proxy")
return {'http': proxy, 'https': proxy}
def delete_proxy(proxy):
requests.get(f"{PROXY_POOL_URL}/delete/?proxy={proxy}")
获取随机Header
使用fake_useragent
这个库来生成随机的UserAgent
,模拟不同的用户浏览器请求
from fake_useragent import UserAgent
def get_header():
return {
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,th;q=0.6",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Pragma": "no-cache",
"User-Agent": ua.random
}
网络请求封装
因为我们没有买收费代理,所以使用的是代理池自动采集的免费代理,众所周知免费代理的质量不好保证,所以我写了重试功能,失败次数超过最大重试次数之后就删除这个代理,换个代理重新来~
最大重试次数可以配置MAX_RETRY_COUNT
变量
MAX_RETRY_COUNT = 5
def request_get(url) -> Tuple[Response, str]:
retry_count = 1
proxy = get_proxy()
while retry_count <= MAX_RETRY_COUNT:
logger.debug(f'第{retry_count}次请求 - 网址 {url} - 代理 {proxy.get("http")}')
try:
resp = requests.get(url, proxies=proxy, headers=get_header(), timeout=15)
return resp, proxy.get('http')
except Exception:
logger.error(f'请求失败 - 网址 {url}')
retry_count += 1
# 删除代理池中代理
logger.warning(f'全部{MAX_RETRY_COUNT}次请求都失败 - 删除代理 {proxy.get("http")}')
delete_proxy(proxy.get('http'))
return request_get(url)
这个函数返回的是一个(Response, str)
类型的元组,考虑到不同请求拿到的数据格式可能不一样,所以没有用resp.json()
或者resp.text
形式,可以调用这个函数拿到数据后自行处理。
同时还会返回一个str
类型的代理服务器地址,是ip:port
形式。
调用方法就是这种形式:resp, proxy = request_get(url)
因为我封装的这个request_get
函数只是最基础的获取数据,但拿到的数据不一定是正确可用的,比如触发了限流或者黑名单,拿到的数据就是空的,这时候在调用这个函数拿到数据后可以加一次判断,假如这个代理IP已经被封禁了,可以调用delete_proxy
方法删除该代理。
线程池
爬虫是一种IO密集型程序,如果全程单线程执行那会很慢,因此可以用多线程来提高数据采集效率,不过自己管理多线程太麻烦,所以我选择了线程池~
线程池是一组预先实例化的空闲线程,准备好接受工作。为每个要异步执行的任务创建一个新的线程对象是很昂贵的。使用线程池,你可以将任务添加到任务队列,线程池为任务分配一个可用线程。线程池有助于避免创建或销毁不必要的线程。
之前我用过threadpool
这个pip包实现线程池,感觉还不错,但是拿来爬虫有几率出现不明原因的假死,不知道哪里出问题了,后面看网上资料说这个threadpool
更适合CPU密集形的操作…
PS:我看了
threadpool
的源码实现,牛哇421行代码就实现了线程池的功能~然后他是基于
threading
模块实现的,可以的
这次我改用Python标准库自带的线程池实现,事实上,Python里有两种“池”
multiprocessing.Pool
multiprocessing.pool.Threadpool
这两种的异同:
multiprocessing.pool.ThreadPool
的行为方式与 multiprocessing.Pool
相同。不同之处在于 multiprocessing.pool.Threadpool
使用线程来运行 worker 的逻辑,而 multiprocessing.Pool
使用工作进程。
但这俩我暂时也不用,因为有更好的选择。
从Python3.2
开始,标准库为我们提供了concurrent.futures
模块,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
两个类,实现了对threading
和multiprocessing
的进一步抽象(这里主要关注线程池),不仅可以帮我们自动调度线程,还可以做到:
- 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
- 当一个线程完成的时候,主线程能够立即知道。
- 让多线程和多进程的编码接口一致。
所以来看看代码吧
代码
简单用法
def crawl_data(page):
...
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
pool = ThreadPoolExecutor(8)
logger.info('线程池启动')
tasks = [pool.submit(crawl_data, page) for page in range(1, 100)]
wait(tasks, return_when=ALL_COMPLETED)
logger.info('线程池结束')
上面代码解析:
crawl_data
函数是爬虫函数,具体代码省略ThreadPoolExecutor(8)
表示创建线程池,同时8个线程并行- 然后用列表生成器,
pool.submit
方法用来把任务添加到线程池 wait
函数用来等待线程池执行结束。
除了pool.submit
方法之外,还支持map
方法批量添加任务
使用方法如下:
pool = ThreadPoolExecutor(8)
pool.map(crawl_data, range(1,100))
map
方法的第二个参数是要传给任务的参数列表,所以就是列表里有多少个参数,就创建多少个任务~
经过测试非常稳,哈哈哈,还是标准库的东西好用~
参考资料
- https://suyin-blog.club/2021/2G4HXBY/#proxy-pool-推荐
- python threadpool 的前世今生:https://zhangchenchen.github.io/2017/05/18/python-thread-pool/
- https://www.delftstack.com/zh/howto/python/python-threadpool-differences/
- [python] ThreadPoolExecutor线程池:https://www.jianshu.com/p/b9b3d66aa0be