项目地址在这: byzer-client-sdk
Hmmm, 首先这是啥呢,byzer-client-sdk 目标是给你提供了使用各种语言拼接和运行Byzer代码的能力(现阶段仅支持 Java/Scala)。
比如我做个一个产品,用户在界面上需要看到一个csv格式的数据的schema, 他把这些信息传递给后端(以Scala语言为例),后端需要根据信息拼接下面的代码,然后发送给 Byzer 引擎,从而得到结果。
Byzer 的代码很简单:
代码语言:javascript复制load csv.`/tmp/upload/a.csv` where header="true" as a;
!desc a;
但是如果你要拼接这个代码,就需要高一些模板啥的,可能需要费点心思,然后呢,还需要想办法如何给 Byzer 引擎进行交互。这个时候, byzer-client-sdk 就可以帮上忙了。我们以 scala-client 为例:
代码语言:javascript复制val client = byzer.cluster.engine.url("http://127.0.0.1:9003/run/script").owner("jack").end.
backendStrategy(new JobNumAwareStrategy("")).
end
val script = client.load.format("csv").path("/tmp/upload/a.csv").
options().add("header", "true").end.end
val lastTable = script.lastTableName
script.raw.code(s"!desc ${lastTable};").end
script.run()
第一行代码配置 Byzer 引擎地址,第二行代码其实就算拼接上了构建csv加载的语句,第三行,第四行生成第二条语句,最后一行直接运行。
是不是很简单!
各种有意思的特性
多引擎负载均衡
我们知道,一般服务我们都要做负载均衡,Byzer 引擎其实也不例外。这个时候,我们可以配置多个引擎
代码语言:javascript复制val client = Byzer().cluster().
engine.url("http://127.0.0.1:9004/run/script").owner("admin").end.
engine.url("http://127.0.0.1:9003/run/script").owner("admin").end.
backendStrategy(new ResourceAwareStrategy("")).
end
然后呢,某次执行的时候,该怎么进行选择呢?此时可以通过 backendStrategy 来配置策略。
目前支持三种策略:
- ResourceAwareStrategy 选择空余资源最多的引擎
- JobNumAwareStrategy 选择任务数最少的引擎执行
- AllBackendsStrategy 每个引擎都会执行一把
标签化
我们可以给每条 Byzer 语句提供标签,从而通过标签来获得对应的语句。我们来看如下语句:
代码语言:javascript复制val byzer = Byzer()
val table1 = byzer.load.format("csv").path("/tmp/jack").options().add("header", "true").end.tag("t1")
val table2 = byzer.load.format("csv").path("/tmp/william").options().add("header", "true").end.tag("t2")
table1.end
table2.end
val t1 = byzer.getByTag("t1")
println(t1.head.tableName)
我们可以给某条 Byzer 语句打个标签,然后通过标签可以随时获取某条语句,并且获得他的信息。
此外,我们还能提供标签运行能力:
代码语言:javascript复制// run all script
val res = script.run()
println(res.head.returnContent().asString())
// Generate code unitl the statement with tag `load_json` and run it
val res1 = script.runUntilTag("load_json")
println(res1.head.returnContent().asString())
// Only run statement with tag `xxxx`.
val res2 = script.runWithTag("xxxx")
println(res2.head.returnContent().asString())
第一部分,我们会运行生成的所有代码,第二个,我们会把打上了 load_json 标签的语句以及之前的语句放在一起运行。后面的语句不会运行。
第三个,只跑某个标签(多条语句可以打相同的标签)。
支持原生 Byzer 代码
总有 SDK 没有涵盖的能力,此时我们可以使用 Raw code :
代码语言:javascript复制val byzer = Byzer().
load.format("csv").path("/tmp/jack").options().add("header", "true").end.tag("load_csv").end.
filter.or.add(Expr(Some("a=b"))).add(And(Expr(Some("c>2")), Expr(Some("d>10")))).end.tag("filter").end.
columns.addColumn(Expr(Some("a,c,d"))).tag("select_col").end
val lastTable = byzer.lastTableName
val genCode = byzer.raw.code(s"select * from ${lastTable} as newTable;").end.
columns.from("newTable").addColumn(Expr(Some("count(*) as c"))).end.
toScript
println(genCode)
产生的语句如下:
代码语言:javascript复制load csv.`/tmp/jack` where `header`='''true''' as 58c451473da540e9bf097d8b59fe62d8;
select * from 58c451473da540e9bf097d8b59fe62d8 where (a=b or (c>2 and d>10)) as ca0d5f002a5f4d1397658fd20bfa5f1a;
select a,c,d from ca0d5f002a5f4d1397658fd20bfa5f1a as 7bd3418bdc4b4a2b9ea64ba6cbe9b03b;
select * from 7bd3418bdc4b4a2b9ea64ba6cbe9b03b as newTable;
select count(*) as c from newTable as 56d4a29f710a4e3cb6454ed568e29707;
节点互换
我们构建好 多条语句后,有的时候要互换语句,具体代码如下;
代码语言:javascript复制val byzer = Byzer().
load.format("csv").path("/tmp/jack").options().add("header", "true").end.tag("load_csv").end.
filter.or.add(Expr(Some("a=b"))).add(And(Expr(Some("c>2")), Expr(Some("d>10")))).end.tag("filter").end.
columns.addColumn(Expr(Some("a,c,d"))).tag("select_col").end
val load_csv = byzer.getByTag("load_csv").head
val filter = byzer.getByTag("filter").head
val select_col = byzer.getByTag("select_col").head
// replace the from table
select_col.asInstanceOf[Columns].from(load_csv.tableName)
filter.asInstanceOf[Filter].from(select_col.tableName)
val genCode = byzer.swapBlock(filter, select_col).toScript
println(genCode)
我们构建了一条加载表的语句,一条过滤语句,一条选择列的语句,此时我希望互换后面两条语句,然后唯一要变化的是需要重新更改两条语句的输入表,此时结合 Tag 以及 通过 swapBlock 就可以实现互换。生成的结果语句如下:
代码语言:javascript复制load csv.`/tmp/jack` where `header`='''true''' as 1e5a6d4fc8f54174854950fa387598f8;
select a,c,d from 1e5a6d4fc8f54174854950fa387598f8 as 6d6834c09492413c91665c00088f4616;
select * from 6d6834c09492413c91665c00088f4616 where (a=b or (c>2 and d>10)) as d4044376b7834f368ea74b7c00e9c479;
序列化反序列化
通常用户为了构建一个脚本,可能需要多次交互。每次交互后,我们可以将代码保存成json,然后下次交互再还原为对象,继续对对象进行操作。
参考如下代码:
代码语言:javascript复制val byzer = Byzer()
val jsonString = byzer.cluster.engine.url("http://127.0.0.1:9003/run/script").owner("jack").end.
backendStrategy(new JobNumAwareStrategy("")).
end.
load.format("csv").path("/tmp/jack").
options().add("header", "true").end.tag("table1").end.filter.
and.add(Or(Expr(Some("a>1")), Expr(Some("b>1")))).add(Expr(Some("c==1"))).end.end.
toJson(true)
var byzer2 = Byzer()
byzer2 = byzer2.fromJson(jsonString)
assert(byzer2.toJson(true) == jsonString)
-- 继续操作
byzer2.filter....
更多的语法示例
具体语法示例,大家参考对应的 sdk README 文档啦。