-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.c
282 lines (265 loc) · 11 KB
/
main.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <pthread.h>
#include <limits.h>
#include "fingerprints.h"
#include "karp_rabin_hash.h"
#include "kgram.h"
#define PRIME 105943
#ifdef BGQ
double g_processor_frequency = 1600000000.0; // processing speed for BG/Q
#include<hwi/include/bqc/A2_inlines.h>
#else
double g_processor_frequency = 1.0;
#define GetTimeBase MPI_Wtime
#endif
//MPI Inits
int mpi_commsize, mpi_myrank;
//Arg inits
int window_size, k_gram_size;
char** file_names;
//Global vars
int chars_per_chunk;
int next_buffer_count; //This is the count for the carry-over from the next rank
double g_time_in_secs = 0;
unsigned long long g_start_cycles=0;
unsigned long long g_end_cycles=0;
//Thread stuff
int num_threads = 4;
typedef struct _chunk_data_t {
char* buffer;
} chunk_data_t;
typedef struct {
int thread_num;
char* text;
fingerprint_t* unwinnowed_fingerprints;
table_t* fingerprints_db;
fingerprint_t** winnowed_fingerprints;
char* file_name;
long start;
long finish;
} thread_args;
void* generate_fingerprint(void* arg);
void* winnow(void* arg);
int main(int argc, char** argv){
MPI_Init( &argc, &argv);
MPI_Comm_size( MPI_COMM_WORLD, &mpi_commsize);
MPI_Comm_rank( MPI_COMM_WORLD, &mpi_myrank);
// command line parsing
file_names = (char **) calloc(argc, sizeof(char*));
int num_files = 0;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--window") == 0 || strcmp(argv[i], "-w") == 0) {
window_size = atoi(argv[i+1]);
++i;
continue;
}
if (strcmp(argv[i], "--kgram") == 0 || strcmp(argv[i], "-k") == 0) {
k_gram_size = atoi(argv[i+1]);
++i;
continue;
}
if (strcmp(argv[i], "--threads") == 0 || strcmp(argv[i], "-t") == 0) {
num_threads = atoi(argv[i+1]);
++i;
continue;
}
file_names[num_files] = argv[i];
num_files++;
}
printf("k=%d, w=%d, threads=%d, num_files=%d\n", k_gram_size, window_size, num_threads, num_files);
long int filesize;
table_t* fingerprints_db; //Stores all winnowed fingerprints
fingerprints_create(&fingerprints_db, PRIME);
chunk_data_t* local_chunk = (chunk_data_t*) malloc(sizeof(struct _chunk_data_t)); //Allocate the local chunk of data
for(int n = 0; n < num_files; n++){
printf("File %d, %s\n", n, file_names[n]);
if(mpi_myrank == 0){
FILE* fp = fopen(file_names[n], "r");
fseek(fp, 0L, SEEK_END);
filesize = ftell(fp);
printf("Filesize of %s: %ld\n", file_names[n], filesize);
fclose(fp);
}
//Collective operation to get total filesize.
MPI_Bcast(&filesize, 1, MPI_LONG, 0, MPI_COMM_WORLD);
chars_per_chunk = filesize / mpi_commsize;
next_buffer_count = k_gram_size - 1;
local_chunk->buffer = (char*) malloc(sizeof(char) * chars_per_chunk + next_buffer_count);
MPI_File * fh = malloc(sizeof * fh);
//Opens file, right now it's only doing one file.
MPI_File_open(MPI_COMM_WORLD, file_names[n], MPI_MODE_RDONLY, MPI_INFO_NULL, fh);
//Read file at the correct location.
MPI_File_read_at(*fh, mpi_myrank * chars_per_chunk, local_chunk->buffer, chars_per_chunk, MPI_CHAR, MPI_STATUS_IGNORE);
MPI_File_close(fh);
free(fh);
MPI_Request send_req, recv_req;
//Sends the nessesary k-gram from next rank to previous
if(mpi_myrank != mpi_commsize - 1){
// all but last rank will receive from next rank
MPI_Irecv(&local_chunk->buffer[chars_per_chunk], next_buffer_count, MPI_CHAR, mpi_myrank + 1, 0, MPI_COMM_WORLD, &recv_req);
MPI_Wait(&recv_req, MPI_STATUS_IGNORE); }
if(mpi_myrank != 0){
// all but first rank will send to next rank
MPI_Isend(local_chunk->buffer, next_buffer_count, MPI_CHAR, mpi_myrank - 1, 0, MPI_COMM_WORLD, &send_req);
}
// allocate space for all candidate fingerprints
fingerprint_t* unwinnowed_fingerprints = (fingerprint_t*) calloc(chars_per_chunk, sizeof(struct _fingerprint_t));
/****************************************************
* Multithreading for kgram and hashes
* **************************************************/
//Generates fingerprints and hashes and store them in fingerprints
pthread_t threads[num_threads];
long chars_per_thread = chars_per_chunk / num_threads;
double start_time = GetTimeBase();
for(int i = 0; i < num_threads; i++){
//All the args to path to pthread are in this struct
thread_args* arg = (thread_args*) malloc(sizeof(thread_args));
arg->thread_num = i;
arg->text = local_chunk->buffer;
arg->unwinnowed_fingerprints = unwinnowed_fingerprints;
arg->start = i * chars_per_thread;
arg->finish = (i+1) * chars_per_thread;
arg->file_name = file_names[n];
pthread_create(&threads[i], NULL, generate_fingerprint, (void*) arg);
}
for (int j = 0; j < num_threads; j++) {
void * ret;
pthread_join(threads[j], &ret);
free(ret);
}
double end_time = GetTimeBase();
double kgram_hash_time = end_time - start_time;
if (mpi_myrank == 0) {
printf("kgram_hash_time: %lf\n", kgram_hash_time / g_processor_frequency);
}
/****************************************************
* Multithreading for winnowing
* **************************************************/
//This is double pointer because it stores the pointer to the fingerprints in
//unwinnowed_fingerprints
//TODO: Switch this to an array of pointers
start_time = GetTimeBase();
fingerprint_t** winnowed_fingerprints = (fingerprint_t**) calloc(chars_per_chunk, sizeof(fingerprint_t*));
for(int i = 0; i < num_threads; i++){
thread_args* arg = (thread_args*) malloc(sizeof(thread_args));
arg->unwinnowed_fingerprints = unwinnowed_fingerprints;
arg->winnowed_fingerprints = winnowed_fingerprints;
arg->fingerprints_db = fingerprints_db;
arg->start = i * chars_per_thread;
arg->finish = (i+1) * chars_per_thread;
arg->thread_num = i;
pthread_create(&threads[i], NULL, winnow, (void*) arg);
}
for (int j = 0; j < num_threads; j++) {
void * ret;
pthread_join(threads[j], &ret);
free(ret);
}
end_time = GetTimeBase();
double winnowing_time = end_time - start_time;
if (mpi_myrank == 0) {
printf("winnow_time: %lf\n", winnowing_time / g_processor_frequency);
}
/****************************************************
* adding fingerprint to DB and querying across ranks
* **************************************************/
// fingerprint_t query[mpi_commsize];
// for(int i = 0; i < chars_per_chunk - k_gram_size - 1; i++){
// if(winnowed_fingerprints[i] == NULL){
// continue;
// }
// // MPI_Bcast(&query[mpi_myrank], 1, MPI
// location_list_t* locations_found;
// int status = fingerprints_get(fingerprints_db, winnowed_fingerprints[i]->hash, &locations_found);
// if(status == 1){
// printf("Found hash %d at %d locations: \n", winnowed_fingerprints[i]->hash, locations_found->size);
// location_node_t* curr = locations_found->head;
// for(int i = 0; i < locations_found->size; i++){
// printf("Position: %ld, Src: %s\n", curr->location.pos, curr->location.source_file);
// curr = curr->next;
// }
// } else {
// fingerprints_add(fingerprints_db, winnowed_fingerprints[i]->hash, winnowed_fingerprints[i]->location);
// }
// }
free(local_chunk->buffer);
free(unwinnowed_fingerprints);
}
free(fingerprints_db->buckets);
free(fingerprints_db);
free(local_chunk);
free(file_names);
MPI_Barrier( MPI_COMM_WORLD );
MPI_Finalize();
}
void* winnow(void* arg){
//An array of potential fingerprints
fingerprint_t* fingerprints = ((thread_args*) arg)->unwinnowed_fingerprints;
int r = 0; // index to window
int min = 0; // index of minimum element in window
fingerprint_t** h = (fingerprint_t**) malloc(window_size * sizeof(fingerprint_t*)); // rolling window of hashes
// Initialize the window
for(int i = 0; i < window_size; i++){
fingerprint_t* f = (fingerprint_t*) malloc(sizeof(fingerprint_t));
f->hash = INT_MAX;
h[i] = f;
}
for(int i = ((thread_args*) arg)->start; i < ((thread_args*) arg)->finish; i++){
if(i >= chars_per_chunk - k_gram_size - 1){
//Protection against index out of bound
break;
}
if(fingerprints[i].hash == 0){
continue;
}
r = (r + 1) % window_size;
h[r] = &fingerprints[i];
int status = -1;
if(min == r){
// iteration complete; reset hashes
min = 0;
for(int j = 0; j != window_size; j++){
if(h[j]->hash < h[min]->hash){
min = j;
}
}
((thread_args*) arg)->winnowed_fingerprints[i] = h[min];
} else {
if(h[r]->hash <= h[min]->hash) {
min = r;
((thread_args*) arg)->winnowed_fingerprints[i] = h[min];
}
}
}
return arg;
}
void* generate_fingerprint(void* arg){
//right now the first k-gram and hash is being calculated seperately. Might refactor this
kgram_gen_t* kgram_gen;
create_kgram_generator(&kgram_gen, ((thread_args*) arg)->file_name, ((thread_args*) arg)->text, ((thread_args*) arg)->start, ((thread_args*) arg)->finish, k_gram_size);
kgram_t* first_kgram;
generate_next_kgram(kgram_gen, &first_kgram);
printf("rank %d first kgram: %s\n", mpi_myrank, first_kgram->kgram);
hash_gen_t* hash_gen;
create_hash_generator(&hash_gen, k_gram_size, PRIME, first_kgram->kgram);
char* text = ((thread_args*) arg)->text;
int thread_num = ((thread_args*) arg)->thread_num;
for(long i = ((thread_args*) arg)->start; i < ((thread_args*) arg)->finish; i++){
kgram_t* new_kgram;
int status = generate_next_kgram(kgram_gen, &new_kgram);
if(status == 0){
hash_t new_hash = generate_next_hash(hash_gen, new_kgram->kgram);
free(new_kgram);
fingerprint_t fingerprint;
fingerprint.hash = new_hash;
fingerprint.location = *new_kgram->location;
((thread_args*) arg)->unwinnowed_fingerprints[i] = fingerprint;
}
}
free(kgram_gen);
free(hash_gen);
return arg;
}