抽取oracle数据到mysql数据库的实现过程
在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:
1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql
2、建立一个目录ETL_DIR
3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql
4、导入mysql数据,文件内容如下
loaddatainfile"alarm_hist_inc.csv"intotablealarm_hist_incfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n"; loaddatainfile"button_authority.csv"intotablebutton_authorityfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n"; loaddatainfile"c3_sms_hist_inc.csv"intotablec3_sms_hist_incfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n"; loaddatainfile"datapermisson.csv"intotabledatapermissonfieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";
附:数据库脚本P_ETL_ORA_DATA
CREATEORREPLACEPROCEDUREP_ETL_ORA_DATA ( P_ORA_DIRVARCHAR2, P_DATA_PATHVARCHAR2 )IS TYPET_RECISRECORD( TBNVARCHAR2(40), WHRVARCHAR2(4000)); TYPET_TABSISTABLEOFT_REC; V_TABST_TABS:=T_TABS(); V_ETL_DIRVARCHAR2(40):=P_ORA_DIR; V_LOAD_FILEUTL_FILE.FILE_TYPE; PROCEDUREETL_DATA ( P_SQL_STMTVARCHAR2, P_DATA_PATHVARCHAR2, P_TB_NAMEVARCHAR2 )IS BEGIN DECLARE V_VAR_COLVARCHAR2(32767); V_NUM_COLNUMBER; V_DATE_COLDATE; V_TMZTIMESTAMP; V_COLSNUMBER; V_COLS_DESCDBMS_SQL.DESC_TAB; V_ROW_STRVARCHAR2(32767); V_COL_STRVARCHAR2(32767); V_SQL_IDNUMBER; V_SQL_REFSYS_REFCURSOR; V_EXP_FILEUTL_FILE.FILE_TYPE; V_DATA_PATHVARCHAR2(200); BEGIN V_DATA_PATH:=P_DATA_PATH; IFREGEXP_SUBSTR(V_DATA_PATH,'\\$')ISNULL THEN V_DATA_PATH:=V_DATA_PATH||'\'; ENDIF; V_DATA_PATH:=REPLACE(V_DATA_PATH,'\','\\'); OPENV_SQL_REFFORP_SQL_STMT; V_SQL_ID:=DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF); DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID,V_COLS,V_COLS_DESC); FORIINV_COLS_DESC.FIRST..V_COLS_DESC.LAST LOOP CASE WHENV_COLS_DESC(I).COL_TYPEIN(1,9,96)THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_VAR_COL,32767); WHENV_COLS_DESC(I).COL_TYPE=2THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_NUM_COL); WHENV_COLS_DESC(I).COL_TYPE=12THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_DATE_COL); WHENV_COLS_DESC(I).COL_TYPE=180THEN DBMS_SQL.DEFINE_COLUMN(V_SQL_ID,I,V_TMZ); ENDCASE; ENDLOOP; DECLARE V_FLUSH_OVERPLS_INTEGER:=1; V_FILE_OVERPLS_INTEGER:=1; V_FILE_NOPLS_INTEGER:=1; V_FILE_NAMEVARCHAR2(200); V_LINEVARCHAR2(400); BEGIN WHILEDBMS_SQL.FETCH_ROWS(V_SQL_ID)>0 LOOP IFV_FILE_OVER=1 THEN V_FILE_NAME:=P_TB_NAME||'_'||V_FILE_NO||'.csv'; V_EXP_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,V_FILE_NAME,OPEN_MODE=>'w',MAX_LINESIZE=>32767); ENDIF; V_ROW_STR:=''; FORIIN1..V_COLS LOOP V_COL_STR:='\N'; BEGIN CASE WHENV_COLS_DESC(I).COL_TYPEIN(1,9,96)THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_VAR_COL); IFV_VAR_COLISNOTNULL THEN V_COL_STR:='^'||V_VAR_COL||'^'; ENDIF; WHENV_COLS_DESC(I).COL_TYPE=2THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_NUM_COL); IFV_NUM_COLISNOTNULL THEN V_COL_STR:=V_NUM_COL; ENDIF; WHENV_COLS_DESC(I).COL_TYPE=12THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_DATE_COL); IFV_DATE_COLISNOTNULL THEN V_COL_STR:='^'||TO_CHAR(V_DATE_COL,'yyyy-mm-ddhh24:mi:ss')||'^'; ENDIF; WHENV_COLS_DESC(I).COL_TYPEIN(180,181,231)THEN DBMS_SQL.COLUMN_VALUE(V_SQL_ID,I,V_TMZ); IFV_TMZISNOTNULL THEN V_COL_STR:='^'||TO_CHAR(V_TMZ,'yyyy-mm-ddhh24:mi:ss.ff6')||'^'; ENDIF; ENDCASE; IFI=1 THEN V_ROW_STR:=V_COL_STR; ELSE V_ROW_STR:=V_ROW_STR||','||V_COL_STR; ENDIF; END; ENDLOOP; UTL_FILE.PUT_LINE(V_EXP_FILE,CONVERT(V_ROW_STR,'UTF8')); IFV_FILE_OVER>200000/*每200000条记录就产生一个新的文件*/ THEN V_FILE_OVER:=1; V_FLUSH_OVER:=1; V_FILE_NO:=V_FILE_NO+1; UTL_FILE.FCLOSE(V_EXP_FILE); V_LINE:='loaddatainfile"'||V_DATA_PATH||V_FILE_NAME||'"intotable'||P_TB_NAME; V_LINE:=V_LINE||'fieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";'; UTL_FILE.PUT_LINE(V_LOAD_FILE,V_LINE); UTL_FILE.FFLUSH(V_LOAD_FILE); CONTINUE; ENDIF; V_FILE_OVER:=V_FILE_OVER+1; IFV_FLUSH_OVER>2000/*每2000条记录就刷新缓存,写到文件中*/ THEN UTL_FILE.FFLUSH(V_EXP_FILE); V_FLUSH_OVER:=1; ELSE V_FLUSH_OVER:=V_FLUSH_OVER+1; ENDIF; ENDLOOP; DBMS_SQL.CLOSE_CURSOR(V_SQL_ID); IFUTL_FILE.IS_OPEN(V_EXP_FILE) THEN UTL_FILE.FCLOSE(V_EXP_FILE); V_LINE:='loaddatainfile"'||V_DATA_PATH||V_FILE_NAME||'"intotable'||P_TB_NAME; V_LINE:=V_LINE||'fieldsterminatedby","enclosedby"^"linesterminatedby"\r\n";'; UTL_FILE.PUT_LINE(V_LOAD_FILE,V_LINE); UTL_FILE.FFLUSH(V_LOAD_FILE); ENDIF; END; EXCEPTION WHENOTHERSTHEN IFDBMS_SQL.IS_OPEN(V_SQL_ID) THEN DBMS_SQL.CLOSE_CURSOR(V_SQL_ID); ENDIF; IFUTL_FILE.IS_OPEN(V_EXP_FILE) THEN UTL_FILE.FCLOSE(V_EXP_FILE); ENDIF; DBMS_OUTPUT.PUT_LINE(SQLERRM); DBMS_OUTPUT.PUT_LINE(P_SQL_STMT); END; END; BEGIN BEGIN EXECUTEIMMEDIATE'createtablemysql_etl_tbs(tnvarchar2(40),cnvarchar2(40),cinumber)'; EXCEPTION WHENOTHERSTHEN NULL; END; EXECUTEIMMEDIATE'truncatetablemysql_etl_tbs'; DECLARE V_CIPLS_INTEGER; V_CNVARCHAR2(40); V_ETL_COLSVARCHAR2(32767); V_TBNVARCHAR2(30); V_ETL_CFGVARCHAR2(32767); V_CNF_FILEUTL_FILE.FILE_TYPE; V_FROM_POSPLS_INTEGER; BEGIN V_CNF_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,'ETL_TABS.CNF','r',32767); LOOP UTL_FILE.GET_LINE(V_CNF_FILE,V_ETL_CFG,32767); V_FROM_POS:=REGEXP_INSTR(V_ETL_CFG,'from',1,1,0,'i'); V_ETL_COLS:=SUBSTR(V_ETL_CFG,1,V_FROM_POS-1); V_ETL_COLS:=REGEXP_SUBSTR(V_ETL_COLS,'(select)(.+)',1,1,'i',2); V_TBN:=REGEXP_SUBSTR(V_ETL_CFG,'(\s+from\s+)(\w+)(\s*)',1,1,'i',2); V_TBN:=UPPER(V_TBN); V_TABS.EXTEND(); V_TABS(V_TABS.LAST).TBN:=V_TBN; V_TABS(V_TABS.LAST).WHR:=REGEXP_SUBSTR(V_ETL_CFG,'\s+where.+',1,1,'i'); V_CI:=1; LOOP V_CN:=REGEXP_SUBSTR(V_ETL_COLS,'\S+',1,V_CI); EXITWHENV_CNISNULL; V_CN:=UPPER(V_CN); EXECUTEIMMEDIATE'insertintomysql_etl_tbs(tn,cn,ci)values(:1,:2,:3)' USINGV_TBN,V_CN,V_CI; COMMIT; V_CI:=V_CI+1; ENDLOOP; ENDLOOP; EXCEPTION WHENUTL_FILE.INVALID_PATHTHEN DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"'||'"无效!'); RETURN; WHENUTL_FILE.INVALID_FILENAMETHEN DBMS_OUTPUT.PUT_LINE('指定的文件:"ETL_TABS.CNF'||'"无效!'); RETURN; WHENNO_DATA_FOUNDTHEN UTL_FILE.FCLOSE(V_CNF_FILE); WHENOTHERSTHEN DBMS_OUTPUT.PUT_LINE(SQLERRM); RETURN; END; DECLARE V_CUR_MATCHSYS_REFCURSOR; V_SQL_SMTVARCHAR2(32767); V_TNVARCHAR2(40); V_CNVARCHAR2(40); V_CIPLS_INTEGER; V_COLUMN_NAMEVARCHAR2(40); V_ETL_COLSVARCHAR2(32767); V_LINEVARCHAR2(4000); V_TBNVARCHAR2(40); BEGIN V_LOAD_FILE:=UTL_FILE.FOPEN(V_ETL_DIR,'load_data.sql',OPEN_MODE=>'w',MAX_LINESIZE=>32767); FORT_IXINV_TABS.FIRST..V_TABS.LAST LOOP V_SQL_SMT:='selecttn,cn,column_name,cifrom(select*frommysql_etl_tbswheretn='':tbn:'')lleftjoinuser_tab_columnsronl.tn=r.table_nameandl.cn=r.column_nameorderbyci'; V_TBN:=V_TABS(T_IX).TBN; V_SQL_SMT:=REPLACE(V_SQL_SMT,':tbn:',V_TBN); V_ETL_COLS:=NULL; OPENV_CUR_MATCHFORV_SQL_SMT; LOOP FETCHV_CUR_MATCH INTOV_TN,V_CN,V_COLUMN_NAME,V_CI; EXITWHENV_CUR_MATCH%NOTFOUND; IFV_CI>1 THEN V_ETL_COLS:=V_ETL_COLS||','; ENDIF; IFV_COLUMN_NAMEISNULL THEN V_ETL_COLS:=V_ETL_COLS||'cast(nullasnumber)'||V_CN; ELSE V_ETL_COLS:=V_ETL_COLS||V_CN; ENDIF; ENDLOOP; CLOSEV_CUR_MATCH; V_TBN:=LOWER(V_TBN); V_SQL_SMT:='select'||V_ETL_COLS||'from'||V_TBN||V_TABS(T_IX).WHR; ETL_DATA(V_SQL_SMT,P_DATA_PATH,V_TBN); ENDLOOP; IFUTL_FILE.IS_OPEN(V_LOAD_FILE) THEN UTL_FILE.FCLOSE(V_LOAD_FILE); ENDIF; END; ENDP_ETL_ORA_DATA;
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。如果你想了解更多相关内容请查看下面相关链接