Skip to content

爬虫基础2520-顾安

工具

Python库

  • 发送请求

    • curl_cffi 浏览器指纹库
    • retrying 重试库
  • 解析数据

    • lxml 提取html标签

    • jsonpath

    • beautifulsoup4

    • chardet

  • 数据库

    • pymysql

    • redis

    • pymongo

    • DBUtils

  • 并发

  • 爬虫框架

    • 自动化
      • playwright + playwright install 下载配置
      • selenium + 手动下载驱动
  • 加密库

    • pyDes
    • Crypto (AES) 或者 pycryptodome

03 requests模块

  • 浏览器指纹库:curl_cffi

    • python
      from curl_cffi import requests
  • 查看历史请求

    • for resp in response.history:
      	print(resp.url, resp.status_code, resp.request.headers)
  • 忽略SSL证书验证(网站证书过期了,跳过验证,直接前往)

    • python
      resp = requests.get(url, verify=False)
  • 请求超时

    • python
      from requests.exceptions import ConnectTimeout
      
      try:
      	requests.get(url, timeout=3) # 3s请求时间
      except ConnectTimeout:
      	print("timeout...")
  • 请求重试

    • python
      from retrying import retry
      
      try_cnt=1
      
      @retry(stop_max_attempt_number=3)
      def work():
      	global try_cnt
          print(f"try times: {try_cnt}")
          try_cnt += 1
          ...
    • 添加try-catch时需要使用raise手动抛出异常,否则retry接收不到异常信息,无法重试

  • 表单数据用 data 传输,载荷数据用 json 进行传输

  • session

    • 自动携带cookie

    • python
      session = requests.Session()
      resp = session.get(url, headers)
  • 网络代理

    • http://httpbin.org/ip 返回本机 ip 地址,返回json格式的数据

    • 快代理的使用

    • 在设置了clash代理之后运行

    • python
      import requests
      
      url = "http://httpbin.org/ip"
      headers = {
          "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Safari/537.36"
      }
      
      proxies = {
          "http": "http://127.0.0.1:7890",
          "https": "http://127.0.0.1:7890"
      }
      
      response = requests.get(url, headers=headers, timeout=10)
      print(response.json())

04 数据提取

miniconda

配置

shell
# 查看版本
conda --version

# 升级
conda update conda

# 添加镜像源
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/bioconda/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/cloud/bioconda/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/cloud/conda-forge/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.bfsu.edu.cn/anaconda/pkgs/main/

# 查看已经添加的channels
conda config --get channels

指令

shell
# 创建/删除 环境 -n: name
conda create -n env_name python=3.8  # 一定要加python关键字说明是python的虚拟环境
conda remove -n env_name --all  # 不加--all只会删除其中的某个库

# 查看当前存在的环境
conda env list
conda info --envs

# 重命名环境
conda create -n env2 --clone env1 # 将 env1 重命名为 env2

# 进入环境
conda activate env_name

# 退出环境
conda deactivate

# 使用pip安装包(pip的包比conda的多)(pip两个“=”,conda一个“=”)
conda activate env_name
conda install numpy=1.93
pip	  install numpy==1.93
conda deactivate

# 安装/删除 命令
conda install gatk
conda install gatk=3.7
conda install -n env_name gatk

# 安装完成后,可以用“which 软件名”来查看该软件安装的位置:
which gatk

# 查看已安装的库
conda list
conda list -n env_name

# 更新指定库
conda update gatk
conda update --all

# 删除环境中的某个库
conda remove --name env_name gatk

卸载

  • 清理:rm -rf /opt/anaconda3
  • 删除 ~/.bash_profile中anaconda的环境变量
  • 删除Anaconda的可能存在隐藏的文件
  • rm -rf ~/.condarc ~/.conda ~/.continuum
  • 经过以上步骤后,Anaconda 就被彻底删除了。

迁移

  • 打包环境
shell
conda pack -n env_name -o environment.tar.gz
# 报错 No command 'conda pack' 尝试
conda install -c conda-forge conda-pack
  • 复制压缩文件到新的电脑环境
  • 进到conda的安装目录:/miniconda/envs/
mkdir environment
# 解压conda环境
tar -xzvf environment.tar.gz -C environmont
  • 使用 conda env list 查看虚拟环境,进入迁移的环境内,通过 pip list 查看迁移前后包的安装情况

xpath

  • 示例
