Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

2024-06-27 15:59:20 浏览数 (1)

kis-flow源代码:https://github.com/aceld/kis-flow

6.1 配置的导入

现在每次建立Flow和Function等,都需要一系列繁琐的添加,不是很方便,接下来,我们可以通过批量读写配置文件,构建KisFlow中的结构关系,并且也可以将KisFlow的结构导出到本地文件中。目前我们先用文件的形式做配置的持久化,开发者也可以今后做数据库或者远程配置的持久化均可。

6.1.1 创建配置文件

首先我们在kis-flow/test/load_conf/下创建需要加载的kisflow业务配置文件。

kis-flow/test/load_conf/下分别创建conn/flow/func/三个文件夹分别存放Connector、Flow、Funciton的配置信息。

代码语言:bash复制
├── conn

│   └── conn-ConnName1.yml

├── flow

│   └── flow-FlowName1.yml

└── func

    ├── func-FuncName1.yml

    ├── func-FuncName2.yml

    └── func-FuncName3.yml

分别创建一些yml文件。具体内容如下:

A.Function

kis-flow/test/load_conf/func/func-FuncNam1.yml

代码语言:yaml复制
kistype: func

fname: funcName1

fmode: Verify

source:

  name: 公众号抖音商城户订单数据

  must:

    - order_id

    - user_id

kis-flow/test/load_conf/func/func-FuncNam2.yml

代码语言:yaml复制
kistype: func

fname: funcName2

fmode: Save

source:

  name: 用户订单错误率

  must:

    - order_id

    - user_id

option:

  cname: ConnName1

kis-flow/test/load_conf/func/func-FuncNam2.yml

代码语言:yaml复制
kistype: func

fname: funcName2

fmode: Save

source:

  name: 用户订单错误率

  must:

    - order_id

    - user_id

option:

  cname: ConnName1

kis-flow/test/load_conf/func/func-FuncNam3.yml

代码语言:yaml复制
kistype: func

fname: funcName3

fmode: Calculate

source:

  name: 用户订单错误率

  must:

    - order_id

    - user_id
B.Connector

kis-flow/test/load_conf/func/func-ConnName1.yml

代码语言:yaml复制
kistype: conn

cname: ConnName1

addrs: '0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990'

type: redis

key: redis-key

params:

  args1: value1

  args2: value2

load: null

save:

  - funcName2
C.Flow

kis-flow/test/load_conf/func/func-FlowName1.yml

代码语言:yaml复制
kistype: flow

status: 1

flow_name: flowName1

flows:

  - fname: funcName1

  - fname: funcName2

  - fname: funcName3

6.1.2 配置解析

创建kis-flow/file/目录,且创建kis-flow/file/config_import.go文件。

首先定义一个可以存放全部配置的接口:

kis-flow/file/config_import.go

代码语言:go复制
type allConfig struct {

    Flows map[string]*config.KisFlowConfig

    Funcs map[string]*config.KisFuncConfig

    Conns map[string]*config.KisConnConfig

}

key作为各个模块的Name名称字段。

然后分别定义解析Flow、Function、Connector配置的方法。yaml的第三方库,我们用"gopkg.in/yaml.v3"这个库。

kis-flow/go.mod

代码语言:go复制
module kis-flow



go 1.18



require github.com/google/uuid v1.5.0

require gopkg.in/yaml.v3 v3.0.1 // indirect
A. Flow 配置解析

kis-flow/file/config_import.go

代码语言:go复制
// kisTypeFlowConfigure 解析Flow配置文件,yaml格式

func kisTypeFlowConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {

    flow := new(config.KisFlowConfig)

    if ok := yaml.Unmarshal(confData, flow); ok != nil {

        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))

    }



    // 如果FLow状态为关闭,则不做配置加载

    if common.KisOnOff(flow.Status) == common.FlowDisable {

        return nil

    }



    if _, ok := all.Flows[flow.FlowName]; ok {

        return errors.New(fmt.Sprintf("%s set repeat flow_id:%s", fileName, flow.FlowName))

    }



    // 加入配置集合中

    all.Flows[flow.FlowName] = flow



    return nil

}
  • confData:是文件二进制数据
  • fileName:是文件路径
  • kistype: 为配置文件类别

