-
Notifications
You must be signed in to change notification settings - Fork 4
/
timeseries_kNN.py
228 lines (189 loc) · 8.97 KB
/
timeseries_kNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
from heapq import nsmallest, nlargest
from sklearn.neighbors import NearestNeighbors
import numpy as np
import pandas as pd
from sys import stdout
from random import shuffle
from datetime import datetime as dt
from threading import Thread
from multiprocessing.pool import ThreadPool
from multiprocessing import Process, cpu_count, Pool, Queue
import json
import math
from itertools import chain
import concurrent
import traceback
import dtw
def get_time():
time = dt.now()
hour, minute, second = str(time.hour), str(time.minute), str(time.second)
if (len(minute) == 1):
minute = '0' + minute
if (len(hour) == 1):
hour = '0' + hour
if (len(second) ==1):
second = '0' + second
time = hour + minute + '.' + second
return time
class timeseries_kNN(object):
def __init__(self, data=None, labels=None, verbose=True):
self.k = None
self.data = None
self.train_labels = None
self.test_labels = None
self.train_df = None
self.test_df = None
self.classes_set = None
self.verbose = verbose
self.predictions = []
self.distance_metric = None
self.dtw_width = None
def fit(self, x, labels=None, dist_metric='euclidean', w = None, k_neigbors=1):
# check if test_set is pandas
if self.verbose: print('In fit: {}'.format(get_time()))
if not isinstance(x, pd.DataFrame) or not isinstance(labels, pd.DataFrame):
raise TypeError("Pandas Dataframe must be used for dataframes")
self.data = x
self.train_labels = labels
self.train_labels.astype(int)
labels_list = labels.transpose().values.tolist()[0]
self.classes_set = set(labels_list) # used for voting
self.distance_metric = dist_metric.lower()
self.dtw_width = w
self.k = k_neigbors
def predict(self, test_set, parallel=False, n_subprocesses=cpu_count()-1):
'''
Worker class that runs kNN
Can run in parallel or in series
Would like to update to use Dask in stead of Queue class
:type test_set: object
:param test_set:
:param parallel:
:return:
'''
self.test_df = test_set
verbose = self.verbose
# self.test_df.insert(loc=self.test_df.shape[1], value=np.nan, column='prediction')
# time series i is a tuple (index, uid, movieid)
# ISSUE - there is sometimes an issue with missing data when num_of_chunks is greater than cpus
# ISSUE - not always an issue but undetermined bug in the software
if parallel:
num_of_chunks = n_subprocesses
n_test_cases =self.test_df.shape[0]
if num_of_chunks> cpu_count():
print("There might be an error due to number of subprocesses")
chunks = self.work_chunks(self.test_df, num_of_chunks)
if verbose:
print("Classifying in parallel with {} subprocesses;\ttime: {}".format(num_of_chunks, get_time()))
print("The index slices are:\n{}".format(chunks))
print("The number of test cases are: {}".format(n_test_cases))
# creating pool of workers
p = Pool(num_of_chunks)
# runs kNN predictions in parallel
# results_zipped is a list of zipped objects representing each chunnk
# Each pool returns a tuple of df_indices and predictions, aka (df index_i, yhat_i)
# When i join the pools I have a tuple of tuples of tuples;
# aka tuple of pool results, index and prediction, ((pool_i), (pool_i+1), ...)
results_zipped = p.map(self.kNN, chunks)
p.close()
p.join()
# I convert those to a flat array of tuples ordered by their df_index
# job has been mapped now processing results
if verbose:
print('Converting tuples to list of predictions: {}'.format(get_time()))
# Converts to list from generator
tup_of_tups_list = list(results_zipped)
if verbose:
print('Flattening to list of tuples: {}'.format(get_time()))
# Flatten to a list of tuples
tup_list = list(chain.from_iterable(tup_of_tups_list))
if verbose:
print('Sorting tuples: number of tuples: {}, time: {}'.format(len(tup_list), get_time()))
# Sorts tuples to original sequence, aka by index ((index 1, yhat1), ...)
# May not need to sort, but as a precaution
sorted_tuples_yhat = sorted(tup_list, key=lambda tup: tup[0])
if verbose:
print('Extracting predictions: {}'.format(get_time()))
# Extracts predictions from tuples [yhat1, yhat2, ...]
prediction_list = [yhat for i, yhat in sorted_tuples_yhat]
# Making sure I have the right number of predictions
n_predictions = len(prediction_list)
assert n_predictions == n_test_cases, "Number of predictions: {}\tNumber of tests: {}\n{}".format(
n_predictions, n_test_cases, sorted_tuple_yhat)
return prediction_list
else:
# single slice of all indices
# Returns one ordered tuple of index and results
# More simple and no need to organize results
results = self.kNN(slice(0, self.test_df.shape[0]))
results_list = [yhat for i, yhat in results]
return results_list
# Having issues with some variations in number of chunks leads to missing data
# I havent had an issue when number of chunks is equal to or less than number of CPUs
def work_chunks(self, df, n_chunks):
num_rows = len(df)
steps = math.ceil(num_rows/n_chunks)
return [slice(n, n+steps) for n in range(0, num_rows, steps)] #should cover uneven steps
def kNN(self, a_chunk):
'''
Runs kNN on chunk of test set
:param a_chunk:
:return:
'''
if self.verbose: print('Running kNN')
predictions = []
prediction_id_list = []
df = self.test_df.iloc[a_chunk] # partition of test_df
if self.verbose:
# print("On Chunk: {}".format(a_chunk))
print("The shape of df is {}".format(df.shape))
i = 0
#name is None because of pickling issue with parallel
for test_i in df.itertuples(name=None):
# if self.verbose:
# print(test_i)
try:
# pandas.Series of distances between xi and yi
# self.dist(row from data based on axis, index of yi, apply along rows)
# (xi, yi_index, rows or columns)
distances = self.data.apply(lambda row: self.dist(row, test_i[0]), axis=1) # to each row, can be quicker
# using euclidean distance, returns list of tuples
neighbors = nsmallest(self.k, distances.iteritems(), key=lambda x: x[1])
assert len(neighbors)>0, 'No nearest neightbors were found'
yhat = self.predict_yhat(neighbors)
if self.verbose and test_i[0] % 20 == 0:
print("chunk slice: {};\ttime is:{};\tindex: {};\ti: {};\tnumber of records: {};\t{}% complete; "
"\t yhat: {};".
format(a_chunk, get_time(), test_i[0], i, len(df), round(i / len(df), 2) * 100, yhat))
predictions.append(yhat)
prediction_id_list.append(test_i[0])
i += 1
except Exception:
print('Exception:\n{}'.format(traceback.format_exc()))
print("chunk slice:{};\tindex:{};\ti:{};\tnumber of records:{}".format(a_chunk, test_i[0], i, len(df)))
return zip(prediction_id_list, predictions)
def predict_yhat(self, neighbors):
# assumes no 0 class but shouldnt matter
# assuming no skipped classes e.g. 4,6
weighted_votes = np.zeros(len(self.classes_set)+1) # no 0 class
for neighbor in neighbors:
index = neighbor[0]
# getting an error here single positional index is out of bounds
# means must be an issue with neighbor or labels dataframe
nbor_class = int(self.train_labels.iloc[index].values[0])
vote = 1/neighbor[1] # smaller the distance the larger the vote
weighted_votes[nbor_class] += vote
voted_class = weighted_votes.argmax() #returns index(class)
return voted_class
def dist(self, x, y_index):
y = self.test_df.loc[y_index,:] # row xi
if self.distance_metric == 'euclidean':
dist = np.sqrt( np.sum((x-y)*(x-y)) ) # sqrt( sum( (xi-yi)^2) )
elif self.distance_metric == 'dtw':
dist, path, iterations = dtw.dtw_dist(self.verbose, x, y, self.dtw_width)
# print(path)
# print(iterations)
else:
print(self.distance_metric)
raise ValueError('Choose a distance metric of either euclidean or dtw')
return dist