Apache Flink 集成

ProtonBase 是一款兼容 PostgreSQL 协议的高性能分布式数据库,支持标准SQL语法和PG生态工具链。本文将介绍如何通过Apache Flink实现高效的数据写入ProtonBase,重点包括DataStream API和Flink SQL两种写入方式,也介绍如何通过 Apache Flink 读取 CDC 增量变更。

环境准备

版本兼容性

  • Flink版本:1.14+ (推荐1.16+)
  • JDBC驱动:PostgreSQL JDBC驱动(推荐 42.5+)

依赖配置

在Flink项目中添加JDBC驱动依赖:

<!-- Maven配置 -->
<dependency>
    <groupId>org.postgresql</groupId>
    <artifactId>postgresql</artifactId>
    <version>42.5.0</version>
</dependency>

Flink SQL与ProtonBase类型映射

Flink SQL类型ProtonBase类型
BOOLEANBOOLEAN
TINYINTSMALLINT
SMALLINTSMALLINT
INTINTEGER
BIGINTBIGINT
FLOATREAL
DOUBLEDOUBLE PRECISION
DECIMAL(p,s)NUMERIC(p,s)
VARCHAR(n)VARCHAR(n)
CHAR(n)CHAR(n)
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
ARRAYARRAY
MAPJSONB
ROWJSONB

使用 Data Stream API 写入

使用JDBC Sink

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
// 定义数据流
DataStream<User> userStream = env.addSource(...);
 
// 配置JDBC连接参数
JdbcConnectionOptions jdbcOpts = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withUrl("jdbc:postgresql://protonbase-host:5432/dbname")
    .withDriverName("org.postgresql.Driver")
    .withUsername("username")
    .withPassword("password")
    .build();
 
// 创建JDBC Sink
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    jdbcOpts));
 
env.execute("ProtonBase Sink Job");

批量写入优化

JdbcExecutionOptions execOpts = new JdbcExecutionOptions.Builder()
    .withBatchSize(1000)  // 每批记录数
    .withBatchIntervalMs(200)  // 批处理间隔(毫秒)
    .withMaxRetries(3)  // 失败重试次数
    .build();
 
userStream.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (ps, user) -> {
        ps.setInt(1, user.getId());
        ps.setString(2, user.getName());
        ps.setInt(3, user.getAge());
    },
    execOpts,
    jdbcOpts));

使用 Flink SQL 写入

创建ProtonBase Catalog

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
 
// 注册ProtonBase Catalog
tableEnv.executeSql(
    "CREATE CATALOG protonbase WITH (" +
    "  'type'='jdbc'," +
    "  'default-database'='dbname'," +
    "  'username'='username'," +
    "  'password'='password'," +
    "  'base-url'='jdbc:postgresql://protonbase-host:5432'" +
    ")");
 
// 设置当前Catalog
tableEnv.useCatalog("protonbase");

通过Flink SQL写入数据

// 注册Flink表(源表)
tableEnv.executeSql(
    "CREATE TABLE source_table (" +
    "  id INT," +
    "  name STRING," +
    "  age INT" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'user_topic'," +
    "  'properties.bootstrap.servers' = 'kafka:9092'," +
    "  'format' = 'json'" +
    ")");
 
// 执行写入
tableEnv.executeSql(
    "INSERT INTO users " +  // ProtonBase中已存在的表
    "SELECT id, name, age FROM source_table");

或者通过 Flink SQL 开发控制台提交作业:

CREATE TEMPORARY TABLE users_sink (
 id INTEGER,
 name STRING,
 age INTEGER
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:postgresql://protonbase-host:5432/dbname',
 'table-name' = 'users',
 'username' = 'username',
 'password' = 'password',
 'driver' = 'org.postgresql.Driver',
 'sink.buffer-flush.max-rows' = '1000',
 'sink.buffer-flush.interval' = '1s',
 'sink.max-retries' = '3'
);
 
 
 
INSERT INTO users_sink
SELECT * FROM source_table;

常用配置参数

参数名说明推荐值
sink.buffer-flush.max-rows批量写入最大行数1000-5000
sink.buffer-flush.interval批量写入间隔1s
sink.max-retries失败重试次数3
sink.parallelismSink并行度与ProtonBase节点数相同
connection.max-retry-timeout连接超时时间30s

更多配置参数,参考 Apache Flink JDBC SQL Connector (opens in a new tab)

性能调优

SQL作业并行度设置

在SQL中设置:

SET 'parallelism.default' = '8';

批量写入参数

CREATE TABLE protonbase_sink (
  ...
) WITH (
  ...
  'sink.buffer-flush.interval' = '1s',
  'sink.buffer-flush.max-rows' = '1000',
  'sink.parallelism' = '8'
);

