logo
企业版

技术分享

使用 nebula-spark-connector 3.6 版本搭建备集群实践

nebula-br local-store

看过我之前文章的读者可能知道我之前用 nebula-br 搭建过主备集群,具体实践过程不做赘述,有兴趣的小伙伴可以看下这个链接:nebula-br local-store 模式搭建主备集群实践 🔗

那么问题来了,为什么在已有 nebula-br 搭建集群的情况下,我又造了个“新轮子”,让 spark-connector 走上 nebula-br 的老路呢?在这里我来讲讲二者的优劣势:

!注意 文章内容仅代表个人观点,仅供参考。

主备集群搭建的选择

先从目前个人已知的可搭建备集群的开箱即用工具讲起,NebulaGraph 社区的相关搭建工具有:

  • nebula-br
  • nebula-spark-connector
  • nebula-flink-connector

由于 nebula-flink-connector 和 nebula-spark-connector 实践原理较为相似,本文就不单独地讲述 nebula-flink-connector 如何搭建主备集群。

搭建备集群实践

nebula-br 主备集群是通过本地生成 snapshot 备份,然后传输到备集群进行恢复的方式。

  • 优点:

    • 搭建速度快;
  • 缺点:

    • 生成备份文件的时候会导致 Nebula 不可写,影响在线业务;
    • 环境搭建麻烦,需要在主备集群上运行 Agent 和 BR,此外恢复时需要读取备份文件,这些文件要么通过 S3 或者 NTFS 共享,要么就手动拷贝;
    • 从源集群备份还原到目标集群,目标集群将会被覆盖(也就是跟源集群保持一致,可以理解为目标集群将会被清空,然后还原源集群数据到目标集群)。

spark-connector 搭建集群的优劣势

nebula-spark-connector 主备集群是通过 nebula-client 去 scan nebula 的数据,然后生成 INSERT 语句在备集群进行重放。

  • 优点:

    • 开箱即用,针对小数据集群(低于 1 亿)基本上不需要任何开发直接使用;大数据集群(大于 1 亿)可能需要额外流量控制逻辑;
    • 同步时对源集群的读写影响较小(前提是需要控制导出并发控制);
    • 同步粒度更精细,可以指定同步 Space、Tag、Edge;
  • 缺点:

    • 导出导入的方式同步数据比较慢。就速度这点和官方交流了下,官方表示之前在某家公司业务上实测过速度,nebula-spark-connector 做同步是要比 nebula-br 快 3-5 倍。可能环境和参数有差异,如果读者你选择工具的时候,可以先测试下相关工具的性能数据。
    • 需要 Spark 环境支持;
    • 无法中断重启时接着上次 cursor 继续 scan;

这里稍微说下 nebula-flink-connector,个人感觉它跟 nebula-spark-connector 最大的区别在于 Spark 需要读完整个分区才会做 Write 操作,nebula-flink-connector 可以做到边读边写,理论上效率会高一点。

nebula-spark-connector 原理

nebula-spark-connector

本图有任何理解错误的地方,麻烦在留言帮忙指出,感谢。

搭建备集群实践

nebula-spark-connector 的 GitHub 仓库地址:https://github.com/vesoft-inc/nebula-spark-connector,本文采用 nebula-spark-connector 的 release-3.6 分支,并以 Spark 3.x 为例。

首先,准备相关的运行环境:

`$ git clone -b release-3.6 https://github.com/vesoft-inc/nebula-spark-connector.git
$ cd nebula-spark-connector
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true -pl nebula-spark-connector_3.0,example -am -Pscala-2.12 -Pspark-3.0`

NebulaGraph 的导入导出可以直接使用 example 中的 Nebula2Nebula.scala 完成,通过上面 maven 打包命令后在 example 的 target 目录可得到 example-3.0-SNAPSHOT-jar-with-dependencies.jar 文件,提交 Spark 任务命令:

