最近在公司享受福报,所以更新进度严重脱节了,本期依旧是一篇Calcite相关的文章,上一篇《基于Calcite自定义SQL解析器》有兴趣的童鞋可以移步去看看。本文我们将介绍一下如何自定义JDBC Driver。
不知道正在读文章的你在刚开始使用JDBC编程的时候,是否很好奇jdbc规范是如何实现的?为什么通过URL,就能打开一个链接,这里面是如何运作的?我们自己是否可以定义一套自己的jdbc url规范?是否想知道ResultSet是如何实现的?反正这些问题,是一直伴随我的编程生涯,直到遇到了Calcite。
由于篇幅限制,我们本次不会实现那么多内容,今天主要来构建一套自定义JDBC URL 及驱动程序,实现对json的jdbc封装 。 其中url包含如下部分,协议规范使用jdbc:json固定格式,后面跟着一段加载路径,驱动程序将遍历该路径,将json文件加载进来,以json的文件名为表名,加载路径的最后一部分为schema名。如下图所示。
下面是user.json的demo数据
代码语言:javascript复制[{
"uid": 1,
"name": "dafei1288",
"age": 33,
"aka": " 7"
},
{
"uid": 2,
"name": "libailu",
"age": 1,
"aka": "maimai"
},
{
"uid": 3,
"name": "libaitian",
"age": 1,
"aka": "doudou"
}
]
下面是order.json的demo数据
代码语言:javascript复制[
{
"oid": 1,
"uid": 1,
"value": 11
},
{
"oid": 2,
"uid": 2,
"value": 15
}
]
这里需要我们之前文章里介绍的一些内容,来定义json的schema和table,主要是为了遍历获取元数据,以及迭代数据的时候,使用的方法。
代码语言:javascript复制package wang.datahub.jdbc;
import com.google.common.collect.Maps;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.AbstractEnumerable;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScannableTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.util.BuiltInMethod;
import org.apache.calcite.util.Pair;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class JsonSchema extends AbstractSchema {
private String target;
private String topic;
static Map<String, Table> table = Maps.newHashMap();
public JsonSchema(){
super();
}
public void put(String topic, String target) {
this.topic = topic;
if (!target.startsWith("[")) {
this.target = '[' target ']';
} else {
this.target = target;
}
final Table table = fieldRelation();
if (table != null) {
this.table.put(topic,table);
}
}
public JsonSchema(String topic, String target) {
super();
this.put(topic,target);
}
@Override
public String toString() {
return "wang.datahub.jdbc.JsonSchema(topic=" topic ":target=" target ")" this.table;
}
public String getTarget() {
return target;
}
@Override
protected Map<String, Table> getTableMap() {
return table;
}
Expression getTargetExpression(SchemaPlus parentSchema, String name) {
return Types.castIfNecessary(target.getClass(),
Expressions.call(Schemas.unwrap(getExpression(parentSchema, name), JsonSchema.class),
BuiltInMethod.REFLECTIVE_SCHEMA_GET_TARGET.method));
}
private <T> Table fieldRelation() {
JSONArray jsonarr = JSON.parseArray(target);
// final Enumerator<Object> enumerator = Linq4j.enumerator(list);
return new JsonTable(jsonarr);
}
private static class JsonTable extends AbstractTable implements ScannableTable {
private final JSONArray jsonarr;
// private final Enumerable<Object> enumerable;
public JsonTable(JSONArray obj) {
this.jsonarr = obj;
}
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
final List<RelDataType> types = new ArrayList<RelDataType>();
final List<String> names = new ArrayList<String>();
JSONObject jsonobj = jsonarr.getJSONObject(0);
for (String string : jsonobj.keySet()) {
final RelDataType type;
type = typeFactory.createJavaType(jsonobj.get(string).getClass());
names.add(string);
types.add(type);
}
if (names.isEmpty()) {
names.add("line");
types.add(typeFactory.createJavaType(String.class));
}
return typeFactory.createStructType(Pair.zip(names, types));
}
public Statistic getStatistic() {
return Statistics.UNKNOWN;
}
public Enumerable<Object[]> scan(DataContext root) {
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new JsonEnumerator(jsonarr);
}
};
}
}
public static class JsonEnumerator implements Enumerator<Object[]> {
private Enumerator<Object[]> enumerator;
public JsonEnumerator(JSONArray jsonarr) {
List<Object[]> objs = new ArrayList<Object[]>();
for (Object obj : jsonarr) {
objs.add(((JSONObject) obj).values().toArray());
}
enumerator = Linq4j.enumerator(objs);
}
public Object[] current() {
return (Object[]) enumerator.current();
}
public boolean moveNext() {
return enumerator.moveNext();
}
public void reset() {
enumerator.reset();
}
public void close() {
enumerator.close();
}
}
}
下面是我们的驱动程序,在这里,我们定义jdbc url字符串,并在创建连接的时候,对url进行分析,并将json的名字,注册到root schema 。 当然这里是最小化实现,我们继承了
代码语言:javascript复制org.apache.calcite.jdbc.Driver
如果完全自定义的话,则需要实现的更多一些。基本原则是不变的。
代码语言:javascript复制import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;
import java.util.stream.Collectors;
public class Driver extends org.apache.calcite.jdbc.Driver {
public static final String CONNECT_STRING_PREFIX = "jdbc:json:";
static {
new Driver().register();
}
@Override protected String getConnectStringPrefix() {
return CONNECT_STRING_PREFIX;
}
@Override
public Connection connect(String url, Properties info) throws SQLException {
Connection c = super.connect(url, info);
CalciteConnection optiqConnection = (CalciteConnection) c.unwrap(CalciteConnection.class);
SchemaPlus rootSchema = optiqConnection.getRootSchema();
String[] pars = url.split(":");
Path f = Paths.get(pars[2]);
try {
JsonSchema js = new JsonSchema();
Files.list(f).forEach(it->{
File file = it.getName(it.getNameCount()-1).toFile();
String filename = file.getName();
filename = filename.substring(0,filename.lastIndexOf("."));
String json = "";
try {
json = Files.readAllLines(it.toAbsolutePath()).stream().collect(Collectors.joining());//.forEach(line->{ sb.append(line);
} catch (Exception e) {
e.printStackTrace();
}
js.put(filename,json);
});
//
rootSchema.add(f.getFileName().toString(), js);
} catch (Exception e) {
e.printStackTrace();
}
return c;
}
}
下面是测试代码,通过标准JDBC的方式获取连接,使用自定义的url,
代码语言:javascript复制jdbc:json:./src/main/resources/
然后就是几个测试的sql了,这里分别查了两个表,以及做了一个join。
代码语言:javascript复制import com.alibaba.fastjson.JSONObject;
import java.sql.*;
public class CalciteTest1 {
public static void main(String[] args) throws Exception {
Class.forName("wang.datahub.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:json:./src/main/resources/");
Statement statement = connection.createStatement();
ResultSet resultSet = resultSet = statement.executeQuery(
"select "user"."uid" from "resources"."user" ");
printResultSet(resultSet);
resultSet = statement.executeQuery(
"select * from "resources"."order" ");
printResultSet(resultSet);
resultSet = statement.executeQuery(
"select * from "resources"."user" inner join "resources"."order" on "user"."uid" = "order"."uid"");
printResultSet(resultSet);
}
public static void printResultSet(ResultSet resultSet) throws SQLException {
while(resultSet.next()){
JSONObject jo = new JSONObject();
int n = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= n; i ) {
jo.put(resultSet.getMetaData().getColumnName(i), resultSet.getObject(i));
}
System.out.println(jo.toJSONString());
}
}
}
控制台,输出结果如下:
{"uid":1}
{"uid":2}
{"uid":3}
{"uid":1,"oid":1,"value":11}
{"uid":2,"oid":2,"value":15}
{"uid":1,"aka":" 7","name":"dafei1288","oid":1,"value":11,"age":33}
{"uid":2,"aka":"maimai","name":"libailu","oid":2,"value":15,"age":1}
好了,自定义jdbc driver部分,先说到这里,其实要想真正实现好一个自己的驱动,还需要处理很多东西,可能很琐碎,也有很多乐趣,希望在逐步分解中,为大家带来一点不一样的东西,也期待您的意见与建议。