python
from lxml import etree

text = """
    <div>
        <ul>
             <li class="item-0">first item</li>
             <li class="item-1"><a href="link2.html">second item</a></li>
             <li class="item-0"><a href="link3.html"><span class="bold">third item</span></a></li>
             <li class="item-1"><a href="link4.html">fourth item</a></li>
             <li class="item-0"><a href="link5.html">fifth item</a></li>
        </ul>
    </div>
    """

tree = etree.HTML(text)
result = tree.xpath('//li[@class="item-0"]/a/text()')
print(result)
  • 语法

    • @ 选取属性
    • // 取相对路径匹配
    • [index] 下标从1开始
    • [condition] 支持条件表达式and,or,=
    • 通配符 *node()
    • 内置表达式 last() position()
  • 实战

    • 豆瓣top250

      • python
        import requests
        from lxml import etree
        from pprint import pprint
        
        url = "https://movie.douban.com/top250?start=0"
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36"
        }
        
        resp = requests.get(url=url, headers=headers)
        html_text = resp.text
        
        tree = etree.HTML(html_text)
        film_title = tree.xpath('//div[@class="hd"]//span[@class="title"][1]/text()')
        film_pic = tree.xpath('//div[@class="pic"]//img/@src')
        data = [
            {
                "title": title,
                "pic": pic
            }
            for title, pic in zip(film_title, film_pic)
        ]
        
        pprint(data)

05 数据提取

jsonpath

  • 参考语法(支持切片)
语法规则含义
$根节点
@现行节点
. or []取子节点
..不管位置,选择所有符合条件的条件
*匹配所有元素节点
[]迭代器标识(可以在里边做简单的迭代操作,如数组下标,根据内容选值等)
[,]支持迭代器中做多选
?()支持过滤操作
()支持表达式计算
  • 计算 [(@.length-1)]
  • 过滤 [?(@.price<10)]

实战-网易招聘

python
import requests
from jsonpath import jsonpath

url = 'https://hr.163.com/api/hr163/position/queryPage'

json_data = {"currentPage": 1, "pageSize": 10}
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36"
}
response = requests.post(url=url, json=json_data, headers=headers)
data = response.json()

name_list = jsonpath(data, '$..name')
requirement_list = jsonpath(data, '$..requirement')

for name, requirement in zip(name_list, requirement_list):
    print(f"Job: {name}\nRequirements: \n{requirement}\n{'='*50}")

bs4

实战-搜狗微信搜索标题

python
import requests
from bs4 import BeautifulSoup

url = "https://weixin.sogou.com/weixin?_sug_type_=1&type=2&query=jk"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.90 Safari/537.36"
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, "lxml")

alist = soup.select(".txt-box a")

for i in alist:
    print(i.get_text())

06 数据提取

RE正则表达式

  • re.split()
  • re.sub() (replace)

07 数据存储

csv储存

  • 储存列表
python
import csv

headers = ['班级', '姓名', '性别', '手机号', 'QQ']

rows = [
    ["18级Python", '小王', '男', '13146060xx1', '123456xx1'],
    ["18级Python", '小李', '男', '13146060xx2', '123456xx2'],
    ["19级Python", '小赵', '女', '13146060xx3', '123456xx3'],
    ["19级Python", '小红', '女', '13146060xx4', '123456xx4'],
]

with open("temp.csv", "w+", encoding="utf-8", newline="") as w:
    writer = csv.writer(w)  # 创建一个csv对象
    writer.writerow(headers)
    writer.writerows(rows)
  • 储存字典
python
import csv

rows = [
    {
        "class_name": "18级Python",
        "name": '小王',
        "gender": '男',
        "phone": '13146060xx1',
        "qq": '123456xx1'
    },
    {
        "class_name": "18级Python",
        "name": '小李',
        "gender": '男',
        "phone": '13146060xx2',
        "qq": '123456xx2'
    },
    {
        "class_name": "19级Python",
        "name": '小赵',
        "gender": '女',
        "phone": '13146060xx3',
        "qq": '123456xx3'
    }]

headers = rows[0].keys()

with open("temp.csv", "w+", encoding="utf-8", newline="") as w:
    dictWriter = csv.DictWriter(w, headers)  # 创建一个DictWriter对象
    dictWriter.writeheader()
    dictWriter.writerows(rows)
  • 读取csv
python
import csv

