Skip to main content

ProtoBuf Format

1、Introduce

The protobuf format allows to read and write protobuf data on an protobuf schema,support only in sql

2、Version Support

protocol buffer2.x、protocol buffer3.x

三、format identify

protobuf-x

4、How to write a DDL with protobuf format

CREATE TABLE proto (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'protobuf-x',
'protobuf-x.message-class-name' = 'com.dtstack.chunjun.behavior.behaviorOuterClass$Message'
)

5、format option

optionrequireddefaultdate typedescriptor
protobuf-x.message-class-nameStringFull path of the protobuf message body corresponding to the outerClass class generated by the Proto file

六、数据类型映射

Flink SQL typeProtobufType
STRINGSTRINGstring、enumstring、enum
BOOLEANBOOLEANboolbool
BINARY / VARBINARYBINARY / VARBINARY
BYTESbytes
DECIMAL
TINYINT
SMALLINT
INTint32、uint32、sint32、fixedint32、sfixed32
BIGINTint64、uint64、sint64、fixedint64、sfixed64
FLOATfloat
DOUBLEdouble
DATE
TIME
TIMESTAMP
ARRAYrepeated修饰符
MAPmap(key,只能是除了浮点型和字节之外的标量类型;value,不能是map)https://developers.google.com/protocol-buffers/docs/proto3#maps
MULTISET
ROWnested,oneOf(Row的第一项为case)

Example

MessageGroup.proto

syntax = "proto3";

package ZPMC.Message;

message RepeatedBool {repeated bool Values = 1;}

message RepeatedMyBool {repeated bool Values = 1;}

message RepeatedInt32 {repeated int32 Values = 1;}

message RepeatedUint32 {repeated uint32 Values = 1;}

message RepeatedInt64 {repeated int64 Values = 1;}

message RepeatedUint64 {repeated uint64 Values = 1;}

message RepeatedFloat {repeated float Values = 1;}

message RepeatedDouble {repeated double Values = 1;}

message RepeatedString {repeated string Values = 1;}

message Variant{
oneof Value{
bool ValueBool = 1;
RepeatedBool ArrayBool = 2;
int32 ValueInt32 = 3;
RepeatedInt32 ArrayInt32 = 4;
uint32 ValueUint32 = 5;
RepeatedUint32 ArrayUint32 = 6;
int64 ValueInt64 = 7;
RepeatedInt64 ArrayInt64 = 8;
uint64 ValueUint64 = 9;
RepeatedUint64 ArrayUint64 = 10;
float ValueFloat = 11;
RepeatedFloat ArrayFloat = 12;
double ValueDouble = 13;
RepeatedDouble arrayDouble=14;
string ValueString = 15;
RepeatedString ArrayString = 16;
bytes ValueBytes = 17;
int64 ValueTimestamp = 18;
};
bool boolx = 19;
oneof Value2{
bool ValueBool2 = 20;
RepeatedBool ArrayBool2 = 21;
}
bool booly=22;
}

message MessageItem{
string TagName = 1;
Variant TagValue = 2;
int32 UaDataType = 3;
bool Quality = 4;
int64 Timestamp = 5;
map<string, string>TagInfos = 6;
map<string, string>ExValues = 7;
}

message MessageGroup{
map<string, string> GroupInfo = 1;
repeated MessageItem Messages = 2;
}

DDL

CREATE TABLE reader (
GroupInfo MAP<STRING,STRING>
,Messages ARRAY<
ROW<
TAGNAME VARCHAR ,
TagValue ROW<
`Value` ROW<
ValueCase INTEGER,
ValueBool BOOlEAN,
ArrayBool ROW<`Values` ARRAY<BOOLEAN>>,
ValueInt32 INTEGER,
ArrayInt32 ROW<`Values` ARRAY<INTEGER>>,
ValueUint32 INTEGER,
ArrayUint32 ROW<`Values` ARRAY<INTEGER>>,
ValueInt64 BIGINT,
ArrayInt64 ROW<`Values` ARRAY<BIGINT>>,
ValueUint64 BIGINT,
ArrayUint64 ROW<`Values` ARRAY<BIGINT>>,
ValueFloat FLOAT,
ArrayFloat ROW<`Values` ARRAY<FLOAT>>,
ValueDouble DOUBLE,
ArrayDouble ROW<`Values` ARRAY<DOUBLE>>,
ValueString STRING,
ArrayString ROW<`Values` ARRAY<STRING>>,
ValueBytes BINARY,
ValueTimestamp BIGINT
>,
boolx BOOLEAN,
Value2 ROW<
ValueCase INTEGER ,
Value2Value BOOLEAN,
ArrayBool2 ROW<`Values` ARRAY<BOOLEAN>>
>,
booly BOOLEAN
>,
UaDataType INTEGER,
Quality BOOLEAN,
`Timestamp` BIGINT,
TagInfos MAP<STRING,STRING>,
ExValues MAP<STRING,STRING>
>
>



-- , `topic` STRING METADATA VIRTUAL -- from Kafka connector
-- , `leader-epoch` int METADATA VIRTUAL -- from Kafka connector
-- , `offset` BIGINT METADATA VIRTUAL -- from Kafka connector
-- , ts TIMESTAMP(3) METADATA FROM 'timestamp' -- from Kafka connector
-- , `timestamp-type` STRING METADATA VIRTUAL -- from Kafka connector
-- , partition_id BIGINT METADATA FROM 'partition' VIRTUAL -- from Kafka connector

) WITH (
'connector' = 'kafka-x'
,'topic' = 'liuliu_proto_source'
,'properties.bootstrap.servers' = 'flink01:9092'
,'properties.group.id' = 'luna_g'
,'scan.startup.mode' = 'earliest-offset'
,'format' = 'protobuf-x'
,'protobuf-x.message-class-name' = 'ZPMC.Message.MessageGroupOuterClass$MessageGroup'
,'scan.parallelism' = '1'
);