kisTypeFlowConfigure 会将配置信息解析到allConfig的Flows成员中。

同理,Function和Connector的解析办法如下。

B. Functioin配置解析

kis-flow/file/config_import.go

代码语言:go复制
// kisTypeFuncConfigure 解析Function配置文件,yaml格式

func kisTypeFuncConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {

    function := new(config.KisFuncConfig)

    if ok := yaml.Unmarshal(confData, function); ok != nil {

        return errors.New(fmt.Sprintf("%s has wrong format kisType = %s", fileName, kisType))

    }

    if _, ok := all.Funcs[function.FName]; ok {

        return errors.New(fmt.Sprintf("%s set repeat function_id:%s", fileName, function.FName))

    }



    // 加入配置集合中

    all.Funcs[function.FName] = function



    return nil

}
C. Connector配置解析

kis-flow/file/config_import.go

代码语言:go复制
// kisTypeConnConfigure 解析Connector配置文件,yaml格式

func kisTypeConnConfigure(all *allConfig, confData []byte, fileName string, kisType interface{}) error {

    conn := new(config.KisConnConfig)

    if ok := yaml.Unmarshal(confData, conn); ok != nil {

        return errors.New(fmt.Sprintf("%s is wrong format nsType = %s", fileName, kisType))

    }



    if _, ok := all.Conns[conn.CName]; ok {

        return errors.New(fmt.Sprintf("%s set repeat conn_id:%s", fileName, conn.CName))

    }



    // 加入配置集合中

    all.Conns[conn.CName] = conn



    return nil

}

6.1.3 遍历文件

下面实现一个遍历一个路径loadPath下面所有的yml和yaml类型文件,按照kistype类别解析配置信息到allConfig中。

kis-flow/file/config_import.go

代码语言:go复制
// parseConfigWalkYaml 全盘解析配置文件,yaml格式, 讲配置信息解析到allConfig中

func parseConfigWalkYaml(loadPath string) (*allConfig, error) {



    all := new(allConfig)



    all.Flows = make(map[string]*config.KisFlowConfig)

    all.Funcs = make(map[string]*config.KisFuncConfig)

    all.Conns = make(map[string]*config.KisConnConfig)



    err := filepath.Walk(loadPath, func(filePath string, info os.FileInfo, err error) error {

        // 校验文件后缀是否合法

        if suffix := path.Ext(filePath); suffix != ".yml" && suffix != ".yaml" {

            return nil

        }



        // 读取文件内容

        confData, err := ioutil.ReadFile(filePath)

        if err != nil {

            return err

        }



        confMap := make(map[string]interface{})



        // 校验yaml合法性

        if err := yaml.Unmarshal(confData, confMap); err != nil {

            return err

        }



        // 判断kisType是否存在

        if kisType, ok := confMap["kistype"]; !ok {

            return errors.New(fmt.Sprintf("yaml file %s has no file [kistype]!", filePath))

        } else {

            switch kisType {

            case common.KisIdTypeFlow:

                return kisTypeFlowConfigure(all, confData, filePath, kisType)



            case common.KisIdTypeFunction:

                return kisTypeFuncConfigure(all, confData, filePath, kisType)



            case common.KisIdTypeConnnector:

                return kisTypeConnConfigure(all, confData, filePath, kisType)



            default:

                return errors.New(fmt.Sprintf("%s set wrong kistype %s", filePath, kisType))

            }

        }

    })



    if err != nil {

        return nil, err

    }



    return all, nil

}

6.1.4 导入方法

下面提供一个对外的公开方法ConfigImportYaml,需要提供一个导入的文件根路径。