with open('temp.csv', 'r', encoding='utf-8') as r:
    reader = csv.reader(r)
    for i in reader:
        print(i)

with open('temp.csv', 'r', encoding='utf-8') as r:
    dictReader = csv.DictReader(r)
    for i in dictReader:
        print(i)

实战

  • 腾讯招聘
python
import requests
import pymysql
import time
from pprint import pprint
from dbutils.pooled_db import PooledDB


class TencentHire():
    def __init__(self):
        self.api_url = ('https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1754363338136&countryId'
                        '=&cityId=&bgIds=&productId=&categoryId=40001001,40001002,40001003,40001004,40001005,'
                        '40001006&parentCategoryId=&attrId=1&keyword=&pageIndex={}&pageSize=10&language=zh-cn&area=cn')
        self.pool = PooledDB(creator=pymysql, user='root', password='44448888', host='localhost', db='tencent_hire', charset='utf8mb4')
        self.connect = self.pool.connection()
        self.cursor = self.connect.cursor()
        self.max_page = 81
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/91.0.4472.124 Safari/537.36'
        }

    def get_info(self):
        for page in range(1, self.max_page + 1):
            resp = requests.get(url=self.api_url.format(page), headers=self.headers)
            data_list = resp.json()['Data']['Posts']
            time.sleep(2)
            for item in data_list:
                yield item

    def create_table(self):
        sql = """
            CREATE TABLE IF NOT EXISTS infos (
                id int primary key auto_increment,
                work_name varchar(50),
                country varchar(20),
                location varchar(20),
                require_years varchar(20)
            )
        """
        try:
            self.cursor.execute(sql)
            pprint("成功创建表")
        except Exception as e:
            pprint("创建表失败", e)

    def save_info(self, *args):
        pprint(args)
        sql = 'INSERT INTO infos VALUES (%s, %s, %s, %s, %s)'
        try:
            self.cursor.execute(sql, args)
            self.connect.commit()
        except Exception as e:
            self.connect.rollback()
            pprint("保存数据失败", e)

    def close(self):
        self.cursor.close()
        self.connect.close()
        pprint("关闭数据库连接")

    def main(self):
        pprint("爬虫开始")
        self.create_table()
        generator = self.get_info()
        for item in generator:
            _id = None
            work_name = item['RecruitPostName']
            country = item['CountryName']
            location = item['LocationName']
            require_years = item['RequireWorkYearsName']
            self.save_info(_id, work_name, country, location, require_years)
        self.close()
        pprint("爬虫结束")


TencentHire().main()

Mongodb

  • 内部实现自带线程池

示例:

python
import pymongo
from pprint import pprint

mongo_client = pymongo.MongoClient()
db = mongo_client['database-test']['collection-test']

test_dict = {
    "name": "Anna",
    'Age': 18
}

db.insert_one(test_dict)
pprint(list(db.find()))

test_list = [
    {"name": "Anna",'Age': 18},
    {"name": "Bob",'Age': 19},
    {"name": "Candy",'Age': 20}
]

db.insert_many(test_list)
pprint(list(db.find()))

08 数据存储

页面静态化

  • 开头的前面几页的数据[1, 2, 3...]是静态数据

爱奇艺抓视频标题

python
import time
import requests
import pymongo
from pprint import pprint


