@wddpct
2020-11-27T14:16:00.000000Z
字数 12248
阅读 989
本文档主要介绍任务编写步骤与数据治理逻辑功能集,以下内容将从一个实际可运行任务的配置文件出发,介绍一个任务的具体组成(公用模板,功能节点等),并在介绍中穿插功能集描述。
附件部分介绍表达式语法,不在任务编写与功能集部分展开。
expr_builder是一个由javascript编写的程序包,可在自定义的nodejs工程下运行npm install expr_builder
安装导入。expr_builder包提供了SummaryType,Context等关键数据结构用于编写测试任务。
任务配置文件最终输出为一个javascript文件(以.js为后缀)。
const {SummaryType, Context} = require("expr_builder");
function dict(dataSource, table, lookupColumn, targetColumn, cacheSize = 10000) {
return function (node) {
return node.mapQuery({
"cacheSize": cacheSize,
"dataSource": dataSource,
"targetTable": table,
"targetColumns": [targetColumn],
"query": [
{
"column": lookupColumn,
"operator": "=",
"value": "$1"
}
]
})
}
}
const ctx = new Context();
ctx
.dataSource("his")
.sourceTable("medrec", "pat_master_index")
.fetchCount(500)
.primaryKeys('patient_id')
.outPrimaryKeys("source_patient_no")
.parallel(10)
.dbSink("demo", "etl_test", "patient_base_info_simple_test", false, true)
.name("patient_base_info_simple_test");
const patient_id = ctx.column("medrec.pat_master_index.patient_id");
patient_id
.output("source_patient_no");
const name = ctx.column("medrec.pat_master_index.name");
name
.filter("$1")
.output("patient_name");
name
.map("$1|toPYCode|toUpper")
.output("spell_code");
name
.map("$1|toWBCode|toUpper")
.output("wb_code");
const sex = ctx.column("medrec.pat_master_index.sex");
sex
.map(`multiIf($1|contains("男"),"男性",$1|contains("女"),"女性","未知的性别")`)
.output("sex_name");
sex
.map(`multiIf($1|contains("男"),1,$1|contains("女"),2,0):Text`)
.output("sex_code");
const nation_name = ctx.column("medrec.pat_master_index.nation")
.map('multiIf($1|contains("汉"),"汉族",$1)')
.map('regexpMatch($1,"[\u4e00-\u9fa5]+族")')
.output("nation_name");
const birth_date = ctx.column("medrec.pat_master_index.date_of_birth");
birth_date
.map('$1:Date')
.output("birth_date");
const profession = ctx.column("medrec.pat_master_index.occupation");
profession.output("profession_code")
profession
.pipe(dict("his", "comm.occupation_dict", "occupation_code", "serial_no"))
.output("profession_id");
profession
.pipe(dict("his", "comm.occupation_dict", "occupation_code", "occupation_name"))
.output("profession");
ctx.const('true').output("is_valid");
ctx.const('now()').output("oper_time");
const insurance_no = ctx.column("medrec.pat_master_index.insurance_no");
const vip_id = ctx.column("medrec.pat_master_index.vip_id");
const cardNoAlt = ctx.concat(insurance_no, vip_id);
cardNoAlt
.map(`multiIf($1,"YBKH", $2,"JZKH")`)
.output("card_type");
cardNoAlt
.map("coalesce($1,$2)")
.output("card_no");
module.exports = ctx;
以下以行号作为索引进行编写与功能介绍
const {SummaryType, Context} = require("expr_builder");
固定模板代码。用于导入expr_builder这个npm包的SummaryType以及Context结构,提供给下文代码进行统计节点(SummaryType)与编写上下文(Context)的注册。
function dict(dataSource, table, lookupColumn, targetColumn, cacheSize = 10000) {
return function (node) {
return node.mapQuery({
"cacheSize": cacheSize,
"dataSource": dataSource,
"targetTable": table,
"targetColumns": [targetColumn],
"query": [
{
"column": lookupColumn,
"operator": "=",
"value": "$1"
}
]
})
}
}
固定模板代码。自定义字典表查询函数,提供给下文的pipe功能节点用于外部数据源字典表的数据匹配。
const ctx = new Context();
固定模板代码。用于注册编写上下文对象ctx,下文中所有的原始列都是通过ctx对象提供的column方法取得。
ctx
.dataSource("his")
.sourceTable("medrec", "pat_master_index")
.fetchCount(500)
.primaryKeys('patient_id')
.outPrimaryKeys("source_patient_no")
.parallel(10)
.dbSink("demo", "etl_test", "patient_base_info_simple_test", false, true)
.name("patient_base_info_simple_test");
js 代码对于换行写法比较宽容,但注意最好用 ; 符号作为完整语句的结束符号。
dataSource(),sourceTable(),fetchCount()等一系列方法都是ctx对象自带的方法(但并不是所有在配置文件中出现的方法都是ctx的方法,比如下文中的output方法),但不需要分开写作以下代码,因为每个ctx的大部分自带方法都返回了ctx本身,所以这些代码引用方法的顺序可任意互换。
ctx.dataSource("his");
ctx.sourceTable("medrec", "pat_master_index");
ctx.primaryKeys('patient_id');
ctx.outPrimaryKeys("source_patient_no");
ctx.parallel(10);
ctx.dbSink("demo", "etl_test", "patient_base_info_simple_test", false, true);
ctx.name("patient_base_info_simple_test");
const patient_id = ctx.column("medrec.pat_master_index.patient_id");
column方法为ctx对象的自带方法,表示生成一个名为patient_id数据节点(Node) ,用于进一步的数据治理操作和输出(因为patient_id在下文中仍然有使用,所以定义成一个变量)。输入参数的格式为 schema_name.table_name.column_name。
值得注意的是,在ctx的所有定义方法集中,column方法不返回节点本身,所以column方法仅能作为数据节点最后一个使用的方法,而不能与其他ctx的方法更改位置,同时一个语句中只能出现一次。
patient_id
.output("source_patient_no");
output方法为数据节点(Node)自带的方法,输入参数为单一的列名(必须存在于dbsink指定的数据源表中)。
上述三行代码可以直接合并写成以下代码
ctx.column("medrec.pat_master_index.patient_id").output("source_patient_no");值得注意的是,节点的所有定义方法集唯独output方法不返回节点本身,所以output方法仅能作为数据节点最后一个使用的方法,而不能与其他方法更改位置,同时一个语句中只能出现一次。
const name = ctx.column("medrec.pat_master_index.name");
name
.filter("$1")
.output("patient_name");
name
.map("$1|toPYCode|toUpper")
.output("spell_code");
name
.map("$1|toWBCode|toUpper")
.output("wb_code");
由于
medrec.pat_master_index.name
代表的数据节点后文仍需使用,所以定义成了一个名为name的数据节点。之后遇到此类情况不额外说明。filter为数据节点自带的方法,之所以能和另一个自带方法output写成链式语法的原因同ctx。之后遇到此类情况不额外说明。
值得注意的是,一个数据节点可以有多个治理逻辑并输出到多个节点中,比如上文中的name节点就分别应用了不同的数据治理逻辑输出到了patient_name, spell_code, wb_code这些输出节点中。
我们也可以书写改写以上语句为下面的代码,这类代码说明我们可以在调用任意方法后(如filter)赋值成一个数据节点,再进行进一步的数据治理工作。
const name = ctx.column("medrec.pat_master_index.name");
const filteredName = name.filter("$1");
filteredName
.output("patient_name");
filteredName
.map("$1|toPYCode|toUpper")
.output("spell_code");
filteredName
.map("$1|toWBCode|toUpper")
.output("wb_code");
const sex = ctx.column("medrec.pat_master_index.sex");
sex
.map(`multiIf($1|contains("男"),"男性",$1|contains("女"),"女性","未知的性别")`)
.output("sex_name");
sex
.map(`multiIf($1|contains("男"),1,$1|contains("女"),2,0):Text`)
.output("sex_code");
const nation_name = ctx.column("medrec.pat_master_index.nation")
.map('multiIf($1|contains("汉"),"汉族",$1)')
.map('regexpMatch($1,"[\u4e00-\u9fa5]+族")')
.output("nation_name");
const birth_date = ctx.column("medrec.pat_master_index.date_of_birth");
birth_date
.map('$1:Date')
.output("birth_date");
略。
const profession = ctx.column("medrec.pat_master_index.occupation");
profession.output("profession_code")
profession
.pipe(dict("his", "comm.occupation_dict", "occupation_code", "serial_no"))
.output("profession_id");
profession
.pipe(dict("his", "comm.occupation_dict", "occupation_code", "occupation_name"))
.output("profession");
pipe 方法暂不过多介绍,dict的使用方法在上文中已经介绍,这两个方法需要结合使用。
ctx.column("medrec.pat_master_index.occupation").pipe(dict("his", "comm.occupation_dict", "occupation_code", "serial_no"))
表示以occupation列与his数据源下的comm.occupation_dict.occupation_code做相等匹配,并取得该表中的serial_no字段作为数据节点。
ctx.column("medrec.pat_master_index.occupation").pipe(dict("his", "comm.occupation_dict", "occupation_code", "occupation_name"))
表示以occupation列与his数据源下的comm.occupation_dict.occupation_code做相等匹配,并取得该表中的occupation_name字段作为数据节点。
ctx.const('true').output("is_valid");
ctx.const('now()').output("oper_time");
const 方法为ctx自带方法,区别于column方法从数据源表中取得数据,const方法指定一个常量表达式直接输出,通常用于,is_valid,oper_time和etl_time这类标准填充字段。
const insurance_no = ctx.column("medrec.pat_master_index.insurance_no");
const vip_id = ctx.column("medrec.pat_master_index.vip_id");
const cardNoAlt = ctx.concat(insurance_no, vip_id);
cardNoAlt
.map(`multiIf($1,"YBKH", $2,"JZKH")`)
.output("card_type");
cardNoAlt
.map("coalesce($1,$2)")
.output("card_no");
module.exports = ctx;
固定模板代码。
expr-taskcli是一个由javascript编写的命令行程序包,可在全局运行npm install -g expr-taskcli
安装导入。expr-taskcli命令行工具通过提供对应的操作语法和文件读取功能帮助任务编写者在编写任务后可以方便快捷的提交到指定环境中进行保存和测试运行。
使用npm install -g expr-taskcli
安装后,可以在任意路径的命令行界面使用etl help
指令查看etl
的使用提示信息。以下是截止11.27号最新的使用提示输出信息。
taskcli =======
VERSION
expr-taskcli/0.6.6 linux-x64 node-v14.15.0
USAGE
$ taskcli [COMMAND]
TOPICS
env
task
COMMANDS
help display help for taskcli
linux && macos:
export TASK_API=http://graphng-2023-develop.sy
windows:
(图形界面更改系统环境变量)
expr-taskcli命令行工具大部分指令需要注册一个名为TASK_API
的环境变量,该环境变量指示了数据治理引擎服务地址,该服务提供了任务管理与运行调度等HTTP API,并由expr-taskcli命令行工具集成。
在编写完一个符合语法规范的任务配置文件后,可以使用etl task:submit
命令提交保存。
用法:
etl task:submit filepath
示例:
etl task:submit ./simple_test.js
etl task:submit /home/panchengtao/task_builder_files/tests/simple_test.js
如上文所示,etl task:submit
命令后接filepath
,filepath
可以是一个相对路径,也可以是一个配置文件绝对路径。提交成功后,将在命令行界面返回新生成的taskId与taskVersion。如果未按语法规范书写或远程服务器内部异常,都会在命令行界面打印出具体错误。
正确返回:
{"taskId": 1, "taskVersion": 1}
异常返回:
{"errors": "xxx"}
用法:
etl task:update taskId filepath
示例
etl update 1 ./simple_test.js
如果针对同一个配置文件进行了修改操作,可以选择使用task update
原地更新而不是通过task submit
重复提交生成多个任务。
如上文所示,第一个参数表示要更新的目标任务Id。而目标任务Id由一开始执行了etl task:submit
命令后得来。更新成功后,将会在命令行界面打印最新的taskVersion值。更新失败后,也会打印相关的异常信息。
用法:
etl task:run taskId
示例
etl task:run 1
略
所谓表达式引擎——顾名思义,是一个解析并执行表达式的程序。表达式引擎被集成在了数据治理引擎中,数据节点扩展的 map, filter, const 等方法可以接收一个表达式字符串,最后在运行任务时调用表达式引擎解析执行,比如patient_name.map("$1|toPYCode|toUpper").output("spell_code");
,该语句表示应用一个接收表达式字符串的map方法,用于将patient_name转换成大写的拼音码。
SELECT to_char(now(), 'Day, DD HH12:MI:SS') FROM tbl WHERE tbl_date_col > '2020-01-01' AND tbl_int_col <= 10 AND tbl_bool_col == true;
表达式的基本书写语法形如sql语句中的关键部分,上文中加粗的部分即为常见格式。
更精确地来说,表达式是一组代码的集合,它返回一个值。每一个合法的表达式都能计算成某个值,但从概念上讲,有两种类型的表达式:有副作用的(比如赋值)和单纯计算求值的。
表达式x=7是第一类型的一个例子。该表达式使用=运算符将值7赋予变量x。这个表达式自己的值等于7。
代码3 + 4是第二个表达式类型的一个例子。该表达式使用+运算符把3和4加到一起但并没有把结果(7)赋值给一个变量。
目前表达式引擎仅支持第二类,不支持赋值表达式。
数据治理引擎集成的表达式引擎支持以下数据类型,并提供了相应的数据类型显式转换函数。
类型转换函数 | 支持传入类型 | 额外说明 |
---|---|---|
toInt | toInt(Int), toInt(Float), toInt(Text), toInt(Timestamp), toInt(Time), toInt(Date), toInt(Numeric) | 支持常量,下同 |
toNumeric | toNumeric(Int), toNumeric(Numeric),toNumeric(Numeric,Int),toNumeric(Int,Int) | 第二个Int参数表示需要保留的小数位数 |
toFloat | toFloat(Int), toFloat(Numeric),toFloat(Float),toFloat(Text) | |
toBlob | 暂缺 | |
toInterval | 暂缺 | |
toTime | toTime(Time), toTime(Timestamp), toTime(Text,Text) | toTime(Text,Text) 第二个参数接受一个时间格式字符串,表示第一个Text的格式,比如 "mmHHss" |
toDate | toDate(Date), toDate(Timestamp), toDate(Text,Text) | toDate(Text,Text) 第二个参数接受一个时间格式字符串,表示第一个Text的格式,比如 "yyyyMMdd" |
toTimestamp | toTimestamp(Timestamp), toTimestamp(Text), toTimestamp(Text,Text) | toTimestamp(Text,Text) 第二个参数接受一个时间格式字符串,表示第一个Text的格式,比如 "yyyyMMdd",假如不传入,默认为RFC 3339格式 —— 2006-01-02T15:04:05Z07:00 |
toText | toText(Text), toText(Int), toText(Numeric), toText(Float), toText(Date), toText(Date,Text), toText(Timestamp), toText(Timestamp,Text) | toText(Date,Text)和 toText(Timestamp,Text)第二个参数接受一个要输出的时间格式化字符串 |
$1 > 10
toText($1)
multiIf($1 > 0, $1, $1 < 0, $2, $3)
...
示例
const insurance_no = ctx.column("medrec.pat_master_index.insurance_no");
const vip_id = ctx.column("medrec.pat_master_index.vip_id");
const cardNoAlt = ctx.concat(insurance_no, vip_id);
cardNoAlt
.map(`multiIf($1,"YBKH", $2,"JZKH")`)
.output("card_type");
cardNoAlt
.map("coalesce($1,$2)")
.output("card_no");
表达式引擎除了支持常量表达式的简单计算外,也支持识别map, filter等方法传入的数据节点类型并进行数据治理操作。为了方便起见,表达式引擎采取了取位置索引来代表具体操作数据节点的方式。其中$1表示传入的第一个变量,当传入联合变量时(concat等节点),根据传入顺序确定位置索引。
算术表达式和运算符
比较表达式和运算符
布尔表达式和运算符
函数调用
常规
toText($1)
toText($1,"yyyyMMdd")
toDate(toText($1,"yyyyMMdd"),"yyyyMMdd")
multiIf($1,"YBKH", $2,"JZKH")
colease($1, $2)
管道 |
$1|toText
$1|toText("yyyyMMdd")
$1|toText("yyyyMMdd")|toDate("yyyyMMdd")
$1|multiIf("YBKH", $2,"JZKH")
$1|colease($2)
表达式引擎支持以上两种函数调用方式,应用效果完全一致(改写示例一一对应)。
在管道模式下,函数调用通过“|”符号替代括号嵌套调用,并且可以省略第一个参数的书写(默认由程序自动传入)。