kis-flow/file/config_import.go

代码语言:go复制
// ConfigImportYaml 全盘解析配置文件,yaml格式

func ConfigImportYaml(loadPath string) error {



    all, err := parseConfigWalkYaml(loadPath)

    if err != nil {

        return err

    }



    for flowName, flowConfig := range all.Flows {



        // 构建一个Flow

        newFlow := flow.NewKisFlow(flowConfig)



        for _, fp := range flowConfig.Flows {

            if err := buildFlow(all, fp, newFlow, flowName); err != nil {

                return err

            }

        }



        //将flow添加到FlowPool中

        kis.Pool().AddFlow(flowName, newFlow)

    }



    return nil

}

首先会调用parseConfigWalkYaml()将全部的配置信息加载到内存中。

其次,遍历所有的Flow,依次去构建Flow,最终将flow添加到Pool当中,具体的构建流程如下:

kis-flow/file/config_import.go

代码语言:go复制
func buildFlow(all *allConfig, fp config.KisFlowFunctionParam, newFlow kis.Flow, flowName string) error {

    //加载当前Flow依赖的Function

    if funcConfig, ok := all.Funcs[fp.FuncName]; !ok {

        return errors.New(fmt.Sprintf("FlowName [%s] need FuncName [%s], But has No This FuncName Config", flowName, fp.FuncName))

    } else {

        //flow add connector

        if funcConfig.Option.CName != "" {

            // 加载当前Function依赖的Connector

            if connConf, ok := all.Conns[funcConfig.Option.CName]; !ok {

                return errors.New(fmt.Sprintf("FuncName [%s] need ConnName [%s], But has No This ConnName Config", fp.FuncName, funcConfig.Option.CName))

            } else {

                // Function Config 关联 Connector Config

                _ = funcConfig.AddConnConfig(connConf)

            }

        }



        //flow add function

        if err := newFlow.Link(funcConfig, fp.Params); err != nil {

            return err

        }

    }



    return nil

}

6.2 配置导入单元测试

创建单元测试文件 kis-flow/test/kis_config_import_test.go

kis-flow/test/kis_config_import_test.go

代码语言:go复制
package test



import (

    "context"

    "kis-flow/common"

    "kis-flow/file"

    "kis-flow/kis"

    "kis-flow/test/caas"

    "kis-flow/test/faas"

    "testing"

)



func TestConfigImportYmal(t *testing.T) {

    ctx := context.Background()



    // 0. 注册Function 回调业务

    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)

    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)

    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)



    // 0. 注册ConnectorInit 和 Connector 回调业务

    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)

    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)



    // 1. 加载配置文件并构建Flow

    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {

        panic(err)

    }



    // 2. 获取Flow

    flow1 := kis.Pool().GetFlow("flowName1")



    // 3. 提交原始数据

    _ = flow1.CommitRow("This is Data1 from Test")

    _ = flow1.CommitRow("This is Data2 from Test")

    _ = flow1.CommitRow("This is Data3 from Test")



    // 4. 执行flow1

    if err := flow1.Run(ctx); err != nil {

        panic(err)

    }

}

先注册业务方法。然后通过ConfigImportYaml加载配置,之后从Pool中得到flow实例,提交数据,运行。

注意,这里的配置文件路径,写的是绝对路径。

cd 到kis-flow/test/目录下,执行指令:

代码语言:bash复制
go test -test.v -test.paniconexit0 -test.run TestConfigImportYmal    

结果如下:

代码语言:bash复制
=== RUN   TestConfigImportYmal

Add KisPool FuncName=funcName1

Add KisPool FuncName=funcName2

Add KisPool FuncName=funcName3

Add KisPool CaaSInit CName=ConnName1

Add KisPool CaaS CName=ConnName1, FName=funcName2, Mode =Save

===> Call Connector InitDemo1

