Apache Flink 集成
ProtonBase 是一款兼容 PostgreSQL 协议的高性能分布式数据库,支持标准SQL语法和PG生态工具链。本文将介绍如何通过Apache Flink实现高效的数据写入ProtonBase,重点包括DataStream API和Flink SQL两种写入方式,也介绍如何通过 Apache Flink 读取 CDC 增量变更。
环境准备
版本兼容性
- Flink版本:1.14+ (推荐1.16+)
- JDBC驱动:PostgreSQL JDBC驱动(推荐 42.5+)
依赖配置
在Flink项目中添加JDBC驱动依赖:
<!-- Maven配置 -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.0</version>
</dependency>
Flink SQL与ProtonBase类型映射
Flink SQL类型 | ProtonBase类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | SMALLINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | REAL |
DOUBLE | DOUBLE PRECISION |
DECIMAL(p,s) | NUMERIC(p,s) |
VARCHAR(n) | VARCHAR(n) |
CHAR(n) | CHAR(n) |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
ARRAY | ARRAY |
MAP | JSONB |
ROW | JSONB |
使用 Data Stream API 写入
使用JDBC Sink
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义数据流
DataStream<User> userStream = env.addSource(...);
// 配置JDBC连接参数
JdbcConnectionOptions jdbcOpts = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:postgresql://protonbase-host:5432/dbname")
.withDriverName("org.postgresql.Driver")
.withUsername("username")
.withPassword("password")
.build();
// 创建JDBC Sink
userStream.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
(ps, user) -> {
ps.setInt(1, user.getId());
ps.setString(2, user.getName());
ps.setInt(3, user.getAge());
},
jdbcOpts));
env.execute("ProtonBase Sink Job");
批量写入优化
JdbcExecutionOptions execOpts = new JdbcExecutionOptions.Builder()
.withBatchSize(1000) // 每批记录数
.withBatchIntervalMs(200) // 批处理间隔(毫秒)
.withMaxRetries(3) // 失败重试次数
.build();
userStream.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
(ps, user) -> {
ps.setInt(1, user.getId());
ps.setString(2, user.getName());
ps.setInt(3, user.getAge());
},
execOpts,
jdbcOpts));
使用 Flink SQL 写入
创建ProtonBase Catalog
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 注册ProtonBase Catalog
tableEnv.executeSql(
"CREATE CATALOG protonbase WITH (" +
" 'type'='jdbc'," +
" 'default-database'='dbname'," +
" 'username'='username'," +
" 'password'='password'," +
" 'base-url'='jdbc:postgresql://protonbase-host:5432'" +
")");
// 设置当前Catalog
tableEnv.useCatalog("protonbase");
通过Flink SQL写入数据
// 注册Flink表(源表)
tableEnv.executeSql(
"CREATE TABLE source_table (" +
" id INT," +
" name STRING," +
" age INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user_topic'," +
" 'properties.bootstrap.servers' = 'kafka:9092'," +
" 'format' = 'json'" +
")");
// 执行写入
tableEnv.executeSql(
"INSERT INTO users " + // ProtonBase中已存在的表
"SELECT id, name, age FROM source_table");
或者通过 Flink SQL 开发控制台提交作业:
CREATE TEMPORARY TABLE users_sink (
id INTEGER,
name STRING,
age INTEGER
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://protonbase-host:5432/dbname',
'table-name' = 'users',
'username' = 'username',
'password' = 'password',
'driver' = 'org.postgresql.Driver',
'sink.buffer-flush.max-rows' = '1000',
'sink.buffer-flush.interval' = '1s',
'sink.max-retries' = '3'
);
INSERT INTO users_sink
SELECT * FROM source_table;
常用配置参数
参数名 | 说明 | 推荐值 |
---|---|---|
sink.buffer-flush.max-rows | 批量写入最大行数 | 1000-5000 |
sink.buffer-flush.interval | 批量写入间隔 | 1s |
sink.max-retries | 失败重试次数 | 3 |
sink.parallelism | Sink并行度 | 与ProtonBase节点数相同 |
connection.max-retry-timeout | 连接超时时间 | 30s |
更多配置参数,参考 Apache Flink JDBC SQL Connector (opens in a new tab)。
性能调优
SQL作业并行度设置
在SQL中设置:
SET 'parallelism.default' = '8';
批量写入参数
CREATE TABLE protonbase_sink (
...
) WITH (
...
'sink.buffer-flush.interval' = '1s',
'sink.buffer-flush.max-rows' = '1000',
'sink.parallelism' = '8'
);
使用Flink消费CDC
Flink可以通过postgres-cdc connector获取ProtonBase的CDC数据。
前置条件
- 数据库已开通逻辑复制(WAL_LEVEL=logical)
- Flink 运行时环境与ProtonBase 数据库网络可达
- 连接数据库的用户需要具备Replication权限,或能够自动创建Slot(复制槽)和 Publication。
更多详情参考 CDC 订阅。
Publication & Slot 资源管理
建议提前创建好 publication,publication 是 PG 协议中服务器端的事件发布及过滤机制,可以选择发布部分表、部分事件、也可以合并分区事件到分区父表。如果不在数据库中显性创建 publication,则 flink CDC connector 会创建一个默认的 dbz_publication,该 publication 会订阅所有表的所有变更事件,然后在 flink 客户端执行时在本地过滤,效率更低,网络开销更大。在 Flink Connector 中用'debezium.publication.name' = 'pub-name' 指定使用的 publication,多个 flink 作业可以共用相同的 publication。
Slot 是用来记录CDC 事件消费端进度(即 LSN)的机制,不同的消费端应该选择不同的 Slot,跟进不同的消费进度。建议提前创建好 Slot,使用 noexport_snapshot 参数,存储空间更节省。不同的 flink 程序应该使用不同的 slot。方式如下:
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
不再使用的 Flink 消费作业,需要显性删除 slot,释放被 PIN 住的存储空间,避免存储空间浪费。
SELECT pg_drop_replication_slot('flink_cdc_slot');
Slot 的更新依赖 Flink 的 checkpoint 执行,checkpoint 会更新 Slot 的LSN,长期不更新 LSN,会使得历史状态被 PIN 住,造成存储浪费。CDC 消费只支持从最近一次完成的 checkpoint 启动,不支持选择历史版本的 checkpoint,因为每个 checkpoint 里记录了历史消费的 LSN 进度,一旦 checkpoint 后,LSN 消费进度会收到 confirm 请求,被 confirm 后,slot 状态向前移动,因此历史 LSN 不再可以被消费。所以,无论flink作业启动checkpoint往后倒退多少,实际都只会从最后一次成功的checkpoint位点读取数据。
使用Flink SQL例子
以下例子中,首先通过CDC方式读取Source表,然后写入Sink表。
CREATE TEMPORARY TABLE source_cdc (
ins_time timestamp(6) NULL,
store_id string NULL,
store_code string NOT NULL,
store_name string NULL,
time_section_no bigint NULL,
stat_date string NOT NULL,
PRIMARY KEY (store_code,stat_date) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'host-url',
'port' = '5432',
'username' = 'xxxx',
'password' = 'xxxx',
'database-name' = 'dbname',
'schema-name' = 'schemaname',
'table-name' = 'tablename',
-- 建议在数据库中提前创建好 Slot,并管理 Slot 生命周期;如果未创建,Flink CDC 会在启动时创建 Slot,自动创建的 Slot 无NOEXPORT_SNAPSHOT属性,会占用额外存储空间。
'slot.name' = 'flink_cdc_slot',
--解码插件,仅支持 pgoutput 解码
'decoding.plugin.name' = 'pgoutput',
--当前仅支持 never 表示从 Slot 创建时的点位开始同步,只同步实时变更,不同步全量,当前引擎还不支持读取全量。
'debezium.snapshot.mode' = 'never',
-- 当使用 pgoutput 解码时,订阅可以指定 publication,optional,如果不写,会自动创建dbz_publication,订阅所有表的更新。
'debezium.pulication.name' = 'pub-name'
);
CREATE TEMPORARY TABLE sink_kafka(
store_id string NULL,
cnt bigint NOT NULL,
PRIMARY KEY (store_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'topic-name',
'properties.bootstrap.servers' = 'servers',
'key.format' = 'json',
'key.fields' = 'store_id',
'value.format' = 'debezium-json',
'value.debezium-json.schema-include' = 'false'
);
INSERT INTO sink_kafka
SELECT store_id, count(*) as cnt
FROM source_cdc GROUP BY store_id;
监控与运维
SQL作业监控
通过Flink Web UI可以监控:
- 写入速率(records/s)
- 各算子的背压情况
- Checkpoint状态
日志配置
# log4j.properties
log4j.logger.org.apache.flink.table.runtime=INFO
log4j.logger.org.postgresql=INFO
常见问题处理
类型映射问题
症状: 字段类型不匹配导致写入失败。
解决方案:
- 在DDL中明确指定类型映射:
CREATE TABLE protonbase_sink (
id INT,
name VARCHAR(100), -- 明确长度
create_time TIMESTAMP(3)
) WITH (...);
- 使用CAST转换类型:
INSERT INTO protonbase_sink
SELECT id, name, CAST(create_time AS TIMESTAMP(3)) FROM source_table;
性能瓶颈
症状: SQL作业吞吐量低。
解决方案:
- 增加并行度
- 调整批量参数
- 检查ProtonBase集群负载
- 优化SQL查询逻辑