Python脚本来实现简单数据迁移 2022-02-08 python Comments Python脚本来实现简单数据迁移 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748import cx_Oracleimport pymysqlimport osimport time#说明:本脚本用于Oracle与MySQL之间的数据迁移#注意:源表与目标表字段数量必须一致#使用:脚本默认是从MySQL迁移到Oracle,如果想从Oracle迁移到MySQL,修改source_db,target_db,source_db_type,target_db_type就行os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8' #设置字符集为UTF8,防止中文乱码source_db = pymysql.connect(host="127.0.0.1",user="root",password="root",database="mydb") #源库target_db = cx_Oracle.connect('scott/oracle@127.0.0.1/orcl')#目标库source_db_type='MySQL' #大小写无关,后面会被转为大写target_db_type='Oracle'#大小写无关,后面会被转为大写cur_select = source_db.cursor() #源库查询对象cur_insert = target_db.cursor() #目标库插入对象cur_select.arraysize = 500cur_insert.arraysize=500source_table=input("请输入源表名称:") #从键盘获取源表名称target_table=input("请输入目标表名称:") #从键盘获取目标表名称if source_db_type.upper()=='ORACLE': get_column_length='select * from '+source_table+' where rownum<=1' #拼接获取源表有多少个列的SQLelif source_db_type.upper()=='MYSQL': get_column_length='select * from ' + source_table + ' limit 1' # 拼接获取源表有多少个列的SQLcur_select.execute(get_column_length) #执行col_len=len(cur_select.fetchone()) #获取源表有多少个列val_str = ''if target_db_type.upper()=='MYSQL': for i in range (1,col_len): val_str=val_str+'%s'+',' val_str=val_str+'%s' #MySQL批量插入语法是 insert into tb_name values(%s,%s,%s,%s)elif target_db_type.upper()=='ORACLE': for i in range (1,col_len): val_str=val_str+':'+str(i)+',' val_str=val_str+':'+str(col_len) #Oracle批量插入语法是 insert into tb_name values(:1,:2,:3)insert_sql='insert into '+target_table+' values('+val_str+')' #拼接insert into 目标表 values #目标表插入语句select_sql='select * from '+source_table #源查询SQL,如果有where过滤条件,在这里拼接cur_select.execute(select_sql) #执行print('开始执行:',time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))while True: rows=list(cur_select.fetchmany(500)) #每次获取500行,由cur_select.arraysize值决定,MySQL fetchmany 返回的是 tuple 数据类型 所以用list做类型转换 cur_insert.executemany(insert_sql, rows) #批量插入每次500行,需要注意的是 rows 必须是 list [] 数据类型 target_db.commit() #提交 if not rows: break #中断循环cur_select.close()cur_insert.close()source_db.close()target_db.close()print('执行成功:',time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))