logo
咨询企业版

技术分享

PySpark + GraphX 尝鲜图算法

NebulaGraph Spark Connector

最近业务需要,需要对图进行一些计算,由于对 scala 不熟悉,对官方 nebula-algorithm 的 Algo 用 Python 包装之后想要自定义一些遍历需求发现无从下手,于是翻阅了一些资料,用 Python 实现这些。本文便是我的实践记录,希望对你有所启发。

环境准备 / Part.01

NebulaGraph Spark Connector 是一个 Spark 连接器,提供通过 Spark 标准形式读写 NebulaGraph 数据的能力。NebulaGraph Spark Connector 由 Reader 和 Writer 两部分组成。

  • Reader:提供一个 Spark SQL 接口,用户可以使用该接口编程读取 NebulaGraph 图数据,单次读取一个点或 Edge type 的数据,并将读取的结果组装成 Spark 的 DataFrame。

  • Writer:提供一个 Spark SQL 接口,用户可以使用该接口编程将 DataFrame 格式的数据逐条或批量写入 NebulaGraph。

详情参见此处: https://docs.nebula-graph.com.cn/3.4.1/nebula-spark-connector/

GraphFrames 是 ApacheSpark 的一个包,它提供了基于 DataFrame 的 Graphs。它在 Scala、 Java 和 Python 中提供高级 API。它旨在提供 GraphX 的功能和利用 Spark DataFrames 的扩展功能。此扩展功能包括主题查找、基于 DataFrame 的序列化和高度表达式的图形查询。

  • GraphFrames GitHub:https://github.com/graphframes/graphframes

  • GraphFrames 如何提交以及版本对应参考此处:https://spark-packages.org/package/graphframes/graphframes

PySpark 是 Spark 在外层封装了 Python 接口,主要是借助 Py4J 实现 Python 和 Java 的交互。这样 Python 使用者就不用多学一门 Java,轻松使用 Python 进行大数据开发。

安装方法 pip install pyspark==,PySpark 在 Spark 2.x 和 3.x 之间并不兼容,所以需要根据你的 Spark 版本安装。

如何提交 / Part.02

主要参数如下:

`spark2-submit \

--master yarn \

--num-executors 20 \

--executor-cores 5 \

--executor-memory 20g \

--driver-cores 1 \

--driver-memory 5g \

--archives /bussiness_jars/bigdata/spark-jar/path/graphx-pyspark.zip#python \

--conf spark.executorEnv.ELKLOG_LEVEL=INFO \

--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python/graphx/bin/python \

--conf spark.jars.packages=graphframes:graphframes:0.8.2-spark2.4-s_2.11 \

--conf spark.executorEnv.ELKLOG_BUSINESS_NAME=nebula340_graphx_degree_graphx_degree_1hop \

/bussiness_jars/bigdata/spark-jar/path/nebula340/nebula340_algo_degree_centrality_v2.py main 1 Relation \`

代码展示 / Part.03

第一步,读取 nebula 数据构建 GraphFrame。如果需要读取多种 Tag 和 Edge 则需要遍历多次读取之后 union 到一起。

def readNebula(self) -> GraphFrame:
    if self.space in ["Relation", "Shareholding"]:
        vertexs = ["Company", "Person"]
        edges = ["Invest", "Employ", "Legal"]
    else:
        vertexs = ["Company", "GroupTag"]
        edges = ["HasCompany", "Controlled"]

    vertex = self.spark.createDataFrame([("", "", "")], ["id", "keyno", "name"])
    edge = self.spark.createDataFrame([("", "", "", "")], ["src", "dst", "_rank", "role"])

    for v in vertexs:
        df = (
            spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
            .option("type", "vertex")
            .option("spaceName", space)
            .option("label", v)
            .option("returnCols", "keyno,name")
            .option("metaAddress", metaHost[cluster])
            .option("partitionNumber", 1000)
            .option("operateType", "read")
            .load()
        )
        print(f"read nebula {v} type done, {df.show(n=2)}")
        vertex = vertex.union(df)

    print(f"read nebula vertex done, {vertex.show(n=3)}")

    for v in edges:
        df = (
            spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
            .option("type", "edge")
            .option("spaceName", space)
            .option("label", v)
            .option("returnCols", "role")
            .option("metaAddress", metaHost[cluster])
            .option("partitionNumber", 1000)
            .option("operateType", "read")
            .load()
        )
        print(f"read nebula {v} type done, {df.show(n=2)}")
        edge = edge.union(df)

    print(f"read nebula edge done, {edge.show(n=3, truncate=False)}")

    g = GraphFrame(vertex, edge)
    return g

第二步,获取度数:GraphFrames 支持 vertexid 为 string 类型,如果 NebulaGraph 中 Vertex 是 fixed-string 不需要做 map。

`def degree_of_hop(self, graph: GraphFrame):
if self.hop==1:
    print("one-hop-degree======")
    ######################### 1-hop degreee #########################
    indegree = g.inDegrees
    # print(indegree.show(truncate=False))
    outdegree = g.outDegrees
    # print(outdegree.show(truncate=False))
    degree = g.degrees
    # filter specific id degree
    degree.filter("id='4a788c2a83870742bb1a35074efc33f3'").show(truncate=False)
    total = degree.join(indegree, on="id", how="left")

    total = total.join(outdegree, on="id", how="left").fillna(0)
    total.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_centrality")
    sorted_df = total.orderBy("degree", ascending=False)
    print(sorted_df.show(truncate=False))
    ######################### 1-hop degreee #########################`

