前言
Hbase中的数据读取起来不太方便,所以这里使用Phoenix来保存数据。
准备Hive
启动Hive服务
代码语言:javascript复制nohup $HIVE_HOME/bin/hiveserver2&
连接Hive服务
代码语言:javascript复制beeline -n hive -u jdbc:hive2://hadoop01:10000/default
插入数据
代码语言:javascript复制INSERT INTO t_user01(id,name) VALUES (1,'李四');
查询数据
代码语言:javascript复制select * from t_user01;
select * from t_user01 limit 10;
准备Phoenix
注意
在Phoenix中无论表还是字段只要没有双引号引起来的字段都会变成大写。 这里不建议用双引号,在后期拼接SQL的时候比较麻烦。
启动query server
代码语言:javascript复制queryserver.py start
lsof -i:8765
连接
代码语言:javascript复制sqlline-thin.py http://hadoop01:8765
创建schema
代码语言:javascript复制create schema mdb;
使用这个新建的 schema:
代码语言:javascript复制use mdb;
创建表
代码语言:javascript复制CREATE TABLE IF NOT EXISTS tuser(
id VARCHAR primary key,
name VARCHAR
);
插入数据
代码语言:javascript复制upsert into tuser values('1001','zhangsan');
upsert into tuser values('1002','lisi');
upsert into tuser(id,name) values('1003','liwu');
查询记录
代码语言:javascript复制select * from tuser;
select * from tuser where id='1001';
select * from tuser where name like 'li%';
分页查询
代码语言:javascript复制select * from tuser order by id desc limit 1 offset 0;
select * from tuser order by id desc limit 1 offset 1;
其中
limit
取多少条offset
从多少条开始
Hive=>Phoenix
依赖
代码语言:javascript复制<!--JSON工具-->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.22</version>
</dependency>
<!--Hive JDBC-->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>2.1.0</version>
</dependency>
<!--操作Phoenix-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-queryserver-client</artifactId>
<version>6.0.0</version>
</dependency>
主类
代码语言:javascript复制import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Hive2Phoenix {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<JSONObject> sourceData = env.addSource(new HiveReader());
sourceData.addSink(new PhoenixWriter());
sourceData.print();
env.execute("Hive2Phoenix");
}
}
读取Hive
代码语言:javascript复制import com.alibaba.fastjson2.JSONObject;
import com.xhkjedu.pojo.DBModel;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class HiveReader extends RichSourceFunction<JSONObject> {
private transient Statement st = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection con = DriverManager.getConnection("jdbc:hive2://192.168.7.101:10000/default", "hive", "hive");
st = con.createStatement();
}
@Override
public void run(SourceContext<JSONObject> ctx) throws Exception {
ResultSet rs = st.executeQuery("select * from t_user");
while (rs.next()) {
Integer id = rs.getInt("id");
String name = rs.getString("name");
JSONObject json = new JSONObject();
json.put("id", id);
json.put("name", name);
ctx.collect(json);
}
//rs.close();
//st.close();
//con.close();
}
@Override
public void cancel() {
}
}
ResultSet.next其实是取一条就跟数据库通讯拿一条数据,并不是全部取出放在内存,因为ResultSet.next之前,是获取了数据库连接的,数据库连接断开,你就获取不到数据了,说明是有通讯的。
写入Phoenix
代码语言:javascript复制import com.alibaba.fastjson2.JSONObject;
import com.xhkjedu.pojo.DBModel;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
public class PhoenixWriter extends RichSinkFunction<JSONObject> {
private transient Statement st = null;
static Connection conn;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open:" Thread.currentThread().getId());
Class.forName("org.apache.phoenix.queryserver.client.Driver");
if (conn == null) {
conn = DriverManager.getConnection("jdbc:phoenix:thin:url=http://192.168.7.101:8765;serialization=PROTOBUF");
}
st = conn.createStatement();
}
@Override
public void close() throws Exception {
System.out.println("close");
conn.commit();
st.close();
conn.close();
super.close();
}
@Override
public void invoke(JSONObject json, Context context) throws Exception {
String id = json.getString("id");
String name = json.getString("name");
String sql = String.format("upsert into mdb.tuser(id,name) VALUES ('%s','%s')", id, name);
System.out.println("sql: " sql);
st.execute(sql);
}
}