Skip to content
This repository has been archived by the owner on Oct 14, 2023. It is now read-only.

Commit

Permalink
graphframes_python
Browse files Browse the repository at this point in the history
  • Loading branch information
Tourgito committed May 5, 2021
1 parent fc80a23 commit f76d6c8
Show file tree
Hide file tree
Showing 4 changed files with 471 additions and 0 deletions.
126 changes: 126 additions & 0 deletions algorithmos
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@

Estw

(1, 2, 0.80)
(5, 1, 0.56)
(1, 40, 0.44)
(2, 5, 0.40)
(6, 2, 0.79)
(2, 40, 0.30)



1) map(lambda x: (x.smaller,(x.bigger,x.probability) )

To apotelesma meta to 1) 8a einai to parakatw RDD:

(1, (2, 0.80))
(1, (5, 0.56))
(1, (40, 0.44))
(2, (5, 0.40))
(2, (6, 0.79))
(2, (40, 0.30))


2) groupBykey()

To apotelesma meta to 2) 8a einai to parakatw RDD:

(1, [(2, 0.80),(5, 0.56),(40, 0.44)])
(2, [(5, 0.40),(6, 0.79),(40, 0.30)])


3) flatMap(function),

def function(x):
lista = list(x_value).sort(key = lambda y : y[o])
listaToReturn = list()
for index,i in enumerate(lista):
y = i_key
listaToReturn.append( ((x_key,y), (i_value,-1)) ) # Ta edges pou uparxoun ontws. Example, ( (1,2),(0.80,-1) ). To "-1" dhlwnnei oti uparxei ontws auto to edge.
for o in lista[index:]: #oles ta edges pou 8elw na 4a3w an uparxoun wste na kanw tripletes
edge = ( (y,o_key), (2,x_key) ) #H timh "2" einai dummy kai thn bazw wste na exoun thn idia morfh me thn grammh 40). Example ( (2,5), (2.00, 1))
listaToReturn.append(edge)
return listaToReturn

To apotelesma meta to 3) 8a einai to parakatw RDD:

( (1,2), (0.80,-1))
( (1,5), (0.56,-1))
( (1,40), (0.44,-1))
( (2,5), (0.40,-1))
( (2,6), (0.79,-1))
( (2,40), (0.33, -1))
( (2,5), (2.00, 1))
( (2,40), (2.00, 1))
( (5,40), (2.00, 1))
( (5,6), (2.00, 2))
( (5,40), (2.00, 2))
( (6,40), (2.00, 2))



4) groupByKey()

To apotelesma meta to 4) 8a einai to parakatw RDD:

((1,2), [(0.80,-1)])
((1,5), [(0.56,-1)])
((1,40), [(0.30,-1)])
((2,5), [(2.00,1), (0.40,-1)]) #Den 3erw an mpainoun me authn thn seira sthn lista
((2,40), [(2.00,1), (0.30,-1)]) #Den 3erw an mpainoun me authn thn seira sthn lista
((2,6), [(0.79,-1)])
((5,40), [(2.00,1), (2.00,2)]) #Den 3erw an mpainoun me authn thn seira sthn lista
((6,40), [(2.00,2)]) #Den 3erw an mpainoun me authn thn seira sthn lista
((5,6), [(2.00,2)])



5) flatMap(func)

def function(x):
lista = list(x_value) #endexomenos den xreiazetai epeidi einai hdh lista

lista = list.sort(key=lambda x: x[1])
probabilityOfEdge = lista[0]_value
edge = x_key

if lista[0]_value == -1: # Uparxei to edge ston grafo
listToReturn = list()
for index,i in enumerate(lista):
if index == 0:
a = tuple(x_key_key,(edge,probabilityOfEdge))
listToReturn.append(a)
else:
a = tuple(i_value,(edge,probabilityOfEdge))
listToReturn.append(a)
return listToReturn
else: # Den uparxei to edge ston grafo
return []