`spark-submit --master local \
 --conf spark.driver.extraClassPath=./ \
 --conf spark.executor.extraClassPath=./  \
 --class com.vesoft.nebula.examples.connector.Nebula2Nebula \
 example-3.0-SNAPSHOT-jar-with-dependencies.jar \
 -sourceMeta "xxxxx:9559" -targetMeta "yyyyy:9559" -targetGraph "yyyy:9669" -sourceSpace "source" -targetSpace "target" -includeTag "user" -excludeTags "aa,bb" -excludeEdges "cc,dd"  -limit 2  -batch 2 -p 8 -timeout 50000 -u root -passwd nebula`

参数解释:

  • sourceMeta:源 nebula 服务 metad 地址(从哪里导出)
  • targetMeta:目标 nebula 服务 metad 地址
  • targetGraph:目标 nebula 服务 graphd 地址,多个地址以英文逗号隔开(导出数据重新组装 INSERT 语句将通过此地址进行提交)
  • sourceSpace:源 nebula 的 Space,也就是要导出的 Space 名称
  • targetSpace:目标 nebula 的 Space,也就是要导入的 Space 名称
  • includeTag:指定导出的 Tag,不指定就全部 Tag 导出
  • excludeTags:排除哪些 Tag 不进行导出同步
  • excludeEdges:排除哪些 Edge 不进行导出同步
  • limit:从源 nebula 导出时每次 scan 多少数据,值越大单次 scan 的数据量越大,对源 nebula 服务器的压力越大,在没有达到磁盘和网络 IO 的瓶颈前,理论上值越大导出更快
  • batch:导出的数据多少一批打包成一条 INSERT 语句发送到目标 nebula 服务的 graphd 机器上进行执行
  • p:也叫 writeParallel, 导出的数据拆分成多少个任务并行写,值越大速度越快,但是目标 nebula 服务器压力越大
  • timeout:每次 scan 源数据的超时时间

优化改造

这里讲下为什么要做这些优化改造:

1.经我们线上验证,Nebula2Nebula 示例是一次所有 Partition 导出后才做导入操作,对于数据量非常大的情况下并不友好、效率太低,此外对 Spark 来说会占用太多的内存可能会导致内存不足;不过,官方表示他们不是所有的 Partition 导出之后才进行导入,而是边读边写,且消耗内存很小。因为我这边已经完成了相关的开发工作,边读边写的操作就留给读者你来实践了。

2.无法支持源 NebulaGraph storaged 地址为 127.0.0.1 的场景(一般属于单机场景),因为 Spark 任务导出的 storaged 地址是通过 metad 元数据获取,就会导致导出任务会请求 127.0.0.1 的 storaged;

3.缺少导出和导入统计,不知道当前已经导出多少数据量; 下面开始进入改造(解决上述问题)部分:

导出优化

本次的优化改造的导出优化:将 Partition 进行拆分,改成多线程的方式,每个线程处理一个 Partition。

Nebula2Nebula.scala 改造:增加 readParallel 参数,用于指定多少个线程可同时读取 nebula,控制读取速率,不然可能会对线上服务器照成影响:

`val readParallelOption = new Option("rp", "readParallelNum", true, "parallel for read data")
options.addOption(readParallelOption)

val readParallelNum: Int =
      if (cli.hasOption("readParallelNum")) cli.getOptionValue("readParallelNum").toInt else partitions
for (partitionId <- 1 to readPartition) {
      val task = new Runnable {
        def run(): Unit = {
          syncTagPartitionData(spark,
            sourceConfig,
            sourceSpace,
            limit,
            readPartition,
            targetConfig,
            targetSpace,
            batch,
            tag,
            writeParallel,
            user,
            passwd,
            overwrite,
            partitionBatch,
            eachScanWaitTime,
            partitionId
          )
        }
      }
      threadPool.execute(task);
    }`

NebulaOptions.scala 改造:增加参数可指定 Partition 读取,代码示例:

`var readPartitionId: Int = parameters.getOrElse(READ_PARTITION_ID, 0).toString.toInt`

SimpleScanBuilder.scala 改造:

