Skip to main content

· One min read
TieZhu

目前任务对脏数据的处理仅仅是日志打印,这样显然是无法应对客户多变的使用场景。

方案

整体架构采用生产者-消费者模式,任务启动过程中,同时将Manager初始化并启动Consumer异步线程池,仅需在BaseRichInputFormat 和 BaseRichOutputFormat 调用 Manager的collect() 方法收集脏数据即可。

流程图

image.png image.png

详细描述

任务配置参数

对应Java 实体类 - DirtyConf

 "dirty": {
"type": "log",
"printRate": 500,
"errorLimit": 1000,
"totalLimit": 1000,
"properties": {
"store": true,
"fileName": "dirty.csv",
"filePath": "~/dirty"
}
}
  • type

插件类型,必填项,根据type动态加载对应的插件;

  • printRate

脏数据在日志中的打印频率,默认值是Long.MAX_VALUE,即:不打印脏数据信息在日志文件中(因为客户的数据不宜暴露); 改(2022-01-13 by tiezhu):默认值修改为1,表示默认脏数据信息都会打印到日志文件中,同时,如果printRate <= 0,表示不打印任何脏数据信息;

  • errorLimit

脏数据在插件中,处理失败的条数限制,当处理失败的脏数据条数超过这个限制时,任务抛出NoRestartException,即任务失败且不重试;默认值是Long.MAX_VALUE改(2022-01-13 by tiezhu):默认值修改为1,如果errorLimit < 0,表示任务容忍所有的异常,不失败;

  • totalLimit

脏数据总条数限制,即收集到的脏数据超过这个限制时,任务抛出NoRestartException,即任务失败且不重试;默认值是Long.MAX_VALUE改(2022-01-13 by tiezhu):默认值修改为1,如果totalLimit < 0,表示任务容忍所有的异常,不失败;

  • properties

各自插件的参数配置

脏数据插件管理者

对应Java 实体类 - DirtyManager Manager主要维护着脏数据消费者Consumer 和 一个异步线程池;

  • 主要作用是收集脏数据,并下发到Consumer队列中

image.png

  • 调用 collect() 方法

BaseRichInputFormat image.png

脏数据插件消费者

对应Java 实体类 - AbstractDirtyConsumer Consumer主要维护着一个消息队列,中间缓存着脏数据信息;

  • run() 方法

主要逻辑是消费队列中的脏数据,consume() 方法交给子类去实现;如果在consume过程中出现了异常,那么errorCounter 计数加一。 image.png

  • consume() 方法

处理脏数据的具体逻辑,交由子类实现,根据插件的不同,对脏数据处理逻辑也会有所不同。

接口规范

启动参数

 "dirty": {
"type": "log",
"printRate": 500,
"errorLimit": 1000,
"totalLimit": 1000,
"properties": {
"store": true,
"fileName": "dirty.csv",
"filePath": "~/dirty"
}
}

# 以下参数在FlinkX 启动参数-confProp中
flinkx.dirty-data.output-type = log/jdbc
flinkx.dirty-data.max-rows = 1000 // total limit
flinkx.dirty-data.max-collect-failed-rows = 1000 // error limit

flinkx.dirty-data.jdbc.url=
flinkx.dirty-data.jdbc.username=
flinkx.dirty-data.jdbc.password=
flinkx.dirty-data.jdbc.database= // database 可以写在 url
flinkx.dirty-data.jdbc.table=

flinkx.dirty-data.log.print-interval= 500

JDBC 建表语句(MySQL)

job_id
job_name
表名 operator_name
数据 data
错误原因 error_msg
字段名 field_name

CREATE TABLE IF NOT EXISTS flinkx_dirty_data
(
job_id VARCHAR(32) NOT NULL COMMENT 'Flink Job Id',
job_name VARCHAR(255) NOT NULL COMMENT 'Flink Job Name',
operator_name VARCHAR(255) NOT NULL COMMENT '出现异常数据的算子名,包含表名',
dirty_data TEXT NOT NULL COMMENT '脏数据的异常数据',
error_message TEXT COMMENT '脏数据中异常原因',
field_name VARCHAR(255) COMMENT '脏数据中异常字段名',
create_time TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6) NOT NULL ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '脏数据出现的时间点'
)
COMMENT '存储脏数据';

CREATE INDEX idx_job_id ON flinkx_dirty_data (job_id);
CREATE INDEX idx_operator_name ON flinkx_dirty_data(operator_name);
CREATE INDEX idx_create_time ON flinkx_dirty_data(create_time);

Metircs

flinkx_DirtyData_count
flinkx_DirtyData_collectFailedCount

风险

数据同步JSON脏数据配置

项目目录结构

父模块 flinkx-dirtydata-collectors

子模块 flinkx-dirtydata-collector-jdbc flinkx-dirtydata-collector-log

打包目录

flinkx-dist dirtydata-collector jdbc log

· One min read
Ada Wang

背景

SQL语句的DDL和DML根据分号划分。 之前的对一组SQL语句的切分有问题。出现 '\\' 会出现切割错误。 例子如下:

CREATE TABLE student (id INT, name STRING, age INT) USING CSV;
SELECT *, split(q,'\\\\') as qq FROM foo_table;
SELECT * FROM bar_table;

上面代码无法被splitIgnoreQuota()方法正确切割成三句SQL。

指导思想

想从编译原理的词法分析角度解决这个问题。实现一个DFA(确定有限状态自动机)。只要状态转换图是对的。那么解析的字符串一定是对的。

技术方案

步骤

  1. 使用状态机将字符串Token化。
    1. 将字符串转换成Token数组。
  2. 根据Token重组成SQL语句。
    1. 根据Token数组中的Token类型,拼接成一个一个SQL语句,组成SQL数组。

主干代码就如下两步:

String string = "一坨SQL语句";
List<Token> tokens = tokenize(s);
List<Token> tokens = tokenize(string);
return processTokens(tokens);

一、状态机的实现

状态机转换图

下图中,每一个节点都代表一种状态,每一条边都代表一个函数。 每当路过需要生成Token的边,就把缓冲区中所有字符保存到Token中。 image.png

状态机五种状态:

private enum State {
/** current char in single quote */
SINGLE_QUOTED,
/** current char in single quote,and after '\' */
AFTER_BACKSLASH_SINGLE_QUOTED,
/** current char in double quote */
DOUBLE_QUOTED,
/** current char in double quote,and after '\' */
AFTER_BACKSLASH_DOUBLE_QUOTED,
/** out of quote */
UNQUOTED
}

切换状态的三种关键字

分隔符不在关键字中,动态传入。

public class Keyword {
public static final char SINGLE_QUOTE = '\'';
public static final char DOUBLE_QUOTE = '\"';
public static final char BACKSLASH = '\\';
}

二、Token的实现

public class Token {

private TokenKind kind;
private String val;

Token(TokenKind kind, String val) {
this.kind = kind;
this.val = val;
}

public TokenKind getKind() {
return kind;
}

public String getVal() {
return val;
}
}

Token只有两种类型

/** There are only two kinds */
private enum TokenKind {
/** SQL string */
STRING,
/** SQL delimiter */
DELIMITER
}

Token使用举例

在如下SQL例子中,转换出的一个Token数组是这样[STRING, STRING, STRING, DELIMITER, STRING, DELIMITER]。那么会根据SEMICOLON类型,最终还原出两句SQL。

SELECT *, split(q,'\\\\') as qq FROM foo_table;
SELECT * FROM bar_table;

参考资料

《现代编译原理:C语言描述》——第2章 词法分析

参考代码

Flink StructuredOptionsSplitter类 splitEscaped()方法。