站长网 大数据 让你在10分钟内掌握如何用Python将数据批量的插入到数据库

让你在10分钟内掌握如何用Python将数据批量的插入到数据库

副标题#e# 本文基于python, 使用pandas, pymysql等三方库实现了向数据库中高效批量插入数据,一方面提供被网上很多瞎转载的答案给坑蒙了的人(因为我也是),一方面自己也做个笔记,以后方便查阅 需求原因 最近在处理一个需求,有关批量往数据库插入数据的,

副标题#e#

本文基于python, 使用pandas, pymysql等三方库实现了向数据库中高效批量插入数据,一方面提供被网上很多瞎转载的答案给坑蒙了的人(因为我也是),一方面自己也做个笔记,以后方便查阅

需求原因

最近在处理一个需求,有关批量往数据库插入数据的,描述如下

原来的程序是基于sql的存储过程进行数据的更新修改操作,由于数据量较大,导致对数据库压力太大,于是需要将程序重构为用python读取文件的方式将数据做计算处理,减少这部分的压力,最后仅仅将计算的结果调用aws的lambda服务重新更新到数据库中就可以了,减少了极大的压力,也降低了成本。涉及数据库主要是插入及更新操作

版本库信息

基于linux系统写的

三方库 >>> pandas 1.0.5, pymysql 0.9.3

python版本 >>> 3.7

标准库 >> os

逻辑梳理

实际上,最后一步,要写入数据库的文件数据是存储在内存中的。因为读取文件后进行的计算都是在内存中进行的,那么计算的结果也没必要再写到本地,再去读取,再写入数据库,这是会影响程序的效率的。逻辑如下

读取文件

文件的拼接及计算,生成新的df

初始化数据库的连接

将df所需数据转换为元组数据(取决于数据库的三方库的接口是如何支持批量操作的)

将数据写入数据库

检查数据库内容即可

分步实现及分析

读取文件

给文件路径,然后去读文件就行了,强调一下需要注意的点

绝对路径: 这种最简单,直接给路径字符串就行了,但是一旦文件夹目录结构变化,就需要频繁的改

相对路径: 我一般喜欢先在脚本中定位当前脚本的位置,然后通过相对路径去找,这样只要你整个包内部的目录结构不变化,都不用改,就算部署上线也是直接根据包的位置来,很方便

pandas默认会将所有数字读取为float类型,所以对于那种看起来是数字,但实际上是需要当作字符串使用的字段进行类型的转换

import pandas as pd  

import numpy as np 

 

# 当前脚本的位置 

current_folder_path = os.path.dirname(__file__) 

 

# 你的文件的位置 

your_file_path1 = os.path.join(current_folder_path, "文件的名字1") 

your_file_path2 = os.path.join(current_folder_path, "文件的名字2") 

 

# 我这里是以读取csv文件为例, delimiter为我们内部约定的列之间的分割符 

df1 = pd.read_csv(your_file_path1, dtype={"column1": str, "column2": str}, delimiter="/t") 

df2 = pd.read_csv(your_file_path2, dtype={"column1": str, "column2": str}, delimiter="/t") 

文件的拼接及计算

文件的拼接主要就是merge和concat两个语法的使用,强调一下小知识点

merge语法主要是对应于sql语言的内连接,外连接,左连接和右连接等

concat主要是用来将相同结构的df单纯的拼接起来(也就是列表的总行数增加)

# 这里以左连接举例, 假设只有两个文件拼接 

ret_df = pd.merge(df1, df2, left_on=["column_name"], right_on=["column_name"], how="left") 

初始化连接

导入三方库pymysql,初始化连接

# pymysql的接口获取链接 

def mysql_conn(host, user, password, db, port=3306, charset="utf8"): 

  # 传参版本 

  conn = pymysql.connect(host=host, user=user, password=password, database=db, port=port, charset=charset) 

  return conn 

对应接口转换数据

数据插入要考虑写入一个事务,因为失败的话,要保证对数据库没有影响

构造符合对应接口的数据格式,通过查询,pymysql有两种可以执行语句的接口

execute(单条插入语句)

执行单条语句的接口

类似这种: Insert into table_name (column) values (value);

executemany(批量插入语句)

执行多条语句的接口

类似这种: Insert into table_name (column1, column2, column3) values (value1, value2, value3);

具体实现如下

# 先创建cursor负责操作conn接口 

#p#副标题#e#

conn = mysql_conn("your db host", "your username", "your password", "db name") 

cursor = conn.cursor() 

