COMBINATORIAL_BLAS 1.6
 
Loading...
Searching...
No Matches
MMConverter.h
Go to the documentation of this file.
1#ifndef _MM_CONVERTER_
2#define _MM_CONVERTER_
3
4#include <iostream>
5#include <vector>
6#include <algorithm>
7#include <numeric>
8#include <iterator>
9#include <omp.h>
10#include <sys/stat.h>
11#include <string.h>
12#include <omp.h>
13#include "mmio.h"
14#include <zlib.h>
15#include "Tommy/tommyhashdyn.h"
16#include "compress_string.h"
17#include "TommyObj.h"
18using namespace std;
19
20#define BATCH 100000000 // 100MB
21#define MAXLINELENGTH 512
22#define VERTEX_HEAD ""
23#define MAXVERTNAME 128
24
25string chop_head(const string & full_str, const string & head_str)
26{
27 if (full_str.compare(0, head_str.length(), head_str) == 0) // begins_with
28 {
29 return full_str.substr(head_str.length(), string::npos);
30 }
31 else
32 {
33 cout << "String doesn't start with " << head_str << endl;
34 return full_str;
35 }
36}
37
38
39// typedef void tommy_foreach_arg_func(void* arg, void* obj);
40
41void* shuffledprintfunc(void* arg, void* obj)
42{
43 pair<vector<uint32_t>*, ofstream*> * mypair = (pair<vector<uint32_t>*, ofstream*> *) arg; // cast argument
44 vector<uint32_t> * shuffler = mypair->first;
45 ofstream * out = mypair->second;
46#ifdef COMPRESS_STRING
47 (*out) << (*shuffler)[((tommy_object *) obj)->vid] << "\t" << decompress_string(((tommy_object *) obj)->vname) << "\n";
48#else
49 (*out) << (*shuffler)[((tommy_object *) obj)->vid] << "\t" << ((tommy_object *) obj)->vname << "\n";
50#endif
51}
52
53
54template <typename IT1, typename NT1, typename IT2, typename NT2>
55void push_to_vectors(vector<IT1> & rows, vector<IT1> & cols, vector<NT1> & vals, IT2 ii, IT2 jj, NT2 vv)
56{
57 rows.push_back(ii);
58 cols.push_back(jj);
59 vals.push_back(vv);
60}
61
62template <typename IT1, typename NT1>
63void ProcessLines(vector<IT1> & rows, vector<IT1> & cols, vector<NT1> & vals, vector<string> & lines, tommy_hashdyn & hashdyn, const vector<uint32_t> & shuffler)
64{
65 char from[MAXVERTNAME];
66 char to[MAXVERTNAME];
67 double vv;
68 for (vector<string>::iterator itr=lines.begin(); itr != lines.end(); ++itr)
69 {
70 // string::c_str() -> Returns a pointer to an array that contains a null-terminated sequence of characters (i.e., a C-string)
71 sscanf(itr->c_str(), "%s %s %lg", from, to, &vv);
72 string s_from = chop_head(string(from), VERTEX_HEAD);
73 string s_to = chop_head(string(to), VERTEX_HEAD);
74
75 tommy_object * obj1 = (tommy_object *) tommy_hashdyn_search(&hashdyn, compare, &s_from, tommy_hash_u32(0, from, strlen(from)));
76 tommy_object * obj2 = (tommy_object *) tommy_hashdyn_search(&hashdyn, compare, &s_to, tommy_hash_u32(0, to, strlen(to)));
77
78
79 uint32_t fr_id = shuffler[obj1->vid];
80 uint32_t to_id = shuffler[obj2->vid];
81 push_to_vectors(rows, cols, vals, fr_id, to_id, vv);
82 }
83 vector<string>().swap(lines);
84}
85
86
87void check_newline(int *bytes_read, int bytes_requested, char *buf)
88{
89 if ((*bytes_read) < bytes_requested) {
90 // fewer bytes than expected, this means EOF
91 if (buf[(*bytes_read) - 1] != '\n') {
92 // doesn't terminate with a newline, add one to prevent infinite loop later
93 buf[(*bytes_read) - 1] = '\n';
94 cout << "Error in input format, appending missing newline at end of file" << endl;
95 (*bytes_read)++;
96 }
97 }
98}
99
100// updates to curpos are reflected in the caller function
101bool FetchBatch(FILE * f_local, long int & curpos, long int end_fpos, bool firstcall, vector<string> & lines)
102{
103 size_t bytes2fetch = BATCH; // we might read more than needed but no problem as we won't process them
104 bool begfile = (curpos == 0);
105 if(firstcall && (!begfile))
106 {
107 curpos -= 1; // first byte is to check whether we started at the beginning of a line
108 bytes2fetch += 1;
109 }
110 char * buf = new char[bytes2fetch]; // needs to happen **after** bytes2fetch is updated
111 char * originalbuf = buf; // so that we can delete it later because "buf" will move
112
113 int seekfail = fseek(f_local, curpos, SEEK_SET); // move the file pointer to the beginning of thread data
114 if(seekfail != 0)
115 cout << "fseek failed to move to " << curpos << endl;
116
117 int bytes_read = fread(buf, sizeof(char), bytes2fetch, f_local); // read byte by byte
118 if(!bytes_read)
119 {
120 delete [] originalbuf;
121 return true; // done
122 }
123 check_newline(&bytes_read, bytes2fetch, buf);
124 if(firstcall && (!begfile))
125 {
126 if(buf[0] == '\n') // we got super lucky and hit the line break
127 {
128 buf += 1;
129 bytes_read -= 1;
130 curpos += 1;
131 }
132 else // skip to the next line and let the preceeding thread take care of this partial line
133 {
134 char *c = (char*)memchr(buf, '\n', MAXLINELENGTH); // return a pointer to the matching byte or NULL if the character does not occur
135 if (c == NULL) {
136 cout << "Unexpected line without a break" << endl;
137 }
138 int n = c - buf + 1;
139 bytes_read -= n;
140 buf += n;
141 curpos += n;
142 }
143 }
144 while(bytes_read > 0 && curpos < end_fpos) // this will also finish the last line
145 {
146 char *c = (char*)memchr(buf, '\n', bytes_read); // return a pointer to the matching byte or NULL if the character does not occur
147 if (c == NULL) {
148 delete [] originalbuf;
149 return false; // if bytes_read stops in the middle of a line, that line will be re-read next time since curpos has not been moved forward yet
150 }
151 int n = c - buf + 1;
152
153 // string constructor from char * buffer: copies the first n characters from the array of characters pointed by s
154 lines.push_back(string(buf, n-1)); // no need to copy the newline character
155 bytes_read -= n; // reduce remaining bytes
156 buf += n; // move forward the buffer
157 curpos += n;
158 }
159 delete [] originalbuf;
160 if (curpos >= end_fpos) return true; // don't call it again, nothing left to read
161 else return false;
162}
163
164void MMConverter(const string & filename, ofstream & dictout, const string & outprefix)
165{
166 FILE *f;
167 if ((f = fopen(filename.c_str(), "r")) == NULL)
168 {
169 printf("file can not be found\n");
170 exit(1);
171 }
172
173 // Use fseek again to go backwards two bytes and check that byte with fgetc
174 struct stat st; // get file size
175 if (stat(filename.c_str(), &st) == -1)
176 {
177 exit(1);
178 }
179 int64_t file_size = st.st_size;
180 cout << "File is " << file_size << " bytes" << endl;
181 long int ffirst = ftell(f); // doesn't change
182 long int fpos = ffirst;
183 long int end_fpos = file_size;
184
185 vector<string> lines;
186 bool finished = FetchBatch(f, fpos, end_fpos, true, lines); // fpos will move
187 int64_t entriesread = lines.size();
188
189 tommy_hashdyn hashdyn;
190 tommy_hashdyn_init(&hashdyn);
191
192 char from[MAXVERTNAME];
193 char to[MAXVERTNAME];
194 double vv;
195 uint32_t vertexid = 0;
196 for (vector<string>::iterator itr=lines.begin(); itr != lines.end(); ++itr)
197 {
198 // string::c_str() -> Returns a pointer to an array that contains a null-terminated sequence of characters (i.e., a C-string)
199 sscanf(itr->c_str(), "%s %s %lg", from, to, &vv);
200 string s_from = chop_head(string(from), VERTEX_HEAD);
201 string s_to = chop_head(string(to), VERTEX_HEAD);
202
203 tommy_object* obj1 = (tommy_object*) tommy_hashdyn_search(&hashdyn, compare, &s_from, tommy_hash_u32(0, from, strlen(from)));
204 if(!obj1)
205 {
206 tommy_object * obj1 = new tommy_object(vertexid, s_from); // vertexid is the vid
207 tommy_hashdyn_insert(&hashdyn, &(obj1->node), obj1, tommy_hash_u32(0, from, strlen(from)));
208 ++vertexid; // new entry
209 }
210
211 tommy_object* obj2 = (tommy_object*) tommy_hashdyn_search(&hashdyn, compare, &s_to, tommy_hash_u32(0, to, strlen(to)));
212 if(!obj2)
213 {
214 tommy_object* obj2 = new tommy_object(vertexid, s_to); // vertexid is the vid
215 tommy_hashdyn_insert(&hashdyn, &(obj2->node), obj2, tommy_hash_u32(0, to, strlen(to)));
216 ++vertexid; // new entry
217 }
218 }
219 vector<string>().swap(lines);
220
221 while(!finished)
222 {
223 finished = FetchBatch(f, fpos, end_fpos, false, lines);
224 entriesread += lines.size();
225 cout << "entriesread: " << entriesread << ", current vertex id: " << vertexid << endl;
226
227 // Process files
228 char from[MAXVERTNAME];
229 char to[MAXVERTNAME];
230 double vv;
231 for (vector<string>::iterator itr=lines.begin(); itr != lines.end(); ++itr)
232 {
233 // string::c_str() -> Returns a pointer to an array that contains a null-terminated sequence of characters (i.e., a C-string)
234 sscanf(itr->c_str(), "%s %s %lg", from, to, &vv);
235
236 string s_from = chop_head(string(from), VERTEX_HEAD);
237 string s_to = chop_head(string(to), VERTEX_HEAD);
238
239 tommy_object* obj1 = (tommy_object*) tommy_hashdyn_search(&hashdyn, compare, &s_from, tommy_hash_u32(0, from, strlen(from)));
240 if(!obj1)
241 {
242 tommy_object* obj1 = new tommy_object(vertexid, s_from); // vertexid is the vid
243 tommy_hashdyn_insert(&hashdyn, &(obj1->node), obj1, tommy_hash_u32(0, from, strlen(from)));
244 ++vertexid; // new entry
245 }
246
247 tommy_object* obj2 = (tommy_object*) tommy_hashdyn_search(&hashdyn, compare, &s_to, tommy_hash_u32(0, to, strlen(to)));
248 if(!obj2)
249 {
250 tommy_object* obj2 = new tommy_object(vertexid, s_to); // vertexid is the vid
251 tommy_hashdyn_insert(&hashdyn, &(obj2->node), obj2, tommy_hash_u32(0, to, strlen(to)));
252 ++vertexid; // new entry
253 }
254 }
255 vector<string>().swap(lines);
256 }
257 cout << "There are " << vertexid << " vertices and " << entriesread << " edges" << endl;
258
259#ifdef SUBGRAPHS
260#define NSUBGRAPHS 6
261 uint32_t ranges[NSUBGRAPHS] = {vertexid, vertexid/2, vertexid/4, vertexid/8, vertexid/16, vertexid/32};
262 cout << "Printing submatrices with the following numbers of vertices: ";
263 copy(ranges, ranges+NSUBGRAPHS, ostream_iterator<uint32_t>(cout," ")); cout << endl;
264#endif
265
266 uint32_t nvertices = vertexid;
267 vector< uint32_t > shuffler(nvertices);
268 iota(shuffler.begin(), shuffler.end(), static_cast<uint32_t>(0));
269 random_shuffle ( shuffler.begin(), shuffler.end() );
270
271 pair< vector<uint32_t>*, ofstream*> mypair(&shuffler, &dictout);
273
274 cout << "Shuffled and wrote dictionary " << endl;
275 fclose(f);
276
277
278#pragma omp parallel
279 {
280 long int fpos, end_fpos; // override
281 int this_thread = omp_get_thread_num();
282 int num_threads = omp_get_num_threads();
283
284 if(this_thread == 0) fpos = ffirst;
285 else fpos = this_thread * file_size / num_threads;
286
287
288#ifdef SUBGRAPHS
289 string names[NSUBGRAPHS];
290 ofstream outfiles[NSUBGRAPHS];
291 for(int i= 0; i<NSUBGRAPHS; i++)
292 {
293 names[i] = "Renamed_subgraph";
294 names[i] += std::to_string(i);
295 names[i] += "_";
296 names[i] += outprefix;
297 names[i] += std::to_string(this_thread);
298 cout << names[i] << endl;
299 outfiles[i].open(names[i]);
300 }
301#else
302 string name;
303 ofstream outfile;
304 name = "Renamed_graph_";
305 name += outprefix;
306 name += std::to_string(this_thread);
307 cout << name<< endl;
308 outfile.open(name);
309#endif
310
311 if(this_thread != (num_threads-1)) end_fpos = (this_thread + 1) * file_size / num_threads;
312 else end_fpos = file_size;
313
314 FILE * f_perthread = fopen(filename.c_str(), "rb"); // reopen
315 vector<string> lines;
316 bool finished = FetchBatch(f_perthread, fpos, end_fpos, true, lines);
317 size_t nnz = lines.size();
318 vector<uint32_t> rows;
319 vector<uint32_t> cols;
320 vector<float> vals;
321 ProcessLines(rows, cols, vals, lines, hashdyn, shuffler);
322
323 if(this_thread == 0)
324 {
325 cout << "there are " << num_threads << " threads" << endl;
326#ifdef SUBGRAPHS
327 for(int i= 0; i<NSUBGRAPHS; i++)
328 {
329 outfiles[i] << "%%MatrixMarket matrix coordinate real symmetric\n";
330 outfiles[i] << ranges[i] << "\t" << ranges[i] << "\t" << entriesread << "\n";
331 }
332#else
333 outfile << "%%MatrixMarket matrix coordinate real symmetric\n";
334 outfile << nvertices << "\t" << nvertices << "\t" << entriesread << "\n";
335#endif
336 }
337 for(size_t k=0; k< nnz; ++k)
338 {
339#ifdef SUBGRAPHS
340 for(int i= 0; i<NSUBGRAPHS; i++)
341 {
342 if(rows[k] < ranges[i] && cols[k] < ranges[i])
343 outfiles[i] << rows[k] << "\t" << cols[k] << "\t" << vals[k] << "\n";
344 }
345#else
346 outfile << rows[k] << "\t" << cols[k] << "\t" << vals[k] << "\n";
347#endif
348
349 }
350 rows.clear();
351 cols.clear();
352 vals.clear();
353
354 while(!finished)
355 {
356 finished = FetchBatch(f_perthread, fpos, end_fpos, false, lines);
357 nnz = lines.size(); // without this in this exact place, it is buggy
358 ProcessLines(rows, cols, vals, lines, hashdyn, shuffler);
359
360 for(size_t k=0; k< nnz; ++k)
361 {
362#ifdef SUBGRAPHS
363 for(int i= 0; i<NSUBGRAPHS; i++)
364 {
365 if(rows[k] < ranges[i] && cols[k] < ranges[i])
366 outfiles[i] << rows[k] << "\t" << cols[k] << "\t" << vals[k] << "\n";
367 }
368#else
369 outfile << rows[k] << "\t" << cols[k] << "\t" << vals[k] << "\n";
370#endif
371 }
372 rows.clear();
373 cols.clear();
374 vals.clear();
375 }
376#ifdef SUBGRAPHS
377 for(int i= 0; i<NSUBGRAPHS; i++)
378 {
379 outfiles[i].close();
380 }
381#else
382 outfile.close();
383#endif
384
385 }
386
387
388 tommy_hashdyn_foreach(&hashdyn, operator delete);
389 tommy_hashdyn_done(&hashdyn);
390}
391
392#endif
393
#define MAXLINELENGTH
Definition MMConverter.h:21
void ProcessLines(vector< IT1 > &rows, vector< IT1 > &cols, vector< NT1 > &vals, vector< string > &lines, tommy_hashdyn &hashdyn, const vector< uint32_t > &shuffler)
Definition MMConverter.h:63
void check_newline(int *bytes_read, int bytes_requested, char *buf)
Definition MMConverter.h:87
void * shuffledprintfunc(void *arg, void *obj)
Definition MMConverter.h:41
#define MAXVERTNAME
Definition MMConverter.h:23
void MMConverter(const string &filename, ofstream &dictout, const string &outprefix)
bool FetchBatch(FILE *f_local, long int &curpos, long int end_fpos, bool firstcall, vector< string > &lines)
void push_to_vectors(vector< IT1 > &rows, vector< IT1 > &cols, vector< NT1 > &vals, IT2 ii, IT2 jj, NT2 vv)
Definition MMConverter.h:55
#define BATCH
Definition MMConverter.h:20
#define VERTEX_HEAD
Definition MMConverter.h:22
string chop_head(const string &full_str, const string &head_str)
Definition MMConverter.h:25
int compare(const void *arg, const void *obj)
Definition TommyObj.h:33
string decompress_string(const string &str)
tommy_uint32_t tommy_hash_u32(tommy_uint32_t init_val, const void *void_key, tommy_size_t key_len)
void tommy_hashdyn_foreach(tommy_hashdyn *hashdyn, tommy_foreach_func *func)
tommy_inline void * tommy_hashdyn_search(tommy_hashdyn *hashdyn, tommy_search_func *cmp, const void *cmp_arg, tommy_hash_t hash)
void tommy_hashdyn_init(tommy_hashdyn *hashdyn)
void tommy_hashdyn_done(tommy_hashdyn *hashdyn)
void tommy_hashdyn_insert(tommy_hashdyn *hashdyn, tommy_hashdyn_node *node, void *data, tommy_hash_t hash)
void tommy_hashdyn_foreach_arg(tommy_hashdyn *hashdyn, tommy_foreach_arg_func *func, void *arg)
void tommy_foreach_arg_func(void *arg, void *obj)
Definition tommytypes.h:291
void iota(_ForwardIter __first, _ForwardIter __last, T __value)
unsigned int uint32_t
Definition stdint.h:80
uint32_t vid
Definition TommyObj.h:10
tommy_node node
Definition TommyObj.h:9