`override def planInputPartitions(): Array[InputPartition] = {
 //读取指定的分区
  Array(NebulaPartitionBatch(Array(nebulaOptions.readPartitionId)))
}
case class NebulaPartitionBatch(partitions: Array[Int]) extends InputPartition`

NebulaPartitionReaderFactory.scala 改造:读取指定分区数据,代码示例:

`class NebulaPartitionReaderFactory(private val nebulaOptions: NebulaOptions,
                                   private val schema: StructType)
    extends PartitionReaderFactory {

  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  override def createReader(inputPartition: InputPartition): PartitionReader[InternalRow] = {
    val partitions = inputPartition.asInstanceOf[NebulaPartitionBatch].partitions
    if (DataTypeEnum.VERTEX.toString.equals(nebulaOptions.dataType)) {
      LOG.info(s"Create NebulaVertexPartitionReader partitions:${partitions}")
      new NebulaVertexPartitionReader(partitions, nebulaOptions, schema)
    } else {
      LOG.info(s"Create NebulaEdgePartitionReader partitions:${partitions}")
      new NebulaEdgePartitionReader(partitions, nebulaOptions, schema)
    }
  }
}`

支持本地地址

下面开始支持导出 storaged 的地址为 127.0.0.1,示例代码参见:

Nebula2Nebula 改造:支持配置源 nebula 的 storaged 连接地址,示例代码见下:

`val sourceStorageOption =
      new Option("sourceStorage", "sourceStorageAddress", true, "source nebulagraph storage address")
options.addOption(sourceStorageOption)
NebulaReader.scala 改造:替换原来的 StorageClient,示例代码见下:
this.storageClient = new SpecialStorageClient(nebulaOptions, address.asJava, nebulaOptions.timeout)`

增加 SpecialStorageClient.scala: 用于获取 storaged 地址的时候返回指定的 storageAddress,示例代码见下:

`class SpecialStorageClient(nebulaOptions: NebulaOptions, addresses: util.List[HostAddress], timeout: Int = 10000, connectionRetry: Int = 3,
                           executionRetry: Int = 3, enableSSL: Boolean = false, sslParam: SSLParam = null)
  extends StorageClient(addresses, timeout, connectionRetry, executionRetry, enableSSL, sslParam) {

  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  override def connect(): Boolean = {
    val success = super.connect()
    if (!success) {
      return success
    }
    val storageAddress = nebulaOptions.getStorageAddress
    if (storageAddress.isEmpty) {
      return success
    }
    setStorageAddress(storageAddress)
    LOG.info(s"Set special storage address:${storageAddress.asJava} | value:${nebulaOptions.storageAddress}")
    success
  }

  private def setStorageAddress(storageAddress: ListBuffer[HostAddr]) = {
    val clazz = this.getClass
    var field = classOf[StorageClient].getDeclaredField("metaManager")
    field.setAccessible(true)
    var metaManager = new SpecialStorageMetaManager(addresses, timeout, connectionRetry, executionRetry, enableSSL, sslParam)
    metaManager.setStorageAddress(storageAddress)
    field.set(this, metaManager)
  }
}

class SpecialStorageMetaManager(address: util.List[HostAddress], timeout: Int, connectionRetry: Int,
                                executionRetry: Int, enableSSL: Boolean, sslParam: SSLParam) extends
  MetaManager(address, timeout, connectionRetry, executionRetry, enableSSL, sslParam) {

  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)

  var storageAddress: ListBuffer[HostAddr] = null;

  override def getPartsAlloc(spaceName: String): util.Map[Integer, util.List[HostAddr]] = {
    val partMap: util.Map[Integer, util.List[HostAddr]] = getPartsAlloc(spaceName)
    if (storageAddress != null && !storageAddress.isEmpty) {
      partMap.keySet().forEach(partId => {
        LOG.error(s"Storage partId:${partId} address replace fix value:${storageAddress.asJava}")
        partMap.put(partId, storageAddress.asJava)
      })
    }
    partMap
  }

  override def listHosts(): util.Set[HostAddr] = {
    if (storageAddress != null && !storageAddress.isEmpty) {
      LOG.error(s"[listHosts] Storage address replace fix value:${storageAddress.asJava}")
      return storageAddress.toSet.asJava
    }
    super.listHosts()
  }


  override def getLeader(spaceName: String, part: Int): HostAddr = {
    if (storageAddress != null && !storageAddress.isEmpty) {
      LOG.error(s"[getLeader] Storage address replace fix value:${storageAddress.asJava}")
      return storageAddress.last
    }
    super.getLeader(spaceName, part)
  }

  def setStorageAddress(storageAddress: ListBuffer[HostAddr]): Unit = {
    this.storageAddress = storageAddress
  }
}`