&{conn ConnName1 0.0.0.0:9988,0.0.0.0:9999,0.0.0.0:9990 redis redis-key map[args1:value1 args2:value2] [] [funcName2 funcName2]}

Add FlowRouter FlowName=flowName1



context.Background

====> After CommitSrcData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b

All Level Data =

 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]]



KisFunctionV, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114960 ThisFunctionId:func-37c7070f45144529891d433ae9c4ebfc PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]] inPut:[This is Data1 from Test This is Data2 from Test This is Data3 from Test]}



---> Call funcName1Handler ----

In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data1 from Test

In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data2 from Test

In FuncName = funcName1, FuncId = func-37c7070f45144529891d433ae9c4ebfc, row = This is Data3 from Test

context.Background

 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b

All Level Data =

 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]]



KisFunctionS, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc0001149c0 ThisFunctionId:func-5315301ffbbb4ae4be021729ddff1569 PrevFunctionId:func-37c7070f45144529891d433ae9c4ebfc funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]] inPut:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2]}



---> Call funcName2Handler ----

In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 0

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save

===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 0

In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 1

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save

===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 1

In FuncName = funcName2, FuncId = func-5315301ffbbb4ae4be021729ddff1569, row = data from funcName[funcName1], index = 2

===> In CaasDemoHanler1: flowName: flowName1, cName:ConnName1, fnName:funcName2, mode:Save

===> Call Connector CaasDemoHanler1, args from funciton: data from funcName[funcName1], index = 2

context.Background

 ====> After commitCurData, flow_name = flowName1, flow_id = flow-bcaaa02a8d4b4a80b2f2895d9cecf20b

All Level Data =

 map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]]



KisFunctionC, flow = &{Id:flow-bcaaa02a8d4b4a80b2f2895d9cecf20b Name:flowName1 Conf:0xc00014eb00 Funcs:map[funcName1:0xc000114960 funcName2:0xc0001149c0 funcName3:0xc000114a20] FlowHead:0xc000114960 FlowTail:0xc000114a20 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc000114a20 ThisFunctionId:func-89a6a662729b4a0895e849c40bf29892 PrevFunctionId:func-5315301ffbbb4ae4be021729ddff1569 funcParams:map[func-37c7070f45144529891d433ae9c4ebfc:map[] func-5315301ffbbb4ae4be021729ddff1569:map[] func-89a6a662729b4a0895e849c40bf29892:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[This is Data1 from Test This is Data2 from Test This is Data3 from Test] func-37c7070f45144529891d433ae9c4ebfc:[data from funcName[funcName1], index = 0 data from funcName[funcName1], index = 1 data from funcName[funcName1], index = 2] func-5315301ffbbb4ae4be021729ddff1569:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]] inPut:[data from funcName[funcName2], index = 0 data from funcName[funcName2], index = 1 data from funcName[funcName2], index = 2]}



---> Call funcName3Handler ----

In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 0

In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 1

In FuncName = funcName3, FuncId = func-89a6a662729b4a0895e849c40bf29892, row = data from funcName[funcName2], index = 2

--- PASS: TestConfigImportYmal (0.01s)

PASS

ok      kis-flow/test   0.517s

预期的结果和我们一致,现在我们可以通过配置文件进行加载且构建KisFlow了。

6.3 配置的导出

6.3.1 导出实现

kis-flow/file/config_export.go

代码语言:go复制
package file



import (

    "errors"

    "fmt"

    "gopkg.in/yaml.v3"

    "io/ioutil"

    "kis-flow/common"

    "kis-flow/kis"

)



// ConfigExportYaml 将flow配置输出,且存储本地

