00001 #ifndef TABLE_STREAM_READER_H
00002 #define TABLE_STREAM_READER_H
00003 #include "Misc.h"
00004 #include <libxml/parser.h>
00005 #include <libxml/tree.h>
00006 #include "ASDM.h"
00007 #include "Entity.h"
00008 #include "EndianStream.h"
00009 #include "ConversionException.h"
00010 #include <sstream>
00011 #include <sys/types.h>
00012 #include <sys/stat.h>
00013 #include <unistd.h>
00014
00015 #define READBUFFERSIZE ( 50 * 1024 * 1024 )
00016 namespace asdm {
00039 template<class T, class R> class TableStreamReader {
00040 public:
00044 TableStreamReader(){currentState = S_CLOSED; readBuffer = (char *) malloc (READBUFFERSIZE); boundary_1 = "" ;}
00045
00049 virtual ~TableStreamReader(){;}
00050
00051
00052
00058 void open(const std::string& directory){
00059 checkState(T_OPEN, "TableStreamReader::open");
00060
00061 tablePath = directory + "/"+ T::name() + ".bin";
00062 tableFile.open(tablePath.c_str(), ios::in|ios::binary);
00063 if (!tableFile.is_open())
00064 throw asdm::ConversionException("Could not open file " + tablePath, T::name());
00065
00066
00067 struct stat filestatus;
00068 stat( tablePath.c_str(), &filestatus);
00069 fileSizeInBytes = filestatus.st_size;
00070
00071
00072
00073 boundary_1 = requireMIMEHeader();
00074
00075
00076 requireBoundary(boundary_1, 0);
00077
00078 skipUntilEmptyLine(10);
00079 std::string xmlHeader = accumulateUntilBoundary(boundary_1, 100);
00080
00081
00082
00083
00084 xmlDoc *doc;
00085 doc = xmlReadMemory(xmlHeader.data(), xmlHeader.size(), "BinaryTableHeader.xml", NULL, XML_PARSE_NOBLANKS);
00086 if ( doc == NULL )
00087 throw asdm::ConversionException("Failed to parse the xmlHeader into a DOM structure.", T::name());
00088
00089 xmlNode* root_element = xmlDocGetRootElement(doc);
00090 if ( root_element == NULL || root_element->type != XML_ELEMENT_NODE )
00091 throw asdm::ConversionException("Failed to parse the xmlHeader into a DOM structure.", T::name());
00092
00093 const ByteOrder* byteOrder = NULL;
00094 if ( std::string("ASDMBinaryTable").compare((const char*) root_element->name) == 0) {
00095
00096
00097 byteOrder = asdm::ByteOrder::Big_Endian;
00098 attributesSeq = T::defaultAttributesNamesInBin();
00099 }
00100 else if (std::string(T::name()+"Table").compare((const char*) root_element->name) == 0) {
00101
00102
00103
00104
00105 xmlNode* bulkStoreRef = 0;
00106 xmlNode* child = root_element->children;
00107
00108
00109 bulkStoreRef = (child == 0) ? 0 : ( (child->next) == 0 ? 0 : child->next->next );
00110
00111 if ( bulkStoreRef == 0 || (bulkStoreRef->type != XML_ELEMENT_NODE) || (std::string("BulkStoreRef").compare((const char*) bulkStoreRef->name) != 0))
00112 throw asdm::ConversionException ("Could not find the element '/"+T::name()+"Table/BulkStoreRef'. Invalid XML header '"+ xmlHeader + "'.", T::name());
00113
00114
00115 _xmlAttr* byteOrderAttr = 0;
00116 for (struct _xmlAttr* attr = bulkStoreRef->properties; attr; attr = attr->next)
00117 if (string("byteOrder").compare((const char*) attr->name) == 0) {
00118 byteOrderAttr = attr;
00119 break;
00120 }
00121
00122 if (byteOrderAttr == 0)
00123 throw asdm::ConversionException("Could not find the element '/"+T::name()+"Table/BulkStoreRef/@byteOrder'. Invalid XML header '" + xmlHeader +"'.", T::name());
00124
00125 string byteOrderValue = std::string((const char*) byteOrderAttr->children->content);
00126 if (!(byteOrder = asdm::ByteOrder::fromString(byteOrderValue)))
00127 throw asdm::ConversionException("No valid value retrieved for the element '/"+T::name()+"Table/BulkStoreRef/@byteOrder'. Invalid XML header '" + xmlHeader + "'.", T::name());
00128
00129
00130
00131
00132 xmlNode* attributes = bulkStoreRef->next;
00133 if ( attributes == 0 || (attributes->type != XML_ELEMENT_NODE) || (string("Attributes").compare((const char*) attributes->name) != 0))
00134 throw asdm::ConversionException ("Could not find the element '/"+T::name()+"Table/Attributes'. Invalid XML header '"+ xmlHeader + "'.", T::name());
00135
00136 xmlNode* childOfAttributes = attributes->children;
00137
00138 while ( childOfAttributes != 0 && (childOfAttributes->type == XML_ELEMENT_NODE) ) {
00139 attributesSeq.push_back(string((const char*) childOfAttributes->name));
00140 childOfAttributes = childOfAttributes->next;
00141 }
00142 }
00143
00144 skipUntilEmptyLine(10);
00145
00146
00147 eifs = asdm::EndianIFStream (&tableFile, byteOrder);
00148
00149 asdm::Entity entity = Entity::fromBin((EndianIStream &)eifs);
00150
00151
00152 asdm::Entity containerEntity = Entity::fromBin((EndianIStream &)eifs);
00153
00154
00155 int numRows = ((EndianIStream &)eifs).readInt();
00156
00157
00158 whereRowsStart = tableFile.tellg();
00159
00160
00161 currentState = S_OPENED;
00162 }
00163
00168 void reset() {
00169 checkState(T_RESET, "TableStreamReader::reset");
00170 clear();
00171 tableFile.seekg(whereRowsStart);
00172 }
00173
00180 const std::vector<R*>& nextNRows(unsigned int nRows) {
00181 checkState(T_READ, "TableStreamReader::nextNRows");
00182 clear();
00183 unsigned int nread = 0;
00184 T& tableRef = (T&) asdm.getTable(T::name());
00185 while ( hasRows() && nread < nRows ) {
00186 rows.push_back(R::fromBin((EndianIStream&) eifs, tableRef, attributesSeq));
00187 nread++;
00188 }
00189 return rows;
00190 }
00191
00200 const std::vector<R*>& untilNBytes(unsigned int nBytes) {
00201 checkState(T_READ, "TableStreamReader::untilNBytes");
00202 clear();
00203 off_t whereAmI = tableFile.tellg();
00204 if (!hasRows()) return rows;
00205
00206 T& tableRef = (T&) asdm.getTable(T::name());
00207 do {
00208 rows.push_back(R::fromBin((EndianIStream&) eifs, tableRef , attributesSeq));
00209 }
00210 while (((tableFile.tellg() - whereAmI) < nBytes) && hasRows());
00211 return rows;
00212 }
00213
00217 bool hasRows() {
00218 checkState(T_CHECK, "TableStreamReader::hasRows");
00219 return tableFile.tellg() < (fileSizeInBytes - 19);
00220 }
00221
00225 void close() {
00226 checkState(T_CLOSE, "TableStreamReader::close");
00227 clear();
00228 if (tableFile.is_open()) tableFile.close();
00229 free(readBuffer);
00230
00231 currentState = S_CLOSED;
00232 }
00233
00234 private:
00235 std::string tablePath;
00236 std::ifstream tableFile;
00237 std::string currentLine;
00238 std::string boundary_1;
00239
00240 off_t fileSizeInBytes;
00241 asdm::EndianIFStream eifs;
00242 std::vector<std::string> attributesSeq;
00243 asdm::ASDM asdm;
00244 std::vector<R*> rows;
00245
00246 char* readBuffer;
00247
00248 streampos whereRowsStart;
00249
00250 enum State {S_CLOSED, S_OPENED};
00251 enum Transition {T_OPEN, T_CHECK, T_RESET, T_READ, T_CLOSE};
00252 State currentState;
00253
00254 void checkState(Transition t, const std::string& methodName) const {
00255 switch (currentState) {
00256 case S_CLOSED:
00257 if (t == T_OPEN) return;
00258
00259 case S_OPENED:
00260 if (t == T_CHECK || t == T_RESET || t == T_READ || t == T_CLOSE) return;
00261 }
00262 throw asdm::ConversionException("Invalid call of method '" + methodName + "' in the current context.", T::name());
00263 }
00267 void clear() {
00268 for (unsigned int i = 0; i < rows.size(); i++)
00269 if (rows[i]) delete rows[i];
00270 rows.clear();
00271 }
00272
00273 void skipUntilEmptyLine(int maxSkips) {
00274
00275 int numSkips = 0;
00276 std::string line;
00277 do {
00278 line = trim(nextLine());
00279 numSkips++;
00280 }
00281 while (line.size() != 0 && numSkips <= maxSkips);
00282
00283 if (numSkips > maxSkips) {
00284 ostringstream oss;
00285 oss << "could not find an empty line is less than " << maxSkips + 1 << " lines." << endl;
00286 throw asdm::ConversionException(oss.str(), T::name());
00287 }
00288
00289 }
00290
00291 std::string nextLine() {
00292 unsigned long long whereAmI = tableFile.tellg();
00293 getline(tableFile, currentLine);
00294 if (tableFile.fail()) {
00295 std::ostringstream oss ;
00296 oss << "TableStreamReader::nextLine() : I could not read a line in '" << tablePath << "' at position " << whereAmI << ".";
00297 throw asdm::ConversionException(oss.str(), T::name());
00298 }
00299
00300 return currentLine;
00301 }
00302
00303 pair<std::string, std::string> headerField2Pair(const std::string& hf){
00304 std::string name, value;
00305 size_t colonIndex = hf.find(":");
00306 if (colonIndex == std::string::npos)
00307 throw asdm::ConversionException(" could not detect a well formed MIME header field in '"+hf+"'", T::name());
00308
00309 if (colonIndex > 0) {
00310 name = hf.substr(0, colonIndex);
00311 trim(name);
00312 }
00313
00314 if (colonIndex < hf.size()) {
00315 value = hf.substr(colonIndex+1);
00316 trim(value);
00317 }
00318
00319 return make_pair(name, value);
00320 }
00321
00322 std::string requireMIMEHeader() {
00323
00324 pair<std::string, std::string>name_value(headerField2Pair(nextLine()));
00325
00326
00327 if (! boost::algorithm::iends_with(currentLine, "IME-Version: 1.0"))
00328 throw asdm::ConversionException("'MIME-Version: 1.0' missing at the very beginning of the file '"+ tablePath +"'.", T::name());
00329
00330
00331 boundary_1 = requireBoundaryInCT(requireHeaderField("CONTENT-TYPE").second);
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342 skipUntilEmptyLine(20);
00343
00344 return boundary_1;
00345 }
00346
00347 pair<std::string, std::string> requireHeaderField(const std::string & hf) {
00348 std::string s = trim_copy(nextLine());
00349 while (boost::algorithm::iends_with(s, ";")) {
00350 s += trim_copy(nextLine());
00351 }
00352 pair<std::string, std::string> hf2pair(headerField2Pair(s));
00353 if (to_upper_copy(hf2pair.first) != hf)
00354 throw asdm::ConversionException("read '" + currentLine + "'. Was expecting '" + hf + "'...", T::name());
00355 return hf2pair;
00356 }
00357
00358 void requireBoundary(const std::string& boundary, int maxLines) {
00359
00360 int numLines = 0;
00361 std::string dashdashBoundary = "--"+boundary;
00362 std::string line = nextLine();
00363 while ((numLines <= maxLines) && (line.compare(dashdashBoundary) != 0)) {
00364 numLines++;
00365 line = nextLine();
00366 }
00367
00368 if (numLines > maxLines) {
00369 ostringstream oss;
00370 oss << "could not find the boundary std::string '"<< boundary << "' in less than " << maxLines + 1 << " lines." << endl;
00371 throw asdm::ConversionException(oss.str(), T::name());
00372 }
00373 }
00374
00375 std::string accumulateUntilBoundary(const std::string& boundary, int maxLines) {
00376
00377 int numLines = 0;
00378 std::string line ;
00379 std::string result;
00380 line=trim(nextLine());
00381 while ( numLines <= maxLines && line.find("--"+boundary) == std::string::npos ) {
00382 result += line;
00383 line=trim(nextLine());
00384 numLines++;
00385 }
00386
00387 if (numLines > maxLines) {
00388 ostringstream oss;
00389 oss << "could not find the boundary std::string '"<< boundary << "' in less than " << maxLines + 1 << " lines." << endl;
00390 throw asdm::ConversionException(oss.str(), T::name());
00391 }
00392 return result;
00393 }
00394
00395 std::string requireBoundaryInCT(const std::string& ctValue) {
00396 vector<std::string> cvValueItems;
00397
00398 split (cvValueItems, ctValue, is_any_of(";"));
00399 vector<std::string> cvValueItemsNameValue;
00400 for ( vector<std::string>::const_iterator iter = cvValueItems.begin(); iter != cvValueItems.end() ; iter++ ) {
00401 cvValueItemsNameValue.clear();
00402 split(cvValueItemsNameValue, *iter, is_any_of("="));
00403 string boundary;
00404 if ((cvValueItemsNameValue.size() > 1) && (to_upper_copy(trim_copy(cvValueItemsNameValue[0])) == "BOUNDARY") && (unquote(cvValueItemsNameValue[1], boundary).size() > 0))
00405 return boundary;
00406 }
00407 throw asdm::ConversionException("could not find a boundary definition in '" + ctValue + "'.", T::name());
00408 }
00409 string unquote(const string& s, string& unquoted) {
00410 if (s.size() >= 2)
00411 if (((s.at(0) == '"') && (s.at(s.size()-1) == '"')) || ((s.at(0) == '\'') && (s.at(s.size()-1) == '\''))) {
00412 if (s.size() == 2)
00413 unquoted = "";
00414 else
00415 unquoted = s.substr(1, s.size() - 2);
00416 }
00417 else
00418 unquoted = s;
00419 else
00420 unquoted = s;
00421 return unquoted;
00422 }
00423
00424 };
00425 }
00426 #endif