Skip to content

Commit

Permalink
Jak w poprzednim commicie, tylko dodane pliki .cs
Browse files Browse the repository at this point in the history
  • Loading branch information
kudkudak committed May 21, 2013
1 parent 6538987 commit 07dc6c1
Show file tree
Hide file tree
Showing 5 changed files with 820 additions and 0 deletions.
Binary file modified server-predictor-tests/server-predictor-tests.v11.suo
Binary file not shown.
114 changes: 114 additions & 0 deletions server-predictor-tests/server-predictor-tests/DataSystem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Collections;
using System.Collections.Specialized;
using System.Xml;
using System.IO;
using System.Xml.Schema;


namespace server_predictor_tests
{
class DataSystem
{
// for demonstration version, we are reading the observed data from .csv files and formatting it for TPM1 //
public int time_start = 0;
public string get_simulation_data_file()
{
return "data/traffic_test.txt";
}
public double[] get_current_X_TPM1(int current_time, int look_behind = 30, int link_count = 20)
{

double[][] X_transposed = this.readMatrixCsv(this.get_simulation_data_file(), false, look_behind, link_count, " ",current_time);
double[] X = new double[X_transposed.Length * X_transposed[0].Length];
int cnt = 0;
for (int i = 0; i < X_transposed[0].Length; ++i)
{
for (int j = 0; j < X_transposed.Length; ++j)
{
X[cnt++] = X_transposed[j][i];
}
}
return X;
}

public string prepare_train_cars()
{
//constant in the demonstration version
return "data/model0b.data.RDa";
}
public string prepare_train_velocity()
{
//constant in the demonstration version
return "data/model0b.data.RDa";
}
public string prepare_train_pairwise_cars()
{
//can be easily calculated using function from general.r file//
return "data/model0b.smrCars.observationalSample.txt";
}
public int get_train_pairwise_cars_lines()
{
//constant int he demonstration version
return 1000;
}
public int get_train_pairwise_velocity()
{
//constant in the demonstartion version
return 1000;
}
public string prepare_train_pairwise_velocity()
{
return "data/model0b.data.RDa";
}

public double[][] readMatrixCsv(string file_name, bool header, int nrow, int ncol, string sep = " ", int skip = 0)
{
double[][] matrix = new double[nrow][]; for (int i = 0; i < nrow; ++i) matrix[i] = new double[ncol];

var reader = new StreamReader(File.OpenRead(file_name));
if (header) reader.ReadLine();
int row = 0, col = 0;
int skipped = 0;
while (!reader.EndOfStream)
{


//TODO: replace with smarter code
++skipped;
if (skipped <= skip) continue;


var line = reader.ReadLine();
var values = line.Split(sep.ToCharArray());

if (line == "") continue; //skip blanks

try
{
col = 0;
foreach (string val in values)
{
matrix[row][col++] = double.Parse(val, System.Globalization.CultureInfo.InvariantCulture);
}

Logger.Instance.log_ifn(ncol == col, "wrong column number specified!"); //warning message
++row;
}
catch (Exception e)
{
//skip this line

}

if (row == nrow) break;
}
return matrix;
}

}
}
116 changes: 116 additions & 0 deletions server-predictor-tests/server-predictor-tests/PredictionDaemon.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;

