Phoenix 批量增加

2023-06-29 14:01:50 浏览数 (2)

1.phoenix 利用CsvBulkLoadTool 批量带入数据并自动创建索引

在phoenix 目录下执行 hadoop jar /home/phoenix-4.6.0-HBase-1.0-bin/phoenix-4.6.0-HBase-1.0-client.jar   org.apache.phoenix.mapreduce.CsvBulkLoadTool -t POPULATION -i /datas/us_population.csv -t :tableName -i: input file 文件必须在hdfs文件上。

2.phoenix 自带sql导入工具

./psql.py centos1,centos2,centos3 /home/ico11.sql

3.java代码批量插入

public class PhoenixSQL { public static PhoenixConnection getConnection(){ try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection("jdbc:phoenix:ip","root" ,"444444" ); return conn; } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } public static void read(String filename) throws Exception{ String sb=""; int i=0; FileInputStream inputStream = null; Scanner  sc = null; PhoenixConnection  conn=null; Statement ps=null; try {   conn=getConnection();   conn.setAutoCommit(false);   ps=conn.createStatement();   inputStream = new FileInputStream(filename);   sc = new Scanner(inputStream, "UTF-8");   int upsertBatchSize = conn.getMutateBatchSize();   while(sc.hasNextLine()) {     sb =sc.nextLine() "  ";     if(i%2!=0){     if(sb != null || sb.length() > 0 ){     ps.executeUpdate(sb.split(";")[0]);     }     sb="";     }     if( i % upsertBatchSize == 0){     conn.commit();     }    }   conn.commit();  if(sc.ioException() != null)  {     throw sc.ioException();  } }catch (Exception e1) {    e1.printStackTrace(); } finally {    if(inputStream != null)    {       inputStream.close();    } if(sc != null) {   sc.close(); } if(ps!=null) { ps.close(); } if(conn!=null) { conn.close(); }  } } public static void main(String[] args) { try { read("F:/xxx/ico.sql"); } catch (Exception e) { e.printStackTrace(); } } }

插入两千万条数据用时一个小时

4.sqoop 工具,数据导入hbase,映射Phoenix中,关系型数据库的数值类型到hbase中变成字节类型,映射到phoenix中,数值类型数据值改变了

0 人点赞