爬虫基础2520-顾安
工具
Python库
发送请求
- curl_cffi 浏览器指纹库
- retrying 重试库
解析数据
lxml 提取html标签
jsonpath
beautifulsoup4
chardet
数据库
pymysql
redis
pymongo
DBUtils
并发
- aiohttp
- aiomysql
- motor --> MongoDB (13 封私信 / 23 条消息) motor,一个好用的 Python 库! - 知乎
爬虫框架
- 自动化
- playwright + playwright install 下载配置
- selenium + 手动下载驱动
- 自动化
加密库
- pyDes
- Crypto (AES) 或者 pycryptodome
03 requests模块
浏览器指纹库:curl_cffi
- python
from curl_cffi import requests
查看历史请求
for resp in response.history: print(resp.url, resp.status_code, resp.request.headers)
忽略SSL证书验证(网站证书过期了,跳过验证,直接前往)
- python
resp = requests.get(url, verify=False)
请求超时
- python
from requests.exceptions import ConnectTimeout try: requests.get(url, timeout=3) # 3s请求时间 except ConnectTimeout: print("timeout...")
请求重试
- python
from retrying import retry try_cnt=1 @retry(stop_max_attempt_number=3) def work(): global try_cnt print(f"try times: {try_cnt}") try_cnt += 1 ... 添加try-catch时需要使用raise手动抛出异常,否则retry接收不到异常信息,无法重试
表单数据用 data 传输,载荷数据用 json 进行传输
session
自动携带cookie
- python
session = requests.Session() resp = session.get(url, headers)
网络代理
http://httpbin.org/ip返回本机 ip 地址,返回json格式的数据快代理的使用
在设置了clash代理之后运行
- python
import requests url = "http://httpbin.org/ip" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36" } proxies = { "http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890" } response = requests.get(url, headers=headers, timeout=10) print(response.json())
04 数据提取
miniconda
配置
shell
# 查看版本
conda --version
# 升级
conda update conda
# 添加镜像源
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/bioconda/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/cloud/bioconda/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/pkgs/main/
# 查看已经添加的channels
conda config --get channels下面这个我没用过, 可以添加一下试试看.
- shell
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/pkgs/r/
目前国内提供conda镜像的大学
- 清华大学: https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/
- 北京外国语大学: https://mirrors.bfsu.edu.cn/help/anaconda/
- 南京邮电大学: https://mirrors.njupt.edu.cn/
- 南京大学: http://mirrors.nju.edu.cn/
- 重庆邮电大学: http://mirror.cqupt.edu.cn/
- 上海交通大学: https://mirror.sjtu.edu.cn/
- 哈尔滨工业大学: http://mirrors.hit.edu.cn/#/home
- (目测哈工大的镜像同步的是最勤最新的)
已添加的channel在哪里查看:
# windows 用户在C:\Users\~\下面 windows 用户无法直接创建 .condarc 文件,需要通过指令 conda config --set show_channel_urls yes 生成该文件,然后可以通过 vim/notepad++ 再修改
指令
shell
# 创建/删除 环境 -n: name
conda create -n env_name python=3.8 # 一定要加python关键字说明是python的虚拟环境
conda remove -n env_name --all # 不加--all只会删除其中的某个库
# 查看当前存在的环境
conda env list
conda info --envs
# 重命名环境
conda create -n env2 --clone env1 # 将 env1 重命名为 env2
# 进入环境
conda activate env_name
# 退出环境
conda deactivate
# 使用pip安装包(pip的包比conda的多)(pip两个“=”,conda一个“=”)
conda activate env_name
conda install numpy=1.93
pip install numpy==1.93
conda deactivate
# 安装/删除 命令
conda install gatk
conda install gatk=3.7
conda install -n env_name gatk
# 安装完成后,可以用“which 软件名”来查看该软件安装的位置:
which gatk
# 查看已安装的库
conda list
conda list -n env_name
# 更新指定库
conda update gatk
conda update --all
# 删除环境中的某个库
conda remove --name env_name gatk卸载
- 清理:rm -rf /opt/anaconda3
- 删除 ~/.bash_profile中anaconda的环境变量
- 删除Anaconda的可能存在隐藏的文件
- rm -rf ~/.condarc ~/.conda ~/.continuum
- 经过以上步骤后,Anaconda 就被彻底删除了。
迁移
- 打包环境
shell
conda pack -n env_name -o environment.tar.gz
# 报错 No command 'conda pack' 尝试
conda install -c conda-forge conda-pack- 复制压缩文件到新的电脑环境
- 进到conda的安装目录:/miniconda/envs/
mkdir environment
# 解压conda环境
tar -xzvf environment.tar.gz -C environmont- 使用
conda env list查看虚拟环境,进入迁移的环境内,通过pip list查看迁移前后包的安装情况
xpath
- 示例
python
from lxml import etree
text = """
<div>
<ul>
<li class="item-0">first item</li>
<li class="item-1"><a href="link2.html">second item</a></li>
<li class="item-0"><a href="link3.html"><span class="bold">third item</span></a></li>
<li class="item-1"><a href="link4.html">fourth item</a></li>
<li class="item-0"><a href="link5.html">fifth item</a></li>
</ul>
</div>
"""
tree = etree.HTML(text)
result = tree.xpath('//li[@class="item-0"]/a/text()')
print(result)语法
@选取属性//取相对路径匹配[index]下标从1开始[condition]支持条件表达式and,or,=- 通配符
*、node() - 内置表达式
last()position()
实战
豆瓣top250
- python
import requests from lxml import etree from pprint import pprint url = "https://movie.douban.com/top250?start=0" headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36" } resp = requests.get(url=url, headers=headers) html_text = resp.text tree = etree.HTML(html_text) film_title = tree.xpath('//div[@class="hd"]//span[@class="title"][1]/text()') film_pic = tree.xpath('//div[@class="pic"]//img/@src') data = [ { "title": title, "pic": pic } for title, pic in zip(film_title, film_pic) ] pprint(data)
05 数据提取
jsonpath
- 参考语法(支持切片)
| 语法规则 | 含义 |
|---|---|
$ | 根节点 |
@ | 现行节点 |
. or [] | 取子节点 |
.. | 不管位置,选择所有符合条件的条件 |
* | 匹配所有元素节点 |
[] | 迭代器标识(可以在里边做简单的迭代操作,如数组下标,根据内容选值等) |
[,] | 支持迭代器中做多选 |
?() | 支持过滤操作 |
() | 支持表达式计算 |
- 计算
[(@.length-1)] - 过滤
[?(@.price<10)]
实战-网易招聘
python
import requests
from jsonpath import jsonpath
url = 'https://hr.163.com/api/hr163/position/queryPage'
json_data = {"currentPage": 1, "pageSize": 10}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36"
}
response = requests.post(url=url, json=json_data, headers=headers)
data = response.json()
name_list = jsonpath(data, '$..name')
requirement_list = jsonpath(data, '$..requirement')
for name, requirement in zip(name_list, requirement_list):
print(f"Job: {name}\nRequirements: \n{requirement}\n{'='*50}")bs4
实战-搜狗微信搜索标题
python
import requests
from bs4 import BeautifulSoup
url = "https://weixin.sogou.com/weixin?_sug_type_=1&type=2&query=jk"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36"
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "lxml")
alist = soup.select(".txt-box a")
for i in alist:
print(i.get_text())06 数据提取
RE正则表达式
- re.split()
- re.sub() (replace)
07 数据存储
csv储存
- 储存列表
python
import csv
headers = ['班级', '姓名', '性别', '手机号', 'QQ']
rows = [
["18级Python", '小王', '男', '13146060xx1', '123456xx1'],
["18级Python", '小李', '男', '13146060xx2', '123456xx2'],
["19级Python", '小赵', '女', '13146060xx3', '123456xx3'],
["19级Python", '小红', '女', '13146060xx4', '123456xx4'],
]
with open("temp.csv", "w+", encoding="utf-8", newline="") as w:
writer = csv.writer(w) # 创建一个csv对象
writer.writerow(headers)
writer.writerows(rows)- 储存字典
python
import csv
rows = [
{
"class_name": "18级Python",
"name": '小王',
"gender": '男',
"phone": '13146060xx1',
"qq": '123456xx1'
},
{
"class_name": "18级Python",
"name": '小李',
"gender": '男',
"phone": '13146060xx2',
"qq": '123456xx2'
},
{
"class_name": "19级Python",
"name": '小赵',
"gender": '女',
"phone": '13146060xx3',
"qq": '123456xx3'
}]
headers = rows[0].keys()
with open("temp.csv", "w+", encoding="utf-8", newline="") as w:
dictWriter = csv.DictWriter(w, headers) # 创建一个DictWriter对象
dictWriter.writeheader()
dictWriter.writerows(rows)- 读取csv
python
import csv
with open('temp.csv', 'r', encoding='utf-8') as r:
reader = csv.reader(r)
for i in reader:
print(i)
with open('temp.csv', 'r', encoding='utf-8') as r:
dictReader = csv.DictReader(r)
for i in dictReader:
print(i)实战
- 腾讯招聘
python
import requests
import pymysql
import time
from pprint import pprint
from dbutils.pooled_db import PooledDB
class TencentHire():
def __init__(self):
self.api_url = ('https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1754363338136&countryId'
'=&cityId=&bgIds=&productId=&categoryId=40001001,40001002,40001003,40001004,40001005,'
'40001006&parentCategoryId=&attrId=1&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn')
self.pool = PooledDB(creator=pymysql, user='root', password='44448888', host='localhost', db='tencent_hire', charset='utf8mb4')
self.connect = self.pool.connection()
self.cursor = self.connect.cursor()
self.max_page = 81
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36'
}
def get_info(self):
for page in range(1, self.max_page + 1):
resp = requests.get(url=self.api_url.format(page), headers=self.headers)
data_list = resp.json()['Data']['Posts']
time.sleep(2)
for item in data_list:
yield item
def create_table(self):
sql = """
CREATE TABLE IF NOT EXISTS infos (
id int primary key auto_increment,
work_name varchar(50),
country varchar(20),
location varchar(20),
require_years varchar(20)
)
"""
try:
self.cursor.execute(sql)
pprint("成功创建表")
except Exception as e:
pprint("创建表失败", e)
def save_info(self, *args):
pprint(args)
sql = 'INSERT INTO infos VALUES (%s, %s, %s, %s, %s)'
try:
self.cursor.execute(sql, args)
self.connect.commit()
except Exception as e:
self.connect.rollback()
pprint("保存数据失败", e)
def close(self):
self.cursor.close()
self.connect.close()
pprint("关闭数据库连接")
def main(self):
pprint("爬虫开始")
self.create_table()
generator = self.get_info()
for item in generator:
_id = None
work_name = item['RecruitPostName']
country = item['CountryName']
location = item['LocationName']
require_years = item['RequireWorkYearsName']
self.save_info(_id, work_name, country, location, require_years)
self.close()
pprint("爬虫结束")
TencentHire().main()Mongodb
- 内部实现自带线程池
示例:
python
import pymongo
from pprint import pprint
mongo_client = pymongo.MongoClient()
db = mongo_client['database-test']['collection-test']
test_dict = {
"name": "Anna",
'Age': 18
}
db.insert_one(test_dict)
pprint(list(db.find()))
test_list = [
{"name": "Anna",'Age': 18},
{"name": "Bob",'Age': 19},
{"name": "Candy",'Age': 20}
]
db.insert_many(test_list)
pprint(list(db.find()))08 数据存储
页面静态化
- 开头的前面几页的数据[1, 2, 3...]是静态数据
爱奇艺抓视频标题
python
import time
import requests
import pymongo
from pprint import pprint
class AiQiYi:
def __init__(self):
self.api_url = 'https://pcw-api.iqiyi.com/search/recommend/list'
self.headers = {
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://list.iqiyi.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://list.iqiyi.com/www/2/15-------------11-1-1-iqiyi--.html?s_source=PCW_SC',
'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Microsoft Edge";v="138"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-site',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0',
}
self.cookies = {
'QC005': 'd673290c5880ff54251b889596f5c945',
'curDeviceState': 'width%3D1652%3BconduitId%3D%3Bscale%3D150%3Bbrightness%3Ddark%3BisLowPerformPC%3D0%3Bos%3Dbrowser%3Bosv%3D10.0.19044',
'T00404': '3f31213172eca49e09491611b4373dfe',
'QP0037': '0',
'QC234': '60186596789f4782adf8ef2d293f9cff',
'QP0042': '{"v":3,"cpu":16,"avc":{"de":2,"wv":1,"pr":1},"hvc":{"de":2,"wv":0,"pr":1},"av1":{"de":2,"wv":0,"pr":0},"av110":{"de":2,"wv":0,"pr":0}}',
'QC006': '91abf3b528b8e87042d7025bc2b18e69',
'QC235': '7dd5c7c135dc4807a5b993df957b59d1',
'QC173': '0',
'QC175': '%7B%22upd%22%3Atrue%2C%22ct%22%3A%22%22%7D',
'Hm_lvt_53b7374a63c37483e5dd97d78d9bb36e': '1754399334',
'Hm_lpvt_53b7374a63c37483e5dd97d78d9bb36e': '1754399334',
'HMACCOUNT': '37620D01B9C40ED9',
'QC198': '70a683895aa01e3bf23492d49ffa1693',
'QC189': '8883_A%2C10385_B%2C10274_A%2C8739_B%2C9419_B%2C9922_C%2C9379_B%2C10276_B%2C11389_C%2C8004_B%2C5257_B%2C10566_D%2C9776_B%2C8873_E%2C10123_B%2C7423_C%2C9082_B%2C8401_A%2C6249_C%2C10793_B%2C7996_B%2C11391_A%2C9576_B%2C10358_B%2C9365_B%2C5465_B%2C6843_B%2C6578_B%2C6312_B%2C6091_B%2C8690_A%2C10992_B%2C8737_D%2C8742_A%2C10193_B%2C10803_C%2C10596_B%2C9484_B%2C6752_C%2C10188_A%2C8971_A%2C7332_B%2C9683_B%2C10383_B%2C11402_A%2C8665_D%2C11642_D%2C6237_A%2C9569_B%2C11004_B%2C11238_A%2C8983_C%2C7024_C%2C5592_B%2C9117_A%2C6031_B%2C10509_B%2C7581_B%2C9506_D%2C11393_A%2C9517_D%2C10216_A%2C9394_B%2C11350_B%2C8542_B%2C6050_B%2C9167_B%2C10637_B%2C11556_A%2C11413_B%2C11819_A%2C9469_B%2C10633_B%2C10598_B%2C8812_B%2C6832_C%2C7074_C%2C7682_C%2C8867_B%2C5924_D%2C6151_C%2C5468_B%2C10447_B%2C11580_A%2C11299_C%2C6704_C%2C11672_A%2C8808_B%2C10765_B%2C8497_B%2C8342_B%2C8871_C%2C9790_B%2C11754_A%2C9355_B%2C10389_B%2C8760_B%2C11441_A%2C10624_B%2C10627_B%2C11661_C%2C9292_B%2C6629_B%2C5670_B%2C9158_A%2C9805_B%2C9959_B%2C10999_A%2C11578_A%2C6082_B%2C5335_B%2C11471_A',
'QC191': '',
'QC007': 'DIRECT',
'nu': '0',
'QC008': '1754399336.1754399336.1754399336.1',
'QC010': '0.29432456281016706',
'QC186': 'false',
'TQC030': '1',
'QC199': '618c4fdf686dcfd24f83980a7898934a',
'IMS': 'IggQABj_1MjEBiomCiAwNmE5ZWRjM2M4YTE0MGVjYjU3NTY0ZGM4MjIxYjlmMBAAIgByJAogMDZhOWVkYzNjOGExNDBlY2I1NzU2NGRjODIyMWI5ZjAQAIIBBCICEA-KASQKIgogMDZhOWVkYzNjOGExNDBlY2I1NzU2NGRjODIyMWI5ZjA',
'__dfp': 'a16ab55d0c63fc45cabb61cfd8eff5a88e7af58926a33c3ac67b9d6d9724f14d01@1755695256612@1754399257612',
}
self.mongo_client = pymongo.MongoClient()
self.db = self.mongo_client['database-test']['aiqiyi_info']
def get_info(self, page):
params = (
('channel_id', '2'),
('data_type', '1'),
('mode', '11'),
('page_id', str(page)),
('ret_num', '48'),
('session', '1f6d98c5093bd36fa2c708511ce9e2f8'),
('three_category_id', '15;must'),
)
resp = requests.get(url=self.api_url, headers=self.headers, params=params, cookies=self.cookies).json()['data']['list']
time.sleep(2)
for item in resp:
yield item
def save_info(self, page):
for page in range(1, page+1):
generator = self.get_info(self.get_info(page))
for item in generator:
self.db.insert_one(item)
pprint(item)
print('保存成功')
def main(self):
self.save_info(10)
print('爬虫结束')
AiQiYi().main()Redis
- 启动 redis 服务
cmd
redis-server.exe redis.windows.conf- 连接服务端
cmd
redis-cli
redis-cli -h host -p port -a password- 操作
cmd
set myKey abs
get myKey
del myKeymd5
- 编码后的数据固定为32位
- 适合数据量在亿以下
- 传递的内容为 字节数据
芒果片库数据去重
python
import time
import requests
import pymongo
import redis
import hashlib
from pprint import pprint
url = "https://www.mgtv.com/lib/2"
class MonguoTV:
def __init__(self):
self.api_url = "https://pianku.api.mgtv.com/rider/list/pcweb/v3"
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/80.0.3987.149 Safari/537.36"
}
self.params = {
"allowedRC": "1",
"platform": "pcweb",
"channelId": "2",
"pn": "2", # page number
"pc": "80",
"hudong": "1",
"_support": "10000000",
"kind": "a1",
"area": "a1",
"year": "all",
"feature": "all",
"chargeInfo": "a1",
"sort": "c2"
}
self.redis_client = redis.Redis()
self.mongo_client = pymongo.MongoClient(host="localhost", port=27017)
self.db = self.mongo_client['database-test']['monguo-tv']
@staticmethod
def md5(content):
md5_hashed = hashlib.md5(str(content).encode('utf-8'))
return md5_hashed.hexdigest()
def get_info(self, page):
self.params['pn'] = str(page)
resp = requests.get(url=self.api_url, headers=self.headers, params=self.params).json()['data']['hitDocs']
for item in resp:
yield item
def save_info(self, pages):
for page in range(1, pages + 1):
time.sleep(2)
generator = self.get_info(page)
for item in generator:
md5_hashed = self.md5(item)
flag = self.redis_client.sadd("mgtv", md5_hashed)
if flag:
try:
self.db.insert_one(item)
pprint(item)
except Exception as e:
print("插入数据失败 --> %s".format(e))
else:
print("数据已存在 --> skip")
def close(self):
self.mongo_client.close()
self.redis_client.close()
print("关闭数据库连接")
def main(self):
self.save_info(10)
print("数据保存完毕")
self.close()
print("程序结束")
"""redis-server.exe"""
MonguoTV().main()协程编程asyncio
- python版本3.9及以下才支持协程对象
python
import asyncio
async def work1():
for _ in range(5):
print('work1')
await asyncio.sleep(1)
async def work2():
for _ in range(5):
print('work2')
await asyncio.sleep(1)
loop = asyncio.get_event_loop()
coroutine = [work1(), work2()]
loop.run_until_complete(asyncio.wait(coroutine))- 3.11以上有新方法
python
import asyncio
async def work1():
for _ in range(5):
print('work1')
await asyncio.sleep(1)
async def work2():
for _ in range(5):
print('work2')
await asyncio.sleep(1)
async def main():
tasks = [asyncio.create_task(work1()), asyncio.create_task(work2())]
await asyncio.wait(tasks)
asyncio.run(main())- from functools import partial
09 并发爬虫
chardet 获取字符编码集
aiomysql 支持创建连接池
异步的时候sql语句无法使用 if exists 语句,得单独判断表是否存在,show tables like 'table'
10 并发爬虫
motor的使用
threading线程
- 守护线程:主线程任务执行完毕之后不会等待未完成的子线程任务直接退出
- 堵塞线程:主线程必须等待子线程任务执行完毕之后才能继续向下执行
python
import threading
import time
def work1():
for _ in range(5):
print("Work 1")
time.sleep(1)
def work2():
while True:
print("Work 2")
time.sleep(1)
if __name__ == '__main__':
t1 = threading.Thread(target=work1)
t2 = threading.Thread(target=work2)
t2.daemon = True # 设置守护线程
t1.join() # 阻塞线程
t1.start()
t2.start()- 使用线程池完成任务
补充:
- 函数定义参数的时候,* 表示在这个参数之后的参数必须通过缺省参数的形式完成传递。有参数名
- 函数定义参数的时候,/ 表示在这个参数之后的参数不能通过缺省参数的形式完成传递。无参数名
python
def test1(a, b, *, c):
print("test1")
def test2(a, b, /, c):
print("test2")
test1(1, 2, c=3)
test2(1, 2, 3)MongoDB专题
数据库操作
- 查看当前数据库名称
sql
db- 查看当前数据库信息
sql
db.stats()- 查看所有物理存在的数据库
sql
show dbs- 切换数据库(创建数据库)
sql
use db- 删除当前指向的数据库
sql
db.dropDatabase()集合操作
- 创建集合
sql
db.createCollection("mongo")- 查看集合
sql
show collections- 删除集合
sql
db.toCollection.drop()数据操作
- 插入
sql
db.toCollection.insertOne(document)
db.toCollection.insertOne({name:'test',age:18})
db.toCollection.insertOne({_id:'1',name:'test',gender:'male'})- 查询
sql
db.toCollection.find()
db.toCollection.find({_id: '100'})- 更新
sql
db.toCollection.updateOne({name: 'ZhangSan'}, {$set: {age:99}})
db.toCollection.updateMany({name: 'LiSi'}, {$set:{age: 99}})- 删除
sql
db.toColletion.deleteOne({age:99})
sb.toColletion.deleteMany({}) // 删除全部Python操作
- 连接数据库
python
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client['web_scraping']
print("connect successfully!")- 创建/选择集合
python
collection = db["web_data"]- 插入数据
python
collection.insert_one(data)
collection.insert_many(data_list)- 查询数据
python
results = collection.find()
for result in results:
print(result)
print(list(collection.find()))
collection.find_one({"name": "test"})- 更新数据
python
collection.update_one(
{"name": "test"},
{"$set": {"age": 99}}
)- 删除数据
python
collection.delete_one({"name": "test"})
collection.delete_many({"age": 19})- 断开连接
python
client.close()Redis
- set操作
- 命名的时候加上命名空间,例如:
scrapy:douban表示爬虫程序的douban数据
- 命名的时候加上命名空间,例如:
<img src="https://gitee.com/kualk/pic-go/raw/master/imgs/image-20250817222153446.png" alt="image-20250817222153446" style="zoom:80%;" />
文件去重
- md5加密
python
import hashlib
def md5_encrypt(obj, encoding):
return hashlib.md5(str(obj).encode(encoding)).hexdigest()爬虫去重(利用redis的set集合)
url去重
数据去重
MongoDB数据自去重
- 一般添加数据前确保唯一即可
封装代码
- 四步骤
- 构造地址 --> for_loop
- 发送请求 --> request
- 解析数据 --> parse
- 保存数据 --> save
python
class Scapy:
def __init__(self):
pass
def request(self):
pass
def parse(self):
pass
def save(self):
pass
def main(self):
pages = range(1, 21)
for page in pages:
self.request()
self.parse()
self.save()
if __name__ == '__main__':
Douban250 = Scapy()
Douban250.main()IP代理池
- 爬取免费IP使用(基本没用)
python
from pprint import pprint
import requests
import re
import json
import time
import random
class KuaiDaiLi:
def __init__(self):
self.url = 'https://www.kuaidaili.com/free/inha/{}'
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/80.0.3987.87 Safari/537.36'
}
self.usable_ips = []
self.ip_test_url = 'https://httpbin.org/ip'
def request(self, page):
try:
return requests.get(url=self.url.format(page), headers=self.headers)
except:
return None
def parse(self, resp):
usable_ips = []
ips = json.loads(re.findall("const fpsList = (.*);", resp.text)[0])
for item in ips:
if self.check_proxy(item['ip'], item['port']):
usable_ips.append({'ip': item['ip'], 'port': item['port']})
return usable_ips
def save(self, usable_ips):
self.usable_ips.append(usable_ips)
with open('usable_ips.txt', 'a+', encoding='utf-8') as a:
for usable_ip in usable_ips:
a.write(json.dumps(usable_ip, ensure_ascii=False)+'\n')
def check_proxy(self, ip, port) -> bool:
proxy = {
'http': '{}:{}'.format(ip, port),
'https': '{}:{}'.format(ip, port)
}
try:
resp = requests.get(url=self.ip_test_url, proxies=proxy, timeout=2)
if resp.status_code == 200:
return True
except:
return False
def main(self, page_start, page_cnt):
if page_start < 1:
page_start = 1
for page in range(page_start, page_start+page_cnt+1):
print('开始爬取第{}页的高匿开放代理...'.format(page))
resp = self.request(page)
if resp:
usable_ips = self.parse(resp)
self.save(usable_ips)
print('第{}页的有效ip有:'.format(page))
pprint(usable_ips)
time.sleep(random.randint(1, 3)/10.0+1)
if __name__ == '__main__':
while True:
KuaiDaiLi().main(1, 3)Playwright框架
页面操作
python
page.goto(url, referer, timeout, wait_util) # 打开页面
page.title() # 获取标题名称
page.content() # 获取源代码
page.close() # 关闭页面
page.screenshot(path, full_page=True, clip=(x, y, w, h)) # 页面截图
page.url等待机制
- 修改默认等待时间
python
page.set_default_timeout(60000) # 设置默认超时时间为 60 秒- 等待页面处于某种状态
load:等待load事件触发,意味着所有资源(包括图片、脚本等)均已加载完毕。domcontentloaded:等待DOMContentLoaded事件触发,此时 HTML 文档已解析完成,DOM 树构建结束,但不包括外部资源(如图片)的加载。networkidle:当网络空闲一段时间后触发,通常表示没有正在进行的网络请求。不过官方建议尽量避免使用此状态,因为它依赖于网络状况且不够稳定。
python
page.wait_for_load_state('load')- 等待页面跳转到指定的 URL 地址
python
page.wait_for_url("**/target.html")- 等待指定的时间
page.wait_for_timeout(10000)元素定位
- 使用locator方法进行定位
- 支持xpath定位
- 支持css定位
基本操作
- 先定位元素再执行操作
python
page.click() # 点击
page.fill("text") # 为输入框填充内容
page.type("text") # 模拟键盘输入元素筛选
python
page.locator().all() # 获取匹配到的所有元素
page.locator().nth(0) # 根据下表选取元素(从0开始)
page.locator().first
page.locator().last数据提取
python
page.locator().inner_text() # 获取元素的文本内容
page.locator().all_inner_texts() # 获取多个元素的文本内容
page.locator().get_attribute('src') # 获取属性值
page.locator().input_value() # 获取<input>的value的值
page.locator().bounding_box() # 获取元素的位置 x,y,w,h
page.locator().count() # 统计匹配到的个数鼠标操作
- button: left | right | middle
- click_count: num=1
- delay: ms
python
page.mouse.click(x, y, button="", click_count=, delay=)
page.mouse.dbclick(x, y, button, click_count, delay)
page.mouse.down(x, y, button)
page.mouse.move(x, y)
page.mouse.up(button, click_count)
page.drag_and_drop(element1, element2) # 将一个元素拖到另一个元素的位置
page.mouse.wheel(delta_x, delta_y) # 滚动滚轮键盘操作
python
page.keyboard.down('Enter')
page.keyboard.up('ArrowLeft')
page.keyboard.press('key') # 同时模拟down和up
page.keyboard.type('内容')
page.keyboard.press('Control+a') # 组合键高级操作
- 执行js代码
python
ret = page.evaluate("js codes")- 滚动到元素可见(滚动到最后一个元素的坐标)
python
page.locator().scroll_into_view_if_needed()- 切换iframe元素(iframe的代码是独立的,无法全局直接匹配)
python
page.frame_locator('css').get_by_text('Submit')