从 ElasticSearch 迁移到 ProtonBase 指南
概述
Elasticsearch(下文简称 ES)是一个基于 Lucene 的开源、分布式、RESTful 搜索引擎,由 Elastic 公司开发。它以其快速搜索、高扩展性和实时分析能力而闻名。Elasticsearch 广泛应用于搜索、全文检索、日志分析等领域,支持大规模数据集的实时索引和搜索,同时提供强大的数据分析和可视化功能。
本文介绍如何将 ES 的数据迁移到 ProtonBase 中,包括数据导出、表结构设计、数据导入和应用改造等内容。
准备工作
环境要求
- ElasticSearch 6.x 或更高版本
- ProtonBase 数据库实例
- Node.js 环境(用于数据导出工具)
- 网络连通性(建议内网连接以获得最佳性能)
权限配置
确保具有以下权限:
- 访问 ElasticSearch 集群的权限
- 在 ProtonBase 中创建数据库和表的权限
- 执行数据导入操作的权限
网络配置
确保源 ElasticSearch 集群和目标 ProtonBase 实例之间的网络连通性。
迁移步骤
1. 评估源数据
在开始迁移之前,需要对 ElasticSearch 中的数据进行全面评估:
# 查看集群健康状态
curl -X GET "localhost:9200/_cluster/health?pretty"
# 查看所有索引
curl -X GET "localhost:9200/_cat/indices?v"
# 查看特定索引的映射
curl -X GET "localhost:9200/index_name/_mapping?pretty"
# 查看索引统计信息
curl -X GET "localhost:9200/index_name/_stats?pretty"
2. 准备目标环境
在 ProtonBase 中创建相应的数据库和表结构:
-- 在 ProtonBase 中创建数据库
CREATE DATABASE search_engine;
-- 创建用户和权限
CREATE USER "es_migration" WITH PASSWORD 'secure_password';
GRANT ALL PRIVILEGES ON DATABASE search_engine TO "es_migration";
3. 数据导出
安装导出工具
-
安装 Node.js
- Windows: 安装包可以从 Node.js 官网 (opens in a new tab) 获取
- Linux: 从仓库中安装或自行编译
-
安装 elasticdump:
npm install -g elasticdump
导出数据
# 导出全部数据
elasticdump \
--input 'http://localhost:9200/' \
--output 'dump.json' \
--timeout 600000
# 导出特定 index 数据
INDEX_NAME="gharchive"
elasticdump \
--input "http://localhost:9200/${INDEX_NAME}" \
--output 'dump.json' \
--timeout 600000
4. 创建目标表
根据导出的数据结构在 ProtonBase 中创建表:
CREATE TABLE gharchive(
id TEXT NOT NULL PRIMARY KEY,
doc JSONB
) USING hybrid; -- 根据需要选择存储格式,可选值为 [row, columnar, hybrid]
5. ETL 导入数据
使用 Python 脚本将导出的数据导入到 ProtonBase:
import json
import psycopg2
def make_connection():
return psycopg2.connect('host=... port=5432 dbname=postgres username=... password=...')
def main():
inited_tables = set()
with open('dump.json', 'r', encoding='utf8') as fd, make_connection() as conn:
with conn.cursor() as cursor:
for line in fd:
line = line.strip()
if not line:
continue
payload = json.loads(line)
index_name = payload['_index']
document_id = payload['_id']
document = payload['_source']
if index_name not in inited_tables:
# 自动初始化表
init_sql = f'''CREATE TABLE IF NOT EXISTS {index_name}(id TEXT NOT NULL PRIMARY KEY, doc JSONB) USING hybrid;'''
cursor.execute(init_sql)
conn.commit()
# 写入 document
insert_sql = f'''INSERT INTO {index_name}(id, doc) VALUES(%s, %s)'''
cursor.execute(insert_sql, (document_id, json.dumps(document)))
conn.commit()
if __name__ == '__main__':
main()
应用改造
连接配置更新
将应用程序中的数据库连接配置从 ElasticSearch 更新为 ProtonBase:
# 原 ElasticSearch 配置
elasticsearch.host=localhost
elasticsearch.port=9200
# 新 ProtonBase 配置
jdbc.url=jdbc:postgresql://protonbase-host:5432/search_engine
jdbc.username=protonbase_user
jdbc.password=protonbase_password
CRUD 操作
写入文档
ElasticSearch 示例
# 写入文档
curl -X POST "localhost:9200/index_name/_doc/123" -H 'Content-Type: application/json' -d'
{
"key": "val"
}
'
ProtonBase 示例
-- 写入文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB);
-- 更新文档
UPDATE index_name
SET doc='{"key": "val2", "flag": false}'::JSONB
WHERE id='123';
-- upsert 文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB)
ON CONFLICT(id) DO UPDATE SET doc=EXCLUDED.doc;
-- partial update
UPDATE index_name
SET doc=jsonb_set(doc, '{flag}', 'false')
WHERE id='123';
取回文档
ElasticSearch 示例
# 取回整个文档
curl http://localhost:9200/index_name/_doc/document_id
# 取回文档部分内容
curl http://localhost:9200/index_name/_doc/document_id?_source_includes=id,type
ProtonBase 示例
-- 取回整个文档
SELECT doc FROM index_name
WHERE id='123';
-- 取回文档部分内容
SELECT
doc->>'id' AS "id",
doc->>'type' AS "type"
FROM index_name
WHERE id='123';
删除文档
ElasticSearch 示例
curl -XDELETE http://localhost:9200/index_name/_doc/document_id
ProtonBase 示例
DELETE FROM index_name
WHERE id='123';
搜索操作
索引查询
ElasticSearch 示例
# 搜索以下内容
# 1. q=key:val ==> 索引 key 的值为 val
# 2. sort=weight:desc ==> 按照索引 weight 进行反向排序
# 3. size=100 ==> 取回 100 条结果
curl "http://localhost:9200/index_name/_search?q=key:val&sort=weight:desc&size=100"
ProtonBase 示例
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
ORDER BY weight DESC
LIMIT 100
全文检索
ElasticSearch 示例
# 精确文本匹配
curl -X GET "localhost:9200/index_name/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"key": "val"
}
}
}
'
# 全文检索
curl -X GET "localhost:9200/index_name/_search" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"content": "search terms"
}
}
}
'
ProtonBase 示例
-- 若需使用全文检索功能,需提前创建 split_gin 索引,详情可参考 [索引设计](/guides/table/index-design)
-- 1. 使用 tsvector 方式进行搜索,建议建立索引
-- CREATE INDEX ON index_name USING split_gin(to_tsvector('english', doc->>'key'), tsvector_ops);
-- 2. 使用 trgm 方式进行搜索,建议建立索引
-- CREATE INDEX ON index_name USING split_gin(doc->>'key', gin_trgm_ops);
-- 精确文本匹配
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
LIMIT 100;
-- 使用 like 进行模糊匹配
SELECT doc FROM index_name
WHERE (doc->>'key') LIKE '%val%'
LIMIT 100;
-- 使用 tsvector 进行全文检索
SELECT doc FROM index_name
WHERE to_tsvector('english', (doc->>'key')) @@ to_tsquery('english', 'val')
LIMIT 100;
-- 使用 trgm 进行文本相似度搜索
SELECT doc FROM index_name
WHERE (doc->>'key') % 'val' -- % 是 trgm 扩展的运算符
LIMIT 100
聚合查询
NDV 统计
ElasticSearch 示例
{
"aggs":{
"distinct_name_count":{"cardinality":{"field":"fees"}}
}
}
ProtonBase 示例
-- 迁移后 SQL
SELECT COUNT(DISTINCT (doc->>'fees')) AS ndv
FROM index_name
最大聚合 & 最小聚合
ElasticSearch 示例
{
"aggs" : {
"fees_stats" : { "max" : { "field" : "fees" } }
}
}
ProtonBase 示例
SELECT max(fees) FROM index_name;
{
"aggs" : {
"min_fees" : { "min" : { "field" : "fees" } }
}
}
ProtonBase 示例
SELECT min(fees) FROM index_name;
汇总聚合
ElasticSearch 示例
{
"aggs" : {
"total_fees" : { "sum" : { "field" : "fees" } }
}
}
ProtonBase 示例
SELECT sum(fees) FROM index_name;
迁移最佳实践
1. 迁移前准备
-
数据评估:
- 评估 ElasticSearch 集群大小和索引数量
- 分析文档结构和字段类型
- 识别大文档和复杂嵌套结构
-
制定迁移计划:
- 确定迁移时间窗口(低峰期)
- 准备回滚方案
- 制定详细的测试计划
- 安排团队培训和知识转移
2. 迁移过程中
-
分阶段迁移:
- 先迁移非关键业务数据
- 逐步迁移核心业务数据
- 并行运行新旧系统进行验证
-
实时监控:
- 监控数据导出和导入进度
- 监控系统性能指标
- 记录迁移过程中的问题和解决方案
3. 迁移后优化
-
性能调优:
- 根据查询模式创建合适的索引
- 优化表存储模式(行存/列存/混存)
- 调整系统参数以适应工作负载
-
安全加固:
- 配置 IP 白名单限制访问
- 设置细粒度的用户权限
- 启用审计日志记录关键操作
常见问题与解决方案
1. 数据类型映射问题
问题:ElasticSearch 的动态类型与 ProtonBase 的强类型系统不匹配。
解决方案:
- 使用 JSONB 类型存储动态结构数据
- 在应用层进行类型转换和验证
- 建立数据验证规则确保一致性
2. 全文检索性能问题
问题:从 ElasticSearch 的倒排索引转换为 ProtonBase 的全文检索可能影响性能。
解决方案:
- 合理设计全文检索索引策略
- 使用适当的文本搜索配置
- 考虑使用物化视图优化复杂查询
3. 聚合查询差异
问题:ElasticSearch 的聚合功能与 SQL 聚合函数存在差异。
解决方案:
- 熟悉 ProtonBase 的聚合函数语法
- 在应用层适配聚合查询逻辑
- 使用窗口函数实现复杂聚合需求
4. 分页查询问题
问题:ElasticSearch 的分页机制与 SQL 分页语法不同。
解决方案:
-- ElasticSearch 使用 from 和 size 参数
-- ProtonBase 使用 OFFSET 和 LIMIT
SELECT * FROM index_name OFFSET 20 LIMIT 10;
通过遵循这些步骤和最佳实践,您可以顺利完成从 ElasticSearch 到 ProtonBase 的迁移,并充分利用 ProtonBase 的分布式数据库优势。