class AiQiYi:

    def __init__(self):
        self.api_url = 'https://pcw-api.iqiyi.com/search/recommend/list'
        self.headers = {
            'accept': '*/*',
            'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
            'cache-control': 'no-cache',
            'content-type': 'application/x-www-form-urlencoded',
            'origin': 'https://list.iqiyi.com',
            'pragma': 'no-cache',
            'priority': 'u=1, i',
            'referer': 'https://list.iqiyi.com/www/2/15-------------11-1-1-iqiyi--.html?s_source=PCW_SC',
            'sec-ch-ua': '"Not)A;Brand";v="8", "Chromium";v="138", "Microsoft Edge";v="138"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"Windows"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-site',
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36 Edg/138.0.0.0',
        }
        self.cookies = {
            'QC005': 'd673290c5880ff54251b889596f5c945',
            'curDeviceState': 'width%3D1652%3BconduitId%3D%3Bscale%3D150%3Bbrightness%3Ddark%3BisLowPerformPC%3D0%3Bos%3Dbrowser%3Bosv%3D10.0.19044',
            'T00404': '3f31213172eca49e09491611b4373dfe',
            'QP0037': '0',
            'QC234': '60186596789f4782adf8ef2d293f9cff',
            'QP0042': '{"v":3,"cpu":16,"avc":{"de":2,"wv":1,"pr":1},"hvc":{"de":2,"wv":0,"pr":1},"av1":{"de":2,"wv":0,"pr":0},"av110":{"de":2,"wv":0,"pr":0}}',
            'QC006': '91abf3b528b8e87042d7025bc2b18e69',
            'QC235': '7dd5c7c135dc4807a5b993df957b59d1',
            'QC173': '0',
            'QC175': '%7B%22upd%22%3Atrue%2C%22ct%22%3A%22%22%7D',
            'Hm_lvt_53b7374a63c37483e5dd97d78d9bb36e': '1754399334',
            'Hm_lpvt_53b7374a63c37483e5dd97d78d9bb36e': '1754399334',
            'HMACCOUNT': '37620D01B9C40ED9',
            'QC198': '70a683895aa01e3bf23492d49ffa1693',
            'QC189': '8883_A%2C10385_B%2C10274_A%2C8739_B%2C9419_B%2C9922_C%2C9379_B%2C10276_B%2C11389_C%2C8004_B%2C5257_B%2C10566_D%2C9776_B%2C8873_E%2C10123_B%2C7423_C%2C9082_B%2C8401_A%2C6249_C%2C10793_B%2C7996_B%2C11391_A%2C9576_B%2C10358_B%2C9365_B%2C5465_B%2C6843_B%2C6578_B%2C6312_B%2C6091_B%2C8690_A%2C10992_B%2C8737_D%2C8742_A%2C10193_B%2C10803_C%2C10596_B%2C9484_B%2C6752_C%2C10188_A%2C8971_A%2C7332_B%2C9683_B%2C10383_B%2C11402_A%2C8665_D%2C11642_D%2C6237_A%2C9569_B%2C11004_B%2C11238_A%2C8983_C%2C7024_C%2C5592_B%2C9117_A%2C6031_B%2C10509_B%2C7581_B%2C9506_D%2C11393_A%2C9517_D%2C10216_A%2C9394_B%2C11350_B%2C8542_B%2C6050_B%2C9167_B%2C10637_B%2C11556_A%2C11413_B%2C11819_A%2C9469_B%2C10633_B%2C10598_B%2C8812_B%2C6832_C%2C7074_C%2C7682_C%2C8867_B%2C5924_D%2C6151_C%2C5468_B%2C10447_B%2C11580_A%2C11299_C%2C6704_C%2C11672_A%2C8808_B%2C10765_B%2C8497_B%2C8342_B%2C8871_C%2C9790_B%2C11754_A%2C9355_B%2C10389_B%2C8760_B%2C11441_A%2C10624_B%2C10627_B%2C11661_C%2C9292_B%2C6629_B%2C5670_B%2C9158_A%2C9805_B%2C9959_B%2C10999_A%2C11578_A%2C6082_B%2C5335_B%2C11471_A',
            'QC191': '',
            'QC007': 'DIRECT',
            'nu': '0',
            'QC008': '1754399336.1754399336.1754399336.1',
            'QC010': '0.29432456281016706',
            'QC186': 'false',
            'TQC030': '1',
            'QC199': '618c4fdf686dcfd24f83980a7898934a',
            'IMS': 'IggQABj_1MjEBiomCiAwNmE5ZWRjM2M4YTE0MGVjYjU3NTY0ZGM4MjIxYjlmMBAAIgByJAogMDZhOWVkYzNjOGExNDBlY2I1NzU2NGRjODIyMWI5ZjAQAIIBBCICEA-KASQKIgogMDZhOWVkYzNjOGExNDBlY2I1NzU2NGRjODIyMWI5ZjA',
            '__dfp': 'a16ab55d0c63fc45cabb61cfd8eff5a88e7af58926a33c3ac67b9d6d9724f14d01@1755695256612@1754399257612',
        }
        self.mongo_client = pymongo.MongoClient()
        self.db = self.mongo_client['database-test']['aiqiyi_info']

    def get_info(self, page):
        params = (
            ('channel_id', '2'),
            ('data_type', '1'),
            ('mode', '11'),
            ('page_id', str(page)),
            ('ret_num', '48'),
            ('session', '1f6d98c5093bd36fa2c708511ce9e2f8'),
            ('three_category_id', '15;must'),
        )
        resp = requests.get(url=self.api_url, headers=self.headers, params=params, cookies=self.cookies).json()['data']['list']
        time.sleep(2)
        for item in resp:
            yield item

    def save_info(self, page):
        for page in range(1, page+1):
            generator = self.get_info(self.get_info(page))
            for item in generator:
                self.db.insert_one(item)
                pprint(item)
        print('保存成功')

    def main(self):
        self.save_info(10)
        print('爬虫结束')


