技术分享
PySpark + GraphX 尝鲜图算法
最近业务需要,需要对图进行一些计算,由于对 scala 不熟悉,对官方 nebula-algorithm 的 Algo 用 Python 包装之后想要自定义一些遍历需求发现无从下手,于是翻阅了一些资料,用 Python 实现这些。本文便是我的实践记录,希望对你有所启发。
环境准备 / Part.01
NebulaGraph Spark Connector 是一个 Spark 连接器,提供通过 Spark 标准形式读写 NebulaGraph 数据的能力。NebulaGraph Spark Connector 由 Reader 和 Writer 两部分组成。
Reader:提供一个 Spark SQL 接口,用户可以使用该接口编程读取 NebulaGraph 图数据,单次读取一个点或 Edge type 的数据,并将读取的结果组装成 Spark 的 DataFrame。
Writer:提供一个 Spark SQL 接口,用户可以使用该接口编程将 DataFrame 格式的数据逐条或批量写入 NebulaGraph。
详情参见此处: https://docs.nebula-graph.com.cn/3.4.1/nebula-spark-connector/
GraphFrames 是 ApacheSpark 的一个包,它提供了基于 DataFrame 的 Graphs。它在 Scala、 Java 和 Python 中提供高级 API。它旨在提供 GraphX 的功能和利用 Spark DataFrames 的扩展功能。此扩展功能包括主题查找、基于 DataFrame 的序列化和高度表达式的图形查询。
GraphFrames GitHub:https://github.com/graphframes/graphframes
GraphFrames 如何提交以及版本对应参考此处:https://spark-packages.org/package/graphframes/graphframes
PySpark 是 Spark 在外层封装了 Python 接口,主要是借助 Py4J 实现 Python 和 Java 的交互。这样 Python 使用者就不用多学一门 Java,轻松使用 Python 进行大数据开发。
安装方法 pip install pyspark==
如何提交 / Part.02
主要参数如下:
`spark2-submit \
--master yarn \
--num-executors 20 \
--executor-cores 5 \
--executor-memory 20g \
--driver-cores 1 \
--driver-memory 5g \
--archives /bussiness_jars/bigdata/spark-jar/path/graphx-pyspark.zip#python \
--conf spark.executorEnv.ELKLOG_LEVEL=INFO \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python/graphx/bin/python \
--conf spark.jars.packages=graphframes:graphframes:0.8.2-spark2.4-s_2.11 \
--conf spark.executorEnv.ELKLOG_BUSINESS_NAME=nebula340_graphx_degree_graphx_degree_1hop \
/bussiness_jars/bigdata/spark-jar/path/nebula340/nebula340_algo_degree_centrality_v2.py main 1 Relation \`
代码展示 / Part.03
第一步,读取 nebula 数据构建 GraphFrame。如果需要读取多种 Tag 和 Edge 则需要遍历多次读取之后 union 到一起。
def readNebula(self) -> GraphFrame:
if self.space in ["Relation", "Shareholding"]:
vertexs = ["Company", "Person"]
edges = ["Invest", "Employ", "Legal"]
else:
vertexs = ["Company", "GroupTag"]
edges = ["HasCompany", "Controlled"]
vertex = self.spark.createDataFrame([("", "", "")], ["id", "keyno", "name"])
edge = self.spark.createDataFrame([("", "", "", "")], ["src", "dst", "_rank", "role"])
for v in vertexs:
df = (
spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
.option("type", "vertex")
.option("spaceName", space)
.option("label", v)
.option("returnCols", "keyno,name")
.option("metaAddress", metaHost[cluster])
.option("partitionNumber", 1000)
.option("operateType", "read")
.load()
)
print(f"read nebula {v} type done, {df.show(n=2)}")
vertex = vertex.union(df)
print(f"read nebula vertex done, {vertex.show(n=3)}")
for v in edges:
df = (
spark.read.format("com.vesoft.nebula.connector.NebulaDataSource")
.option("type", "edge")
.option("spaceName", space)
.option("label", v)
.option("returnCols", "role")
.option("metaAddress", metaHost[cluster])
.option("partitionNumber", 1000)
.option("operateType", "read")
.load()
)
print(f"read nebula {v} type done, {df.show(n=2)}")
edge = edge.union(df)
print(f"read nebula edge done, {edge.show(n=3, truncate=False)}")
g = GraphFrame(vertex, edge)
return g
第二步,获取度数:GraphFrames 支持 vertexid 为 string 类型,如果 NebulaGraph 中 Vertex 是 fixed-string 不需要做 map。
`def degree_of_hop(self, graph: GraphFrame):
if self.hop==1:
print("one-hop-degree======")
######################### 1-hop degreee #########################
indegree = g.inDegrees
# print(indegree.show(truncate=False))
outdegree = g.outDegrees
# print(outdegree.show(truncate=False))
degree = g.degrees
# filter specific id degree
degree.filter("id='4a788c2a83870742bb1a35074efc33f3'").show(truncate=False)
total = degree.join(indegree, on="id", how="left")
total = total.join(outdegree, on="id", how="left").fillna(0)
total.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_centrality")
sorted_df = total.orderBy("degree", ascending=False)
print(sorted_df.show(truncate=False))
######################### 1-hop degreee #########################`
第三步,获取二跳度数。上一步(step2)中只是获取了某个实体一跳的子图来预估实体的大小,实际业务中需要用两跳子图来评估其大小。
由于没有现成的 2-hop degree 方法,查看了 GraphFrames 方法后,决定用 motif-finding 的方式用 path 的数量来近似表示其数量。
`elif self.hop ==2:
print("two-hop-one-direction-path-cnt=======")
data = graph.find("(a)-[e]->(b); (b)-[e2]->(c)")
data.show(truncate=False)
# 此处计算路径的条数,而不是点的个数
two_hop = data.groupBy("a.id").agg(count("*")).union(data.groupBy("c.id")
.agg(count("*"))).groupBy("id").agg({"count(1)": "sum"})
two_hop = two_hop.withColumnRenamed("sum(count(1))", "cnt")
two_hop.show(truncate=False)
print("two-hop-both-direction-path-cnt=======")
# c<-a->b->f
# data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e3]->(f)").filter("b.id='A' and f.id != 'A' ")
data = graph.find("(a)-[e]->(b); (b)-[]->(); (a)-[e2]->(c)").filter("b.id != c.id").distinct()
# data.show(truncate=False)
two_hop_2 = data.groupBy("b.id").agg(count("*"))
two_hop_2 = two_hop_2.withColumnRenamed("count(1)", "cnt")
two_hop_2.show(truncate=False)
# two-hop total
total_two_hop = two_hop.union(two_hop_2).groupBy("id").agg({"cnt":"sum"})
total_two_hop = total_two_hop.withColumnRenamed("sum(cnt)", "cnt")
print("two_hop path union")
total_two_hop.show(truncate=False)
total_two_hop.write.mode("overwrite").saveAsTable("hive_tmp.nebula_degree_two_hop")`
在业务中,如果两跳子图太大 nebula 查询超时,可以通过对子图太大的数据通过图计算预先计算用 T+1 的数据来给业务呈现。
其他基础图算法 / Part.04
需要注意的是此处的 BFS 并不是给定指定的点,向外遍历 maxPathLength,而是查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。
广度优先遍历 BFS
需要注意的是此处的 BFS 并不是给定指定的点,向外遍历 maxPathLength,而是查找从一个顶点(或一组顶点)到另一个顶点(或一组顶点)的最短路径。
`from graphframes.examples import Graphs
g = Graphs.friends() # Get example graph
# Search from "Esther" for users of age < 32.
paths = g.bfs("name = 'Esther'", "age < 32")
paths.show()
# Specify edge filters or max path lengths.
g.bfs("name = 'Esther'", "age < 32",\
edgeFilter="relationship != 'friend'", maxPathLength=3)`
连通分量 Connected Components
`from graphframes.examples import Graphs
g = Graphs.friends() # Get example graph
result = g.connectedComponents()
result.select("id", "component").orderBy("component").show()`
最短路径 Shortest Paths
计算从每个顶点到给定 landmarks 顶点集的最短路径,其中 landmarks 由顶点 ID 指定。请注意,这需要考虑边缘方向。
`from graphframes.examples import Graphs
g = Graphs.friends() # Get example graph
results = g.shortestPaths(landmarks=["a"])
results.select("id", "distances").show()
# OUTPUT:
# +---+-----+---+---------+
# | id| name|age|distances|
# +---+-----+---+---------+
# | d|David| 29| {a -> 1}|
# | a|Alice| 34| {a -> 0}|
# +---+-----+---+---------+`
其他更多参考 graphframe-graph-algorithm:https://graphframes.github.io/graphframes/docs/_site/user-guide.html#graph-algorithms
总结 / Part.5
整体上 NebulaGraph 官方提供好用的的 nebula-spark-connector,在和其他图计算框架结合使用时提供了便利,用户可以很方便的根据自己的需求跑相应的图算法或者图遍历。