forked from jubenjum/compute_scores
-
Notifications
You must be signed in to change notification settings - Fork 0
/
compute_cov.py
executable file
·257 lines (202 loc) · 9.5 KB
/
compute_cov.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#!/usr/bin/env python
import sys
import os
import logging
import codecs
from bisect import bisect_left, bisect_right, bisect
from itertools import combinations, count
from collections import defaultdict
import argparse
import ipdb
import numpy as np
import pandas as pd
from utils import *
# load environmental varibles
try:
PHON_GOLD=os.environ['PHON_GOLD']
except:
print("PHON_GOLD not set")
sys.exit()
# if LOG environment doesnt exist then use the stderr
try:
LOG = os.environ['LOG_COV']
except:
LOG = 'test.log'
#LOG_LEV = logging.ERROR
LOG_LEV = logging.DEBUG
#LOG_LEV = logging.INFO
def get_logger(level=logging.WARNING):
FORMAT = '%(asctime)s - {} - %(levelname)s - %(message)s'.format(disc_class)
logging.basicConfig(stream=sys.stdout, format=FORMAT, level=LOG_LEV)
def cov_from_class(classes_file):
''' Compute the cov from the tde class file.
This scripts takes a dictionnary with 1 entry
per triplet (speaker, phone onset, phone offset),
and parses the class file. Each time a new phone
is discovered in the class file, the entry is removed
from the dictionnary.
'''
## reading the phoneme gold
phn_gold = PHON_GOLD
#gold, _ = read_gold_phn(phn_gold)
gold, gold_trs, _, _ = read_gold_intervals(phn_gold)
undisc_trs = gold_trs.copy()
tot_n_ph = len(gold_trs) # total number of discoverable phones
# get the vector ngram_mask filled with 1s at positions in the gold vector
# with repeted ngrams more that once in the gold.
#ngram_mask = find_mask_ngrams(gold, ngrams=3)
# TODO : this code assume that the class file is build correctly but if not???
logging.info("Parsing class file %s", classes_file)
# initializing things
#classes = list()
classes = set()
n_pairs = count()
n_overall = 0
n_phones = sum([len(gold[k]['start']) for k in gold.keys()])
count_phonemes = {k:np.zeros(len(gold[k]['start'])) for k in gold.keys()}
# file is decoded line by line and ned statistics are computed in
# a streaming to avoid using a high amount of memory
with codecs.open(classes_file, encoding='utf8') as cfile:
for lines in cfile:
line = lines.strip()
if len(line) == 0:
# empty line means that the class has ended and it is possilbe to compute cov
# compute the cov for the found intervals
#for elem1 in range(len(classes)):
for fname1, on1, off1 in classes:
#file_name = classes[elem1][0]
# search for intevals in the phoneme file
#try:
# b1_ = bisect_left(gold[file_name]['start'], classes[elem1][1])
# e1_ = bisect_right(gold[file_name]['end'], classes[elem1][2])
# b1_, e1_ = check_phn_boundaries(b1_, e1_, gold, classes, elem1)
#except KeyError:
# logging.error("%s not in gold", classes[elem1][0])
# continue
# First interval from pair
#fname1, on1, off1 = classes[elem1]
int1, _ = get_intervals(fname1, float(on1), float(off1), gold, gold_trs)
for on, off in int1:
try:
del(undisc_trs[(fname1, on, off)])
except:
# phone already discovered
pass
## Second interval from pair
#fname, on, off = classes[elem2]
#int2, _= get_intervals(fname, float(on), float(off), gold, trs)
#for on, off in int1:
# try:
# del(trs[(fname, on, off)])
# except:
# # phone already discovered
# pass
## cor intervals ... 1 phoneme length occasional gives swaped results
#if (e1_ <= b1_):
# b1_, e1_ = e1_, b1_
## including the whole phoneme ...
#b1_ = (b1_ - 1) if b1_ >= 1 else b1_
#e1_ = (e1_ + 1) if e1_ <= len(count_phonemes[file_name])-1 else e1_
# overall speakers = all the information
n_overall+=1
#count_phonemes[file_name][b1_:e1_] = 1
# it will show some work has been done ...
n_total = n_pairs.next()
if (n_total%1e4) == 0.0 and n_total > 0:
logging.debug("done %s intervals", n_total)
# clean the varibles
#classes = list()
classes = set()
# if is found the label Class do nothing
elif line[:5] == 'Class': # the class + number + ngram if available
pass
# getting the information of the pairs
else:
fname, start, end = line.split(' ')
#classes.append([fname, float(start), float(end)])
classes.add((fname, float(start), float(end)))
remain_ph = len(undisc_trs) # count remaining, not discovered, phones
discovered = tot_n_ph - remain_ph
cov_overall = float(discovered) / tot_n_ph
# logging the results
#count_overall = np.array([])
#total_count_overall = 0
#for file_name in count_phonemes.keys():
# count_overall = np.append(count_overall, count_phonemes[file_name])
# total_count_overall+=len(count_phonemes[file_name])
#cov_overall = np.sum(count_overall.astype('int') & ngram_mask.astype('int')) / ngram_mask.sum()
#ipdb.set_trace()
#cov_overall = np.sum(count_overall.astype('int')) / ngram_mask.sum()
#cov_overall = count_overall.sum() / n_phones
logging.info('overall: COV=%.3f intervals=%d', cov_overall, n_overall)
def find_ngrams(input_list, n=3):
'''return a list with n-grams from the input list'''
# http://locallyoptimal.com/blog/2013/01/20/elegant-n-gram-generation-in-python/
return zip(*[input_list[i:] for i in range(n)])
def find_mask_ngrams(gold, ngrams=3):
''' create a mask with the size of the gold for ngrams that are found more than once in the corpus'''
n_phones = sum([len(gold[k]['start']) for k in gold.keys()])
mask = np.zeros(n_phones)
all_grams = defaultdict(int)
seen_once = defaultdict(list)
high_index = 0
for k in gold.keys():
phns = gold[k]['phon']
# make a list with all the phon-ngrams and their indexes respect to all corpus
indexes = np.arange(high_index, high_index+len(phns))
index_ngrams = find_ngrams(indexes, ngrams) # list of indexes of the n-grams
phon_ngrams = find_ngrams(phns, ngrams) # list of n-grams
high_index+=len(phns)
for n_, n_gram in enumerate(phon_ngrams):
# convert the n-grams to a single hash/key
n_g = ' '.join([str(x) for x in n_gram])
all_grams[n_g]+=1 # track the number of times the n-grams has been seen
if all_grams[n_g] > 1: # if see more than once, then include in the mask
mask[index_ngrams[n_][0]:index_ngrams[n_][-1]] = 1
# also include the first n-grams
if n_g in seen_once:
seen_ngram = seen_once.pop(n_g)
mask[seen_ngram[0][0]:seen_ngram[0][-1]] = 1
else: # first time that the n-gram has been seen
seen_once[n_g].append(index_ngrams[n_])
#logging.debug('mask covers %d/%d', mask.sum(), len(mask))
#ipdb.set_trace()
return mask
def read_gold_class(class_gold, gold_phn):
'''read the class gold file that contains the gold tokens, return a mask with the
size of the gold phonemes with 1s in the places where phonemes are present in the class'''
# create the counting vector. Gold tokens could covers less found phonemes
n_phones = sum([len(gold_phn[k]['start']) for k in gold_phn.keys()])
mask = np.zeros(n_phones)
# decode class gold file and store intervals by speaker
tokens_by_spaker = defaultdict(list)
with codecs.open(class_gold, encoding='utf8') as cfile:
for lines in cfile:
line = lines.strip()
if line[:5] == 'Class' or len(line) == 0:
pass
else:
fname, start, end = line.split(' ')
tokens_by_spaker[fname].append([float(start), float(end)])
# find all found fragments in gold and mark them in mask
for speaker in tokens_by_spaker.keys():
# search for intevals in the phoneme file
for interval in tokens_by_spaker[speaker]:
b1_ = bisect_left(gold_phn[speaker]['start'], interval[0])
e1_ = bisect_right(gold_phn[speaker]['end'], interval[1])
mask[b1_:e1_] = 1
return mask
if __name__ == '__main__':
command_example = '''example:
compute_cov.py file.class
'''
parser = argparse.ArgumentParser(epilog=command_example)
parser.add_argument('fclass', metavar='CLASS_FILE', nargs=1, \
help='Class file in tde format')
args = parser.parse_args()
# TODO: check file
disc_class = args.fclass[0]
get_logger(level=LOG_LEV)
logging.info("Begining computing COV for %s", disc_class)
cov_from_class(disc_class)
logging.info('Finished computing COV for %s', disc_class)