1.phoenix 利用CsvBulkLoadTool 批量带入数据并自动创建索引
在phoenix 目录下执行 hadoop jar /home/phoenix-4.6.0-HBase-1.0-bin/phoenix-4.6.0-HBase-1.0-client.jar org.apache.phoenix.mapreduce.CsvBulkLoadTool -t POPULATION -i /datas/us_population.csv -t :tableName -i: input file 文件必须在hdfs文件上。
2.phoenix 自带sql导入工具
./psql.py centos1,centos2,centos3 /home/ico11.sql
3.java代码批量插入
public class PhoenixSQL { public static PhoenixConnection getConnection(){ try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix:ip","root" ,"444444" ); return conn; } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public static void read(String filename) throws Exception{ String sb=""; int i=0; FileInputStream inputStream = null; Scanner sc = null; PhoenixConnection conn=null; Statement ps=null; try { conn=getConnection(); conn.setAutoCommit(false); ps=conn.createStatement(); inputStream = new FileInputStream(filename); sc = new Scanner(inputStream, "UTF-8"); int upsertBatchSize = conn.getMutateBatchSize(); while(sc.hasNextLine()) { sb =sc.nextLine() " "; if(i%2!=0){ if(sb != null || sb.length() > 0 ){ ps.executeUpdate(sb.split(";")[0]); } sb=""; } if( i % upsertBatchSize == 0){ conn.commit(); } } conn.commit(); if(sc.ioException() != null) { throw sc.ioException(); } }catch (Exception e1) { e1.printStackTrace(); } finally { if(inputStream != null) { inputStream.close(); } if(sc != null) { sc.close(); } if(ps!=null) { ps.close(); } if(conn!=null) { conn.close(); } } } public static void main(String[] args) { try { read("F:/xxx/ico.sql"); } catch (Exception e) { e.printStackTrace(); } } }
插入两千万条数据用时一个小时
4.sqoop 工具,数据导入hbase,映射Phoenix中,关系型数据库的数值类型到hbase中变成字节类型,映射到phoenix中,数值类型数据值改变了