-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcornell_data_utils.py
226 lines (152 loc) · 5.13 KB
/
cornell_data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
import re
import numpy as np
import time
import config
from collections import Counter
def get_conversations():
'''
Function made ONLY for Cornell dataset to extract conversations from the raw file.
'''
conversations = []
with open('raw_cornell_data/movie_conversations.txt', 'r') as f:
for line in f.readlines():
conversation = line.split(' +++$+++ ')[-1]
conversation = conversation.replace("'", "")
conversation = conversation[1:-2]
conversation = conversation.split(", ")
conversations.append(conversation)
return conversations
def get_movie_lines():
'''
The helper function used to extract movie_lines from the Cornell dataset
'''
sentences = {}
with open('raw_cornell_data/movie_lines.txt', 'r') as f:
for line in f.readlines():
sentences[line.split(' +++$+++ ')[0]] = line.split(' +++$+++ ')[-1].replace('\n', "")
return sentences
def questions_vs_answers(convs, lines):
'''
Save to the file questions and answers extracted from the raw files. VERSION 1
'''
for i in range(len(convs)):
conversation = convs[i]
if len(conversation) % 2 == 0:
for line in range(len(conversation)):
if line % 2 == 0:
with open('movie_questions.txt', 'a') as f:
f.write(lines[conversation[line]] + "\n")
else:
with open('movie_answers.txt', 'a') as f:
f.write(lines[conversation[line]] + "\n")
def questions_vs_answers_v2(convs, lines):
'''
Save to the file questions and answers extracted from the raw files. VERSION 2
'''
for i in range(len(convs)):
conversation = convs[i]
for line in range(len(conversation) - 1):
with open('movie_questions_2.txt', 'a') as f:
f.write(lines[conversation[line]] + "\n")
with open('movie_answers_2.txt', 'a') as f:
f.write(lines[conversation[line + 1]] + "\n")
def cornell_tokenizer(text):
'''
Basic, starting tokenizer used for sentence preprocessing.
'''
text = re.sub(r"\'m", " am", text)
text = re.sub(r"\'s", " is", text)
text = re.sub(r"\'re", " are", text)
text = re.sub(r"\'ll", " will", text)
text = re.sub(r"\'d", " would", text)
text = re.sub(r"won't", "will not", text)
text = re.sub(r"can't", "cannot", text)
text = re.sub(r"\.", " . ", text)
text = re.sub(r"\?", " ? ", text)
text = re.sub(r"!", " ! ", text)
text = re.sub(r"/", " / ", text)
text = re.sub(r",", " , ", text)
text = re.sub(r'"', ' " ', text)
text = re.sub(r"-", " - ", text)
text = re.sub(r"[-<>{}+=|?'()\:@]", "", text)
return text.replace('\n', '')
def clean_data():
'''
Raw data clearner.
'''
cleaned_questions = []
cleaned_answers = []
with open('movie_questions_2.txt', 'r') as f:
lines = f.readlines()
for line in lines:
cleaned_questions.append(cornell_tokenizer(line))
with open('movie_answers_2.txt', 'r') as f:
lines = f.readlines()
for line in lines:
cleaned_answers.append(cornell_tokenizer(line))
return cleaned_questions, cleaned_answers
def create_vocab(questions, answers):
'''
This function is used to create vocabulary, word_to_id and id_to_word dicts from cleaned data (got from the last question).
'''
assert len(questions) == len(answers)
vocab = []
for i in range(len(questions)):
words = questions[i].split()
for word in words:
vocab.append(word)
words = answers[i].split()
for word in words:
vocab.append(word)
vocab = Counter(vocab)
new_vocab = []
for key in vocab.keys():
if vocab[key] >= config.VOCAB_THRESHOLD:
new_vocab.append(key)
new_vocab = ['<PAD>', '<GO>', '<UNK>', '<EOS>'] + new_vocab
word_to_id = {word:i for i, word in enumerate(new_vocab)}
id_to_word = {i:word for i, word in enumerate(new_vocab)}
return new_vocab, word_to_id, id_to_word
def encoder(data, word_to_id, targets=False):
'''
Using word_to_id dictionery to map each word in the sample to it's own int representation
'''
encoded_data = []
for i in range(len(data)):
encoded_line = []
words = data[i].split()
for word in words:
if word not in word_to_id.keys():
encoded_line.append(word_to_id['<UNK>'])
else:
encoded_line.append(word_to_id[word])
if targets:
encoded_line.append(word_to_id['<EOS>'])
encoded_data.append(encoded_line)
return np.array(encoded_data)
def pad_data(data, word_to_id, max_len, target=False):
'''
If the sentence is shorter then wanted length, pad it to that length
'''
if target:
return data + [word_to_id['<PAD>']] * (max_len - len(data))
else:
return [word_to_id['<PAD>']] * (max_len - len(data)) + data
def bucket_data(questions, answers, word_to_id):
'''
If you prefere bucketing version of the padding, use this function to create buckets of your data.
'''
assert len(questions) == len(answers)
bucketed_data = []
already_added = []
for bucket in config.BUCKETS:
data_for_bucket = []
encoder_max = bucket[0]
decoder_max = bucket[1]
for i in range(len(questions)):
if len(questions[i]) <= encoder_max and len(answers[i]) <= decoder_max:
if i not in already_added:
data_for_bucket.append((pad_data(questions[i], word_to_id, encoder_max), pad_data(answers[i], word_to_id, decoder_max, True)))
already_added.append(i)
bucketed_data.append(data_for_bucket)
return bucketed_data