FunDA是一种函数式的编程工具,它所产生的程序是由许多功能单一的细小函数组合而成,这些函数就是用户自定义操作函数了。我们在前面曾经提过FunDA的运作原理模拟了数据流管道。流元素在管道流动的过程中被使用或者更新。在管道中流动的元素都必须继承FDAROW类型,可以细分成几个大类:
1、数据行(data-row):因为FunDA的数据行必须是强类型的,所以各种case class类型继承了FDAROW(extends FDAROW)之后最为适合
2、动作行(action-row):case class包嵌slick.DBIOAction的数据类型,如:FDAActionRow(slickQueryAction)
3、异常行(exception-row):case class包嵌Exception类型,是下面这样申明的:
代码语言:javascript复制case class FDAErrorRow(e: Exception) extends FDAROW
4、终止行(end-of-stream):数据流终止信号,用于通知下游节点已经没有流动元素了
FunDA自定义操作函数的主要目的是在某个流节点对流元素进行使用和处理。乍看好像直接用函数式编程中的map,flatMap函数都能达到同样的目标,如:
代码语言:javascript复制fdaStream.map(row => transformData(row)).map(action => runQueryAction(action))
但经过实验后发现标准流操作函数map,flatMap缺乏功能强大又可以灵活应用的流动操作,而这又是流式数据处理至关重要的一项功能。这就是为什么我们需要一套新的用户自定义函数了。
FunDA规范了一套标准的自定义函数操作流程,由一下几个步骤组成:
1、确定当前流元素类型
2、在该类型的框架内使用和变动流元素字段值
3、流动控制:控制元素向下游的流动
我们将在这篇讨论里示范各种形式和功能的自定义函数。承上篇的示范所产生的数据表AIRQM。这是一个直接导入cvs文件产生的数据表,所有字段都是String类型的。我们的示范就是把这个表里的字段属性转换成匹配的类型后生成一个新表AQMRPT,并把AIRQM里数据的字段值经过转换后并入新表。下面是这个表的结构:
代码语言:javascript复制 case class AQMRPTModel(rid: Long
, mid: Int
, state: String
, county: String
, year: Int
, value: Int
, total: Int
, valid: Boolean) extends FDAROW
class AQMRPTTable(tag: Tag) extends Table[AQMRPTModel](tag, "AQMRPT") {
def rid = column[Long]("ROWID",O.AutoInc,O.PrimaryKey)
def mid = column[Int]("MEASUREID")
def state = column[String]("STATENAME",O.Length(32))
def county = column[String]("COUNTYNAME",O.Length(32))
def year = column[Int]("REPORTYEAR")
def value = column[Int]("VALUE")
def total = column[Int]("TOTAL")
def valid = column[Boolean]("VALID")
def * = (rid,mid,state,county,year,value,total,valid) <> (AQMRPTModel.tupled, AQMRPTModel.unapply)
}
val AQMRPTQuery = TableQuery[AQMRPTTable]
注意我们用extends FDAROW把AQMRPTModel变成了强类型数据行类型,这是必须的。现在AQMRPTQuery就是这个新的数据表了。下面是这个表的创建和使用铺垫代码:
代码语言:javascript复制//drop original table schema
val futVectorTables = db.run(MTable.getTables)
val futDropTable = futVectorTables.flatMap{ tables => {
val tableNames = tables.map(t => t.name.name)
if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
db.run(AQMRPTQuery.schema.drop)
else Future()
}
}.andThen {
case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} dropped successfully! ")
case Failure(e) => println(s"Failed to drop Table ${AQMRPTQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
}
Await.ready(futDropTable,Duration.Inf)
//create new table to refine AQMRawTable
val actionCreateTable = Models.AQMRPTQuery.schema.create
val futCreateTable = db.run(actionCreateTable).andThen {
case Success(_) => println("Table created successfully!")
case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
}
//would carry on even fail to create table
Await.ready(futCreateTable,Duration.Inf)
//truncate data, only available in slick 3.2.1
val futTruncateTable = futVectorTables.flatMap{ tables => {
val tableNames = tables.map(t => t.name.name)
if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
db.run(AQMRPTQuery.schema.truncate)
else Future()
}
}.andThen {
case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} truncated successfully!")
case Failure(e) => println(s"Failed to truncate Table ${AQMRPTQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
}
Await.ready(futDropTable,Duration.Inf)
通过上面这段铺垫代码(boiler-code)使我们保证获得一个空的AQMRPTQuery表。下一步我们把AQMRaw载入内存作为FunDA程序的一个数据源(source)来使用:
代码语言:javascript复制//load original table content
//original table strong-typed-row
case class AQMRaw(mid: String, state: String,
county: String, year: String, value: String) extends FDAROW
implicit def toAQMRaw(row: (String,String,String,String,String)) =
AQMRaw(row._1,row._2,row._3,row._4,row._5)
val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRaw _)
// val queryAQMRaw = for { r <- AQMRawQuery } yield (r.mid,r.state,r.county,r.year,r.value)
val queryAQMRaw = sql"""
SELECT MEASUREID,STATENAME,COUNTYNAME,REPORTYEAR,VALUE FROM AIRQM
""".as[(String,String,String,String,String)]
val streamAQMRaw: FDAPipeLine[FDAROW] = streamLoader.fda_typedStream(queryAQMRaw)(db)(512,512)()
注意我们使用了slick的plain sql来示范创建这个数据源。AQMRaw类型是这个源的强类型数据行类型,所以又必须extends FDAROW。再就是类型转换函数toAQMRaw是implicit def的,这是一种确保转换函数存在的措施,提供给compiler在编译时使用(试试如果不用implicit def会怎样,仔细阅读compiler的错误提示)。我们把数据导入的流程分成下面几个步骤:
1、载入数据源 >>> 数据行类型转换:从AQMRowModel转成AQMRPTModel >>> 把新类型的数据行传给下游
2、把上游传来的数据行转换成动作行FDAActionRow(queryAction),然后把这个动作行传给下游
3、对上游传来的动作行进行运算
上面这三个大步骤代表三个功能单一,细化的用户自定义函数。我们先看看第一步:这是一个典型格式的自定义函数:
代码语言:javascript复制//filter out rows with inconvertible value strings and out of ranged year
def filterRows: FDAUserTask[FDAROW] = row => {
row match {
case r: AQMRaw => {
try {
val yr = r.year.toInt
val v = r.value.toInt
val vlu = if ( v > 10 ) 10 else v
val data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
if ((yr > 1960 && yr < 2018))
fda_next(data) //this row ok. pass downstream
else
fda_skip //filter out this row
} catch {
case e: Exception =>
fda_next(AQMRPTModel(0,r.mid.toInt,r.state,r.county,2000,0,0,false))
//pass a invalid row
}
}
case _ => fda_skip //wrong type, skip
}
}
下面我们把用户自定义函数filterRows与自定义函数的标准操作流程对应一下:
1、确定数据行类型:row match { case r: AQMRow => ??? 通过这段明确了数据行是AQMRow类型的
2、下面是数据行的内容的具体应用:
代码语言:javascript复制data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
我们使用了AQMRaw的行字段r.??来构建AQMRPTModel数据行。
3、fda_next(???)把新构建的AQMRPTModel行传到下游
以上几步证明filterRows是按照自定义函数操作标准来运作的。
第二步是把新类型的数据行转换成一条动作行,然后传给下游。由下面这个用户自定义函数来实现:
代码语言:javascript复制//transform data to action for later execution
def toAction: FDAUserTask[FDAROW] = row => {
row match {
case r: AQMRPTModel =>
val queryAction = AQMRPTQuery = r
fda_next(FDAActionRow(queryAction))
case _ => fda_skip
}
}
toAction同样遵循自定义函数的操作标准。我们需要需要一个运算器来运算动作行:
代码语言:javascript复制//get a query runner and an action task
val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)
def runActionRow: FDAUserTask[FDAROW] = action => {
action match {
case FDAActionRow(q) => actionRunner.fda_execAction(q)(db)
fda_skip
case _ => fda_skip
}
}
runActionRow在程序的最后一个节点,是个终点函数,不传送任何数据行到下游。把这三个函数组合成一个FunDA程序然后startRun:
代码语言:javascript复制/start the program
val streamAllTasks = streamAQMRaw.appendTask(filterRows)
.appendTask(toAction)
.appendTask(runActionRow)
val streamToRun = streamAllTasks.onError { case e: Exception => println("Error:" e.getMessage); fda_appendRow(FDAErrorRow(new Exception(e))) }
streamToRun.startRun
注意在startRun之前我们可以对FunDA stream进行任何组合。运行startRun后检验数据库表清单里是否增加了AQMRPT表。
除了每行数据的独立应用外,很多时候我们都会对一组串联的数据行进行某种汇总操作(aggregation),比如清点行数、对行内某字段进行汇总计算等。FunDA提供了自定义汇总函数(user-defined-aggregation)来实现这个目的。下面是一个自定义汇总函数例子:
代码语言:javascript复制//user defined aggregation task
def aggregateValue: FDAAggrTask[Accu,FDAROW] = (accu,row) => {
row match {
case aqmr: AQMRPTModel =>
if (accu.state == "" || (aqmr.state == accu.state && aqmr.year == accu.year))
//same condition: inc count and add sum, pass no row downstream
(Accu(aqmr.state,aqmr.county,aqmr.year,accu.count 1, accu.sumOfValue aqmr.value),fda_skip)
else
//reset accumulator, create a new aggregated row and pass downstream
(Accu(aqmr.state,aqmr.county,aqmr.year,1, aqmr.value)
,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
,accu.count,accu.sumOfValue/accu.count,true)))
case FDANullRow =>
//last row encountered. create and pass new aggregated row
(Accu(accu.state,accu.county,accu.year,1, 0)
,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
,accu.count,accu.sumOfValue/accu.count,true)))
//incorrect row type, do nothing
case _ => (accu,fda_skip)
}
}
自定义汇总函数的款式是FDAAggrTask,如下定义:
代码语言:javascript复制 type FDAAggrTask[AGGR,ROW] = (AGGR,ROW) => (AGGR,Option[List[ROW]])
AGGR是个用户自定义类型,用来记录汇总当前状态。ROW类型代表数据行类型。自定义汇总函数aggregateValue的功能如下:
1、对AQMRPT表里的数据按statename,year进行汇总
2、产生一条新的汇总数据行并把它插入AQMRPT表里。
汇总函数就是一种状态函数,它的典型函数表现形式就是输入原状态,输出新状态。自定义汇总函数必须用aggregateTask来组合:
代码语言:javascript复制 aqmrStream.aggregateTask(Accu("","",0,0,0),aggregateValue)
.appendTask(toAction)
.appendTask(runActionRow)
.startRun
Accu是个自定义case class。在调用startRun之前我们把初始状态Accused("","",0,0,0)传入aggregateTask。aqmrStream是个数据源,它的铺垫代码如下:
代码语言:javascript复制//aggregate-task demo: get count and sum of value for each state and year
val orderedAQMRPT = AQMRPTQuery.sortBy(r => (r.state,r.year))
//TableElementType conversion. must declare implicit
implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
val aqmrStreamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
val aqmrStream: FDAPipeLine[FDAROW] = aqmrStreamLoader.fda_typedStream(orderedAQMRPT.result)(db)(512,512)()
注意我们这次使用了slick TableQuery原始行类型AQMRPTTable#TableElementType来进行强类型转换。
本次示范的源代码如下:
代码语言:javascript复制import slick.jdbc.meta._
import com.bayakala.funda._
import api._
import scala.language.implicitConversions
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import slick.jdbc.H2Profile.api._
import Models._
object UserDefinedTasks extends App {
val db = Database.forConfig("h2db")
//drop original table schema
val futVectorTables = db.run(MTable.getTables)
val futDropTable = futVectorTables.flatMap{ tables => {
val tableNames = tables.map(t => t.name.name)
if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
db.run(AQMRPTQuery.schema.drop)
else Future()
}
}.andThen {
case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} dropped successfully! ")
case Failure(e) => println(s"Failed to drop Table ${AQMRPTQuery.baseTableRow.tableName}, it may not exist! Error: ${e.getMessage}")
}
Await.ready(futDropTable,Duration.Inf)
//create new table to refine AQMRawTable
val actionCreateTable = Models.AQMRPTQuery.schema.create
val futCreateTable = db.run(actionCreateTable).andThen {
case Success(_) => println("Table created successfully!")
case Failure(e) => println(s"Table may exist already! Error: ${e.getMessage}")
}
//would carry on even fail to create table
Await.ready(futCreateTable,Duration.Inf)
//truncate data, only available in slick 3.2.1
val futTruncateTable = futVectorTables.flatMap{ tables => {
val tableNames = tables.map(t => t.name.name)
if (tableNames.contains(AQMRPTQuery.baseTableRow.tableName))
db.run(AQMRPTQuery.schema.truncate)
else Future()
}
}.andThen {
case Success(_) => println(s"Table ${AQMRPTQuery.baseTableRow.tableName} truncated successfully!")
case Failure(e) => println(s"Failed to truncate Table ${AQMRPTQuery.baseTableRow.tableName}! Error: ${e.getMessage}")
}
Await.ready(futDropTable,Duration.Inf)
//load original table content
//original table strong-typed-row
case class AQMRaw(mid: String, state: String,
county: String, year: String, value: String) extends FDAROW
implicit def toAQMRaw(row: (String,String,String,String,String)) =
AQMRaw(row._1,row._2,row._3,row._4,row._5)
val streamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRaw _)
// val queryAQMRaw = for { r <- AQMRawQuery } yield (r.mid,r.state,r.county,r.year,r.value)
val queryAQMRaw = sql"""
SELECT MEASUREID,STATENAME,COUNTYNAME,REPORTYEAR,VALUE FROM AIRQM
""".as[(String,String,String,String,String)]
val streamAQMRaw: FDAPipeLine[FDAROW] = streamLoader.fda_typedStream(queryAQMRaw)(db)(512,512)()
//filter out rows with inconvertible value strings and out of ranged value and year
def filterRows: FDAUserTask[FDAROW] = row => {
row match {
case r: AQMRaw => {
try {
val yr = r.year.toInt
val v = r.value.toInt
val vlu = if ( v > 10 ) 10 else v
val data = AQMRPTModel(0,r.mid.toInt,r.state,r.county,yr,vlu,0,true)
if ((yr > 1960 && yr < 2018))
fda_next(data) //this row ok. pass downstream
else
fda_skip //filter out this row
} catch {
case e: Exception =>
fda_next(AQMRPTModel(0,r.mid.toInt,r.state,r.county,2000,0,0,false))
//pass a invalid row
}
}
case _ => fda_skip //wrong type, skip
}
}
//transform data to action for later execution
def toAction: FDAUserTask[FDAROW] = row => {
row match {
case r: AQMRPTModel =>
val queryAction = AQMRPTQuery = r
fda_next(FDAActionRow(queryAction))
case _ => fda_skip
}
}
//get a query runner and an action task
val actionRunner = FDAActionRunner(slick.jdbc.H2Profile)
def runActionRow: FDAUserTask[FDAROW] = action => {
action match {
case FDAActionRow(q) => actionRunner.fda_execAction(q)(db)
fda_skip
case _ => fda_skip
}
}
//start the program
val streamAllTasks = streamAQMRaw.appendTask(filterRows)
.appendTask(toAction)
.appendTask(runActionRow)
val streamToRun = streamAllTasks.onError { case e: Exception => println("Error:" e.getMessage); fda_appendRow(FDAErrorRow(new Exception(e))) }
streamToRun.startRun
//aggregate-task demo: get count and sum of value for each state and year
val orderedAQMRPT = AQMRPTQuery.sortBy(r => (r.state,r.year))
//TableElementType conversion. must declare implicit
implicit def toAQMRPT(row: AQMRPTTable#TableElementType) =
AQMRPTModel(row.rid,row.mid,row.state,row.county,row.year,row.value,row.total,row.valid)
val aqmrStreamLoader = FDAStreamLoader(slick.jdbc.H2Profile)(toAQMRPT _)
val aqmrStream: FDAPipeLine[FDAROW] = aqmrStreamLoader.fda_typedStream(orderedAQMRPT.result)(db)(512,512)()
//user defined aggregator type.
case class Accu(state: String, county: String, year: Int, count: Int, sumOfValue: Int)
//user defined aggregation task
def aggregateValue: FDAAggrTask[Accu,FDAROW] = (accu,row) => {
row match {
case aqmr: AQMRPTModel =>
if (accu.state == "" || (aqmr.state == accu.state && aqmr.year == accu.year))
//same condition: inc count and add sum, pass no row downstream
(Accu(aqmr.state,aqmr.county,aqmr.year,accu.count 1, accu.sumOfValue aqmr.value),fda_skip)
else
//reset accumulator, create a new aggregated row and pass downstream
(Accu(aqmr.state,aqmr.county,aqmr.year,1, aqmr.value)
,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
,accu.count,accu.sumOfValue/accu.count,true)))
case FDANullRow =>
//last row encountered. create and pass new aggregated row
(Accu(accu.state,accu.county,accu.year,1, 0)
,fda_next(AQMRPTModel(0,9999,accu.state,accu.county,accu.year
,accu.count,accu.sumOfValue/accu.count,true)))
//incorrect row type, do nothing
case _ => (accu,fda_skip)
}
}
aqmrStream.aggregateTask(Accu("","",0,0,0),aggregateValue)
.appendTask(toAction)
.appendTask(runActionRow)
.startRun
}