使用Flink消费CDC

Flink可以通过postgres-cdc connector获取ProtonBase的CDC数据。

前置条件

  • 数据库已开通逻辑复制(WAL_LEVEL=logical)
  • Flink 运行时环境与ProtonBase 数据库网络可达
  • 连接数据库的用户需要具备Replication权限,或能够自动创建Slot(复制槽)和 Publication。

更多详情参考 CDC 订阅

Publication & Slot 资源管理

建议提前创建好 publication,publication 是 PG 协议中服务器端的事件发布及过滤机制,可以选择发布部分表、部分事件、也可以合并分区事件到分区父表。如果不在数据库中显性创建 publication,则 flink CDC connector 会创建一个默认的 dbz_publication,该 publication 会订阅所有表的所有变更事件,然后在 flink 客户端执行时在本地过滤,效率更低,网络开销更大。在 Flink Connector 中用'debezium.publication.name' = 'pub-name' 指定使用的 publication,多个 flink 作业可以共用相同的 publication。

Slot 是用来记录CDC 事件消费端进度(即 LSN)的机制,不同的消费端应该选择不同的 Slot,跟进不同的消费进度。建议提前创建好 Slot,使用 noexport_snapshot 参数,存储空间更节省。不同的 flink 程序应该使用不同的 slot。方式如下:

SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);

不再使用的 Flink 消费作业,需要显性删除 slot,释放被 PIN 住的存储空间,避免存储空间浪费。

SELECT pg_drop_replication_slot('flink_cdc_slot');

Slot 的更新依赖 Flink 的 checkpoint 执行,checkpoint 会更新 Slot 的LSN,长期不更新 LSN,会使得历史状态被 PIN 住,造成存储浪费。CDC 消费只支持从最近一次完成的 checkpoint 启动,不支持选择历史版本的 checkpoint,因为每个 checkpoint 里记录了历史消费的 LSN 进度,一旦 checkpoint 后,LSN 消费进度会收到 confirm 请求,被 confirm 后,slot 状态向前移动,因此历史 LSN 不再可以被消费。所以,无论flink作业启动checkpoint往后倒退多少,实际都只会从最后一次成功的checkpoint位点读取数据。

使用Flink SQL例子

以下例子中,首先通过CDC方式读取Source表,然后写入Sink表。

CREATE TEMPORARY TABLE source_cdc (
        ins_time timestamp(6) NULL,
        store_id string NULL,
        store_code string NOT NULL,
        store_name string NULL,
        time_section_no bigint NULL,
        stat_date string NOT NULL,
        PRIMARY KEY (store_code,stat_date) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'host-url',
  'port' = '5432',
  'username' = 'xxxx',
  'password' = 'xxxx',
  'database-name' = 'dbname',
  'schema-name' = 'schemaname',
  'table-name' = 'tablename',
  -- 建议在数据库中提前创建好 Slot,并管理 Slot 生命周期;如果未创建,Flink CDC 会在启动时创建 Slot,自动创建的 Slot 无NOEXPORT_SNAPSHOT属性,会占用额外存储空间。
  'slot.name' = 'flink_cdc_slot',
  --解码插件,仅支持 pgoutput 解码
  'decoding.plugin.name' = 'pgoutput',
  --当前仅支持 never 表示从 Slot 创建时的点位开始同步,只同步实时变更,不同步全量,当前引擎还不支持读取全量。
  'debezium.snapshot.mode' = 'never',
  -- 当使用 pgoutput 解码时,订阅可以指定 publication,optional,如果不写,会自动创建dbz_publication,订阅所有表的更新。
  'debezium.pulication.name' = 'pub-name'
);
 