func ConfigExportYaml(flow kis.Flow, savaPath string) error {



    if data, err := yaml.Marshal(flow.GetConfig()); err != nil {

        return err

    } else {

        //flow

        err := ioutil.WriteFile(savaPath common.KisIdTypeFlow "-" flow.GetName() ".yaml", data, 0644)

        if err != nil {

            return err

        }



        //function

        for _, fp := range flow.GetConfig().Flows {

            fConf := flow.GetFuncConfigByName(fp.FuncName)

            if fConf == nil {

                return errors.New(fmt.Sprintf("function name = %s config is nil ", fp.FuncName))

            }



            if fdata, err := yaml.Marshal(fConf); err != nil {

                return err

            } else {

                if err := ioutil.WriteFile(savaPath common.KisIdTypeFunction "-" fp.FuncName ".yaml", fdata, 0644); err != nil {

                    return err

                }

            }



            // Connector

            if fConf.Option.CName != "" {

                cConf, err := fConf.GetConnConfig()

                if err != nil {

                    return err

                }

                if cdata, err := yaml.Marshal(cConf); err != nil {

                    return err

                } else {

                    if err := ioutil.WriteFile(savaPath common.KisIdTypeConnnector "-" cConf.CName ".yaml", cdata, 0644); err != nil {

                        return err

                    }

                }

            }

        }

    }



    return nil

}

这里面需要补充一些接口,如下:

6.3.2 Flow新增接口

kis-flow/kis/flow.go

代码语言:go复制
package kis



import (

    "context"

    "kis-flow/common"

    "kis-flow/config"

)



type Flow interface {

    // Run 调度Flow,依次调度Flow中的Function并且执行

    Run(ctx context.Context) error

    // Link 将Flow中的Function按照配置文件中的配置进行连接

    Link(fConf *config.KisFuncConfig, fParams config.FParam) error

    // CommitRow 提交Flow数据到即将执行的Function层

    CommitRow(row interface{}) error

    // Input 得到flow当前执行Function的输入源数据

    Input() common.KisRowArr

    // GetName 得到Flow的名称

    GetName() string

    // GetThisFunction 得到当前正在执行的Function

    GetThisFunction() Function

    // GetThisFuncConf 得到当前正在执行的Function的配置

    GetThisFuncConf() *config.KisFuncConfig

    // GetConnector 得到当前正在执行的Function的Connector

    GetConnector() (Connector, error)

    // GetConnConf 得到当前正在执行的Function的Connector的配置

    GetConnConf() (*config.KisConnConfig, error)

    

    //                                

    // GetConfig 得到当前Flow的配置

    GetConfig() *config.KisFlowConfig

    // GetFuncConfigByName 得到当前Flow的配置

    GetFuncConfigByName(funcName string) *config.KisFuncConfig

    //                                

}

flow新增的接口实现如下:

kis-flow/flow/kis_flow.go

代码语言:go复制
func (flow *KisFlow) GetConfig() *config.KisFlowConfig {

    return flow.Conf

}



// GetFuncConfigByName 得到当前Flow的配置

func (flow *KisFlow) GetFuncConfigByName(funcName string) *config.KisFuncConfig {

    if f, ok := flow.Funcs[funcName]; ok {

        return f.GetConfig()

    } else {

        log.Logger().ErrorF("GetFuncConfigByName(): Function %s not found", funcName)

        return nil

    }

}

6.3.3 KisFlow中Funcs修复

这里面之前有个笔误。

kis-flow/flow/kis_flow.go

代码语言:go复制
// KisFlow 用于贯穿整条流式计算的上下文环境