AiQiYi().main()

Redis

  • 启动 redis 服务
cmd
redis-server.exe redis.windows.conf
  • 连接服务端
cmd
redis-cli
redis-cli -h host -p port -a password
  • 操作
cmd
set myKey abs
get myKey
del myKey

md5

  • 编码后的数据固定为32位
  • 适合数据量在亿以下
  • 传递的内容为 字节数据

芒果片库数据去重

python
import time
import requests
import pymongo
import redis
import hashlib
from pprint import pprint

url = "https://www.mgtv.com/lib/2"
class MonguoTV:

    def __init__(self):
        self.api_url = "https://pianku.api.mgtv.com/rider/list/pcweb/v3"
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/80.0.3987.149 Safari/537.36"
        }
        self.params = {
            "allowedRC": "1",
            "platform": "pcweb",
            "channelId": "2",
            "pn": "2",  # page number
            "pc": "80",
            "hudong": "1",
            "_support": "10000000",
            "kind": "a1",
            "area": "a1",
            "year": "all",
            "feature": "all",
            "chargeInfo": "a1",
            "sort": "c2"
        }
        self.redis_client = redis.Redis()
        self.mongo_client = pymongo.MongoClient(host="localhost", port=27017)
        self.db = self.mongo_client['database-test']['monguo-tv']

    @staticmethod
    def md5(content):
        md5_hashed = hashlib.md5(str(content).encode('utf-8'))
        return md5_hashed.hexdigest()

    def get_info(self, page):
        self.params['pn'] = str(page)
        resp = requests.get(url=self.api_url, headers=self.headers, params=self.params).json()['data']['hitDocs']
        for item in resp:
            yield item

    def save_info(self, pages):
        for page in range(1, pages + 1):
            time.sleep(2)
            generator = self.get_info(page)
            for item in generator:
                md5_hashed = self.md5(item)
                flag = self.redis_client.sadd("mgtv", md5_hashed)
                if flag:
                    try:
                        self.db.insert_one(item)
                        pprint(item)
                    except Exception as e:
                        print("插入数据失败 -​-> %s".format(e))
                else:
                    print("数据已存在 -​-> skip")

    def close(self):
        self.mongo_client.close()
        self.redis_client.close()
        print("关闭数据库连接")

    def main(self):
        self.save_info(10)
        print("数据保存完毕")
        self.close()
        print("程序结束")


"""redis-server.exe"""
MonguoTV().main()

协程编程asyncio

  • python版本3.9及以下才支持协程对象
python
import asyncio

async def work1():
    for _ in range(5):
        print('work1')
        await asyncio.sleep(1)

async def work2():
    for _ in range(5):
        print('work2')
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
coroutine = [work1(), work2()]
loop.run_until_complete(asyncio.wait(coroutine))
  • 3.11以上有新方法
python
import asyncio

async def work1():
    for _ in range(5):
        print('work1')
        await asyncio.sleep(1)

async def work2():
    for _ in range(5):
        print('work2')
        await asyncio.sleep(1)

async def main():
    tasks = [asyncio.create_task(work1()), asyncio.create_task(work2())]
    await asyncio.wait(tasks)

asyncio.run(main())
  • from functools import partial

09 并发爬虫

  • chardet 获取字符编码集

  • aiomysql 支持创建连接池

  • 异步的时候sql语句无法使用 if exists 语句,得单独判断表是否存在,show tables like 'table'

10 并发爬虫

motor的使用

threading线程

  • 守护线程:主线程任务执行完毕之后不会等待未完成的子线程任务直接退出
  • 堵塞线程:主线程必须等待子线程任务执行完毕之后才能继续向下执行
python
import threading
import time

def work1():
    for _ in range(5):
        print("Work 1")
        time.sleep(1)


def work2():
    while True:
        print("Work 2")
        time.sleep(1)


