Apache Flink 实时写入

ProtonBase是一款兼容PostgreSQL协议的高性能分布式数据库,支持标准SQL语法和PG生态工具链。本文将介绍如何通过Apache Flink实现高效的数据写入ProtonBase,重点包括DataStream API和Flink SQL两种写入方式。

环境准备

版本兼容性

  • Flink版本:1.14+ (推荐1.16+)
  • JDBC驱动:PostgreSQL JDBC驱动(推荐 42.5+)

依赖配置

在Flink项目中添加JDBC驱动依赖:

<!-- Maven配置 -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.5.0</version>
</dependency>

Flink SQL与ProtonBase类型映射

Flink SQL类型ProtonBase类型
BOOLEANBOOLEAN
TINYINTSMALLINT
SMALLINTSMALLINT
INTINTEGER
BIGINTBIGINT
FLOATREAL
DOUBLEDOUBLE PRECISION
DECIMAL(p,s)NUMERIC(p,s)
VARCHAR(n)VARCHAR(n)
CHAR(n)CHAR(n)
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
ARRAYARRAY
MAPJSONB
ROWJSONB

使用 Data Stream API 写入

使用JDBC Sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// 定义数据流
DataStream<User> userStream = env.addSource(...);
 
// 配置JDBC连接参数
JdbcConnectionOptions jdbcOpts = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:postgresql://protonbase-host:5432/dbname")
    .withDriverName("org.postgresql.Driver")
    .withUsername("username")
    .withPassword("password")
    .build();
 
// 创建JDBC Sink
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    jdbcOpts));
 
env.execute("ProtonBase Sink Job");

批量写入优化

JdbcExecutionOptions execOpts = new JdbcExecutionOptions.Builder()
    .withBatchSize(1000)  // 每批记录数
    .withBatchIntervalMs(200)  // 批处理间隔(毫秒)
    .withMaxRetries(3)  // 失败重试次数
    .build();
 
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    execOpts,
    jdbcOpts));

使用 Flink SQL 写入

创建ProtonBase Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// 注册ProtonBase Catalog
tableEnv.executeSql(
    "CREATE CATALOG protonbase WITH (" +
    "  'type'='jdbc'," +
    "  'default-database'='dbname'," +
    "  'username'='username'," +
    "  'password'='password'," +
    "  'base-url'='jdbc:postgresql://protonbase-host:5432'" +
    ")");
 
// 设置当前Catalog
tableEnv.useCatalog("protonbase");

通过Flink SQL写入数据

// 注册Flink表(源表)
tableEnv.executeSql(
    "CREATE TABLE source_table (" +
    "  id INT," +
    "  name STRING," +
    "  age INT" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user_topic'," +
    "  'properties.bootstrap.servers' = 'kafka:9092'," +
    "  'format' = 'json'" +
    ")");
 
// 执行写入
tableEnv.executeSql(
    "INSERT INTO users " +  // ProtonBase中已存在的表
    "SELECT id, name, age FROM source_table");

或者通过 Flink SQL 开发控制台提交作业:

CREATE TEMPORARY TABLE users_sink (
 id INTEGER,
 name STRING,
 age INTEGER
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://protonbase-host:5432/dbname',
 'table-name' = 'users',
 'username' = 'username',
 'password' = 'password',
 'driver' = 'org.postgresql.Driver',
 'sink.buffer-flush.max-rows' = '1000',
 'sink.buffer-flush.interval' = '1s',
 'sink.max-retries' = '3'
);
 
 
 
INSERT INTO users_sink
SELECT * FROM source_table;

常用配置参数

参数名说明推荐值
sink.buffer-flush.max-rows批量写入最大行数1000-5000
sink.buffer-flush.interval批量写入间隔1s
sink.max-retries失败重试次数3
sink.parallelismSink并行度与ProtonBase节点数相同
connection.max-retry-timeout连接超时时间30s

更多配置参数,参考 Apache Flink JDBC SQL Connector (opens in a new tab)

性能调优

SQL作业并行度设置

在SQL中设置:

SET 'parallelism.default' = '8';

批量写入参数

CREATE TABLE protonbase_sink (
  ...
) WITH (
  ...
  'sink.buffer-flush.interval' = '1s',
  'sink.buffer-flush.max-rows' = '1000',
  'sink.parallelism' = '8'
);

监控与运维

SQL作业监控

通过Flink Web UI可以监控:

  • 写入速率(records/s)
  • 各算子的背压情况
  • Checkpoint状态

日志配置

# log4j.properties
log4j.logger.org.apache.flink.table.runtime=INFO
log4j.logger.org.postgresql=INFO

常见问题处理

类型映射问题

症状: 字段类型不匹配导致写入失败。

解决方案:

  1. 在DDL中明确指定类型映射:
CREATE TABLE protonbase_sink (
  id INT,
  name VARCHAR(100),  -- 明确长度
  create_time TIMESTAMP(3)
) WITH (...);
  1. 使用CAST转换类型:
INSERT INTO protonbase_sink
SELECT id, name, CAST(create_time AS TIMESTAMP(3)) FROM source_table;

性能瓶颈

症状: SQL作业吞吐量低。

解决方案:

  1. 增加并行度
  2. 调整批量参数
  3. 检查ProtonBase集群负载
  4. 优化SQL查询逻辑