type KisFlow struct {

    // 基础信息

    Id   string                // Flow的分布式实例ID(用于KisFlow内部区分不同实例)

    Name string                // Flow的可读名称

    Conf *config.KisFlowConfig // Flow配置策略



    // Function列表

    Funcs          map[string]kis.Function // 当前flow拥有的全部管理的全部Function对象, key: FunctionName

    FlowHead       kis.Function            // 当前Flow所拥有的Function列表表头

    FlowTail       kis.Function            // 当前Flow所拥有的Function列表表尾

    flock          sync.RWMutex            // 管理链表插入读写的锁

    ThisFunction   kis.Function            // Flow当前正在执行的KisFunction对象

    ThisFunctionId string                  // 当前执行到的Function ID

    PrevFunctionId string                  // 当前执行到的Function 上一层FunctionID



    // Function列表参数

    funcParams map[string]config.FParam // flow在当前Function的自定义固定配置参数,Key:function的实例KisID, value:FParam

    fplock     sync.RWMutex             // 管理funcParams的读写锁



    // 数据

    buffer common.KisRowArr  // 用来临时存放输入字节数据的内部Buf, 一条数据为interface{}, 多条数据为[]interface{} 也就是KisBatch

    data   common.KisDataMap // 流式计算各个层级的数据源

    inPut  common.KisRowArr  // 当前Function的计算输入数据

}

这里的Funcs成员,其key的含义,之前我们定义的是KisID,现在要修正为key的含义是FunctionName。

下面想Funcs成员赋值的代码做一个简单的修正

代码语言:go复制
// appendFunc 将Function添加到Flow中, 链表操作

func (flow *KisFlow) appendFunc(function kis.Function, fParam config.FParam) error {

       // ... ... 





    //将Function Name 详细Hash对应关系添加到flow对象中

    flow.Funcs[function.GetConfig().FName] = function



    // ... ... 

}

6.3.4 KisPool新增方法

kis-flow/kis/pool.go

代码语言:go复制
// GetFlows 得到全部的Flow

func (pool *kisPool) GetFlows() []Flow {

    pool.flowLock.RLock() // 读锁

    defer pool.flowLock.RUnlock()



    var flows []Flow



    for _, flow := range pool.flowRouter {

        flows = append(flows, flow)

    }



    return flows

}

KisPool新增 获取全部Flow的方法,以支持导出模块使用。

6.4 配置导出单元测试

kis-flow/test/创建kis_config_export_test.go文件。

代码语言:go复制
package test



import (

    "kis-flow/common"

    "kis-flow/file"

    "kis-flow/kis"

    "kis-flow/test/caas"

    "kis-flow/test/faas"

    "testing"

)



func TestConfigExportYmal(t *testing.T) {

    // 0. 注册Function 回调业务

    kis.Pool().FaaS("funcName1", faas.FuncDemo1Handler)

    kis.Pool().FaaS("funcName2", faas.FuncDemo2Handler)

    kis.Pool().FaaS("funcName3", faas.FuncDemo3Handler)



    // 0. 注册ConnectorInit 和 Connector 回调业务

    kis.Pool().CaaSInit("ConnName1", caas.InitConnDemo1)

    kis.Pool().CaaS("ConnName1", "funcName2", common.S, caas.CaasDemoHanler1)



    // 1. 加载配置文件并构建Flow

    if err := file.ConfigImportYaml("/Users/gopath/src/kis-flow/test/load_conf/"); err != nil {

        panic(err)

    }



    // 2. 讲构建的内存KisFlow结构配置导出的文件当中

    flows := kis.Pool().GetFlows()

    for _, flow := range flows {

        if err := file.ConfigExportYaml(flow, "/Users/gopath/src/kis-flow/test/export_conf/"); err != nil {

            panic(err)

        }

    }

}

cd到kis-flow/test/下 执行:

代码语言:bash复制
go test -test.v -test.paniconexit0 -test.run  TestConfigExportYmal 

会在kis-flow/test/export_conf/下得到导出的配置。

代码语言:bash复制
├── export_conf

│   ├── conn-ConnName1.yaml

│   ├── flow-flowName1.yaml

│   ├── func-funcName1.yaml

│   ├── func-funcName2.yaml

│   └── func-funcName3.yaml

6.5 【V0.5】源代码

https://github.com/aceld/kis-flow/releases/tag/v0.5


作者:刘丹冰Aceld github: https://github.com/aceld

KisFlow开源项目地址:https://github.com/aceld/kis-flow

0 人点赞