Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions dataset/transport-nodes.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
id,latitude,longitude,population
"Amsterdam",52.379189,4.899431,821752
"Utrecht",52.092876,5.104480,334176
"Den Haag",52.078663,4.288788,514861
"Immingham",53.61239,-0.22219,9642
"Doncaster",53.52285,-1.13116,302400
"Hoek van Holland",51.9775,4.13333,9382
"Felixstowe",51.96375,1.3511,23689
"Ipswich",52.05917,1.15545,133384
"Colchester",51.88921,0.90421,104390
"London",51.509865,-0.118092,8787892
"Rotterdam",51.9225,4.47917,623652
"Gouda",52.01667,4.70833,70939
16 changes: 16 additions & 0 deletions dataset/transport-relationships.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
src,dst,relationship,cost
"Amsterdam","Utrecht","EROAD",46
"Amsterdam","Den Haag","EROAD",59
"Den Haag","Rotterdam","EROAD",26
"Amsterdam","Immingham","EROAD",369
"Immingham","Doncaster","EROAD",74
"Doncaster","London","EROAD",277
"Hoek van Holland","Den Haag","EROAD",27
"Felixstowe","Hoek van Holland","EROAD",207
"Ipswich","Felixstowe","EROAD",22
"Colchester","Ipswich","EROAD",32
"London","Colchester","EROAD",106
"Gouda","Rotterdam","EROAD",25
"Gouda","Utrecht","EROAD",35
"Den Haag","Gouda","EROAD",32
"Hoek van Holland","Rotterdam","EROAD",33
49 changes: 49 additions & 0 deletions graphframe/path_finding_algo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# -*- coding:utf-8 -*-
from pyspark.sql.types import *
from graphframes import *
from pyspark.sql import SparkSession


def create_transport_graph(spark: SparkSession) -> GraphFrame:
"""加载nodes和边relationships的csv,创建transport graph,用union是不分区边的direct"""
node_fields = [
StructField("id", StringType(), True),
StructField("latitude", FloatType(), True),
StructField("longitude", FloatType(), True),
StructField("population", IntegerType(), True)
]
nodes = spark.read.csv("../dataset/transport-nodes.csv", header=True,
schema=StructType(node_fields))
# 读取加载交通关系relationships.csv内容,并构成graphframe
rels = spark.read.csv("../dataset/transport-relationships.csv", header=True)
reversed_rels = (rels.withColumn("newSrc", rels.dst).withColumn("newDst", rels.src) \
.drop("dst", "src").withColumnRenamed("newSrc", "src") \
.withColumnRenamed("newDst", "dst") \
.select("src", "dst", "relationship", "cost"))

relationships = rels.union(reversed_rels)
return GraphFrame(nodes, relationships)


def getSparkContext() -> SparkSession:
builder = SparkSession.builder.appName("pandas-on-spark")\
.master("local")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
return builder.getOrCreate()


if __name__ == '__main__':
graph = create_transport_graph(getSparkContext())
# 先从所有点vertex中找到人口介于10万~30万的城市,并用show()方法展示出来
graph.vertices.filter("population > 100000 and population < 300000") \
.sort("population") \
.show()
# 从Den Haag到一个中型城市的最短路径,result.columns展示的是结果集中的列
from_expr: str = "id='Den Haag'"
to_expr: str = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = graph.bfs(from_expr, to_expr)
print(result.columns)
# 以e开头的列代表关系(边),而以v开头的列代表节点(顶点),我们仅对节点感兴趣
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).show()
12 changes: 0 additions & 12 deletions neo4j/neo4j_graph_data_analytics.cql

This file was deleted.

63 changes: 63 additions & 0 deletions neo4j_analytics/path_finding_algo/neo4j_path_finding.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
-- Load Node data from transport-nodes.csv
LOAD CSV WITH HEADERS FROM "file:/data/transport-nodes.csv" AS row
merge (place:Place {id:row.id})
set place.latitude = toFloat(row.latitude),
place.longitude = toFloat(row.longitude),
place.population = toInteger(row.population)

-- Load relation data from transport-relationships.csv
LOAD CSV WITH HEADERS FROM "file:/data/transport-relationships.csv" AS rows
match(origin:Place {id: row.src})
match(destination:Place {id: row.dst})
merge(origin)-[:EROAD {distance: toInteger(row.cost)}]->(destination)

-- #1. 用neo4j计算从Amsterdam->London的跳数(hop),将null作为第3个参数
match(source:Place {id: 'Amsterdam'}), (dest :Place {id: 'London'})
call algo.shortestPath.stream(source, dest, null)
yield nodeId, cost
return algo.getNodeById(nodeId).id as place, cost
-- ### place cost
-- "Amsterdam" 0.0
-- "Immingham" 1.0
-- "Doncaster" 2.0
-- "London" 3.0

-- #2. 计算从Amsterdam->London的总距离,该路线通过的城市最少,总代价为720千米,cypher语句有些复杂
match(source:Place {id: 'Amsterdam'}), (dest :Place {id: 'London'})
call algo.shortestPath.stream(source, dest, null)
yield nodeId, cost

// 用collect()将从Amsterdam->London的所有点 收集起来
with collect(algo.getNodeById(nodeId)) as path
unwind range(0, size(path)-1) as index
with path[index] as current, path[index+1] as next
// 匹配current-next路径,并且取得r上distance数值
with current, next, [(current)-[r:EROAD]-(next)|r.distance][0] as distance

with collect({current:current, next:next, distance:distance}) as stops
unwind range(0, size(stops)-1) as index
with stops[index] as location, stops, index
// 这块不太懂,complex problem
return location.current.id as place,
reduce(acc=0.0,
distance in [stop in stops[0..index] |stop.distance] |acc + distance) as cost
--### place cost
-- "Amsterdam" 0.0
-- "Immingham" 369.0
-- "Doncaster" 443.0
-- "London" 720.0

-- #3. 用Neo4j实现加权最短路径,计算从Amsterdam->London的最短路径,有权重值为453km
match(source:Place {id: 'Amsterdam'}), (dest :Place {id: 'London'})
call algo.shortestPath.stream(source, dest, "distance")
yield nodeId, cost
return algo.getNodeById(nodeId).id as place, cost
--### place cost
-- "Amsterdam" 0.0
-- "Den Haag" 59.0
-- "Hoek van Holland" 86.0
-- "Felixstowe" 293.0
-- "Ipswich" 315.0
-- "Colchester" 347.0
-- "London" 453.0