第三步,获取二跳度数。上一步(step2)中只是获取了某个实体一跳的子图来预估实体的大小,实际业务中需要用两跳子图来评估其大小。

由于没有现成的 2-hop degree 方法,查看了 GraphFrames 方法后,决定用 motif-finding 的方式用 path 的数量来近似表示其数量。

`elif self.hop ==2:
    print("two-hop-one-direction-path-cnt=======")
    data = graph.find("(a)-[e]->(b); (b)-[e2]->(c)")
    data.show(truncate=False)
    # 此处计算路径的条数,而不是点的个数
    two_hop = data.groupBy("a.id").agg(count("*")).union(data.groupBy("c.id")
    .agg(count("*"))).groupBy("id").agg({"count(1)": "sum"})

    two_hop = two_hop.withColumnRenamed("sum(count(1))", "cnt")
    two_hop.show(truncate=False)

    print("two-hop-both-direction-path-cnt=======")
    # c<-a->b->f
    # data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e3]->(f)").filter("b.id='A' and f.id != 'A' ")
    data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e2]->(c)").filter("b.id != c.id").distinct()
    # data.show(truncate=False)
    two_hop_2 = data.groupBy("b.id").agg(count("*"))
    two_hop_2 = two_hop_2.withColumnRenamed("count(1)", "cnt")
    two_hop_2.show(truncate=False)
    # two-hop total
    total_two_hop = two_hop.union(two_hop_2).groupBy("id").agg({"cnt":"sum"})
    total_two_hop = total_two_hop.withColumnRenamed("sum(cnt)", "cnt")
    print("two_hop path union")
    total_two_hop.show(truncate=False)

    total_two_hop.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_two_hop")`

在业务中,如果两跳子图太大 nebula 查询超时,可以通过对子图太大的数据通过图计算预先计算用 T+1 的数据来给业务呈现。

其他基础图算法 / Part.04

需要注意的是此处的 BFS 并不是给定指定的点,向外遍历 maxPathLength,而是查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。

广度优先遍历 BFS

需要注意的是此处的 BFS 并不是给定指定的点,向外遍历 maxPathLength,而是查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。

`from graphframes.examples import Graphs
g = Graphs.friends()  # Get example graph

# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()

# Specify edge filters or max path lengths.
g.bfs("name = 'Esther'", "age < 32",\
  edgeFilter="relationship != 'friend'", maxPathLength=3)`

连通分量 Connected Components

`from graphframes.examples import Graphs
g = Graphs.friends()  # Get example graph

result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()`

最短路径 Shortest Paths

计算从每个顶点到给定 landmarks 顶点集的最短路径,其中 landmarks 由顶点 ID 指定。请注意,这需要考虑边缘方向。

`from graphframes.examples import Graphs
g = Graphs.friends()  # Get example graph

results = g.shortestPaths(landmarks=["a"])
results.select("id", "distances").show()

# OUTPUT:
# +---+-----+---+---------+
# | id| name|age|distances|
# +---+-----+---+---------+
# |  d|David| 29| {a -> 1}|
# |  a|Alice| 34| {a -> 0}|
# +---+-----+---+---------+`

其他更多参考 graphframe-graph-algorithm:https://graphframes.github.io/graphframes/docs/_site/user-guide.html#graph-algorithms

总结 / Part.5

整体上 NebulaGraph 官方提供好用的的 nebula-spark-connector,在和其他图计算框架结合使用时提供了便利,用户可以很方便的根据自己的需求跑相应的图算法或者图遍历。