从 PostgreSQL 迁移到 ProtonBase 指南
概述
ProtonBase 支持从多种 PostgreSQL 的数据源进行迁入,包括阿里云 ADB for PostgreSQL、PolarDB PG版、RDS PG、以及其他类似云平台及各类自建PostgreSQL 数据源和其他兼容PG生态的数据产品。
本文以阿里云 RDS PG 为例,介绍如何通过 ProtonBase 同步工具 Teleport 将数据同步到 ProtonBase 中。
有关 Teleport 的更多文档,请参考 Teleport 数据同步。
准备工作
环境要求
- 源 PostgreSQL 数据库(支持 PostgreSQL 9.6 及以上版本)
- ProtonBase 数据库实例
- 网络连通性(建议内网连接以获得最佳性能)
- Teleport 数据同步工具访问权限
权限配置
在 PostgreSQL 中创建专门用于数据同步的用户:
-- 使用具有足够权限的账户登录 PostgreSQL
psql postgres://${user}:${password}@${host}:5432/${your_database_name}
-- 创建CDC专用同步账号
CREATE USER ${cdc_user} WITH REPLICATION ENCRYPTED PASSWORD '${cdc_user_passord}';
-- 对新cdc专用同步账号赋权
source_db=# GRANT SELECT ON ALL TABLES IN SCHEMA schema1, [other_schema] TO ${cdc_user};
source_db=# GRANT USAGE ON SCHEMA schema1, [other_schema] TO ${cdc_user};
网络配置
注意:此步骤非必须,也可以走公网进行数据同步。但从同步速度和费用角度,建议进行打通,参考文档数据同步网络配置。
迁移步骤
1. 评估源数据库
在开始迁移之前,需要对源 PostgreSQL 数据库进行全面评估:
-- 检查数据库大小
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname NOT IN ('information_schema', 'pg_catalog')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- 检查表依赖关系
SELECT
tc.table_schema,
tc.constraint_name,
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY';
2. 准备目标环境
在 ProtonBase 中创建相应的数据库和表结构:
-- 在ProtonBase中创建数据库
CREATE DATABASE ecommerce;
-- 创建必要的schema
CREATE SCHEMA sales;
CREATE SCHEMA inventory;
CREATE SCHEMA customers;
-- 创建用户和权限
CREATE USER "ecommerce_app" WITH PASSWORD 'secure_password';
GRANT ALL PRIVILEGES ON SCHEMA sales, inventory, customers TO "ecommerce_app";
3. 创建 PUBLICATION
在源 PostgreSQL 中创建用于数据同步的 PUBLICATION:
-- 高权限账号登录
psql -h ${host} -p 5432 -U ${user} "dbname=${your_database_name} replication=database" -W
-- 以下命令二选一
-- 对所有表创建订阅
CREATE PUBLICATION teleport_pub FOR ALL TABLES;
-- 只对同步表创建订阅
CREATE PUBLICATION teleport_pub FOR TABLE ${your_schema}.${your_table}, ...;
4. 设置 WAL_LEVEL
设置 wal_level = logical:
- 自建服务,请修改 postgresql.conf 文件中 wal_level 值后重启
- 云厂商:
- 阿里云,请参考官网设置 (opens in a new tab)
- 其他,请参考相应云厂商文档
5. 配置数据同步
在 Teleport 中配置数据同步任务:
- 登录 ProtonBase 控制台
- 导航到"数据同步" -> "数据导入"
- 创建新的同步任务
- 配置源 PostgreSQL 连接信息:
- 主机地址
- 端口(通常为5432)
- 数据库名
- 用户名(具有 REPLICATION 权限的用户)
- 密码
- 配置目标 ProtonBase 连接信息
- 选择要同步的数据库和表
- 配置在目标端的新的 database 和 schema 名称(可选)
- 确定同步模式和脏数据策略
6. 启动同步
在 Teleport 界面启动同步任务,一般任务初始化需要一些时间进行资源准备。
资源准备完成后,根据页面的提示可以看到当前所处节点。当进行当增量阶段显示运行中或显示具体时延时,意味着已经在实施同步。
实际迁移示例:电商系统迁移
以下是一个完整的电商系统从 PostgreSQL 迁移到 ProtonBase 的示例,展示如何进行端到端的数据迁移。
1. 系统评估
评估源数据库结构
-- 检查数据库大小
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size
FROM pg_tables
WHERE schemaname NOT IN ('information_schema', 'pg_catalog')
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC;
-- 检查表依赖关系
SELECT
tc.table_schema,
tc.constraint_name,
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY';
准备目标环境
-- 在ProtonBase中创建数据库
CREATE DATABASE ecommerce;
-- 创建必要的schema
CREATE SCHEMA sales;
CREATE SCHEMA inventory;
CREATE SCHEMA customers;
-- 创建用户和权限
CREATE USER "ecommerce_app" WITH PASSWORD 'secure_password';
GRANT ALL PRIVILEGES ON SCHEMA sales, inventory, customers TO "ecommerce_app";
2. 表结构迁移
创建核心表结构
-- 用户表
CREATE TABLE customers.users (
user_id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL UNIQUE,
first_name VARCHAR(50),
last_name VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 产品表
CREATE TABLE inventory.products (
product_id SERIAL PRIMARY KEY,
product_name VARCHAR(200) NOT NULL,
description TEXT,
price NUMERIC(10, 2) NOT NULL,
stock_quantity INTEGER NOT NULL DEFAULT 0,
category_id INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 订单表
CREATE TABLE sales.orders (
order_id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES customers.users(user_id),
order_status VARCHAR(20) NOT NULL DEFAULT 'pending',
total_amount NUMERIC(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 订单项表
CREATE TABLE sales.order_items (
item_id SERIAL PRIMARY KEY,
order_id INTEGER NOT NULL REFERENCES sales.orders(order_id),
product_id INTEGER NOT NULL REFERENCES inventory.products(product_id),
quantity INTEGER NOT NULL,
unit_price NUMERIC(10, 2) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
3. 数据迁移配置
创建 Publication
-- 在源 PostgreSQL 中创建 Publication
CREATE PUBLICATION ecommerce_pub FOR TABLE
customers.users,
inventory.products,
sales.orders,
sales.order_items;
配置 Teleport 同步任务
- 登录 ProtonBase 控制台
- 导航到"数据同步" -> "数据导入"
- 创建新的同步任务,命名为"ecommerce_migration"
- 配置源 PostgreSQL 连接信息:
- 主机地址
- 端口(通常为5432)
- 数据库名
- 用户名(具有 REPLICATION 权限的用户)
- 密码
- 配置目标 ProtonBase 连接信息
- 选择要同步的表:
- customers.users
- inventory.products
- sales.orders
- sales.order_items
- 配置目标映射:
- customers.users -> customers.users
- inventory.products -> inventory.products
- sales.orders -> sales.orders
- sales.order_items -> sales.order_items
- 设置同步模式为"全量+增量"
- 配置脏数据处理策略
4. 启动和监控迁移
启动同步任务
-- 在 Teleport 界面启动同步任务
-- 监控同步进度和状态
验证数据一致性
-- 检查源端和目标端数据行数
-- 源端 PostgreSQL
SELECT COUNT(*) FROM customers.users;
SELECT COUNT(*) FROM inventory.products;
SELECT COUNT(*) FROM sales.orders;
SELECT COUNT(*) FROM sales.order_items;
-- 目标端 ProtonBase
SELECT COUNT(*) FROM customers.users;
SELECT COUNT(*) FROM inventory.products;
SELECT COUNT(*) FROM sales.orders;
SELECT COUNT(*) FROM sales.order_items;
-- 抽样验证数据
SELECT * FROM customers.users LIMIT 5;
SELECT * FROM inventory.products ORDER BY product_id DESC LIMIT 5;
5. 应用程序切换
更新应用程序配置
# 原 PostgreSQL 配置
# jdbc:postgresql://old-postgres-host:5432/ecommerce
# 新 ProtonBase 配置
jdbc:postgresql://protonbase-host:5432/ecommerce
测试应用程序功能
-- 测试用户注册功能
INSERT INTO customers.users (username, email, first_name, last_name)
VALUES ('testuser', 'test@example.com', 'Test', 'User');
-- 测试产品查询功能
SELECT product_name, price FROM inventory.products WHERE category_id = 1;
-- 测试订单创建功能
INSERT INTO sales.orders (user_id, total_amount) VALUES (1, 99.99);
应用改造
连接配置更新
将应用程序中的数据库连接配置从 PostgreSQL 更新为 ProtonBase:
# 原 PostgreSQL 配置
spring.datasource.url=jdbc:postgresql://old-postgres-host:5432/ecommerce
spring.datasource.username=postgres_user
spring.datasource.password=postgres_password
# 新 ProtonBase 配置
spring.datasource.url=jdbc:postgresql://protonbase-host:5432/ecommerce
spring.datasource.username=protonbase_user
spring.datasource.password=protonbase_password
SQL 语法适配
虽然 ProtonBase 与 PostgreSQL 高度兼容,但仍有一些细微差异需要注意:
序列操作
-- PostgreSQL 中获取序列下一个值
SELECT nextval('user_id_seq');
-- ProtonBase 中获取序列下一个值(相同语法)
SELECT nextval('user_id_seq');
数组操作
-- PostgreSQL 中数组操作
SELECT ARRAY[1,2,3];
SELECT array_append(ARRAY[1,2], 3);
-- ProtonBase 中数组操作(相同语法)
SELECT ARRAY[1,2,3];
SELECT array_append(ARRAY[1,2], 3);
代码示例
Java (Spring Boot)
@Repository
public class UserRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public User findById(Long id) {
String sql = "SELECT user_id, username, email FROM customers.users WHERE user_id = ?";
return jdbcTemplate.queryForObject(sql, new Object[]{id}, new UserRowMapper());
}
public List<User> findAll() {
String sql = "SELECT user_id, username, email FROM customers.users ORDER BY user_id";
return jdbcTemplate.query(sql, new UserRowMapper());
}
public void save(User user) {
String sql = "INSERT INTO customers.users (username, email) VALUES (?, ?)";
jdbcTemplate.update(sql, user.getUsername(), user.getEmail());
}
}
迁移最佳实践
1. 迁移前准备
-
详细评估:
- 评估数据量大小和复杂度
- 识别自定义函数、存储过程和触发器
- 检查扩展插件使用情况
- 评估应用程序 SQL 兼容性
-
制定迁移计划:
- 确定迁移时间窗口(低峰期)
- 准备回滚方案
- 制定详细的测试计划
- 安排团队培训和知识转移
2. 迁移过程中
-
分阶段迁移:
- 先迁移非关键业务数据
- 逐步迁移核心业务数据
- 并行运行新旧系统进行验证
-
实时监控:
- 监控数据同步延迟
- 监控系统性能指标
- 记录迁移过程中的问题和解决方案
3. 迁移后优化
-
性能调优:
- 根据查询模式创建合适的索引
- 优化表存储模式(行存/列存/混存)
- 调整系统参数以适应工作负载
-
安全加固:
- 配置 IP 白名单限制访问
- 设置细粒度的用户权限
- 启用审计日志记录关键操作
常见问题和解决方案
1. Publication 创建失败
问题:创建 Publication 时提示权限不足。
解决方案:
-- 确保使用具有足够权限的用户创建 Publication
-- 授予必要的权限
GRANT SELECT ON ALL TABLES IN SCHEMA customers, inventory, sales TO your_user;
GRANT USAGE ON SCHEMA customers, inventory, sales TO your_user;
2. 数据同步延迟
问题:数据同步存在较大延迟。
解决方案:
-- 检查源端 WAL 设置
SHOW wal_level;
SHOW max_replication_slots;
SHOW max_wal_senders;
-- 检查复制槽状态
SELECT * FROM pg_replication_slots;
3. 数据类型不兼容
问题:某些 PostgreSQL 特有数据类型在 ProtonBase 中不支持。
解决方案:
-- 使用兼容的数据类型替换
-- PostgreSQL 的数组类型
-- 原:tags TEXT[]
-- 新:tags JSONB
-- PostgreSQL 的自定义类型
-- 原:status order_status
-- 新:status VARCHAR(20) CHECK (status IN ('pending', 'processing', 'shipped', 'delivered'))
注意事项
-
Teleport 支持常见的 DDL 的变更的自动同步,比如
CREATE TABLE
/TRUNCATE TABLE
/DROP TABLE
/ADD COLUMN
/ALTER COLUMN TYPE
等,但是不支持约束变更、DROP COLUMN
等的自动同步。在源端有不支持自动同步的 DDL 行为发生时,可能导致 Teleport 同步链路停止。 -
启动后可能报错:message: ERROR: permission denied for database source_demo. Please check and retry。一般为 PUBLICATION 没有创建。请参考前面步骤进行检查。
-
同步状态一直显示运行中,没有显示具体的延迟。原因为 Teleport 目前的实现机制需要在增量阶段至少有新数据写入时,才能计算出延迟。
通过遵循这些步骤和最佳实践,您可以顺利完成从 PostgreSQL 到 ProtonBase 的迁移,并充分利用 ProtonBase 的分布式数据库优势。