周边工具
高性能图计算系统 Plato 在 NebulaGraph 中的实践
1.图计算介绍
1.1 图数据库 vs 图计算
图数据库是面向 OLTP 场景,强调增删改查,并且一个查询往往只涉及到全图中的少量数据,而图计算是面向 OLAP 场景,往往是针对全图数据进行分析计算。
1.2 图计算系统分布架构
按照分布架构,图计算系统分为单机和分布式。
单机图计算系统优势在于模型简单,无需考虑分布式通讯,也无需进行图切分,但受制于单机系统资源,无法进行更大规模的图数据分析。
分布式图计算平台将图数据划分到多个机器上,从而处理更大规模的图数据,但不可避免的引入了分布式通讯的开销问题。
1.3 图的划分
图划分主要有两种方式边切割(Edge- Cut)和点切割(Vertex-Cut)。
边分割:每个点的数据只会存储在一台机器上,但有的边会被打断分到多台机器上。 如图(a)所示,点 A 的数据只存放在机器 1 上,点B 的数据只存放在机器 2 上。对于边 AB 而言,会存储在机器 1 和机器 2 上。由于点 A 和点 B 分布在不同的机器上,在迭代计算过程中,会带来通讯上的开销。
点分割:每条边只会存储在一台机器上,但有的点有可能分割,分配在多台机器上。 如图(b)所示, 边 AB 存储在机器 1 上,边 BC 存储在机器 2 上,边 CD 存储在机器 3 上,而点 B 被分配到了 1, 2 两台机器上,点 C 被分配到了 2,3 两台机器上。由于点被存储在多台机器上,维护顶点数据的一致性同样也会带来通讯上的开销。
1.4 计算模型
编程模型是针对图计算应用开发者,可分为以节点为中心的编程模型、以边或路径为中心的编程模型、以子图为中心的编程模型。
计算模型是图计算系统开发者面临的问题,主要有同步执行模型和异步执行模型。比较常见的有 BSP 模型(Bulk Synchronous Parallel Computing Model)和 GAS 模型。
BSP 模型:BSP 模型的计算过程是由一系列的迭代步组成,每个迭代步被称为超步。采用 BSP 模型的系统主要有 Pregel、Hama、Giraph 等。 BSP 模型具有水平和垂直两个方面的结构。垂直上看,BSP 模型有一系列串行的超步组成。水平上看(如图所示),一个超步又分三个阶段:
- 本地计算阶段,每个处理器只对存储本地内存中的数据进行计算。
- 全局通信阶段,机器节点之间相互交换数据。
- 栅栏同步阶段,等待所有通信行为的结束。
GAS 模型:GAS 模型是在 PowerGraph 系统提出,分为信息收集阶段(Gather)、应用阶段(Apply)和分发阶段(Scatter)。
- Gather 阶段,负责从邻居顶点收集信息。
- Apply 阶段,负责将收集的信息在本地处理,更新到顶点上。
- Scatter 阶段,负责发送新的信息给邻居顶点。
2. Gemini 图计算系统介绍
Gemini 在工业界较有影响力,它的主要技术点包括:CSR/CSC、push/pull、master 和 mirror、稀疏和稠密图、通信与计算协同工作、chunk-based 式分区、NUMA 感知的子分区等。
Gemini 采用边切割方式将图数据按照 chunk-based 的方式分区,并支持 Numa 结构。分区后的数据,用 CSR 存储出边信息,用 CSC 存储入边信息。在迭代计算过程中,对稀疏图采用 push 的方式更新其出边邻居,对稠密图采用 pull 的方式拉取入边邻居的信息。
如果一条边被切割,边的一端顶点为 master,另一端顶点则为 mirror。mirror 被称为占位符(placeholder) ,在 pull 的计算过程中,各个机器上的 mirror 顶点会拉取其入边邻居 master 顶点的信息进行一次计算,在 BSP 的计算模型下通过网络同步给其 master 顶点。在 push 的计算过程中,各个机器的 master 顶点会将其信息先同步给它的 mirror 顶点,再由 mirror 更新其出边邻居。
在 BSP 的通信阶段,每台机器 Node_i
发送给它的下一个机器 Node_i+1
,最后一个机器会发送给第一个机器。在每台机器发送的同时也会收到 Node_i-1
的信息,收到信息后会立即执行本地计算。通讯和计算的重叠可以隐藏通信时间,提升整体的效率。
更多细节可以参考论文《Gemini: A Computation-Centric Distributed Graph Processing System》。
3. Plato 图计算系统与 NebulaGraph 的集成
3.1 Plato 图计算系统介绍
Plato 是腾讯开源的基于 Gemni 论文实现的工业级图计算系统。Plato 可运行在通用的 x86 集群,如 Kubernetes 集群、Yarn 集群等。在文件系统层面,Plato 提供了多种接口支持主流的文件系统,如 HDFS、Ceph 等等。
3.2 与 NebulaGraph 的集成
我们基于 Plato 做了二次开发,以接入 NebulaGraph 数据源。
3.2.1 NebulaGraph 作为输入和输出数据源
增加 Plato 的数据源,支持将 NebulaGraph 作为输入和输出数据源,直接从 NebulaGraph 中读取数据进行图计算,并将计算结果直接写回到 NebulaGraph 中。
NebulaGraph 的存储层提供了针对 partition 的 scan 接口,很容易通过该接口批量扫出顶点和边数据:
ScanEdgeIter scanEdgeWithPart(std::string spaceName,
int32_t partID,
std::string edgeName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);
ScanVertexIter scanVertexWithPart(std::string spaceName,
int32_t partId,
std::string tagName,
std::vector<std::string> propNames,
int64_t limit = DEFAULT_LIMIT,
int64_t startTime = DEFAULT_START_TIME,
int64_t endTime = DEFAULT_END_TIME,
std::string filter = "",
bool onlyLatestVersion = false,
bool enableReadFromFollower = true);
实践中,我们首先获取指定 space 下的 partition 分布情况,并将每个 partition 的 scan 任务分别分配给 Plato 集群的各个节点上,每个节点再进一步将 partition 的 scan 任务分配给运行在该节点的各个线程上,以达到并行快速的读取数据。图计算完成之后,将计算结果通过 Nebula client 并行写入 NebulaGraph。
3.2.2 分布式 ID 编码器
Gemini 和 Plato 的要求顶点 ID 从 0 开始连续递增,但绝大多数的真实数据顶点 ID 并不满足这个需求,尤其是 NebulaGraph 从 2.0 版本开始支持 string 类型 ID。
因此,在计算之前,我们需要将原始的 ID 从 int 或 string 类型转换为从 0 开始连续递增的 int。Plato 内部实现了一个单机版的 ID 编码器,即 Plato 集群的每台机器均冗余存储所有 ID 的映射关系。当点的数量比较多时,每台机器仅 ID 映射表的存储就需上百 GB 的内存,因为我们需要实现分布式的 ID 映射器,将 ID 映射关系切成多份,分开存储。
我们通过哈希将原始 ID 打散在不同的机器,并行地分配全局从 0 开始连续递增的 ID。生成 ID 映射关系后,每台机器都会存有 ID 映射表的一部分。随后再将边数据分别按起点和终点哈希,发送到对应的机器进行编码,最终得到的数据即为可用于计算的数据。当计算运行结束后,需要数据需要映射回业务 ID,其过程和上述也是类似的。
3.2.3 补充算法
我们在 Plato 的基础上增加了 sssp、apsp、jaccard similarity、三角计数等算法,并为每个算法增加了输入和输出到 NebulaGraph 数据源的支持。目前支持的算法有:
文件名 | 算法名称 | 分类 |
---|---|---|
apsp.cc | 全对最短路径 | 路径 |
sssp.cc | 单源最短路径 | 路径 |
tree_stat.cc | 树深度/宽度 | 图特征 |
nstepdegrees.cc | n阶度 | 图特征 |
hyperanf.cc | 图平均距离估算 | 图特征 |
triangle_count.cc | 三角计数 | 图特征 |
kcore.cc | 节点中心性 | |
pagerank.cc | Pagerank | 节点中心性 |
bnc.cc | Betweenness | 节点中心性 |
cnc.cc | 接近中心性(Closeness Centrality) | 节点中心性 |
cgm.cc | 连通分量计算 | 社区发现 |
lpa.cc | 标签传播 | 社区发现 |
hanp.cc | HANP | 社区发现 |
metapath_randomwalk.cc | 图表示学习 | |
node2vec_randomwalk.cc | 图表示学习 | |
fast_unfolding.cc | louvain | 聚类 |
infomap_simple.cc | 聚类 | |
jaccard_similarity.cc | 相似度 | |
mutual.cc | 其他 | |
torch.cc | 其他 | |
bfs.cc | 广度优先遍历 | 其他 |
4. Plato 部署安装与运行
4.1 集群部署
Plato 采用 MPI 进行进程间通信,在集群上部署 Plato 时,需要将 Plato 安装在相同的目录下,或者使用 NFS。操作方法见:https://mpitutorial.com/tutorials/running-an-mpi-cluster-within-a-lan/
4.2 运行算法的脚本及配置文件
scripts/run_pagerank_local.sh
#!/bin/bash
PROJECT="$(cd "$(dirname "$0")" && pwd)/.."
MAIN="./bazel-bin/example/pagerank" # process name
WNUM=3
WCORES=8
#INPUT=${INPUT:="$PROJECT/data/graph/v100_e2150_ua_c3.csv"}
INPUT=${INPUT:="nebula:${PROJECT}/scripts/nebula.conf"}
#OUTPUT=${OUTPUT:='hdfs://192.168.8.149:9000/_test/output'}
OUTPUT=${OUTPUT:="nebula:$PROJECT/scripts/nebula.conf"}
IS_DIRECTED=${IS_DIRECTED:=true} # let plato auto add reversed edge or not
NEED_ENCODE=${NEED_ENCODE:=true}
VTYPE=${VTYPE:=uint32}
ALPHA=-1
PART_BY_IN=false
EPS=${EPS:=0.0001}
DAMPING=${DAMPING:=0.8}
ITERATIONS=${ITERATIONS:=5}
export MPIRUN_CMD=${MPIRUN_CMD:="${PROJECT}/3rd/mpich-3.2.1/bin/mpiexec.hydra"}
PARAMS+=" --threads ${WCORES}"
PARAMS+=" --input ${INPUT} --output ${OUTPUT} --is_directed=${IS_DIRECTED} --need_encode=${NEED_ENCODE} --vtype=${VTYPE}"
PARAMS+=" --iterations ${ITERATIONS} --eps ${EPS} --damping ${DAMPING}"
# env for JAVA && HADOOP
export LD_LIBRARY_PATH=${JAVA_HOME}/jre/lib/amd64/server:${LD_LIBRARY_PATH}
# env for hadoop
export CLASSPATH=${HADOOP_HOME}/etc/hadoop:`find ${HADOOP_HOME}/share/hadoop/ | awk '{path=path":"$0}END{print path}'`
export LD_LIBRARY_PATH="${HADOOP_HOME}/lib/native":${LD_LIBRARY_PATH}
chmod 777 ./${MAIN}
${MPIRUN_CMD} -n ${WNUM} -f ${PROJECT}/scripts/cluster ./${MAIN} ${PARAMS}
exit $?
参数说明
INPUT
参数和OUPUT
参数分别指定算法的输入数据源和输出数据源,目前支持本地 csv 文件、HDFS文件、 NebulaGraph。当输入输出数据源为 NebulaGraph 时,INPUT
和OUPUT
形式为nebula:/path/to/nebula.conf
- WNUM 为集群所有机器所运行的进程数之和,推荐每台机器运行为 1 或者 NUMA node 数个进程,WCORE 为每个进程的线程数,推荐最大设置为机器的硬件线程数。
scripts/nebula.conf
## read/write
--retry=3 # 连接 NebulaGraph 时的重试次数
--space=sf30 # 要读取或写入的 space 名称
## read from nebula
--meta_server_addrs=192.168.8.94:9559 # NebulaGraph 的 metad 服务地址
--edge=LIKES # 要读取的边的名称
#--edge_data_field # 要读取的作为边的权重属性的名称
--read_batch_size=10000 # 每次 scan 时的 batch 的大小
## write to nebula
--graph_server_addrs=192.168.8.94:9669 # NebulaGraph 的 graphd 服务地址
--user=root # graphd 服务的登陆用户名
--password=nebula # graphd 服务的登陆密码
# insert or update
--mode=insert # 写回 NebulaGraph 时采用的模式: insert/update
--tag=pagerank # 写回到 NebulaGraph 的 tag 名称
--prop=pr # 写回到 NebulaGraph 的 tag 对应的属性名称
--type=double # 写回到 NebulaGraph 的 tag 对应的属性的类型
--write_batch_size=1000 # 写回时的 batch 大小
--err_file=/home/plato/err.txt # 写回失败的数据所存储的文件
scripts/cluster
cluster 文件指定要运行该算法所在的集群机器的 IP
192.168.15.3
192.168.15.5
192.168.15.6
以上为 Plato 在 NebulaGraph 中的应用,目前该功能集成在 NebulaGraph 企业版中,如果你使用的是开源版本的 NebulaGraph,需按照自己的需求自己对接 Plato。
交流图数据库技术?加入 Nebula 交流群请先填写下你的 Nebula 名片,Nebula 小助手会拉你进群~~