How to use Dynamic Time warping with kNN in python
You can use a custom metric for KNN. Therefore you only need to implement DTW yourself (or use/adapt any existing DTW implementation in python) [gist of this code].
import numpy as np
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
#toy dataset
X = np.random.random((100,10))
y = np.random.randint(0,2, (100))
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
#custom metric
def DTW(a, b):
an = a.size
bn = b.size
pointwise_distance = distance.cdist(a.reshape(-1,1),b.reshape(-1,1))
cumdist = np.matrix(np.ones((an+1,bn+1)) * np.inf)
cumdist[0,0] = 0
for ai in range(an):
for bi in range(bn):
minimum_cost = np.min([cumdist[ai, bi+1],
cumdist[ai+1, bi],
cumdist[ai, bi]])
cumdist[ai+1, bi+1] = pointwise_distance[ai,bi] + minimum_cost
return cumdist[an, bn]
#train
parameters = {'n_neighbors':[2, 4, 8]}
clf = GridSearchCV(KNeighborsClassifier(metric=DTW), parameters, cv=3, verbose=1)
clf.fit(X_train, y_train)
#evaluate
y_pred = clf.predict(X_test)
print(classification_report(y_test, y_pred))
Which yields
Fitting 3 folds for each of 3 candidates, totalling 9 fits
[Parallel(n_jobs=1)]: Done 9 out of 9 | elapsed: 29.0s finished
precision recall f1-score support
0 0.57 0.89 0.70 18
1 0.60 0.20 0.30 15
avg / total 0.58 0.58 0.52 33
Use dtaidistance. This is the simplified pipeline of what I'm using in order to find the best fit for all windows with lengths between 1 and 20:
from dtaidistance import dtw
from sklearn.metrics import f1_score
def knn(trainX,trainY,testX,testY,w):
predictions = np.zeros(len(testX))
for testSampleIndex,testSample in enumerate(testX):
minimumDistance = float('inf')
for trainingSampleIndex, trainingSample in enumerate(trainX):
distanceBetweenTestAndTrainingAScan = dtw.distance(testSample,trainingSample,use_c=True,window=w,max_dist=minimumDistance)
if (distanceBetweenTestAndTrainingAScan < minimumDistance):
minimumDistance = distanceBetweenTestAndTrainingAScan
predictions[testSampleIndex] = trainY[trainingSampleIndex]
return [testY,predictions]
def DTWForCurrentDataSet(testX,testY,trainX,trainY,testDataSetID):
testDataSetBestF1Score = -float("inf")
testDataSetBestPredictions = []
for w in range(1,21):
[testY,predictions] = knn(trainX,trainY,testX,testY,w)
microF1Score = f1_score(testY, predictions, average='micro')
if (microF1Score > testDataSetBestF1Score):
testDataSetBestF1Score = microF1Score
testDataSetBestPredictions = predictions
return testDataSetBestPredictions
def runDTW(database):
for testDataSetID in database:
[testX,testY,trainX,trainY,patientIDsForTraining] = createTestingAndTrainingSets(database,testDataSetID)
testDataSetBestPredictions = DTWForCurrentDataSet(testX,testY,trainX,trainY,testDataSetID)
The tslearn library has DTW metric and can be used with sklearn.
from tslearn.metrics import dtw
clf = KNeighborsClassifier(n_neighbors=10, metric=dtw)