To apotelesma meta to 5) 8a einai to parakatw RDD:

(1,((1,2),0.80))
(1,((1,5),0.56))
(1,((1,40),0.30))
(2,((2,5),0.40))
(1,((2,5),0.40))
(2,((2,40),0.30))
(1,((2,40),0.30))
(2,((2,6),0.79))


6) groupByKey()

To apotelesma meta to 6) 8a einai to parakatw RDD:

(1, [ ((1,2),0.80), ((1,5),0.56), ((1,40),0.30), ((2,5),0.40), ((2,40),0.30) ])
(2, [ ((2,5),0.40), ((2,40),0.30), ((2,6),0.79) ])

7) flatMap(func).sort().take(k)

func = mia function pou parnei tis listesw apo to rdd sto 6) kai briskei tis tripletes mazi me ths pi8anothtes tous kai tis bash se mia lista wste na ginoun flatmap.


102 changes: 102 additions & 0 deletions graphframe_bs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,lit,col,when
from graphframes.examples import Graphs
from graphframes import *
from functools import reduce
import sys
import time
#from pyspark.sql import *







def main(TopK:str):

sc = SparkSession.builder.appName("Top-k most probable triangles").getOrCreate()


#load dataset(edge list) to dataframe
edgesDF = sc.read.option("header",True).option("inferSchema",True).csv("./ex.csv")

# returns the smallest string between 2 strings
@udf("string")
def edgeSrc(src:str,dst:str)-> str:
if src < dst:
return src
return dst

# returns the biggest string between 2 strings
@udf("string")
def edgeDst(src:str,dst:str)-> str:
if src < dst:
return dst
return src


# re-order the source and destination of the edges. Source always the smallest id
edgesDF = edgesDF \
.select(edgeSrc(edgesDF.src,edgesDF.dst).alias("src"),edgeDst(edgesDF.src,edgesDF.dst).alias("dst"),"probability")

# create dataframe that consist the nodes
nodesDF= edgesDF \
.select(edgesDF.src) \
.union(edgesDF.select(edgesDF.dst)) \
.distinct() \
.withColumnRenamed("src", "id")


# Create the Graph
g = GraphFrame(nodesDF,edgesDF)
#g = GraphFrame(nodesDF,edgesDF).cache()
#g = GraphFrame(nodesDF,edgesDF).persist(StorageLevel.MEMORY_AND_DISK)

# Finds all the triangles, "subgraph" = Dataframe
subgraph = g.find("(a)-[e]->(b); (b)-[e2]->(c); (a)-[e3]->(c)")

# Concatenate 3 strings
@udf("string")
def triangleName(node1:str, node2:str, node3:str)-> str:
triangle = ""
for node in sorted([node1,node2,node3]):
triangle += node
return triangle

# Na to kanw udf
def triangleProbCalc(cnt, edge):
edgeProbability = col(edge)["probability"]
return cnt * edgeProbability






# creates new column ("Triangle) that contains the name of the triangle. Example, "143"
# removes duplicates from the dataframe based on the "Triangle" column
# creates new column ("Triangle_Prob") that contains the probability of the triangle
# sorts the dataframe
# Take the first k elements
TopKTriangles = subgraph \
.withColumn("Triangle",triangleName(subgraph.a["id"],subgraph.b["id"],subgraph.c["id"])) \
.dropDuplicates(["Triangle"]) \
.withColumn("Triangle_Prob", reduce(triangleProbCalc, ["e", "e2", "e3"], lit(1))) \
.sort("Triangle_Prob",ascending=False) \
.take(int(TopK))

print(TopKTriangles)

sc.stop()


if __name__ == "__main__":
if len(sys.argv) < 2:
print("Give k as input")
sys.exit()
start = time.time()
main(sys.argv[1])
end = time.time()
print("Execution time : " + str(end - start))
125 changes: 125 additions & 0 deletions graphframe_ex1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf,lit,col,when
from pyspark import StorageLevel
from graphframes.examples import Graphs
from graphframes import *
from functools import reduce
import sys
import time
#from pyspark.sql import *







