从 ElasticSearch 迁移

从 ElasticSearch 迁移到 ProtonBase 指南

概述

Elasticsearch(下文简称 ES)是一个基于 Lucene 的开源、分布式、RESTful 搜索引擎,由 Elastic 公司开发。它以其快速搜索、高扩展性和实时分析能力而闻名。Elasticsearch 广泛应用于搜索、全文检索、日志分析等领域,支持大规模数据集的实时索引和搜索,同时提供强大的数据分析和可视化功能。

本文介绍如何将 ES 的数据迁移到 ProtonBase 中,包括数据导出、表结构设计、数据导入和应用改造等内容。

准备工作

环境要求

  • ElasticSearch 6.x 或更高版本
  • ProtonBase 数据库实例
  • Node.js 环境(用于数据导出工具)
  • 网络连通性(建议内网连接以获得最佳性能)

权限配置

确保具有以下权限:

  1. 访问 ElasticSearch 集群的权限
  2. 在 ProtonBase 中创建数据库和表的权限
  3. 执行数据导入操作的权限

网络配置

确保源 ElasticSearch 集群和目标 ProtonBase 实例之间的网络连通性。

迁移步骤

1. 评估源数据

在开始迁移之前,需要对 ElasticSearch 中的数据进行全面评估:

# 查看集群健康状态
curl -X GET "localhost:9200/_cluster/health?pretty"
 
# 查看所有索引
curl -X GET "localhost:9200/_cat/indices?v"
 
# 查看特定索引的映射
curl -X GET "localhost:9200/index_name/_mapping?pretty"
 
# 查看索引统计信息
curl -X GET "localhost:9200/index_name/_stats?pretty"

2. 准备目标环境

在 ProtonBase 中创建相应的数据库和表结构:

-- 在 ProtonBase 中创建数据库
CREATE DATABASE search_engine;
 
-- 创建用户和权限
CREATE USER "es_migration" WITH PASSWORD 'secure_password';
GRANT ALL PRIVILEGES ON DATABASE search_engine TO "es_migration";

3. 数据导出

安装导出工具

  1. 安装 Node.js

  2. 安装 elasticdump:

npm install -g elasticdump

导出数据

# 导出全部数据
elasticdump \
--input 'http://localhost:9200/' \
--output 'dump.json' \
--timeout 600000
 
# 导出特定 index 数据
INDEX_NAME="gharchive"
elasticdump \
--input "http://localhost:9200/${INDEX_NAME}" \
--output 'dump.json' \
--timeout 600000

4. 创建目标表

根据导出的数据结构在 ProtonBase 中创建表:

CREATE TABLE gharchive(
  id TEXT NOT NULL PRIMARY KEY,
  doc JSONB
) USING hybrid; -- 根据需要选择存储格式,可选值为 [row, columnar, hybrid]

5. ETL 导入数据

使用 Python 脚本将导出的数据导入到 ProtonBase:

import json
import psycopg2
 
def make_connection():
    return psycopg2.connect('host=... port=5432 dbname=postgres username=... password=...')
    
    
def main():
    inited_tables = set()
    with open('dump.json', 'r', encoding='utf8') as fd, make_connection() as conn:
        with conn.cursor() as cursor:
            for line in fd:
                line = line.strip()
                if not line:
                    continue
                payload = json.loads(line)
                index_name = payload['_index']
                document_id = payload['_id']
                document = payload['_source']
                if index_name not in inited_tables:
                    # 自动初始化表
                    init_sql = f'''CREATE TABLE IF NOT EXISTS {index_name}(id TEXT NOT NULL PRIMARY KEY, doc JSONB) USING hybrid;'''
                    cursor.execute(init_sql)
                    conn.commit()
                # 写入 document
                insert_sql = f'''INSERT INTO {index_name}(id, doc) VALUES(%s, %s)'''
                cursor.execute(insert_sql, (document_id, json.dumps(document)))
                conn.commit()
 
if __name__ == '__main__':
    main()

应用改造

连接配置更新

将应用程序中的数据库连接配置从 ElasticSearch 更新为 ProtonBase:

# 原 ElasticSearch 配置
elasticsearch.host=localhost
elasticsearch.port=9200
 
# 新 ProtonBase 配置
jdbc.url=jdbc:postgresql://protonbase-host:5432/search_engine
jdbc.username=protonbase_user
jdbc.password=protonbase_password

CRUD 操作

写入文档

ElasticSearch 示例

# 写入文档
curl -X POST "localhost:9200/index_name/_doc/123" -H 'Content-Type: application/json' -d'
{
  "key": "val"
}
'

ProtonBase 示例

-- 写入文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB);
 
-- 更新文档
UPDATE index_name
    SET doc='{"key": "val2", "flag": false}'::JSONB
WHERE id='123';
 
-- upsert 文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB)
ON CONFLICT(id) DO UPDATE SET doc=EXCLUDED.doc;
 
-- partial update
UPDATE index_name
    SET doc=jsonb_set(doc, '{flag}', 'false')
WHERE id='123';

取回文档

ElasticSearch 示例

# 取回整个文档
curl http://localhost:9200/index_name/_doc/document_id
 
# 取回文档部分内容
curl http://localhost:9200/index_name/_doc/document_id?_source_includes=id,type

ProtonBase 示例

-- 取回整个文档
SELECT doc FROM index_name
WHERE id='123';
 
-- 取回文档部分内容
SELECT
    doc->>'id' AS "id",
    doc->>'type' AS "type"