if __name__ == '__main__':
    t1 = threading.Thread(target=work1)
    t2 = threading.Thread(target=work2)
    t2.daemon = True  # 设置守护线程
    t1.join()  # 阻塞线程
    t1.start()
    t2.start()
  • 使用线程池完成任务

补充:

  • 函数定义参数的时候,* 表示在这个参数之后的参数必须通过缺省参数的形式完成传递。有参数名
  • 函数定义参数的时候,/ 表示在这个参数之后的参数不能通过缺省参数的形式完成传递。无参数名
python
def test1(a, b, *, c):
    print("test1")


def test2(a, b, /, c):
    print("test2")


test1(1, 2, c=3)
test2(1, 2, 3)

MongoDB专题

数据库操作

  • 查看当前数据库名称
sql
db
  • 查看当前数据库信息
sql
db.stats()
  • 查看所有物理存在的数据库
sql
show dbs
  • 切换数据库(创建数据库)
sql
use db
  • 删除当前指向的数据库
sql
db.dropDatabase()

集合操作

  • 创建集合
sql
db.createCollection("mongo")
  • 查看集合
sql
show collections
  • 删除集合
sql
db.toCollection.drop()

数据操作

  • 插入
sql
db.toCollection.insertOne(document)
db.toCollection.insertOne({name:'test',age:18})
db.toCollection.insertOne({_id:'1',name:'test',gender:'male'})
  • 查询
sql
db.toCollection.find()
db.toCollection.find({_id: '100'})
  • 更新
sql
db.toCollection.updateOne({name: 'ZhangSan'}, {$set: {age:99}})
db.toCollection.updateMany({name: 'LiSi'}, {$set:{age: 99}})
  • 删除
sql
db.toColletion.deleteOne({age:99})
sb.toColletion.deleteMany({})  // 删除全部

Python操作

  • 连接数据库
python
from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db = client['web_scraping']
print("connect successfully!")
  • 创建/选择集合
python
collection = db["web_data"]
  • 插入数据
python
collection.insert_one(data)
collection.insert_many(data_list)
  • 查询数据
python
results = collection.find()
for result in results:
	print(result)
	
print(list(collection.find()))

collection.find_one({"name": "test"})
  • 更新数据
python
collection.update_one(
	{"name": "test"},
	{"$set": {"age": 99}}
)
  • 删除数据
python
collection.delete_one({"name": "test"})
collection.delete_many({"age": 19})
  • 断开连接
python
client.close()

Redis

  • set操作
    • 命名的时候加上命名空间,例如:scrapy:douban 表示爬虫程序的douban数据

<img src="https://gitee.com/kualk/pic-go/raw/master/imgs/image-20250817222153446.png" alt="image-20250817222153446" style="zoom:80%;" />

文件去重

  • md5加密
python
import hashlib

def md5_encrypt(obj, encoding):
    return hashlib.md5(str(obj).encode(encoding)).hexdigest()

爬虫去重(利用redis的set集合)

  • url去重

  • 数据去重

MongoDB数据自去重

  • 一般添加数据前确保唯一即可

封装代码

  • 四步骤
    • 构造地址 --> for_loop
    • 发送请求 --> request
    • 解析数据 --> parse
    • 保存数据 --> save
python
class Scapy:

    def __init__(self):
        pass

    def request(self):
        pass

    def parse(self):
        pass

    def save(self):
        pass

    def main(self):
        pages = range(1, 21)
        for page in pages:
            self.request()
            self.parse()
            self.save()


if __name__ == '__main__':
    Douban250 = Scapy()
    Douban250.main()

IP代理池

  • 爬取免费IP使用(基本没用)
python
from pprint import pprint

import requests
import re
import json
import time
import random