# 开启事务 

conn.begin() 

 

#############      构造批量数据的过程            ############# 

 

# 先构造需要的或是和数据库相匹配的列 

columns = list(df.columns) 

# 可以删除不要的列或者数据库没有的列名 

columns.remove("列名") 

# 重新构造df,用上面的columns,到这里你要保证你所有列都要准备往数据库写入了 

new_df = df[columns].copy() 

 

# 构造符合sql语句的列,因为sql语句是带有逗号分隔的,(这个对应上面的sql语句的(column1, column2, column3)) 

columns = ','.join(list(new_df.columns)) 

 

# 构造每个列对应的数据,对应于上面的((value1, value2, value3)) 

data_list = [tuple(i) for i in gdsord_df.values] # 每个元组都是一条数据,根据df行数生成多少元组数据 

 

# 计算一行有多少value值需要用字符串占位 

s_count = len(data_list[0]) * "%s," 

 

# 构造sql语句 

#p#副标题#e##p#分页标题#e#

insert_sql = "insert into " + "数据库表名" + " (" + columns + ") values (" + s_count[:-1] + ")" 

将数据写入数据库

这个简单,直接上代码

cursor.executemany(insert_sql, data_list) 

conn.commit() 

cursor.close() 

conn.close() 

检查数据库是否插入成功

如果没问题的话,就可以同时进行多个文件读写,计算,最后启用多线程同时向数据库中写入数据了,非常高效!

完整代码

import pandas as pd  

import numpy as np 

 

 

# pymysql接口 

def mysql_conn(host, user, password, db, port=3306, charset="utf8"): 

  conn = pymysql.connect(host=host, user=user, password=password, database=db, port=port, charset=charset) 

  return conn 

 

 

# 当前脚本的位置 

current_folder_path = os.path.dirname(__file__) 

 

# 你的文件的位置 

your_file_path1 = os.path.join(current_folder_path, "文件的名字1") 

your_file_path2 = os.path.join(current_folder_path, "文件的名字2") 

 

# 我这里是以读取csv文件为例, delimiter为我们内部约定的列之间的分割符 

df1 = pd.read_csv(your_file_path1, dtype={"column1": str, "column2": str}, delimiter="/t") 

#p#副标题#e#

df2 = pd.read_csv(your_file_path2, dtype={"column1": str, "column2": str}, delimiter="/t") 

# 合并 

ret_df = pd.merge(df1, df2, left_on=["column_name"], right_on=["column_name"], how="left") 

 

# 先创建cursor负责操作conn接口 

conn = mysql_conn("your db host", "your username", "your password", "db name") 

cursor = conn.cursor() 

# 开启事务 

conn.begin() 

 

# 先构造需要的或是和数据库相匹配的列 

columns = list(df.columns) 

# 可以删除不要的列或者数据库没有的列名 

columns.remove("列名") 

# 重新构造df,用上面的columns,到这里你要保证你所有列都要准备往数据库写入了 

new_df = df[columns].copy() 

 

# 构造符合sql语句的列,因为sql语句是带有逗号分隔的,(这个对应上面的sql语句的(column1, column2, column3)) 

columns = ','.join(list(new_df.columns)) 

 

# 构造每个列对应的数据,对应于上面的((value1, value2, value3)) 

data_list = [tuple(i) for i in gdsord_df.values] # 每个元组都是一条数据,根据df行数生成多少元组数据 

 

# 计算一行有多少value值需要用字符串占位 

s_count = len(data_list[0]) * "%s," 

 

# 构造sql语句 

insert_sql = "insert into " + "数据库表名" + " (" + columns + ") values (" + s_count[:-1] + ")" 

try: 

  cursor.executemany(insert_sql, data_list) 

  conn.commit() 

  cursor.close() 

  conn.close() 

except Exception as e: 

  # 万一失败了,要进行回滚操作 

  conn.rollback() 

  cursor.close() 

  conn.close() 

本文来自网络,不代表站长网立场,转载请注明出处:https://www.tzzz.com.cn/html/shuju/2021/0527/7039.html

作者: dawei

【声明】:站长网内容转载自互联网,其相关言论仅代表作者个人观点绝非权威,不代表本站立场。如您发现内容存在版权问题,请提交相关链接至邮箱:bqsm@foxmail.com,我们将及时予以处理。
联系我们

联系我们

0577-28828765

在线咨询: QQ交谈

邮箱: xwei067@foxmail.com

工作时间:周一至周五,9:00-17:30,节假日休息

返回顶部