周边工具
复杂业务隔离需求下的 NebulaGraph 集群迁移与版本升级全攻略
导读:在数据库的运维实践中,集群迁移与版本升级是及时满足业务需求的关键环节。这篇经验贴分享了使用 NebulaGraph Spark Connector 将多个业务的集群从 NebulaGraph 2.6.2 向 3.8.0 版本迁移,并部署到各自对应新集群中的完整方案,涉及业务隔离集群搭建、跨版本数据转运、脏数据处理等核心要点。
一、背景
多个业务使用同一套 NebulaGraph 集群,当其中某一个业务有大量耗时查询或者大量数据加载时,这时此集群的压力可能会急剧上升,导致所有的业务出现服务性能极度下降的情况;
NebulaGraph 使用古早的 2.6.2 版本,很多业务需求较难快速实现。
二、需求
- 为每个业务搭建一套属于自己的 NebulaGraph 集群;
- 将旧集群中的各个业务的数据迁移到自己业务对应的新集群中;
- 新集群需要使用 3.8.0 版本 NebulaGraph;
- 旧的 NebulaGraph 集群下线。
三、数据迁移工具选型
通过 NebulaGraph 官网查看有没有现成的数据迁移工具,能直接实现我们的需求,调研如下:
- 社区版的 NebulaGraph BR:限制是版本为 3.x 才能用;只能恢复到原集群,不可跨集群等等。遂放弃。
- NebulaGraph Importer:导入数据使用。这个之前没怎么用过,应该是使用官方提供的工具执行相应的命令将数据发送到对应的集群。考虑到失败无法重试的缘故,遂放弃。
- NebulaGraph Exchange:这个主要考虑到配置文件的繁琐性,且我们集群上的 Spark 客户端一般不暴露出来直接使用,都是通过 dolphinscheduler 使用的。遂放弃。
- NebulaGraph Spark Connector:连接器使用,通过官网的 demo 发现这个可以将 NebulaGraph 数据写成 DF 格式,还可以将 DF 格式数据写入 NebulaGraph;且通过 dolphinscheduler 好控制。因此,我们决定使用 NebulaGraph Spark Connector 进行数据迁移。
四、具体步骤
(一)NebulaGraph to HDFS
运行 NebulaGraph to HDFS 对应的工作流,将 NebulaGraph 数据备份到 HDFS。
edge 类型导入 HDFS 为例:
package nebula
import com.vesoft.nebula.connector.connector.{NebulaDataFrameReader, NebulaDataFrameWriter}
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.sql.SparkSession
object Edge2HDFS {
def main(args: Array[String]): Unit = {
try {
if (args.length < 4) {
println("Usage: <space> <edge> <graph_host> <hdfs path>")
System.exit(-1)
}
val space = args(0)
val edge = args(1)
val graph_host = args(2)
val hdfsPath = args(3)
val spark:SparkSession =SparkSession.builder()
.appName(space + "_" + edge)
.getOrCreate()
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(graph_host)
.withConenctionRetry(2)
.withExecuteRetry(2)
.withTimeout(36000000)
.build()
val edge_config = ReadNebulaConfig
.builder()
.withSpace(space)
.withLabel(edge)
.withNoColumn(false)
.withPartitionNum(400)
.build()
val edges = spark.read.nebula(config, edge_config).loadEdgesToDF()
edges.repartition(400).write.
format("json").
mode("overwrite").save(hdfsPath)
} catch {
case e: Exception => {
// 在这里捕获异常,并进行相应的处理
// 例如,记录异常信息、进行异常恢复操作等
e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理
}
}
}
}
(二)HDFS to NebulaGraph
运行 HDFS to NebulaGraph, 对应的工作流把备份在 HDFS 上的数据加载到 3.8.0 的 NebulaGraph.
edge 类型导入 NebulaGraph 为例:
package com.nebula
import com.vesoft.nebula.connector.connector.NebulaDataFrameWriter
import com.vesoft.nebula.connector.{NebulaConnectionConfig, WriteNebulaEdgeConfig}
import org.apache.spark.sql.SparkSession
object Edge2nebula {
def main(args: Array[String]): Unit = {
try {
if (args.length < 5) {
println("Usage: <space> <edge> <graph_host> <hdfsPath> <meta_host>")
System.exit(-1)
}
val space = args(0)
val edge = args(1)
val graph_host = args(2)
val hdfsPath = args(3)
val meta_host = args(4)
val _SPARK_SESSION: SparkSession = SparkSession.builder()
.appName(edge)
.getOrCreate()
val config = NebulaConnectionConfig
.builder()
.withMetaAddress(meta_host)
.withGraphAddress(graph_host)
.withConenctionRetry(2)
.withTimeout(36000000)
.build()
// HDFS 目录路径
val edgeHdfsPath = s"hdfs://HACluster" + hdfsPath + "/*.json"
var edgeDF = _SPARK_SESSION.read
.format("json")
.load(edgeHdfsPath)
edgeDF = edgeDF.withColumnRenamed("_srcId", "_vertexId")
edgeDF = edgeDF.withColumnRenamed("_dstId", "dstId")
edgeDF = edgeDF.withColumnRenamed("_rank", "rank")
edgeDF.show(10)
val edgeCount = edgeDF.count()
// 打印记录总数
println(s"Total number of rows: $edgeCount")
val nebulaWriteContainConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace(space)
.withEdge(edge)
.withSrcIdField("_vertexId")
.withSrcPolicy(null)
.withDstIdField("dstId")
.withDstPolicy(null)
.withRankField("rank")
.withUser("root")
.withPasswd("nebula")
.withBatch(50)
.build()
edgeDF.write.nebula(config, nebulaWriteContainConfig).writeEdges()
} catch {
case e: Exception => {
// 在这里捕获异常,并进行相应的处理
// 例如,记录异常信息、进行异常恢复操作等
e.printStackTrace() // 这里只是简单地打印异常信息,你可以根据需要进行适当的处理
}
}
}
}
将代码写好编译成 jar 包上传到 dolphinscheduler 并通过 dolphinscheduler 创建对应的工作流就可以用了。
贴一下 Spark 参数:
(三)数据量与准确性检验
数据量检验可通过执行 SHOW STATS
命令,对比新旧集群的统计信息,确保数据完整无缺失。准确性验证则通过在新旧 NebulaGraph 集群上执行相同的 nGQL 查询语句,对比返回结果是否一致。通过这两种方式,全面验证数据迁移的完整性和准确性,确保业务在新集群上可正常运行。
五、遇到的问题
1.读写时可能会遇到
com.vesoft.nebula.client.graph.exception.IOErrorException: java.net.SocketTimeoutException: Read timed out
这是我们设置的超时时间太短了。我们把withTimeout(36000000)设置得大一些就好了。
2.导入数据时可能会遇到
RaftPart buffer is full
这是我们并发设置得太大了,调整 Spark 参数,降低 Executor 数量,batch 数量等,减小并发。
3.如果遇到 NebulaGraph 中创建 tag 和 edge 使用的属性类型和数据类型不一致的脏数据的话,就无法正常读取数据到 HDFS.
这时候需要下载官网的 nebula-spark-connector 源码调整相应的部分,编译打包上传到自己的 maven 库引用,然后将脏数据找到,进行手动处理,再重新跑一遍 NebulaGraph to HDFS 对应的工作流。
六、总结
通过以上步骤,可使用 NebulaGraph Spark Connector 顺利实现复杂业务隔离需求下的 NebulaGraph 集群迁移与版本升级。NebulaGraph Spark Connector 是一个强大的生态工具,能够将 NebulaGraph 中的数据读取为 DF 格式,并支持将 DF 数据写入 NebulaGraph, 具有良好的扩展性和易用性,非常适合大规模数据迁移任务。希望本文能为其他有类似需求的社区用户提供参考与帮助。
可参考官方文档,进一步了解 NebulaGraph Spark Connector:
https://docs.nebula-graph.com.cn/3.6.0/connector/nebula-spark-connector/