不想看的可直接到 实践 部分拿源码。

本文将以 Python 3.9 环境下进行爬虫实践。

此文顺便作为 Python 爬虫入门教程,主要介绍 Python 爬虫的原理、使用方法、实践。

爬虫简介

爬虫(Web Crawler)是一种按照一定的规则,自动地抓取互联网信息的程序或者脚本。它是一种高效的网络数据采集工具,可以用来获取大量的有用信息。

前言

Python 是一种高级的、跨平台的、解释型的编程语言,它有着丰富的网络爬虫库,可以用来进行网络数据采集,而且便于书写和阅读,十分方便。

本文将以 Python 3.9 环境下进行爬虫实践。

前置

Python 3.9 安装

此处不再赘述,请自行安装 Python 3.9 环境,可以在网络上搜索相关教程。

安装第三方库

本文使用到的第三方库有:

  • requests:用于发送 HTTP 请求
  • BeautifulSoup4:用于解析 HTML 文档

pip 安装:

1
2
pip install requests
pip install beautifulsoup4

爬虫原理

爬虫的原理是模拟浏览器行为,向服务器发送 HTTP 请求,获取网页内容,然后分析网页内容,提取有效信息。

HTTP 请求有多种,常用的有两种 GET 和 POST。

  • GET 请求:浏览器地址栏输入网址,按下回车后,浏览器会向服务器发送 GET 请求,获取网页内容。简单来说,就是获取信息
  • POST 请求:表单提交,浏览器会向服务器发送 POST 请求,提交表单数据。简单来说,就是提交信息

本文仅介绍 GET 请求的爬虫实践。

入门

requests 库

requests 库是 Python 中用于发送 HTTP 请求的库,可以发送 GET 请求。

1
2
3
4
5
6
7
8
9
import requests

url = 'https://example.com'

response = requests.get(url = url) # 发送 GET 请求

print(response.status_code) # 打印响应状态码

print(response.text) # 打印响应内容

requests.get() 方法

requests.get() 方法返回一个 Response 对象,包含了服务器响应的状态码、内容等信息,下面是部分常用参数说明。

参数接受类型说明
urlstr请求的 URL 地址
headersdict请求头信息
timeoutint超时时间(秒,超时后停止)

headers 参数可以设置请求头信息,就是浏览器的唯一身份标识,比如 User-Agent、Cookie 等,可以通过 https://tool.ip138.com/useragent/ 获取 User-Agent 值,一般也只需要 User-Agent 即可。

设置方式:

1
2
3
4
5
headers = {
'User-Agent': 'sample user agent'
}
url = 'https://example.com'
response = requests.get(url = url, headers = headers)

响应状态码

响应状态码(status code)是 HTTP 协议中用于表示请求状态的三位数字代码,常见的有:

  • 2xx: 成功
  • 3xx: 重定向
  • 4xx: 客户端错误
  • 5xx: 服务器错误

响应内容

响应内容(response content)是服务器返回给客户端的内容,可以是 HTML 文档、JSON 数据、图片、视频等。

response.text 属性可以获取响应内容的文本形式,response.content 属性可以获取响应内容的字节形式。

BeautifulSoup4 库

BeautifulSoup4 库是 Python 中用于解析 HTML 文档的库,可以提取 HTML 文档中的有效信息。

简单示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from bs4 import BeautifulSoup

html = '''
<html>
<head>
<title>Example</title>
</head>
<body>
<h1>Hello World</h1>
<p>This is a paragraph.</p>
</body>
</html>
'''

soup = BeautifulSoup(html, 'html.parser') # 解析 HTML 文档

print(soup.title) # 打印 title 标签内容
print(soup.body) # 打印 body 标签内容
print(soup.body.h1.text) # 打印 h1 标签内容

html 文档结构

先介绍一下 html 文档的结构:

1
2
3
4
5
6
7
8
9
<html>
<head>
<title>Example</title>
</head>
<body>
<h1>Hello World</h1>
<p>This is a paragraph.</p>
</body>
</html>
  • <html> 标签是 HTML 文档的根标签,包含了整个文档的结构。
  • <head> 标签包含了文档的元信息,比如标题、关键字、描述、作者、脚本等。
  • <body> 标签包含了文档的主要内容,比如文本、图片、视频、表格等。
  • <title> 标签包含了文档的标题。

对于一个 html 元素,比如 <h1>Hello World</h1>,它的父标签是 <body>,它的兄弟标签是 <p>This is a paragraph.</p>,他的内容是 "Hello World"。

一个 html 元素能有一些属性:

1
2
3
4
5
<a href="https://example.com">Example</a> <!-- 超链接 -->
<img src="https://example.com/image.jpg" alt="Example Image"> <!-- 图片 -->
<input type="text" name="username" value="example"> <!-- 输入框 -->
<p class="example">This is a paragraph.</p> <!-- 类名为 "example" 的段落 -->
<p id="example">This is a paragraph.</p> <!-- id 为 "example" 的段落 -->