def main(TopK:str):

sc = SparkSession.builder.appName("Top-k most probable triangles").getOrCreate()


#load dataset(edge list) to dataframe
edgesDF = sc.read.option("header",True).option("inferSchema",True).csv("./ex.csv")

edgesDF.printSchema()
# returns the smallest string between 2 strings
@udf("string")
def edgeSrc(src:str,dst:str)-> str:
if src < dst:
return src
return dst

# returns the biggest string between 2 strings
@udf("string")
def edgeDst(src:str,dst:str)-> str:
if src < dst:
return dst
return src


# re-order the source and destination of the edges. Source always the smallest id
edgesDF = edgesDF \
.select(edgeSrc(edgesDF.src,edgesDF.dst).alias("src"),edgeDst(edgesDF.src,edgesDF.dst).alias("dst"),"probability")

# create dataframe that consist the nodes
nodesDF= edgesDF \
.select(edgesDF.src) \
.union(edgesDF.select(edgesDF.dst)) \
.distinct() \
.withColumnRenamed("src", "id")

#edgesDF.printSchema()

# Create the Graph
#g = GraphFrame(nodesDF,edgesDF)
#g = GraphFrame(nodesDF,edgesDF).cache()
g = GraphFrame(nodesDF,edgesDF).persist(StorageLevel.MEMORY_AND_DISK)

#spaces = [0.9,0.8,0.7,0.6,0.5,0.4,0.3,0.2,0.1,0]
spaces = [0.7,0.4,0]
#spaces = [0.5,0]

# Finds all the triangles, "subgraph" = Dataframe
for index,space in enumerate(spaces):

newGraph = GraphFrame(nodesDF, g.edges.filter("probability > " + str(space)))
#newGraph = g.filterEdges("probability > " + str(space))
subgraph = newGraph.find("(a)-[e]->(b); (b)-[e2]->(c); (a)-[e3]->(c)")

# Concatenate 3 strings
@udf("string")
def triangleName(node1:str, node2:str, node3:str)-> str:
triangle = ""
for node in sorted([str(node1),str(node2),str(node3)]):
triangle += node
return triangle


"""
# Multiply 3 float
@udf("float")
def triangleProbCalc(edge1Prob, edge2Prob, edge3Prob)-> float:
return float(edge1Prob) * float(edge2Prob) * float(edge3Prob)
` """

# calculates the triangle probability
def triangleProbCalc(cnt:float, edge:str) -> float:
probability = col(edge)["probability"]
return cnt * probability

# creates new column ("Triangle) that contains the name of the triangle. Example, "143"
# removes duplicates from the dataframe based on the "Triangle" column
# creates new column ("Triangle_Prob") that contains the probability of the triangle
# sorts the dataframe
# Take the first k elements
TopKTriangles = subgraph \
.withColumn("Triangle",triangleName(subgraph.a["id"],subgraph.b["id"],subgraph.c["id"])) \
.dropDuplicates(["Triangle"]) \
.withColumn("Triangle_Prob", reduce(triangleProbCalc, ["e", "e2", "e3"], lit(1))) \
.sort("Triangle_Prob",ascending=False) \
.select("Triangle", "Triangle_Prob") \
.take(int(TopK))


#.withColumn("Triangle",triangleName(subgraph.a["id"],subgraph.b["id"],subgraph.c["id"])) \


if len(TopKTriangles) == int(TopK):
print(TopKTriangles)
#print("Edw stamathse : " + str(index))
break

if len(TopKTriangles) < int(TopK):
print(TopKTriangles)
sc.stop()

if __name__ == "__main__":
if len(sys.argv) < 2:
print("Give k as input")
sys.exit()
start = time.time()
main(sys.argv[1])
end = time.time()
print("Execution time : " + str(end - start))
Loading

0 comments on commit f76d6c8

Please sign in to comment.