FROM index_name
WHERE id='123';

删除文档

ElasticSearch 示例

curl -XDELETE http://localhost:9200/index_name/_doc/document_id

ProtonBase 示例

DELETE FROM index_name
WHERE id='123';

搜索操作

索引查询

ElasticSearch 示例

# 搜索以下内容
# 1. q=key:val ==> 索引 key 的值为 val
# 2. sort=weight:desc ==> 按照索引 weight 进行反向排序
# 3. size=100 ==> 取回 100 条结果
curl "http://localhost:9200/index_name/_search?q=key:val&sort=weight:desc&size=100"

ProtonBase 示例

SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
ORDER BY weight DESC
LIMIT 100

全文检索

ElasticSearch 示例

# 精确文本匹配
curl -X GET "localhost:9200/index_name/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "key": "val"
    }
  }
}
'
 
# 全文检索
curl -X GET "localhost:9200/index_name/_search" -H 'Content-Type: application/json' -d'
{
  "query": {
    "match": {
      "content": "search terms"
    }
  }
}
'

ProtonBase 示例

-- 若需使用全文检索功能,需提前创建 split_gin 索引,详情可参考 [索引设计](/guides/table/index-design)
-- 1. 使用 tsvector 方式进行搜索,建议建立索引 
--    CREATE INDEX ON index_name USING split_gin(to_tsvector('english', doc->>'key'), tsvector_ops);
-- 2. 使用 trgm 方式进行搜索,建议建立索引
--    CREATE INDEX ON index_name USING split_gin(doc->>'key', gin_trgm_ops);
 
-- 精确文本匹配
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
LIMIT 100;
 
-- 使用 like 进行模糊匹配
SELECT doc FROM index_name
WHERE (doc->>'key') LIKE '%val%'
LIMIT 100;
 
-- 使用 tsvector 进行全文检索
SELECT doc FROM index_name
WHERE to_tsvector('english', (doc->>'key')) @@ to_tsquery('english', 'val')
LIMIT 100;
 
-- 使用 trgm 进行文本相似度搜索
SELECT doc FROM index_name
WHERE (doc->>'key') % 'val' -- % 是 trgm 扩展的运算符
LIMIT 100

聚合查询

NDV 统计

ElasticSearch 示例

{
   "aggs":{
      "distinct_name_count":{"cardinality":{"field":"fees"}}
   }
}

ProtonBase 示例

-- 迁移后 SQL
SELECT COUNT(DISTINCT (doc->>'fees')) AS ndv
FROM index_name

最大聚合 & 最小聚合

ElasticSearch 示例

{
   "aggs" : {
      "fees_stats" : { "max" : { "field" : "fees" } }
   }
}

ProtonBase 示例

SELECT max(fees) FROM index_name;
{
   "aggs" : {
      "min_fees" : { "min" : { "field" : "fees" } }
   }
}

ProtonBase 示例

SELECT min(fees) FROM index_name;

汇总聚合

ElasticSearch 示例

{
   "aggs" : {
      "total_fees" : { "sum" : { "field" : "fees" } }
   }
}

ProtonBase 示例

SELECT sum(fees) FROM index_name;

迁移最佳实践

1. 迁移前准备

  1. 数据评估

    • 评估 ElasticSearch 集群大小和索引数量
    • 分析文档结构和字段类型
    • 识别大文档和复杂嵌套结构
  2. 制定迁移计划

    • 确定迁移时间窗口(低峰期)
    • 准备回滚方案
    • 制定详细的测试计划
    • 安排团队培训和知识转移

2. 迁移过程中

  1. 分阶段迁移

    • 先迁移非关键业务数据
    • 逐步迁移核心业务数据
    • 并行运行新旧系统进行验证
  2. 实时监控

    • 监控数据导出和导入进度
    • 监控系统性能指标
    • 记录迁移过程中的问题和解决方案

3. 迁移后优化

  1. 性能调优

    • 根据查询模式创建合适的索引
    • 优化表存储模式(行存/列存/混存)
    • 调整系统参数以适应工作负载
  2. 安全加固

    • 配置 IP 白名单限制访问
    • 设置细粒度的用户权限
    • 启用审计日志记录关键操作

常见问题与解决方案

1. 数据类型映射问题

问题:ElasticSearch 的动态类型与 ProtonBase 的强类型系统不匹配。

解决方案

  • 使用 JSONB 类型存储动态结构数据
  • 在应用层进行类型转换和验证
  • 建立数据验证规则确保一致性

2. 全文检索性能问题

问题:从 ElasticSearch 的倒排索引转换为 ProtonBase 的全文检索可能影响性能。

解决方案

  • 合理设计全文检索索引策略
  • 使用适当的文本搜索配置
  • 考虑使用物化视图优化复杂查询

3. 聚合查询差异

问题:ElasticSearch 的聚合功能与 SQL 聚合函数存在差异。

解决方案

  • 熟悉 ProtonBase 的聚合函数语法
  • 在应用层适配聚合查询逻辑
  • 使用窗口函数实现复杂聚合需求

4. 分页查询问题

问题:ElasticSearch 的分页机制与 SQL 分页语法不同。

解决方案

-- ElasticSearch 使用 from 和 size 参数
-- ProtonBase 使用 OFFSET 和 LIMIT
SELECT * FROM index_name OFFSET 20 LIMIT 10;

通过遵循这些步骤和最佳实践,您可以顺利完成从 ElasticSearch 到 ProtonBase 的迁移,并充分利用 ProtonBase 的分布式数据库优势。