namespace server_predictor_tests
{

//implementation of prediction daemon taht will be runned on the server, for demonstration purposes merged with client//
class PredictionDaemon
{
ReaderWriterLockSlim current_prediction_lock = new ReaderWriterLockSlim(); //read write lock for reading current prediction
DataSystem ds; //data system reference - for demonstration purposes simplifed
TrafficPredictionModel1 tpm1; //traffic prediction object
private double[,] current_prediction;
private static PredictionDaemon m_instance;
private PredictionDaemon() { ds = new DataSystem(); tpm1 = new TrafficPredictionModel1(ds); }
private volatile int last_stamp_prediction_backing = -1;
private int last_stamp_prediction { get { return last_stamp_prediction_backing; } set { last_stamp_prediction_backing = value; } }


//=== public methods ====//
public Thread getDaemon()
{
return new Thread(new ThreadStart(run));
}

public static PredictionDaemon Instance
{
get
{
if (m_instance == null)
{
m_instance = new PredictionDaemon();
}
return m_instance;
}
}

//for demonstration purposes merged with client//
public void makePrediction()
{
current_prediction_lock.EnterWriteLock();
Logger.Instance.log(" ==== Making prediction for "+ ServerCore.Instance.current_time_stamp.ToString() +" ==== ");
current_prediction = this.tpm1.makePrediction(ds.get_current_X_TPM1(ServerCore.Instance.current_time_stamp));
current_prediction_lock.ExitWriteLock();
}

/// <summary>
/// Used by ServerCore , waits for the first prediction
/// </summary>
/// <returns> Matrix of prediction </returns>
public double[,] fetchPrediction()
{

while (this.last_stamp_prediction == -1) Thread.Sleep(2000); //wait for the first predicition
current_prediction_lock.EnterReadLock();
double [,] prediction_clone = (double[,])this.current_prediction.Clone();
current_prediction_lock.ExitReadLock();
return prediction_clone;
}

public void run()
{
Logger.Instance.log(" ==== Executed prediction service ==== ");
//=====Initialize prediction systems====//
try
{
tpm1.init(); //for demonstration purposes
}
catch (Exception e)
{
Logger.Instance.log_error("Failed to init prediction submodule: " + e.ToString());
throw (e);
}
Logger.Instance.log("==== Initiliazed predcition systems successfully =====");
tpm1.smrCars.isTrained = true;
tpm1.smrVelocity.isTrained = true;


Logger.Instance.log("===== Compiling inference engine =======");
try
{
tpm1.compileEngine(); //compile Infer.NET engine
}
catch (Exception e)
{
Logger.Instance.log_error("Failed to compile prediction infering algorithm: " + e.ToString());
throw (e);
}
Logger.Instance.log("===== Compiled infering engine successfully =======");


while (true)
{
// Check if the current time stamp has changed //

if (ServerCore.Instance.current_time_stamp != last_stamp_prediction)
{

makePrediction(); //make prediction every 1 min
if (last_stamp_prediction == -1)
{
Logger.Instance.log(" ==== First prediction and compilation was successful ==== ");
}
last_stamp_prediction = ServerCore.Instance.current_time_stamp;
}
Thread.Sleep(1000);
}
}


}
}
85 changes: 85 additions & 0 deletions server-predictor-tests/server-predictor-tests/ServerCore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace server_predictor_tests
{
class ServerConfiguration{
public int
simulation_start_minute { get; set; }
}

///<summary>
/// Class biding all submodules belonging to the server
///</summary>
class ServerCore
{
private Stopwatch stopWatch;
private static ServerCore m_instance;
private static readonly ReaderWriterLockSlim slimLock = new ReaderWriterLockSlim();
private double elapsed_minutes_start_backing;
/// <summary>
/// current time_stamp </summary>
public double elapsed_minutes_start {
get{ slimLock.EnterReadLock(); var x = elapsed_minutes_start_backing; slimLock.ExitReadLock(); return x;}
set{ slimLock.EnterWriteLock(); elapsed_minutes_start_backing=value; slimLock.ExitWriteLock();}
}
public int current_time_stamp { get { return (int)(elapsed_minutes_start + serverConfiguration.simulation_start_minute); } }
public double[,] current_prediction { get { return PredictionDaemon.Instance.fetchPrediction(); } }

/// <summary>
/// Wrapper for current_prediction (for server)
/// </summary>
/// <returns> Matrix of prediction (using ordering of links specified in matching file) </returns>
public double[,] fetchPrediction() { return this.current_prediction; }
/// <summary>
/// Wrapper current_time_stamp (for server)
/// </summary>
/// <returns> Int time stamp (minute) </returns>
public int fetchCurrentTimeStamp() { return this.current_time_stamp; }
private ServerCore()
{
this.elapsed_minutes_start = 0.0;
this.stopWatch = new Stopwatch();
this.stopWatch.Start();
}

public ServerConfiguration serverConfiguration { get; set; }

public static ServerCore Instance
{
get
{
if (m_instance == null)
{
m_instance = new ServerCore();
}
return m_instance;
}
}

private void runServerDaemons()
{
PredictionDaemon.Instance.getDaemon().Start();
}

private void run()
{
this.runServerDaemons();
while (true)
{
Thread.Sleep(1000);
this.elapsed_minutes_start = (this.stopWatch.Elapsed.Minutes + this.stopWatch.Elapsed.Seconds / 60.0); //update current time_stamp
}
}

public Thread getDaemon()
{
return new Thread(new ThreadStart(run));
}

}
}
Loading

0 comments on commit 07dc6c1

Please sign in to comment.