解析方法

下面是一些简单的解析方法:

方法说明
find('tag')查找第一个匹配的标签
find(name = "example")查找第一个 name 属性值为 "example" 的标签
find_all('tag')查找所有匹配的标签
find_all(name = "example")查找所有 name 属性值为 "example" 的标签
find_parent('tag')查找 tag 标签的父标签
find_next_sibling()查找下一个兄弟标签
find_previous_sibling()查找上一个兄弟标签

name = "example" 中,因为 class 是 Python 的关键字,所以用 _class 替代。

示例

下面是一个简单的爬虫示例,爬取 https://www.python.org 首页的标题、链接、描述、图片等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests
from bs4 import BeautifulSoup

url = 'https://www.python.org'

response = requests.get(url = url)

soup = BeautifulSoup(response.text, 'html.parser')

title = soup.find('h1', class_ = 'page-title').text.strip() # 标题
description = soup.find('meta', attrs = {'name': 'description'})['content'] # 描述
image = soup.find('img', class_ = 'logo')['src'] # 图片
links = [a['href'] for a in soup.find_all('a', class_ = 'headerlink')] # 链接

print('标题:', title)
print('描述:', description)
print('图片:', image)
print('链接:', links)

输出结果:

1
2
3
4
标题: Welcome to Python.org
描述: The official home of the Python Programming Language
图片: https://www.python.org/static/img/python-logo@2x.png
链接: ['https://www.python.org/downloads/', 'https://www.python.org/downloads/source/', 'https://www.python.org/downloads/windows/', 'https://www.python.org/downloads/mac-osx/', 'https://www.python.org/downloads/linux/', 'https://www.python.org/doc/', 'https://www.python.org/doc/essays/', 'https://www.python.org/doc/essays/blurb/', 'https://www.python.org/doc/essays/ppt/', 'https://www.python.org/community/', 'https://www.python.org/community/sigs/', 'https://www.python.org/community/lists/', 'https://www.python.org/community/awards/', 'https://www.python.org/psf/', 'https://www.python.org/psf/records/', 'https://www.python.org/jobs/', 'https://www.python.org/events/', 'https://www.python.org/events/python-events/', 'https://www.python.org/events/python-user-group/', 'https://www.python.org/events/python-events/past/', 'https://www.python.org/events/python-events/upcoming/', 'https://www.python.org/events/python-events/calendar/', 'https://www.python.org/events/python-events/python-events-calendar/', 'https://www.python.org/events/python-events/python-events-calendar/']

进阶

有时候我们会抱怨,爬虫速度太慢,有没有办法提高爬虫速度,就怪在了 Python 的运行速度上?

实际上,网络爬虫的速度主要取决于网络的连接速率,对于小项目 Python 的速度是足够的。

但是网络的连接速率并不是无限的,所以我们需要考虑如何提高爬虫的速度。

多线程

多线程是提高爬虫速度的一种方法,它可以让多个线程同时运行,同时发送多个网络请求,从而提高爬虫的速度。

Python 的 threading 库提供了多线程的支持,我们可以用它来实现多线程爬虫。

这里建议使用线程池,线程池可以管理线程,可以自动分配线程,减少线程创建和销毁的开销。

需要库:concurrent.futures

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import requests
import concurrent.futures

def get_page(url):
# 获取信息

url = 'https://example.com/'

url_list = [url + str(i) for i in range(1, 101)] # 构造 URL 列表

with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor: # max_workers 线程数
futures = [executor.submit(get_page, url) for url in url_list] # 构造线程池
for future in concurrent.futures.as_completed(futures):
if future.done():
try:
result = future.result() # 函数返回值
except Exception as e:
print("Error: {}".format(e))

浏览器指纹

对于一些网站会增加一些反爬虫机制,浏览器指纹是一种通过分析浏览器特征来判断其是否为正常浏览器的一种方法。

通过浏览器指纹我们可以绕开一些反爬虫机制,比如通过浏览器指纹来判断是否为正常浏览器。

这里使用第三方库 curl_cffi 来获取浏览器指纹。

pip 安装:

1
pip install curl_cffi
1
2
3
4
5
from curl_cffi import requests

url = 'https://example.com'

r = requests.get(url = url, impersonate="chrome101") # impersonate 指定浏览器

实践

这里使用随机图片 API 来获取随机二次元图片。

API 网址大全:https://blog.jixiaob.cn/?post=93

这里以 https://api.lolicon.app/#/setu 为例。

API 介绍

API 是 Application Programming Interface 的缩写,它是一些预先定义的函数,通过这些函数,我们可以访问到一些服务或资源。

一般请求 API 可以添加一些参数。

例如对于 API : example.com/api?key=value,我们可以添加参数 key=value 来指定请求的条件。

对于多个参数,我们可以用 & 连接,例如 example.com/api?key1=value1&key2=value2

爬虫实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import concurrent.futures
import requests
import json
import os