CREATE TEMPORARY TABLE sink_kafka(
        store_id string NULL,
        cnt bigint NOT NULL,
        PRIMARY KEY (store_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic-name',
  'properties.bootstrap.servers' = 'servers',
  'key.format' = 'json',
  'key.fields' = 'store_id',
  'value.format' =  'debezium-json',
  'value.debezium-json.schema-include' = 'false'
);
 
INSERT INTO sink_kafka
SELECT store_id, count(*) as cnt
FROM source_cdc GROUP BY store_id;

实际示例:实时电商数据处理管道

以下是一个完整的实时电商数据处理管道示例,展示如何使用Flink与ProtonBase构建端到端的实时数据处理系统。

1. 系统架构

Kafka (订单事件) → Flink (实时处理) → ProtonBase (存储和分析)

                        Kafka (实时指标) → 实时监控面板

2. 数据源准备

首先创建Kafka主题来模拟订单事件流:

# 创建订单事件主题
kafka-topics.sh --create --topic order-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
 
# 创建订单状态更新主题
kafka-topics.sh --create --topic order-status-updates --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

3. ProtonBase表结构

在ProtonBase中创建相关表:

-- 创建订单事实表
CREATE TABLE orders (
    order_id BIGINT PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    quantity INT NOT NULL,
    price DECIMAL(10, 2) NOT NULL,
    total_amount DECIMAL(10, 2) NOT NULL,
    order_status VARCHAR(20) DEFAULT 'pending',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
-- 创建订单聚合表(按小时统计)
CREATE TABLE order_hourly_stats (
    stat_hour TIMESTAMP PRIMARY KEY,
    order_count BIGINT NOT NULL,
    total_revenue DECIMAL(12, 2) NOT NULL,
    avg_order_value DECIMAL(10, 2) NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) USING columnar;
 
-- 创建热门产品排行表
CREATE TABLE hot_products (
    product_id BIGINT PRIMARY KEY,
    product_name VARCHAR(200) NOT NULL,
    total_quantity BIGINT NOT NULL,
    total_revenue DECIMAL(12, 2) NOT NULL,
    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) USING columnar;

4. Flink实时处理作业

创建一个Flink作业来处理订单事件:

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
 
public class ECommerceDataPipeline {
    
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        
        // 配置检查点
        env.enableCheckpointing(30000); // 30秒检查点间隔
        
        // 1. 从Kafka读取订单事件
        tableEnv.executeSql(
            "CREATE TABLE order_events (" +
            "  order_id BIGINT," +
            "  customer_id BIGINT," +
            "  product_id BIGINT," +
            "  product_name STRING," +
            "  quantity INT," +
            "  price DECIMAL(10, 2)," +
            "  total_amount DECIMAL(10, 2)," +
            "  event_time TIMESTAMP(3)," +
            "  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND" +
            ") WITH (" +
            "  'connector' = 'kafka'," +
            "  'topic' = 'order-events'," +
            "  'properties.bootstrap.servers' = 'localhost:9092'," +
            "  'format' = 'json'," +
            "  'scan.startup.mode' = 'latest-offset'" +
            ")"
        );
        
        // 2. 创建ProtonBase Sink表
        tableEnv.executeSql(
            "CREATE TABLE protonbase_orders (" +
            "  order_id BIGINT," +
            "  customer_id BIGINT," +
            "  product_id BIGINT," +
            "  quantity INT," +
            "  price DECIMAL(10, 2)," +
            "  total_amount DECIMAL(10, 2)," +
            "  order_status STRING," +
            "  created_at TIMESTAMP(3)" +
            ") WITH (" +
            "  'connector' = 'jdbc'," +
            "  'url' = 'jdbc:postgresql://protonbase-host:5432/ecommerce'," +
            "  'table-name' = 'orders'," +
            "  'username' = 'flink_user'," +
            "  'password' = 'flink_password'," +
            "  'driver' = 'org.postgresql.Driver'," +
            "  'sink.buffer-flush.max-rows' = '1000'," +
            "  'sink.buffer-flush.interval' = '1s'," +
            "  'sink.max-retries' = '3'" +
            ")"
        );
        
        // 3. 创建实时指标输出表(输出到Kafka)
        tableEnv.executeSql(
            "CREATE TABLE real_time_metrics (" +
            "  metric_name STRING," +
            "  metric_value DECIMAL(12, 2)," +
            "  event_time TIMESTAMP(3)" +
            ") WITH (" +
            "  'connector' = 'kafka'," +
            "  'topic' = 'real-time-metrics'," +
            "  'properties.bootstrap.servers' = 'localhost:9092'," +
            "  'format' = 'json'" +
            ")"
        );
        
        // 4. 处理订单事件并写入ProtonBase
        tableEnv.executeSql(
            "INSERT INTO protonbase_orders " +
            "SELECT " +
            "  order_id," +
            "  customer_id," +
            "  product_id," +
            "  quantity," +
            "  price," +
            "  total_amount," +
            "  'pending' as order_status," +
            "  event_time as created_at " +
            "FROM order_events"
        );
        
        // 5. 计算实时指标并输出到Kafka
        tableEnv.executeSql(
            "INSERT INTO real_time_metrics " +
            "SELECT " +
            "  'hourly_order_count' as metric_name," +
            "  COUNT(*) as metric_value," +
            "  TUMBLE_END(event_time, INTERVAL '1' HOUR) as event_time " +
            "FROM order_events " +
            "GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)"
        );
        
        // 6. 计算小时统计并写入ProtonBase
        tableEnv.executeSql(
            "CREATE TABLE protonbase_hourly_stats (" +
            "  stat_hour TIMESTAMP(3)," +
            "  order_count BIGINT," +
            "  total_revenue DECIMAL(12, 2)," +
            "  avg_order_value DECIMAL(10, 2)" +
            ") WITH (" +
            "  'connector' = 'jdbc'," +
            "  'url' = 'jdbc:postgresql://protonbase-host:5432/ecommerce'," +
            "  'table-name' = 'order_hourly_stats'," +
            "  'username' = 'flink_user'," +
            "  'password' = 'flink_password'," +
            "  'driver' = 'org.postgresql.Driver'" +
            ")"
        );
        
        tableEnv.executeSql(
            "INSERT INTO protonbase_hourly_stats " +
            "SELECT " +
            "  TUMBLE_START(event_time, INTERVAL '1' HOUR) as stat_hour," +
            "  COUNT(*) as order_count," +
            "  SUM(total_amount) as total_revenue," +
            "  AVG(total_amount) as avg_order_value " +
            "FROM order_events " +
            "GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR)"
        );
        
        env.execute("E-Commerce Real-time Data Pipeline");
    }
}

5. 使用Flink SQL进行交互式分析

除了批处理作业,还可以使用Flink SQL进行交互式分析:

-- 连接到Flink SQL客户端
./bin/sql-client.sh
 
-- 设置ProtonBase Catalog
CREATE CATALOG protonbase WITH (
  'type'='jdbc',
  'default-database'='ecommerce',
  'username'='analyst',
  'password'='analyst_password',
  'base-url'='jdbc:postgresql://protonbase-host:5432'
);
 
USE CATALOG protonbase;
 
-- 实时查询热门产品
SELECT 
    p.product_name,
    SUM(oi.quantity) as total_sold,
    SUM(oi.quantity * oi.unit_price) as total_revenue
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
JOIN orders o ON oi.order_id = o.order_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '7' DAY
GROUP BY p.product_name
ORDER BY total_sold DESC
LIMIT 10;
 
-- 实时监控订单状态
SELECT 
    order_status,
    COUNT(*) as order_count,
    AVG(total_amount) as avg_order_value
FROM orders
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR
GROUP BY order_status;

6. 监控和告警

创建监控面板来跟踪系统性能:

-- 创建系统监控视图
CREATE VIEW system_monitoring AS
SELECT 
    'order_processing_rate' as metric_name,
    COUNT(*) as metric_value,
    CURRENT_TIMESTAMP as timestamp
FROM orders
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '1' MINUTE;
 
-- 查询处理延迟
SELECT 
    AVG(EXTRACT(EPOCH FROM (CURRENT_TIMESTAMP - created_at))) as avg_processing_delay_seconds
FROM orders
WHERE created_at >= CURRENT_TIMESTAMP - INTERVAL '10' MINUTE;

监控与运维

SQL作业监控

通过Flink Web UI可以监控:

  • 写入速率(records/s)
  • 各算子的背压情况
  • Checkpoint状态

日志配置

# log4j.properties
log4j.logger.org.apache.flink.table.runtime=INFO
log4j.logger.org.postgresql=INFO

最佳实践

1. 性能优化

  1. 并行度调优

    • 根据数据量和处理需求设置合适的并行度
    • 监控资源使用情况,避免资源浪费
  2. 批处理优化

    • 合理设置批量写入参数
    • 使用连接池减少连接开销
  3. 内存管理

    • 合理配置Flink内存参数
    • 监控GC情况,避免频繁GC影响性能

2. 数据一致性

  1. Exactly-Once语义

    • 启用检查点确保数据一致性
    • 使用支持事务的Sink连接器
  2. 幂等性处理

    • 在ProtonBase中使用唯一约束避免重复数据
    • 设计幂等的处理逻辑

3. 容错和恢复

  1. 检查点配置

    • 设置合适的检查点间隔
    • 配置检查点超时和重试策略
  2. 状态管理

    • 合理管理Flink状态大小
    • 定期清理过期状态

常见问题处理

类型映射问题

症状: 字段类型不匹配导致写入失败。

解决方案:

  1. 在DDL中明确指定类型映射:
CREATE TABLE protonbase_sink (
  id INT,
  name VARCHAR(100),  // 明确长度
  create_time TIMESTAMP(3)
) WITH (...);
  1. 使用CAST转换类型:
INSERT INTO protonbase_sink
SELECT id, name, CAST(create_time AS TIMESTAMP(3)) FROM source_table;

性能瓶颈

症状: SQL作业吞吐量低。

解决方案:

  1. 增加并行度
  2. 调整批量参数
  3. 检查ProtonBase集群负载
  4. 优化SQL查询逻辑