-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPeriEventTraceFuncLib.py
More file actions
997 lines (780 loc) · 44.1 KB
/
PeriEventTraceFuncLib.py
File metadata and controls
997 lines (780 loc) · 44.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed May 29 22:07:25 2019
@author: thugwithyoyo
"""
import numpy as np
import CalciumImagingBehaviorAnalysis as CIBA
from collections import defaultdict
import CalciumImagingFluorProcessing as CIFP
import PartialLeastSquares as PLS
import hwfunclib as HFL
import os
import shelve
from tkinter import *
from tkinter import filedialog
import matplotlib.pyplot as plt
#filename, file_extension = os.path.splitext('/path/to/somefile.ext')
# Generate dictionary of reference events
def BehavDictGen(PathToBehavFile):
# Routine to take Plexon-generated behavioral data (in the form of event
# timestamps) and parse them into a python dictionary usable by processing/
# decoding routines in this library.
# Determine particular subroutine to parse behavioral events.
_, file_extension = os.path.splitext(PathToBehavFile)
# Examine file extension to determmine the corresponding parser to call.
# Subroutines are contained in the script library: CalciumImagingBehaviorAnalysis.py
if (file_extension == '.csv'):
BehavDict = CIBA.CinePlexCSV_parser(PathToBehavFile)
elif (file_extension == '.json'):
BehavDict = CIBA.CinePlexFromMatJSON_parser(PathToBehavFile)
else:
# Report an error if the given file is not either as .csv or .json
print('Behavioral events: unrecognized file format.')
return BehavDict
# Generate dataframe of calcium events
def CellFluorTraces_FrameGen(PathToFluorFile):
# Routine to take Calcium fluorescence data, written in the form of a json
# file, and parse it into a pandas dataframe. The dataframe format is required
# for processing/decoding routines in this libarary to access calcium trace
# data.
# Determine particular subroutine to parse calcium traces.
_, file_extension = os.path.splitext(PathToFluorFile)
# Examine file extension to determine the corresponding parser to call.
# Subroutines are contained in the script library: CalciumImagingFluorProcessing.py
if (file_extension == '.csv'):
CellFluorTraces_Frame = CIFP.IDPS_CellFluorTracesParser(PathToFluorFile)
elif (file_extension == '.json'):
CellFluorTraces_Frame = CIFP.FrameFromJSON(PathToFluorFile)
else:
# Report an error if the given file is not either as .csv or .json
print('Calcium fluorescence traces: unrecognized file format.')
return CellFluorTraces_Frame
def RemoveRepeatTargetEntries(BehavDict, RefEventsList, RelativeTolWindow):
# This routine removes timestamps that follow earlier occurances within
# a specified duration defined by the argument RelativeTolWindow. Timestamps
# are listed in BehavDict and the particular event timestamp lists to filter
# are listed in RefEventsList.
# Peripheral target entry events
#RefEventsList = ['M6T0_Entry_ts', 'M6T1_Entry_ts']
# Count the number of lists to process.
#BehavDict = BehavDictGen(PathToBehavFile)
nEventListsToFilter = len(RefEventsList)
# Initialize dict to contain filter lists
#RapidRepeatDict = np.empty((nEventListsToFilter,), dtype=dict)
EventFilters = defaultdict(dict)
# Iterate over the collection of event types. Call the routine EventComparator
# to detect "rapid-repeat" timestamps and then RemoveEventsInTolWindow...
# to remove them. Write resulting filtered lists to EventFilters dict
# initialized above.
for i in np.arange(0, nEventListsToFilter):
#ComparatorDict = CIBA.EventComparator(tlist1, tlist2, tol_window)
RapidRepeatDict = CIBA.EventComparator(
BehavDict[RefEventsList[i]].values,
BehavDict[RefEventsList[i]].values,
RelativeTolWindow)
EventFilters[RefEventsList[i]] = CIBA.RemoveEventsInTolWindowFiltGen(
RapidRepeatDict)
return EventFilters
def PeriEventExtractor_Trace(BehavDict, CellFluorTraces_Frame, RefEventsDict, BoundaryWindow):
# This routine takes information from behavioral and imaging sources
# and parses the into an input dict for the Partial Least Squares decoding
# algorithm PLS_DecoderPerformance and other routines: PLS_MonteCarlo and
# PLS_Shuffle (below). Events listed in RefEventsDict contain the names
# of behavioral events to be decoded and BoundaryWindow specifies the peri-
# event domain around events that trace activity is to be extracted.
# Count the number of events to be decoded.
EventIndices = np.arange(0, len(RefEventsDict['RefEventsList']))
# # Determine particular subroutine to parse behavioral events.
# _, file_extension = os.path.splitext(PathToBehavFile)
#
# if (file_extension == '.csv'):
#
# BehavDict = CIBA.CinePlexCSV_parser(PathToBehavFile)
#
# elif (file_extension == '.json'):
#
# BehavDict = CIBA.CinePlexFromMatJSON_parser(PathToBehavFile)
#
# else:
#
# print('Behavioral events: unrecognized file format.')
#
# # Determine particular subroutine to parse calcium traces.
# _, file_extension = os.path.splitext(PathToFluorFile)
#
# if (file_extension == '.csv'):
#
# CellFluorTraces_Frame = CIFP.IDPS_CellFluorTracesParser(PathToFluorFile)
#
# elif (file_extension == '.json'):
#
# CellFluorTraces_Frame = CIFP.FrameFromJSON(PathToFluorFile)
#
# else:
#
# print('Calcium fluorescence traces: unrecognized file format.')
# Initialize dict for storing peri-event activity; one key per reference event.
PeriEventActivityArraysDict = defaultdict()
# Initialize input argument arrays for Partial Least Squares
PEA_Array = np.array([[]]).transpose()
TargetsVec = np.array([[]], dtype=int).transpose()
# Generate a dictionary of lists of indices for referencing trials for each
# event.
TrialIndicesByEventDict = defaultdict()
nTrialsPerEvent = np.empty((EventIndices.size,), dtype=int)
# Populate PLS input arrays by stacking vertically sets of peri-event activity
# snippets for each event type.
for i in EventIndices:
# Extract peri-event snippets about each event of the current event type.
PeriEventActivityArraysDict[RefEventsDict['RefEventsList'][i]] = CIFP.PeriEventTraceExtractor(
CellFluorTraces_Frame, BehavDict[RefEventsDict['RefEventsList'][i]].values,
BoundaryWindow)
# Determine the number of events in the current set.
(nTrialsPerEvent[i], nSamplesPerTrace) = \
PeriEventActivityArraysDict[RefEventsDict['RefEventsList'][i]].shape
# If this iteration is the first, then initialize empty arrays of appropriate
# size for stacking.
if i == 0:
PEA_Array = np.empty((0, nSamplesPerTrace))
TargetsVec = np.empty((0, 1))
# Aquire number of total trials to generate index list for current set.
(nTotalTrials, nSamplesPerTrace) = PEA_Array.shape
# Generate and store list of indices for current event set that indicate
# trace row locations in the composite activity array PEA_Array.
TrialIndicesByEventDict[RefEventsDict['RefEventsList'][i]] = np.arange(nTotalTrials,
nTotalTrials + nTrialsPerEvent[i])
# Append current peri-activity set to the composite array.
PEA_Array = np.vstack((PEA_Array,
PeriEventActivityArraysDict[RefEventsDict['RefEventsList'][i]]))
# Each event type will be assigned its own unique integer. Stack sets of
# event integer lables in the same manner as the PEA_Array above.
RefEventVec = RefEventsDict['AssignedEventVals'][i]*np.ones(
(nTrialsPerEvent[i], 1), dtype=int)
TargetsVec = np.vstack((TargetsVec, RefEventVec))
return {
'PEA_Array' : PEA_Array,
'TargetsVec' : np.asarray(TargetsVec, dtype=int),
'RefEventsDict' : RefEventsDict,
'TrialIndicesByEventDict' : TrialIndicesByEventDict
}
def PLS_DecoderPerformance(PeriEventExtractorDict, nLatents, **kwargs):
# Routine that uses leave-one-out cross validation to compile a confusion
# matrix of output from the the partial least squares decoding algorithm.
# The routine takes as input the peri event activities and corresponding
# target values within the dict PeriEventExtractorDict. nLatents contains
# the interger number of latent variables to form the Partial Least Squares
# decoding model. The optional variable, trials_to_include, must contain
# an array of integers each listing the corresponding index of a activity
# target pair to be included in the PLS decoding model. This option is
# used when this routine is called during the performance/mutual infor-
# mation bootstrap via the PLS_MonteCarlo routine below.
# Unpack dictionary contents
PEA_Array = PeriEventExtractorDict['PEA_Array']
TargetsVec = PeriEventExtractorDict['TargetsVec']
RefEventsDict = PeriEventExtractorDict['RefEventsDict']
TrialIndicesByEventDict = PeriEventExtractorDict['TrialIndicesByEventDict']
# Index number of included events
EventIndices = np.arange(0, len(RefEventsDict['RefEventsList']))
# Initialize confusion matrix from which mutual information will be determined.
ConfusionMatrix = np.zeros((len(RefEventsDict['RefEventsList']),
len(RefEventsDict['RefEventsList'])), dtype=int)
#### LEAVE ONE OUT DECODER PERFORMANCE EVALUATION
# Iterate through each of the input traces in PEA_Array. Select the current
# trace to be the test trace and use the rest to compute the PLS model decoder
# matrices (B, B_0). Using the test trace make a prediction and record the
# outcome in the confusion matrix.
(nTotalTrials, nTotalFeatures) = PEA_Array.shape
PEA_ArrayRowIndices = np.arange(0, nTotalTrials)
# If the optional trials_to_include argument is present then set the trial
# set list to its value, otherwise use all trials.
if np.isin('trials_to_include', np.array(list(kwargs.keys()))):
TrialSetIndices = kwargs['trials_to_include']
else:
TrialSetIndices = np.arange(0, nTotalTrials)
# Iterate through each single trial in the peri-event activity array. Find
# the event type to which the current trial belongs and compute the regression
# algorithm using the remaining (n - 1) set of trials.
for TestTrialIndex in TrialSetIndices:
# Find the event pool that contains the current trial
for Event in RefEventsDict['RefEventsList']:
if np.isin(TestTrialIndex, TrialIndicesByEventDict[Event]):
CurrentEvent = Event
# Compute the PLS output matrices for the (remaining) complement set of
# activity traces
# Acquire current test trace
TestActivity = np.array([PEA_Array[TestTrialIndex, :]])
# Generate filter that selects ALL OTHER traces from the complete set of traces
ComplementFilt = ~(PEA_ArrayRowIndices == TestTrialIndex)
# Run Partial Least Squares regression on the complement set of peri-event
# activity traces.
B, B_0 = PLS.PLS1(PEA_Array[ComplementFilt,:], TargetsVec[ComplementFilt,:],
nLatents)
# Using the resulting PLS model, predict the event during which the test
# trace was recorded.
ModelOutput = TestActivity @ B + B_0
#### Start of the "Discriminant Analysis" portion of the procedure.
# Initialize array to contain proximity values
EuclideanDiffVals = np.empty((EventIndices.size,), dtype=float)
# Iterate through events list calculating Euclidean distance on each
# pass. Write result to EuclideanDiffVals array.
for i in EventIndices:
EuclideanDiffVals[i] = HFL.EuclideanDistanceCalculator(np.array(
RefEventsDict['AssignedEventVals'][i]), ModelOutput)
#EuclideanDiffVals = np.array(RefEventsDict['AssignedEventVals']) - ModelOutput
# Obtain the index of the smallest distance. This is the target value
# predicted by the PLS model.
PredictedEventIndex = np.argmin(EuclideanDiffVals)
# PredictedEventIndex = int(np.round(TestActivity @ B + B_0)[0,0])
#
# if PredictedEventIndex > np.max(TargetsVec):
#
# PredictedEventIndex = np.max(TargetsVec)
#
# if (PredictedEventIndex < 0):
#
# PredictedEventIndex = 0
# Obtain the value of the actual target.
TrueEventIndex = EventIndices[RefEventsDict['RefEventsList'] == np.array(CurrentEvent)][0]
# Increment the cooresponding element of the confusion matrix (e.g. correct
# prediction (on diagonal), or incorrect prediction (off diagonal))
ConfusionMatrix[TrueEventIndex, PredictedEventIndex] += 1
######## End of leave-one-out cross validation. #########
# Compute the PLS regression matrices for the entire set of trials.
B, B_0 = PLS.PLS1(PEA_Array[TrialSetIndices, :],
TargetsVec[TrialSetIndices, :], nLatents)
# Run the PLS decoder on the complete dataset.
ModelOutput = PEA_Array[TrialSetIndices, :] @ B + B_0
# Initialize array to contain model predictions.
Predicted = np.empty((nTotalTrials,1), dtype=int)
# Iterate over trials
for j in np.arange(0, nTotalTrials):
# Iterate over event types.
for i in EventIndices:
# Calculate Euclidean distance
EuclideanDiffVals[i] = HFL.EuclideanDistanceCalculator(np.array(
RefEventsDict['AssignedEventVals'][i]), ModelOutput[j])
# Acquire index of closest event. This is the model's prediction.
PredictedEventIndex = np.argmin(EuclideanDiffVals)
# Record the prediction.
Predicted[j,:] = RefEventsDict['AssignedEventVals'][PredictedEventIndex]
return {
'confusion_matrix' : ConfusionMatrix,
'performance' : HFL.PerformanceCalculator(ConfusionMatrix),
'mutual_info' : HFL.MutualInformationFromConfusionMatrix(ConfusionMatrix),
'targets' : np.asarray(TargetsVec, dtype=int),
'predicted' : Predicted,
'B' : B,
'B_0' : B_0
}
#nIncorrects = np.sum(np.abs(np.squeeze(np.round(Predicted)) - TargetsVec.transpose()[0]))
##### PLS_MonteCarlo
def PLS_MonteCarlo(PeriEventExtractorDict, nLatents, nRepetitions, ConfLevel, ObservedPerfDict):
# This function performs a non-parametric bootstrap of the partial least
# squares decoding algorithm. Input data (calcium fluorescence traces) and
# decoding target values (target types) are contained in PeriEventExtractorDict--
# output dict of PeriEventExtractor_Trace routine above.
# Calculate alphas and boundary indices from arguments
alphas = np.empty((2,), dtype=int)
alphas[0] = np.ceil(nRepetitions*(1. - ConfLevel)/2.)
alphas[1] = np.floor(nRepetitions*(1. + ConfLevel)/2.)
# Unpack needed dictionary contents
PEA_Array = PeriEventExtractorDict['PEA_Array']
RefEventsDict = PeriEventExtractorDict['RefEventsDict']
# Initialize arrays for storing simulated output distributions
nRefEvents = len(RefEventsDict['RefEventsList'])
ConfusionMatrixSimDist = np.empty((nRefEvents, nRefEvents, nRepetitions),
dtype=int)
PerformanceSimDist = np.empty((nRepetitions,), dtype=float)
MutualInfoSimDist = np.empty((nRepetitions,), dtype=float)
# Generate set of indices from which to extract sets for each bootstrap
# iteration
(nTotalTrials, nTotalFeatures) = PEA_Array.shape
#MasterIndicesSet = np.arange(0, nTotalTrials)
# Repeat PLS decoding over the specified number of repetitions.
for i in np.arange(0, nRepetitions):
# Draw, with replacement, N random trials from the dataset (with N
# being equal to the total number of trials in the dataset)
InclusionSet = np.random.randint(0, high=nTotalTrials,
size=(nTotalTrials,))
# Invoking the optional keyword argument, trials_to_include, run the
# PLS decoder on the bootstrapped-sampled dataset.
PerformanceStruct = PLS_DecoderPerformance(PeriEventExtractorDict,
nLatents, trials_to_include=InclusionSet)
# Write current confusion matrix to the collection of bootstrapped
# confusion matrices.
ConfusionMatrixSimDist[:,:,i] = PerformanceStruct['confusion_matrix']
# Write the current Performance dict to the collection of bootstrapped
# performance dicts.
PerformanceSimDist[i] = PerformanceStruct['performance']
# Write the mutual information dict to the collection of bootstrapped
# mutual informations dicts.
MutualInfoSimDist[i] = PerformanceStruct['mutual_info']
### Compute statistics from bootstrapping. ###
# Compute median values of the collection of confusion matrices.
confusion_matrix_median = np.median(ConfusionMatrixSimDist, axis=-1)
# Compute ConfLevel confidence interval limits from collection of bootstrapped
# confidence matrices.
confusion_matrix_CLs = np.sort(2*ObservedPerfDict['confusion_matrix'] -
np.sort(ConfusionMatrixSimDist, axis=-1)[:,:,alphas],
axis=-1)
# Compute ConfLevel confidence interval limits from collection of bootstrapped
# of performance values.
performance_CLs = np.sort(2.*ObservedPerfDict['performance'] -
np.sort(PerformanceSimDist, axis=-1)[alphas],
axis=-1)
# Compute ConfLevel confidence interval limits from collection of bootstrapped
# of mutual information values.
mutual_info_CLs = np.sort(2.*ObservedPerfDict['mutual_info'] -
np.sort(MutualInfoSimDist, axis=-1)[alphas],
axis=-1)
# Compute the median of the collection of bootstrapped performance values.
performance_median = np.median(PerformanceSimDist, axis=-1)
# Compute the mean of the collection of bootstrapped performance values.
performance_mean = np.mean(PerformanceSimDist, axis=-1)
# Sort the list of bootstrapped performance values for bias visualization.
performance_SortedSimDist = np.sort(PerformanceSimDist, axis=-1)
# Obtain values of ConfLevel quantiles for bias visualization.
performance_quantiles = performance_SortedSimDist[alphas]
# Compute ConfLevel confidence interval limits about bootstrapped median
# performance_CLs = np.sort(2.*performance_median -
# performance_SortedSimDist[alphas], axis=-1)
# Compute standard error of performance as the standard deviation of the
# performance bootstrap distribution.
performance_SE = np.std(PerformanceSimDist, axis=-1, ddof=1)
# Compute the boundaries of the standard error bars about the performance
# median
# performance_SE_bounds = np.array([performance_median - performance_SE,
# performance_median + performance_SE])
# Compute the boundaries of the standared error bars about the observed performance.
performance_SE_bounds = np.array([ObservedPerfDict['performance'] - performance_SE,
ObservedPerfDict['performance'] + performance_SE])
# Compute the median value of the bootstrap distribution of mutual info.
mutual_info_median = np.median(MutualInfoSimDist, axis=-1)
# Compute the standard error of the mutual information as the standard deviation
# of the bootstrap distribution.
mutual_info_SE = np.std(MutualInfoSimDist, axis=-1, ddof=1)
# Compute the boundaries of standard error bars about the observed mutual information
# values.
mutual_info_SE_bounds = np.array([ObservedPerfDict['mutual_info'] - mutual_info_SE,
ObservedPerfDict['mutual_info'] + mutual_info_SE])
# Return bootstrap statistics in dictionary form
return {
'confusion_matrix_median' : confusion_matrix_median,
'confusion_matrix_CLs' : confusion_matrix_CLs,
'performance_median' : performance_median,
'performance_mean' : performance_mean,
'performance_quanitles' : performance_quantiles,
'performance_CLs' : performance_CLs,
'performance_SE' : performance_SE,
'performance_SortedSimDist' : performance_SortedSimDist,
'performance_SE_bounds' : performance_SE_bounds,
'mutual_info_median' : mutual_info_median,
'mutual_info_CLs' : mutual_info_CLs,
'mutual_info_SE' : mutual_info_SE,
'mutual_info_SE_bounds' : mutual_info_SE_bounds
}
##### SHUFFLE CONTROL
# To test validity of decoding, shuffle event lables prior to running decoder
def PLS_Shuffle(PeriEventExtractorDict, nLatents, nRepetitions, ConfLevel):
# This function is out-dated as it basically repeats the Monte Carlo bootstrap
# process that is defined in PLS_MonteCarlo above. This early version
# of the routine is kept here for possible reference. Strongly suggest
# using PLS_Shuffle2 instead.
# Calculate alphas and boundary indices from arguments
alphas = np.empty((2,), dtype=int)
alphas[0] = np.ceil(nRepetitions*(1. - ConfLevel)/2.)
alphas[1] = np.floor(nRepetitions*(1. + ConfLevel)/2.)
# Unpack needed dictionary contents
PEA_Array = PeriEventExtractorDict['PEA_Array']
RefEventsDict = PeriEventExtractorDict['RefEventsDict']
# Initialize arrays for storing simulated output distributions
nRefEvents = len(RefEventsDict['RefEventsList'])
ConfusionMatrixSimDist = np.empty((nRefEvents, nRefEvents, nRepetitions),
dtype=int)
PerformanceSimDist = np.empty((nRepetitions,), dtype=float)
MutualInfoSimDist = np.empty((nRepetitions,), dtype=float)
# Generate set of indices from which to extract sets for each bootstrap
# iteration
(nTotalTrials, nTotalFeatures) = PEA_Array.shape
ShuffledIndices = np.arange(0, nTotalTrials)
for i in np.arange(0, nRepetitions):
np.random.shuffle(ShuffledIndices)
PeriEventExtractorDict['TargetsVec'] = \
PeriEventExtractorDict['TargetsVec'][ShuffledIndices]
PerformanceStruct = PLS_DecoderPerformance(PeriEventExtractorDict,
nLatents)
ConfusionMatrixSimDist[:,:,i] = PerformanceStruct['confusion_matrix']
PerformanceSimDist[i] = PerformanceStruct['performance']
MutualInfoSimDist[i] = PerformanceStruct['mutual_info']
# return {
# 'confusion_matrix_median' : np.median(ConfusionMatrixSimDist, axis=-1),
# 'confusion_matrix_CLs' : np.sort(ConfusionMatrixSimDist, axis=-1)[:,:,alphas],
# 'performance_median' : np.median(PerformanceSimDist, axis=-1),
# 'performance_CLs' : np.sort(PerformanceSimDist, axis=-1)[alphas],
# 'performance_SE' : np.std(PerformanceSimDist, axis=-1, ddof=1),
# 'mutual_info_median' : np.median(MutualInfoSimDist, axis=-1),
# 'mutual_info_CLs' : np.sort(MutualInfoSimDist, axis=-1)[alphas],
# 'mutual_info_SE' : np.std(MutualInfoSimDist, axis=-1, ddof=1)
# }
# Compute statistics from bootstrapping.
confusion_matrix_median = np.median(ConfusionMatrixSimDist, axis=-1)
confusion_matrix_CLs = np.sort(2.*confusion_matrix_median -
np.sort(ConfusionMatrixSimDist, axis=-1)[:,:,alphas], axis=-1)
performance_median = np.median(PerformanceSimDist, axis=-1)
performance_mean = np.mean(PerformanceSimDist, axis=-1)
performance_SortedSimDist = np.sort(PerformanceSimDist, axis=-1)
performance_quantiles = performance_SortedSimDist[alphas]
performance_CLs = np.sort(2.*performance_median -
performance_SortedSimDist[alphas], axis=-1)
performance_SE = np.std(PerformanceSimDist, axis=-1, ddof=1)
performance_SE_bounds = np.array([performance_median - performance_SE,
performance_median + performance_SE])
mutual_info_median = np.median(MutualInfoSimDist, axis=-1)
mutual_info_CLs = np.sort(2.*mutual_info_median -
np.sort(MutualInfoSimDist, axis=-1)[alphas], axis=-1)
mutual_info_SE = np.std(MutualInfoSimDist, axis=-1, ddof=1)
mutual_info_SE_bounds = np.array([mutual_info_median - mutual_info_SE,
mutual_info_median + mutual_info_SE])
return {
'confusion_matrix_median' : confusion_matrix_median,
'confusion_matrix_CLs' : confusion_matrix_CLs,
'performance_median' : performance_median,
'performance_mean' : performance_mean,
'performance_quanitles' : performance_quantiles,
'performance_CLs' : performance_CLs,
'performance_SE' : performance_SE,
'performance_SortedSimDist' : performance_SortedSimDist,
'performance_SE_bounds' : performance_SE_bounds,
'mutual_info_median' : mutual_info_median,
'mutual_info_CLs' : mutual_info_CLs,
'mutual_info_SE' : mutual_info_SE,
'mutual_info_SE_bounds' : mutual_info_SE_bounds
}
def PLS_Shuffle2(PeriEventExtractorDict, nLatents, nRepetitions, ConfLevel):
# This version of partial least squares
# Calculate alphas and boundary indices from arguments
alphas = np.empty((2,), dtype=int)
alphas[0] = np.ceil(nRepetitions*(1. - ConfLevel)/2.)
alphas[1] = np.floor(nRepetitions*(1. + ConfLevel)/2.)
# Unpack needed dictionary contents
PEA_Array = PeriEventExtractorDict['PEA_Array']
RefEventsDict = PeriEventExtractorDict['RefEventsDict']
# Initialize arrays for storing simulated output distributions
nRefEvents = len(RefEventsDict['RefEventsList'])
ConfusionMatrixSimDist = np.empty((nRefEvents, nRefEvents, nRepetitions),
dtype=int)
PerformanceSimDist = np.empty((nRepetitions,), dtype=float)
MutualInfoSimDist = np.empty((nRepetitions,), dtype=float)
# Generate set of indices from which to extract sets for each bootstrap
# iteration
(nTotalTrials, nTotalFeatures) = PEA_Array.shape
ShuffledIndices = np.arange(0, nTotalTrials)
# Shuffle order of referencing indices to destroy correspondence between
# peri-event fluorescence activity snippits and the particular events
# around which the snippets were extracted.
np.random.shuffle(ShuffledIndices)
PeriEventExtractorDict['TargetsVec'] = \
PeriEventExtractorDict['TargetsVec'][ShuffledIndices]
# Run PLS decoding on the shuffled dataset. In this context, the output
# performance we are calling "observed" for the MonteCarlo bootstrapping
# to follow.
ObservedPerfDict = PLS_DecoderPerformance(PeriEventExtractorDict,
nLatents)
# Perform a non-paramentric bootstrap on the shuffled dataset using the
# PLS_MonteCarlo routine above.
return PLS_MonteCarlo(PeriEventExtractorDict, nLatents, nRepetitions,
ConfLevel, ObservedPerfDict)
#TargetsVec = TargetsVec.transpose()[0]
#np.random.shuffle(TargetsVec)
#TargetsVec = np.array([TargetsVec]).transpose()
#### The following function does not work as intended!!! ####
# See: ShelveWorkspaceScript.py
#def ShelveWorkspace(SavePath):
#
# ScriptDir = os.getcwd()
#
# # Attempt to use GUI navigator dialog to guide save. Drop for batch processing.
## root = Tk()
## root.filename = filedialog.asksaveasfilename(
## initialdir = "/home/thugwithyoyo/Desktop",
## title = "Enter name of file to save",
## filetypes = (("out files","*.out"), ("all files","*.*")))
##
## if root.filename is None: # asksaveasfile return `None` if dialog closed with "cancel".
## return
##
## LoadFilePath = root.filename
## root.destroy()
#
# # Use path from function argument to change to save directory and write file
# # with name contained in Filename.
# (PathToFile, Filename) = os.path.split(SavePath)
#
# os.chdir(PathToFile)
#
# # Open a shelf object to contain workspace variables.
# my_shelf = shelve.open(Filename)
# #PathToSaveFile = root.filename
# #my_shelf = shelve.open(PathToSaveFile, 'n') # 'n' for new
#
# # Iterate through list of global variables and write to file.
# for key in dir():
#
# try:
#
# my_shelf[key] = globals()[key]
#
# except TypeError:
# #
# # __builtins__, my_shelf, and imported modules can not be shelved.
# #
# print('ERROR shelving: {0}'.format(key))
#
# #root.destroy()
#
# # Close shelf object after variables have been written to file
# my_shelf.close()
#
# # Return to directory that contains this script.
# os.chdir(ScriptDir)
#### The following function does not work as intended!!! ####
# See: RestoreShelvedWorkspaceScript.py
#def RestoreShelvedWorkspace(RestoreFilePath):
#
# ScriptDir = os.getcwd()
#
## root = Tk()
## root.filename = filedialog.askopenfilename(
## initialdir = "/home/thugwithyoyo/Desktop",
## title = "Select file to open",
## filetypes = (("dat files","*.dat"),("all files","*.*")))
##
## RestoreFilePath = root.filename
## root.destroy()
# (PathToFile, Filename) = os.path.split(RestoreFilePath)
#
# os.chdir(PathToFile)
#
# my_shelf = shelve.open(os.path.splitext(Filename)[0])
#
# for key in my_shelf:
#
# globals()[key]=my_shelf[key]
#
# my_shelf.close()
#
# os.chdir(ScriptDir)
def GenerateConfIntsPlot(ArrayOfConfIntsDicts, ArrayOfPerformanceDicts, PlotSpecDict, AxesHandle, Direction):
# Routine to generate plots of decoding performance or mutual information
# values over (event-relative) time. The routine takes as arguments
# output arrays from either SlidingWindowAnalysisFunc or
# VariableWindowAnalysisFunc routines. Generated plots contain a solid line
# to depict the observed results (performance or mutual information) and
# corresponding error-values (either as confidence intervals or standard
# errors) as an underlying filled band. The input argument PlotSpecDict
# contains directives about which items in the data arrays (ArrayofConfIntsDicts
# and ArrayOfPerformanceDicts) are to used to build the plot. AxesHandle contains
# a handle to the pre-exisiting axes object on which the the traces will be drawn.
# Count number of datapoints and confidence intervals to plot.
(nDicts,) = ArrayOfConfIntsDicts.shape
# Determine the spacing between successive datapoints.
StepSize = np.max(np.abs(ArrayOfConfIntsDicts[1]['PeriEventDomain'] -
ArrayOfConfIntsDicts[0]['PeriEventDomain']))
# Initialize data summary arrays to send to plotting routines.
X = np.empty(ArrayOfConfIntsDicts.shape, dtype=float)
Y_median = np.empty(ArrayOfConfIntsDicts.shape, dtype=float)
# CI_Bars = np.empty((nDicts,2), dtype=float)
Y_BarBounds = np.empty((nDicts,2), dtype=float)
Y_test = np.empty(ArrayOfConfIntsDicts.shape, dtype=float)
# Obtain the y-axis name. If "_median" substring is at the end of the
# measure name, remove it. Obtain the name of the trace for legend.
OrdinateName = PlotSpecDict['measure']
if OrdinateName.endswith('_median'):
OrdinateName = OrdinateName[:-7]
TraceType = 'Shuffled outcomes'
else:
TraceType = 'Observed outcomes'
# Generate plot of performance measure over event-relative time plot with error band.
# In this locations of datapoints along the time axis are defined and labled
# for time axis and plot title.
for i in np.arange(0, nDicts):
# Use this clause if domain is backwards-cumulative.
if (Direction == 'bw'):
X[i] = np.min(ArrayOfConfIntsDicts[i]['PeriEventDomain']) + 0.5*StepSize
TraceLabel = 'neg. cumulative span from '+ \
str(ArrayOfConfIntsDicts[i]['PeriEventDomain'][1]) +' sec.' \
+ '('+ TraceType + ')'
TitleLabel = ' dependence on span of activity window'
# Use this clause if domain is forwards-cummulative.
elif (Direction == 'fw'):
X[i] = np.max(ArrayOfConfIntsDicts[i]['PeriEventDomain']) - 0.5*StepSize
TraceLabel = 'pos. cumulative span from '+ \
str(ArrayOfConfIntsDicts[i]['PeriEventDomain'][0]) +' sec.' \
+ '('+ TraceType + ')'
TitleLabel = ' dependence on span of activity window'
# Use this clause if domain is a foward-sliding window.
elif (Direction == 'fw_sliding'):
X[i] = np.mean(np.array(ArrayOfConfIntsDicts[i]['PeriEventDomain']))
WindowWidth = np.diff(np.array(ArrayOfConfIntsDicts[0]['PeriEventDomain']))[0]
TraceLabel = TraceType
TitleLabel = ': window of width '+str(round(WindowWidth, 3))+' sec. slid over ' \
+str(round(StepSize, 3))+' sec. increments.'
# Y_median[i] = ArrayOfConfIntsDicts[i]['performance_median']
Y_median[i] = ArrayOfConfIntsDicts[i][PlotSpecDict['measure_median']]
# CI_Bars[i,:] = np.abs(ArrayOfConfIntsDicts[i]['performance_CLs'] -
# ArrayOfConfIntsDicts[i]['performance_median'])
# CI_Bars[i,:] = np.abs(ArrayOfConfIntsDicts[i][PlotSpecDict['measure_CLs']] -
# ArrayOfConfIntsDicts[i][PlotSpecDict['measure_median']])
# Plot Standard Error (standard deviation of sampling distribution of
# the statistic) rather than Confidence Intervals
# CI_Bars[i,:] = np.abs(ArrayOfConfIntsDicts[i][PlotSpecDict['measure_bars']] -
# ArrayOfConfIntsDicts[i][PlotSpecDict['measure_median']])
# CI_Bounds[i,:] = ArrayOfConfIntsDicts[i]['performance_CLs']
Y_BarBounds[i,:] = ArrayOfConfIntsDicts[i][PlotSpecDict['measure_bars']]
# Plot band centered around median of sampling distribution
# CI_Bounds[i,0] = (ArrayOfConfIntsDicts[i][PlotSpecDict['measure_median']] -
# ArrayOfConfIntsDicts[i][PlotSpecDict['measure_SE']])
# CI_Bounds[i,1] = (ArrayOfConfIntsDicts[i][PlotSpecDict['measure_median']] +
# ArrayOfConfIntsDicts[i][PlotSpecDict['measure_SE']])
# Plot band centered around observed performance values
# CI_Bounds[i,0] = (ArrayOfPerformanceDicts[i][PlotSpecDict['measure']] -
# ArrayOfConfIntsDicts[i][PlotSpecDict['measure_bars']])
#
# CI_Bounds[i,1] = (ArrayOfPerformanceDicts[i][PlotSpecDict['measure']] +
# ArrayOfConfIntsDicts[i][PlotSpecDict['measure_bars']])
# Y_test[i] = ArrayOfPerformanceDicts[i]['performance']
Y_test[i] = ArrayOfPerformanceDicts[i][PlotSpecDict['measure']]
#fig = plt.figure()
# Plot error band on specified axes object.
#AxesHandle.errorbar(X, Y_median, yerr=CI_Bars.transpose(), lolims=True, uplims=True, label=TraceLabel)
AxesHandle.fill_between(X, Y_BarBounds.transpose()[0,:], Y_BarBounds.transpose()[1,:],
label=TraceLabel, alpha=0.7, color=PlotSpecDict['color'])
# Plot observed performance measure on specified axes object.
AxesHandle.plot(X, Y_test, '.-', color=PlotSpecDict['color'])
# Add label to x axis.
AxesHandle.set_xlabel('time relative to target entry (sec.)')
# Add label to y axis.
AxesHandle.set_ylabel(OrdinateName)
# Title the axes object
AxesHandle.set_title(OrdinateName + TitleLabel)
# Add legend.
plt.legend(loc='lower right')
def CumulativeWindowGen(BoundaryWindow, StepWidth, Direction):
# This function generates a list of domains of increasing width over the
# entire, user-specified domain BoundaryWindow. Each cumulatively-growing
# window in the list is one StepWidth longer than the one that precedes it
# in the list. The Direction arguement specifies whether windows
# cummulatively grow from the floor (positive) or ceiling (negative) of
# the full domain.
# Generate a list of windows that grow positively from the floor of BoundaryWindow
# if Direction is positive.
if (Direction == 'positive'):
# Generate the ceiling of each domain in list.
ForwardArray = np.round(np.arange(BoundaryWindow[0] + StepWidth,
BoundaryWindow[1] + 0.5*StepWidth,
StepWidth)/StepWidth)*StepWidth
# Generate the common floor of each domain in list, combine with ceilings
# and transpose into an N by 2 array.
ArrayOfWindows = np.array([BoundaryWindow[0]*np.ones_like(ForwardArray),
ForwardArray]).transpose()
elif (Direction == 'negative'):
# Generate the floor of each domain in list.
BackwardArray = np.arange(BoundaryWindow[0], BoundaryWindow[1],
StepWidth)[::-1]
# Generate the common ceiling of each domain in list, combine with floors
# and transpose into an N by 2 array.
ArrayOfWindows = np.array([BackwardArray, BoundaryWindow[1]*np.ones_like(
BackwardArray)]).transpose()
return ArrayOfWindows
def SlidingWindowGen(BoundaryWindow, StepWidth, WindowWidth):
# This simple function generates a list of arrays of domain boundaries
# that are slid over the wide, user-specified domain: BoundaryWindow.
# Windows are WindowWidth wide, and slid in increments of StepWidth.
# Generate a list of domain lower bounds.
LowerBounds = np.arange(BoundaryWindow[0], BoundaryWindow[1] - WindowWidth + StepWidth,
StepWidth)
# Generate a list of domain upper bounds.
UpperBounds = np.arange(BoundaryWindow[0] + WindowWidth, BoundaryWindow[1] + StepWidth,
StepWidth)
# Combine corresponding lower and upper bounds into a 2-D array and transpose.
ArrayOfWindows = np.array([LowerBounds, UpperBounds]).transpose()
return ArrayOfWindows
def PeriEventExtractorDictJoiner(PE_dict1, PE_dict2):
# This function joins together two PeriEventExtractorDict dictionaries
# produced by PeriEventExtractor_Trace fucnction (above) and called by
# SlidingWindowAnalysisFunc.py. This functionality can be used
# generate a combined dataset on which to run subsequent decoding analyses.
(D1_NumTraces, D1_NumSamples) = PE_dict1['PEA_Array'].shape
(D2_NumTraces, D2_NumSamples) = PE_dict2['PEA_Array'].shape
# List of keys pointing to dictionary elements to join.
ListOfToJoinKeys = ['PEA_Array', 'TargetsVec', 'TrialIndicesByEventDict']
# Initialize the combined target dictionary
CombinedDict = PE_dict1
# Attempt to join peri-event activity arrays from the two dictionaries. To
# be successfully joined, the two arrays must have the same number of columns.
# For that reason, joining operations are placed in error-exception blocks here.
try:
# Join PEA_Array elements
CombinedDict['PEA_Array'] = np.vstack([PE_dict1['PEA_Array'], PE_dict2['PEA_Array']])
except ValueError:
print('Error: PEA_Array fields from the two dicts likely have different number of columns.')
try:
# Join TargetsVec elements
CombinedDict['TargetsVec'] = np.vstack([PE_dict1['TargetsVec'], PE_dict2['TargetsVec']])
except ValueError:
print('Error: TargetVec fields from the two dicts likely have different number of columns.')
# Join TrialIndicesByEvent sub-dicts. Iterate through entires in RefEventsList
# joining like entries from both dicts as well as adjusting corresponding
# referencing indices.
for RefEvent in CombinedDict['RefEventsDict']['RefEventsList']:
# Build index list by adding the number of elements in the first set
# to each index of the second set, and then concatenatethe adjusted
# second set onto the first.
IndicesToAppend = PE_dict2['TrialIndicesByEventDict'][RefEvent] + \
D1_NumTraces*np.ones_like(PE_dict2['TrialIndicesByEventDict'][RefEvent])
CombinedDict['TrialIndicesByEventDict'][RefEvent] = \
np.hstack([CombinedDict['TrialIndicesByEventDict'][RefEvent],
IndicesToAppend])
return CombinedDict
def UniquePairsGenerator(CellIDs):
# Make sure that the datatype of CellIDs list is a numpy array
CellIDs = np.array(CellIDs, dtype=object)
# Check that each entry in CellIDs is unique. If not, tell user and then
# remove duplicates.
if (len(CellIDs) != len(np.unique(CellIDs))):
print("Warning: input argument list contains duplicate entries.\nFiltering list before proceeding...")
CellIDs = np.unique(CellIDs)
# Count number of elements in list
(nIDs,) = CellIDs.shape
# Verify that
# Generate the sequential list of indices used for later iterations.
IndicesList = np.arange(0, nIDs)
# Initialize array to contain full list of pairs
IndexPairsList = np.empty((nIDs*nIDs, 2), dtype=int)
# Populate 2D array of all possible pairs. Note that we are working with
# list entry indices here rather than the entries themselves. This is
# because numpy's unique method can only sort an array-of-arrays if
# array entries are numerical--it will not do nested sorts if array entries
# are a non-numeric datatype. Secondly, using indexes rather than labels
# assumes that each entry in the CellIDs array is unique, which should be
# the case for cell identity labels. Recall uniqueness is checked above.
for i in IndicesList:
for j in IndicesList:
IndexPairsList[i*nIDs+j] = np.array([i, j])
### Ignoring order of entries in each pair, keep only unique pairs.
# First sort entries in order of their last (second) dimension.
IndexPairsList.sort(axis=-1)
# For processing speed, write PairsList into a C-based ordering that is
# contiguous in memory.
cIndexPairsList = np.ascontiguousarray(IndexPairsList)
# Select unique entries
UniqueIndexPairsList, inv, ct = np.unique(cIndexPairsList, return_inverse=True,
return_counts=True, axis=0)
# Filter out subarrays with same value
KeepFilt = (UniqueIndexPairsList[:,0] != UniqueIndexPairsList[:,1])
UniqueIndexPairsList = UniqueIndexPairsList[KeepFilt, :]
UniquePairsList = np.array([CellIDs[UniqueIndexPairsList[:, 0]],
CellIDs[UniqueIndexPairsList[:, 1]]]).transpose()
return UniqueIndexPairsList, UniquePairsList