headers = { # 请求头
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
}

json_path = "config.json" # 配置文件路径
out_path = "graph" # 输出文件夹路径
max_flow = 4 # 最大线程数

def load_json():
if os.path.exists(json_path):
with open(json_path, "r", encoding="utf-8") as f:
return json.load(f)
else:
js = {"number": 0, "graph": []}
with open(json_path, "w", encoding="utf-8") as f:
json.dump(js, f, indent=4, ensure_ascii=False)
return js

def get_image(id,url):
print("正在下载图片:id-{} url-{}".format(id,url))
try:
response = requests.get(url, headers=headers)
except:
print("Error: 无法访问图片:id-{} url-{}".format(id,url))
return -1
if response.status_code == 200:
suffix = ""
if url.endswith(".png"):
suffix = ".png"
elif url.endswith(".jpg"):
suffix = ".jpg"
elif url.endswith(".jpeg"):
suffix = ".jpeg"
else:
print("Error: 图片格式不支持:id-{} url-{}".format(id,url))
return -1
print("成功下载图片:id-{} url-{}".format(id,url))
with open(os.path.join(out_path, str(id) + suffix), "wb") as f:
f.write(response.content)
return id
print("Error: 无法下载图片:id-{} url-{}".format(id,url))
return -1

def get_url(cnt):
global js
id = cnt
# https://api.lolicon.app/#/setu
url = "https://api.lolicon.app/setu/v2?num=20"
r = requests.get(url, headers=headers) # 发送请求
data = json.loads(r.text) # 解析json数据
data = data["data"]
url_list = []
image_list = []
for i in data:
image_url = i["urls"]["original"]
f = 1
for j in js["graph"]:
if j["url"] == image_url:
f = 0
break
if f == 0:
continue
id+=1
url_list.append((id,image_url))
image_list.append(i)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_flow) as executor:
futures = [executor.submit(get_image, id, image_url) for id,image_url in url_list]
for future in concurrent.futures.as_completed(futures):
if future.done():
try:
result = future.result()
if result == -1:
continue
js["number"] = max(js["number"], result)
js["graph"].append(image_list[result-cnt-1])
except Exception as e:
print("Error: {}".format(e))
with open(json_path, "w", encoding="utf-8") as f:
json.dump(js, f, indent=4, ensure_ascii=False)

if not os.path.exists(out_path):
os.mkdir(out_path)

js = load_json()

cnt = js["number"]

get_url(cnt)

print("下载完成")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import concurrent.futures
import requests
import json
import os

headers = { # 请求头
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0",
}

json_path = "config.json" # 配置文件路径
out_path = "graph" # 输出文件夹路径
max_flow = 4 # 最大线程数

def load_json():
if os.path.exists(json_path):
with open(json_path, "r", encoding="utf-8") as f:
return json.load(f)
else:
js = {"number": 0, "graph": []}
with open(json_path, "w", encoding="utf-8") as f:
json.dump(js, f, indent=4, ensure_ascii=False)
return js

def get_image(id,url):
print("正在下载图片:id-{} url-{}".format(id,url))
try:
response = requests.get(url, headers=headers)
except:
print("Error: 无法访问图片:id-{} url-{}".format(id,url))
return -1
if response.status_code == 200:
suffix = ""
if url.endswith(".png"):
suffix = ".png"
elif url.endswith(".jpg"):
suffix = ".jpg"
elif url.endswith(".jpeg"):
suffix = ".jpeg"
else:
print("Error: 图片格式不支持:id-{} url-{}".format(id,url))
return -1
print("成功下载图片:id-{} url-{}".format(id,url))
with open(os.path.join(out_path, str(id) + suffix), "wb") as f:
f.write(response.content)
return id
print("Error: 无法下载图片:id-{} url-{}".format(id,url))
return -1

def get_url(cnt):
global js
id = cnt
# https://docs.anosu.top/
url = "https://image.anosu.top/pixiv/json?num=10&r18=0"
r = requests.get(url, headers=headers) # 发送请求
data = json.loads(r.text) # 解析json数据
url_list = []
image_list = []
for i in data:
image_url = i["url"]
f = 1
for j in js["graph"]:
if j["url"] == image_url:
f = 0
break
if f == 0:
continue
id+=1
url_list.append((id,image_url))
image_list.append(i)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_flow) as executor:
futures = [executor.submit(get_image, id, image_url) for id,image_url in url_list]
for future in concurrent.futures.as_completed(futures):
if future.done():
try:
result = future.result()
if result == -1:
continue
js["number"] = max(js["number"], result)
js["graph"].append(image_list[result-cnt-1])
except Exception as e:
print("Error: {}".format(e))
with open(json_path, "w", encoding="utf-8") as f:
json.dump(js, f, indent=4, ensure_ascii=False)

if not os.path.exists(out_path):
os.mkdir(out_path)

js = load_json()

cnt = js["number"]

get_url(cnt)

print("下载完成")