支持导入打印统计

新增导出导入统计打印,方便知道当前进度。

增加 ReadWriteStats.scala:用于统计,示例代码见下:

`object ReadWriteStats {

  private var readCounter = new ConcurrentHashMap[String, Int]();
  private var writeCounter = new ConcurrentHashMap[String, Int]();

  def incrementReadCount(key: String, num: Int): Int = {
    readCounter.compute(key, (_, count) => count + num)
  }

  def getReadCount(key: String): Int = {
    readCounter.getOrDefault(key, 0)
  }

  def incrementWriteCount(key: String, num: Int): Int = {
    writeCounter.compute(key, (_, count) => count + num)
  }

  def getWriteCount(key: String): Int = {
    writeCounter.getOrDefault(key, 0)
  }

}`

NebulaReader.scala 改造:增加 scan 读取统计数据输出,示例代码见下:

`protected def getRow(): InternalRow = {
    ......
    val totalCount = ReadWriteStats.incrementReadCount(nebulaOptions.label, 1)
    if (totalCount > 10000 && totalCount % 10000 == 0) {
      LOG.info(s"Reader vertex:${nebulaOptions.label} , totalCount:${totalCount}")
    }
    mutableRow
  }

protected def hasNextVertexRow: Boolean = {
    .....
    if (!hashNext) {
      LOG.info(s"Finish read vertex:${nebulaOptions.label} , readTotalCount:${ReadWriteStats.getReadCount(nebulaOptions.label)}")
    }
    hashNext
  }  

protected def hasNextEdgeRow: Boolean = {
    ....
    if (!hashNext) {
      LOG.info(s"Finish read edge:${nebulaOptions.label} , readTotalCount:${ReadWriteStats.getReadCount(nebulaOptions.label)}")
    }
    hashNext
  }`

NebulaVertexWriter.scala 改造:增加写入统计数据输出,示例代码见下:

`def execute(): Unit = {
    val nebulaVertices = NebulaVertices(propNames, vertices.toList, policy)
    val exec = nebulaOptions.writeMode match {
      case WriteMode.INSERT =>
        val vertexCount = nebulaVertices.values.length
        val totalCount = ReadWriteStats.incrementWriteCount(nebulaOptions.label,vertexCount)
        if (totalCount > nebulaOptions.printRWRowCount && totalCount % nebulaOptions.printRWRowCount == 0) {
          LOG.info(s"Batch insert vertex:${nebulaOptions.label} , count: ${vertexCount} , totalCount:${totalCount}")
        }
        ...
    }
}  

override def commit(): WriterCommitMessage = {
    if (vertices.nonEmpty) {
      execute()
    }
    LOG.error(s"Write vertex:${nebulaOptions.label} task finished. Haven write totalCount:${ReadWriteStats.getWriteCount(nebulaOptions.label)}")
    .....
}`

改造完毕之后重新使用 maven 命令打包,再使用 Spark 命令提交任务即可。

这里打包了相关的代码:请点击阅读原文查看,如果你有需要可以拷贝到你的运行环境,自行替换。

以上,感谢你的阅读。

如果本文有任何技术错误,麻烦评论指出,谢谢你的反馈。

关于 NebulaGraph

NebulaGraph 是一款开源的分布式图数据库,自 2019 年开源以来,先后被美团、京东、360 数科、快手、众安金融等多家企业采用,应用在智能推荐、金融风控、数据治理、知识图谱等等应用场景。GitHub 地址:https://github.com/vesoft-inc/nebula