Skip to content

Commit

Permalink
Exposing Scala DataFrames in PySpark (#214); resolves #209.
Browse files Browse the repository at this point in the history
* DataFrameLoader - provides bridge to PySpark.
* Initial python classes for aut.
* Better packaging of Python modules.
  • Loading branch information
lintool authored and ruebot committed May 2, 2018
1 parent ef76758 commit 505c47a
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 0 deletions.
23 changes: 23 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,29 @@
<artifactId>build-helper-maven-plugin</artifactId>
<version>${build-helper.plugin.version}</version>
</plugin>

<!-- this is to create a zip of PySpark modules -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/python.xml</descriptor>
</descriptors>
<finalName>aut</finalName>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

Expand Down
13 changes: 13 additions & 0 deletions src/main/assembly/python.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id>python</id>
<formats>
<format>zip</format>
</formats>
<fileSets>
<fileSet>
<directory>src/main/python/aut/</directory>
<outputDirectory>/</outputDirectory>
</fileSet>
</fileSets>
</assembly>
5 changes: 5 additions & 0 deletions src/main/python/aut/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from aut.common import WebArchive
from aut.udfs import extract_domain

__all__ = ['WebArchive', 'extract_domain']

15 changes: 15 additions & 0 deletions src/main/python/aut/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pyspark.sql import DataFrame

class WebArchive:
def __init__(self, sc, sqlContext, path):
self.sc = sc
self.sqlContext = sqlContext
self.loader = sc._jvm.io.archivesunleashed.DataFrameLoader(sc._jsc.sc())
self.path = path

def pages(self):
return DataFrame(self.loader.extractValidPages(self.path), self.sqlContext)

def links(self):
return DataFrame(self.loader.extractHyperlinks(self.path), self.sqlContext)

11 changes: 11 additions & 0 deletions src/main/python/aut/udfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def extract_domain_func(url):
url = url.replace('http://', '').replace('https://', '')
if '/' in url:
return url.split('/')[0].replace('www.', '')
else:
return url.replace('www.', '')

extract_domain = udf(extract_domain_func, StringType())
16 changes: 16 additions & 0 deletions src/main/scala/io/archivesunleashed/DataFrameLoader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.archivesunleashed

import org.apache.spark.SparkContext
import org.apache.spark.sql._

class DataFrameLoader(sc: SparkContext) {
def extractValidPages(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc)
.extractValidPagesDF()
}

def extractHyperlinks(path: String): DataFrame = {
RecordLoader.loadArchives(path, sc)
.extractHyperlinksDF()
}
}

0 comments on commit 505c47a

Please sign in to comment.