应该怎么把数据存到数据库
在实际的爬虫项目中,是否需要执行数据库的事务操作取决于数据的复杂性、数据的一致性需求以及爬虫的业务逻辑。 事务操作通常涉及一系列的数据库操作,这些操作要么全部成功,要么全部失败,确保数据的完整性和一致性。
数据完整性/错误恢复
如果爬取的数据跨多个表或记录,并且这些数据之间存在依赖关系,那么使用事务可以确保所有相关数据都被正确地一起更新。 例如,如果你的爬虫从一个电商网站收集产品信息,并且需要更新产品表、价格表和库存表,这些更新应该在一个事务中完成,以避免数据不一致的情况。
事务可以提供一种机制,允许在发生错误时回滚到事务开始之前的状态。 这对于爬虫尤其重要,因为网络抓取经常可能遇到不可预见的错误,如网络请求失败、数据格式变化等。 通过使用事务,爬虫可以在遇到此类错误时撤销其对数据库的所有更改,从而避免留下不完整或错误的数据。
import pymysql
def update_database():
connection = pymysql.connect(host='localhost', user='user', password='passwd', db='db')
try:
with connection.cursor() as cursor:
# 开始事务
cursor.execute('START TRANSACTION;')
# 执行多个数据库操作
cursor.execute('INSERT INTO products (name, price) VALUES (%s, %s)', ('product1', 10))
cursor.execute('UPDATE stock SET quantity = quantity - 1 WHERE product_id = %s', (1,))
# 提交事务
connection.commit()
except Exception as e:
connection.rollback() # 发生错误时回滚
finally:
connection.close()
这段代码演示了如何使用 Python 的 PyMySQL 库执行数据库事务。 在事务中,所有数据库操作要么全部成功,要么全部失败。 如果任何一个操作失败,整个事务将被回滚,数据库将恢复到事务开始之前的状态。
值得注意的是:在这个数据库实例中你应该关闭自动 commit 以便手动控制事务的提交和回滚。
这在爬虫中是比较常见的场景,比如在url表拿到每一个的url抓取完成之后,需要在详情页插入数据的同时,更新url表的状态(更改为已抓取状态),这时候就需要事务来保证数据的一致性。
并发控制/资源竞争
在多线程或分布式爬虫中,多个进程或线程可能同时尝试写入数据库。 事务可以帮助管理这些并发访问,确保数据的一致性和完整性。 数据库的事务管理系统会处理锁定和并发问题,使开发者可以专注于业务逻辑的实现。
检查去重/业务逻辑
对于涉及复杂业务逻辑的爬虫,如需要多步骤处理和决策的爬虫,事务可以确保在整个处理流程中数据的一致性。 一个爬虫可能需要先检查一个记录是否存在,然后基于这个检查的结果决定是插入新记录还是更新现有记录。
例如有一个业务流程是这样的:每天抓取某个网站的商品列表,然后将这些列表页的数据插入到url表(待抓取详情页的url队列)。 但是该网站的某一个商品可能在第1页和第10页都会出现,这时候就需要在插入url表之前,先检查url表是否已经存在该url,如果存在则不插入,如果不存在则插入。
做法可能是:去数据表中查询是否已经存在该url,如果存在则不插入,如果不存在则插入。
但在实际的爬虫业务中,为了提高存储效率,我们一般都会选择批量插入的做法,如果每次插入之前都去查询一次,会增加数据库的压力, 这时候可能要考虑从数据表本身的约束来解决这个问题。
例如:
- 在url表中设置url字段为
唯一值约束
,这样在插入数据时如果url已经存在,则会抛出异常,这时候就可以捕获这个异常,然后根据异常类型来判断是否插入数据。 - 也可以设置组合的唯一约束,比如url和date字段的
组合约束
,这样可以保证url在某一天只能存在一条记录。
比如对 SQL Server 修改数据库表 TbListData,为 UrlID 和 StartDate 添加唯一约束:
这样你可以直接尝试插入数据,并依赖数据库来拒绝重复的 UrlID 和 StartDate 组合。 当尝试插入重复数据时,数据库会抛出一个异常,你可以捕获这个异常并相应地处理(例如记录日志或忽略这些数据)。
import pandas as pd
import time
from loguru import logger
def save_data(product_list):
if not product_list:
return 0
df = pd.DataFrame(product_list)
current_date = time.strftime("%Y-%m-%d", time.localtime(time.time()))
df['StartDate'] = current_date
# 数据类型转换
df['Price'] = df['Price'].astype(float)
df['Sales'] = df['Sales'].astype(int)
# 执行批量插入
with SQLServerMss() as conn:
try:
conn.df_insert_many('TbListData', df.columns.tolist(), df)
logger.debug(f"{df['ShopName'].iloc[0]} 写入数据量: {len(df)}")
except Exception as e:
# 处理插入时的异常(例如由于唯一约束违反)
logger.error(f"插入数据时出错: {e}")
return len(df)
让数据表的约束来保证数据的一致性,
让数据表主动去拒绝这样的插入
,这样就不需要在插入之前去查询了。