Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6c2e2a6
reworked bgtest to take residuals, added bptest
Jun 12, 2015
75e9932
cleaned up after adding bptest/bgtest
Jun 12, 2015
148a1fc
starting on ewma
Jun 12, 2015
4a19951
add some initial code for EWMA
Jun 12, 2015
a70d8ab
first working version of EWMA
josepablocam Jun 15, 2015
121c9ef
small style edits
josepablocam Jun 15, 2015
7fdbafa
cleaned up additional style issues
Jun 15, 2015
e7ed3ae
added EWMA functionality and tests
Jun 16, 2015
c6b5e55
Merge pull request #26 from josepablocam/master
sryza Jun 16, 2015
dcc429c
cleaned up additional style issues
Jun 15, 2015
5cfc79b
Add Hadoop dependency
sryza Jun 16, 2015
1aff3f4
Add seed generation in historical var example
sryza Jun 16, 2015
e1487d0
Convert tab to spaces
sryza Jun 16, 2015
db6a716
changed SSE calculation to use while loop rather than map + sum
Jun 17, 2015
fd45327
changed EWMA fitting to use gradient descent
Jun 17, 2015
7adc5d0
style changes
Jun 17, 2015
88bd843
added note stating ubounded optimization for ewma
Jun 17, 2015
fab45bc
Merge pull request #27 from josepablocam/ewma
sryza Jun 18, 2015
e942aae
Document more functionality in README
sryza Jun 19, 2015
0767f31
added holt model basics: adding and removing time dependent effects
josepablocam Jun 22, 2015
04ba448
added getHoltComponents, and changed other methods to use this functi…
josepablocam Jun 22, 2015
7dec14c
improved documentation of holt model
josepablocam Jun 22, 2015
4e56a07
Merge remote-tracking branch 'origin/master'
Jun 22, 2015
d151a6f
Copyright bump
cdalzell Jun 23, 2015
3bd0c28
Quick find & replace: toSamples -> toInstants
cdalzell Jun 23, 2015
fd50705
fixing merge conflict
Jun 23, 2015
ae91035
Merge pull request #28 from cdalzell/feature/#23-rename_toSamples
sryza Jun 23, 2015
5a4aa29
added up/down sampling and cubic spline interpolation
Jun 25, 2015
4b98b63
added up/down sampling tests
Jun 25, 2015
48d9251
Harmonize MLlib dependency version
sryza Jun 26, 2015
dd80326
started on acf plot
Jun 27, 2015
1c595b0
Mention R's equivalents in README
sryza Jun 27, 2015
0655b07
acf plot done
josepablocam Jun 28, 2015
432b10d
added pacf, refactored acf
josepablocam Jun 28, 2015
3c316ee
added scaladocs to acf/pacf
josepablocam Jun 28, 2015
78b5d77
added scaladocs to acf/pacf
josepablocam Jun 28, 2015
7ccc531
Merge remote-tracking branch 'origin/acf_pacf' into acf_pacf
josepablocam Jun 28, 2015
315ff27
changed up/downSample function names to up/downsample (along with rel…
Jun 30, 2015
f939146
Merge pull request #30 from josepablocam/acf_pacf
sryza Jun 30, 2015
87418e8
added scaladoc comments for up/down sampling
Jul 1, 2015
496dc92
Merge pull request #31 from josepablocam/docsampling
sryza Jul 2, 2015
bec7089
Renamed labels to keys
cdalzell Jul 10, 2015
1a15e0f
Merge pull request #33 from cdalzell/feature/#24-TimeSeries_label_to_key
sryza Jul 10, 2015
e2ab12a
holt: added bobyqa and conjugate gradient descent parameter fitting, …
josepablocam Jul 16, 2015
529eddf
Merge remote-tracking branch 'upstream/master' into dewma
josepablocam Jul 16, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ A Scala / Java library for interacting with time series data on Apache Spark.
Scaladoc is available at http://cloudera.github.io/spark-timeseries.

The aim here is to provide
* A set of abstractions for transforming and summarizing large time series data sets, similar to
* A set of abstractions for manipulating large time series data sets, similar to
what's provided for smaller data sets in
[Pandas](http://pandas.pydata.org/pandas-docs/dev/timeseries.html) and
[Matlab](http://www.mathworks.com/help/matlab/time-series.html).
[Pandas](http://pandas.pydata.org/pandas-docs/dev/timeseries.html),
[Matlab](http://www.mathworks.com/help/matlab/time-series.html), and R's
[zoo](http://cran.r-project.org/web/packages/zoo/index.html) and
[xts](http://cran.r-project.org/web/packages/xts/index.html) packages.
* Models, tests, and functions that enable dealing with time series from a statistical perspective,
similar to what's provided in [StatsModels](http://statsmodels.sourceforge.net/devel/tsa.html)
and a variety of Matlab and R packages.
Expand Down Expand Up @@ -52,16 +54,24 @@ TimeSeriesRDDs then support efficient series-wise operations like slicing, imput
val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series))


Statistical Functionality
Functionality
--------------------------

### Time Series
### Time Series Manipulation
* Aligning
* Slicing by date-time
* Missing value imputation

### Time Series Math and Stats

* Exponentially weighted moving average
* Autoregressive models
* GARCH models
* Missing data imputation
* Augmented Dickey-Fuller test
* Durbin-Watson test
* Breusch-Godfrey test
* Breusch-Pagan test

### General Prob / Stats

Expand Down
15 changes: 11 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.

Cloudera, Inc. licenses this file to you under the Apache License,
Version 2.0 (the "License"). You may not use this file except in
Expand All @@ -13,7 +13,9 @@
the specific language governing permissions and limitations under the
License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloudera.datascience</groupId>
<artifactId>sparktimeseries</artifactId>
Expand Down Expand Up @@ -160,6 +162,12 @@
</build>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>2.6.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
Expand Down Expand Up @@ -198,8 +206,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.minor.version}</artifactId>
<!-- TODO -->
<version>1.2.0</version>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ import com.cloudera.sparkts.TimeSeriesRDD._

import com.github.nscala_time.time.Imports._

import org.apache.commons.math3.random.RandomGenerator
import org.apache.commons.math3.random.{MersenneTwister, RandomGenerator}
import org.apache.commons.math3.stat.regression.OLSMultipleLinearRegression

import org.apache.spark.{SparkConf, SparkContext}

object HistoricalValueAtRiskExample {
def main(args: Array[String]): Unit = {
// Read parameters
val numTrials = if (args.length > 0) args(0).toInt else 10000
val parallelism = if (args.length > 1) args(1).toInt else 10
val factorsDir = if (args.length > 2) args(2) else "factors/"
val instrumentsDir = if (args.length > 3) args(3) else "instruments/"
val factorsDir = if (args.length > 0) args(0) else "factors/"
val instrumentsDir = if (args.length > 1) args(1) else "instruments/"
val numTrials = if (args.length > 2) args(2).toInt else 10000
val parallelism = if (args.length > 3) args(3).toInt else 10
val horizon = if (args.length > 4) args(4).toInt else 1

// Initialize Spark
val conf = new SparkConf().setMaster("local").setAppName("Historical VaR")
Expand Down Expand Up @@ -66,13 +67,18 @@ object HistoricalValueAtRiskExample {

// Fit an AR(1) + GARCH(1, 1) model to each factor
val garchModels = factorReturns.mapValues(ARGARCH.fitModel(_)).toMap
val iidFactorReturns = factorReturns.mapSeriesWithLabel { case (symbol, series) =>
val iidFactorReturns = factorReturns.mapSeriesWithKey { case (symbol, series) =>
val model = garchModels(symbol)
model.removeTimeDependentEffects(series, DenseVector.zeros[Double](series.length))
}

// Generate an RDD of simulations
// val rand = new MersenneTwister()
val seeds = sc.parallelize(0 until numTrials, parallelism)
seeds.map { seed =>
val rand = new MersenneTwister(seed)
val factorPaths = simulatedFactorReturns(horizon, rand, iidFactorReturns, garchModels)
}

// val factorsDist = new FilteredHistoricalFactorDistribution(rand, iidFactorReturns.toArray,
// garchModels.asInstanceOf[Array[TimeSeriesFilter]])
// val returns = simulationReturns(0L, factorsDist, numTrials, parallelism, sc,
Expand Down Expand Up @@ -101,7 +107,7 @@ object HistoricalValueAtRiskExample {
mat(i, ::) := iidSeries.data(rand.nextInt(iidSeries.data.rows), ::)
}
(0 until models.size).foreach { i =>
models(iidSeries.labels(i)).addTimeDependentEffects(mat(::, i), mat(::, i))
models(iidSeries.keys(i)).addTimeDependentEffects(mat(::, i), mat(::, i))
}
mat
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ object SimpleTickDataExample {
val iidRdd = slicedRdd.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series))

// Regress a stock against all the others
val samples = iidRdd.toSamples()
val samples = iidRdd.toInstants()
}
}
144 changes: 144 additions & 0 deletions src/main/scala/com/cloudera/sparkts/EWMA.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/**
* Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
*
* Cloudera, Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.sparkts

import breeze.linalg._

import org.apache.commons.math3.analysis.{MultivariateFunction, MultivariateVectorFunction}
import org.apache.commons.math3.optim.{InitialGuess, MaxEval, MaxIter, SimpleValueChecker}
import org.apache.commons.math3.optim.nonlinear.scalar.{GoalType, ObjectiveFunction,
ObjectiveFunctionGradient}
import org.apache.commons.math3.optim.nonlinear.scalar.gradient.NonLinearConjugateGradientOptimizer

/**
* Fits an Exponentially Weight Moving Average model (EWMA) (aka. Simple Exponential Smoothing) to
* a time series. The model is defined as S_t = (1 - a) * X_t + a * S_{t - 1}, where a is the
* smoothing parameter, X is the original series, and S is the smoothed series. For more
* information, please see https://en.wikipedia.org/wiki/Exponential_smoothing.
*/
object EWMA {
/**
* Fits an EWMA model to a time series. Uses the first point in the time series as a starting
* value. Uses sum squared error as an objective function to optimize to find smoothing parameter
* The model for EWMA is recursively defined as S_t = (1 - a) * X_t + a * S_{t-1}, where
* a is the smoothing parameter, X is the original series, and S is the smoothed series
* Note that the optimization is performed as unbounded optimization, although in its formal
* definition the smoothing parameter is <= 1, which corresponds to an inequality bounded
* optimization. Given this, the resulting smoothing parameter should always be sanity checked
* https://en.wikipedia.org/wiki/Exponential_smoothing
* @param ts the time series to which we want to fit an EWMA model
* @return EWMA model
*/
def fitModel(ts: Vector[Double]): EWMAModel = {
val optimizer = new NonLinearConjugateGradientOptimizer(
NonLinearConjugateGradientOptimizer.Formula.FLETCHER_REEVES,
new SimpleValueChecker(1e-6, 1e-6))
val gradient = new ObjectiveFunctionGradient(new MultivariateVectorFunction() {
def value(params: Array[Double]): Array[Double] = {
val g = new EWMAModel(params(0)).gradient(ts)
Array(g)
}
})
val objectiveFunction = new ObjectiveFunction(new MultivariateFunction() {
def value(params: Array[Double]): Double = {
new EWMAModel(params(0)).sse(ts)
}
})
// optimization parameters
val initGuess = new InitialGuess(Array(.94))
val maxIter = new MaxIter(10000)
val maxEval = new MaxEval(10000)
val goal = GoalType.MINIMIZE
// optimization step
val optimal = optimizer.optimize(objectiveFunction, goal, gradient, initGuess, maxIter, maxEval)
val params = optimal.getPoint
new EWMAModel(params(0))
}
}

class EWMAModel(val smoothing: Double) extends TimeSeriesModel {

/**
* Calculates the SSE for a given timeseries ts given the smoothing parameter of the current model
* The forecast for the observation at period t + 1 is the smoothed value at time t
* Source: http://people.duke.edu/~rnau/411avg.htm
* @param ts the time series to fit a EWMA model to
* @return Sum Squared Error
*/
private[sparkts] def sse(ts: Vector[Double]): Double = {
val n = ts.length
val smoothed = new DenseVector(Array.fill(n)(0.0))
addTimeDependentEffects(ts, smoothed)

var i = 0
var error = 0.0
var sqrErrors = 0.0
while (i < n - 1) {
error = ts(i + 1) - smoothed(i)
sqrErrors += error * error
i += 1
}

sqrErrors
}

/**
* Calculates the gradient of the SSE cost function for our EWMA model
* @param ts
* @return gradient
*/
private[sparkts] def gradient(ts: Vector[Double]): Double = {
val n = ts.length
val smoothed = new DenseVector(Array.fill(n)(0.0))
addTimeDependentEffects(ts, smoothed)

var error = 0.0
var prevSmoothed = ts(0)
var prevDSda = 0.0 // derivative of the EWMA function at time t - 1: (d S(t - 1)/ d smoothing)
var dSda = 0.0 // derivative of the EWMA function at time t: (d S(t) / d smoothing)
var dJda = 0.0 // derivative of our SSE cost function
var i = 0

while (i < n - 1) {
error = ts(i + 1) - smoothed(i)
dSda = ts(i) - prevSmoothed + (1 - smoothing) * prevDSda
dJda += error * dSda
prevDSda = dSda
prevSmoothed = smoothed(i)
i += 1
}
2 * dJda
}

override def removeTimeDependentEffects(ts: Vector[Double], dest: Vector[Double] = null)
: Vector[Double] = {
dest(0) = ts(0) // by definition in our model S_0 = X_0

for (i <- 1 until ts.length) {
dest(i) = (ts(i) - (1 - smoothing) * ts(i - 1)) / smoothing
}
dest
}

override def addTimeDependentEffects(ts: Vector[Double], dest: Vector[Double]): Vector[Double] = {
dest(0) = ts(0) // by definition in our model S_0 = X_0

for (i <- 1 until ts.length) {
dest(i) = smoothing * ts(i) + (1 - smoothing) * dest(i - 1)
}
dest
}
}
68 changes: 67 additions & 1 deletion src/main/scala/com/cloudera/sparkts/EasyPlot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package com.cloudera.sparkts
import breeze.linalg._
import breeze.plot._

import org.apache.commons.math3.distribution.NormalDistribution

object EasyPlot {
def ezplot(vec: Vector[Double], style: Char): Unit = {
val f = Figure()
Expand All @@ -30,7 +32,7 @@ object EasyPlot {
def ezplot(arr: Array[Double], style: Char): Unit = {
val f = Figure()
val p = f.subplot(0)
p += plot((0 until arr.length).map(_.toDouble).toArray, arr, style = style)
p += plot(arr.indices.map(_.toDouble).toArray, arr, style = style)
}

def ezplot(arr: Array[Double]): Unit = ezplot(arr, '-')
Expand All @@ -45,4 +47,68 @@ object EasyPlot {
}

def ezplot(vecs: Seq[Vector[Double]]): Unit = ezplot(vecs, '-')

/**
* Autocorrelation function plot
* @param data array of data to analyze
* @param maxLag maximum lag for autocorrelation
* @param conf confidence bounds to display
*/
def acfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = {
// calculate correlations and confidence bound
val autoCorrs = UnivariateTimeSeries.autocorr(data, maxLag)
val confVal = calcConfVal(conf, data.length)

// Basic plot information
val f = Figure()
val p = f.subplot(0)
p.title = "Autocorrelation function"
p.xlabel = "Lag"
p.ylabel = "Autocorrelation"
drawCorrPlot(autoCorrs, confVal, p)
}

/**
* Partial autocorrelation function plot
* @param data array of data to analyze
* @param maxLag maximum lag for partial autocorrelation function
* @param conf confidence bounds to display
*/
def pacfPlot(data: Array[Double], maxLag: Int, conf: Double = 0.95): Unit = {
// create AR(maxLag) model, retrieve coefficients and calculate confidence bound
val model = Autoregression.fitModel(new DenseVector(data), maxLag)
val pCorrs = model.coefficients // partial autocorrelations are the coefficients in AR(n) model
val confVal = calcConfVal(conf, data.length)

// Basic plot information
val f = Figure()
val p = f.subplot(0)
p.title = "Partial autocorrelation function"
p.xlabel = "Lag"
p.ylabel = "Partial Autocorrelation"
drawCorrPlot(pCorrs, confVal, p)
}

private[sparkts] def calcConfVal(conf:Double, n: Int): Double = {
val stdNormDist = new NormalDistribution(0, 1)
val pVal = (1 - conf) / 2.0
stdNormDist.inverseCumulativeProbability(1 - pVal) / Math.sqrt(n)
}

private[sparkts] def drawCorrPlot(corrs: Array[Double], confVal: Double, p: Plot): Unit = {
// make decimal ticks visible
p.setYAxisDecimalTickUnits()
// plot correlations as vertical lines
val verticalLines = corrs.zipWithIndex.map { case (corr, ix) =>
(Array(ix.toDouble + 1, ix.toDouble + 1), Array(0, corr))
}
verticalLines.foreach { case (xs, ys) => p += plot(xs, ys) }
// plot confidence intervals as horizontal lines
val n = corrs.length
Array(confVal, -1 * confVal).foreach { conf =>
val xs = (0 to n).toArray.map(_.toDouble)
val ys = Array.fill(n + 1)(conf)
p += plot(xs, ys, '-', colorcode = "red")
}
}
}
Loading