class KuaiDaiLi:
    def __init__(self):
        self.url = 'https://www.kuaidaili.com/free/inha/{}'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                          'Chrome/80.0.3987.87 Safari/537.36'
        }
        self.usable_ips = []
        self.ip_test_url = 'https://httpbin.org/ip'

    def request(self, page):
        try:
            return requests.get(url=self.url.format(page), headers=self.headers)
        except:
            return None

    def parse(self, resp):
        usable_ips = []
        ips = json.loads(re.findall("const fpsList = (.*);", resp.text)[0])
        for item in ips:
            if self.check_proxy(item['ip'], item['port']):
                usable_ips.append({'ip': item['ip'], 'port': item['port']})
        return usable_ips

    def save(self, usable_ips):
        self.usable_ips.append(usable_ips)
        with open('usable_ips.txt', 'a+', encoding='utf-8') as a:
            for usable_ip in usable_ips:
                a.write(json.dumps(usable_ip, ensure_ascii=False)+'\n')

    def check_proxy(self, ip, port) -> bool:
        proxy = {
            'http': '{}:{}'.format(ip, port),
            'https': '{}:{}'.format(ip, port)
        }
        try:
            resp = requests.get(url=self.ip_test_url, proxies=proxy, timeout=2)
            if resp.status_code == 200:
                return True
        except:
            return False

    def main(self, page_start, page_cnt):
        if page_start &lt; 1:
            page_start = 1
        for page in range(page_start, page_start+page_cnt+1):
            print('开始爬取第{}页的高匿开放代理...'.format(page))
            resp = self.request(page)
            if resp:
                usable_ips = self.parse(resp)
                self.save(usable_ips)
                print('第{}页的有效ip有:'.format(page))
                pprint(usable_ips)
                time.sleep(random.randint(1, 3)/10.0+1)


if __name__ == '__main__':
    while True:
        KuaiDaiLi().main(1, 3)

Playwright框架

页面操作

python
page.goto(url, referer, timeout, wait_util)  # 打开页面
page.title()  # 获取标题名称
page.content()  # 获取源代码
page.close()  # 关闭页面
page.screenshot(path, full_page=True, clip=(x, y, w, h))  # 页面截图
page.url

等待机制

  • 修改默认等待时间
python
page.set_default_timeout(60000)  # 设置默认超时时间为 60 秒
  • 等待页面处于某种状态
    • load:等待 load 事件触发,意味着所有资源(包括图片、脚本等)均已加载完毕。
    • domcontentloaded:等待 DOMContentLoaded 事件触发,此时 HTML 文档已解析完成,DOM 树构建结束,但不包括外部资源(如图片)的加载。
    • networkidle:当网络空闲一段时间后触发,通常表示没有正在进行的网络请求。不过官方建议尽量避免使用此状态,因为它依赖于网络状况且不够稳定。
python
page.wait_for_load_state('load')
  • 等待页面跳转到指定的 URL 地址
python
page.wait_for_url("**/target.html")
  • 等待指定的时间
page.wait_for_timeout(10000)

元素定位

  • 使用locator方法进行定位
    • 支持xpath定位
    • 支持css定位

基本操作

  • 先定位元素再执行操作
python
page.click()  # 点击
page.fill("text")  # 为输入框填充内容
page.type("text")  # 模拟键盘输入

元素筛选

python
page.locator().all()  # 获取匹配到的所有元素
page.locator().nth(0)  # 根据下表选取元素(从0开始)
page.locator().first
page.locator().last

数据提取

python
page.locator().inner_text()  # 获取元素的文本内容
page.locator().all_inner_texts()  # 获取多个元素的文本内容
page.locator().get_attribute('src')  # 获取属性值
page.locator().input_value()  # 获取&lt;input>的value的值
page.locator().bounding_box()  # 获取元素的位置 x,y,w,h
page.locator().count()  # 统计匹配到的个数

鼠标操作

  • button: left | right | middle
  • click_count: num=1
  • delay: ms
python
page.mouse.click(x, y, button="", click_count=, delay=)
page.mouse.dbclick(x, y, button, click_count, delay)
page.mouse.down(x, y, button)
page.mouse.move(x, y)
page.mouse.up(button, click_count)

page.drag_and_drop(element1, element2)  # 将一个元素拖到另一个元素的位置
 
page.mouse.wheel(delta_x, delta_y)  # 滚动滚轮

键盘操作

python
page.keyboard.down('Enter')
page.keyboard.up('ArrowLeft')
page.keyboard.press('key')  # 同时模拟down和up 
page.keyboard.type('内容')
page.keyboard.press('Control+a')  # 组合键

高级操作

  • 执行js代码
python
ret = page.evaluate("js codes")
  • 滚动到元素可见(滚动到最后一个元素的坐标)
python
page.locator().scroll_into_view_if_needed()
  • 切换iframe元素(iframe的代码是独立的,无法全局直接匹配)
python
page.frame_locator('css').get_by_text('Submit')

记录学习,分享技术