diff --git a/migration/fmtp/FromMySQL2PostgreSQL.js b/migration/fmtp/FromMySQL2PostgreSQL.js index cf0f7e4..04cde79 100644 --- a/migration/fmtp/FromMySQL2PostgreSQL.js +++ b/migration/fmtp/FromMySQL2PostgreSQL.js @@ -90,7 +90,7 @@ FromMySQL2PostgreSQL.prototype.boot = function(self) { self.readDataTypesMap ).then( function() { - return new Promise(function(resolveBoot) { + return new Promise(function(resolveBoot, rejectBoot) { console.log('\t--[boot] Boot is accomplished...'); resolveBoot(self); }); @@ -231,9 +231,11 @@ FromMySQL2PostgreSQL.prototype.createTemporaryDirectory = function(self) { resolve(self); } }); + } else if (!stat.isDirectory()) { self.log(self, '\t--[createTemporaryDirectory] Cannot perform a migration due to unexpected error'); reject(); + } else { self.log(self, '\t--[createTemporaryDirectory] Temporary directory already exists...'); resolve(self); @@ -249,12 +251,12 @@ FromMySQL2PostgreSQL.prototype.createTemporaryDirectory = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.removeTemporaryDirectory = function(self) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { fs.rmdir(self._tempDirPath, function(error) { var msg; if (error) { - msg = '\t--[removeTemporaryDirectory] Note, TemporaryDirectory located at "' + self._tempDirPath + '" is not removed'; + msg = '\t--[removeTemporaryDirectory] Note, TemporaryDirectory located at "' + self._tempDirPath + '" is nor removed'; } else { msg = '\t--[removeTemporaryDirectory] TemporaryDirectory located at "' + self._tempDirPath + '" is removed'; } @@ -283,14 +285,17 @@ FromMySQL2PostgreSQL.prototype.createLogsDirectory = function(self) { + '"logs_directory": ' + self._logsDirPath ); reject(); + } else { self.log(self, '\t--[createLogsDirectory] Logs directory is created...'); resolve(self); } }); + } else if (!stat.isDirectory()) { console.log('\t--[createLogsDirectory] Cannot perform a migration due to unexpected error'); reject(); + } else { self.log(self, '\t--[createLogsDirectory] Logs directory already exists...'); resolve(self); @@ -310,9 +315,9 @@ FromMySQL2PostgreSQL.prototype.createLogsDirectory = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.log = function(self, log, isErrorLog) { - var buffer = new Buffer(log + '\n\n'); - - return new Promise(function(resolve) { + var buffer = new Buffer(log + '\n\n'); + + return new Promise(function(resolve, reject) { if (isErrorLog === undefined || isErrorLog === false) { console.log(log); } @@ -321,42 +326,47 @@ FromMySQL2PostgreSQL.prototype.log = function(self, log, isErrorLog) { fs.open(self._allLogsPath, 'a', self._0777, function(error, fd) { if (!error) { self._allLogsPathFd = fd; - fs.write(self._allLogsPathFd, buffer, 0, buffer.length, null, function() { + fs.write(self._allLogsPathFd, buffer, 0, buffer.length, null, function(error) { resolve(self); }); + } else { resolve(self); } }); + } else { - fs.write(self._allLogsPathFd, buffer, 0, buffer.length, null, function() { + fs.write(self._allLogsPathFd, buffer, 0, buffer.length, null, function(error) { resolve(self); }); } + }).then( - function() { - return new Promise(function(resolveTableLog) { - if (self._clonedSelfTableNamePath === undefined) { - resolveTableLog(self); - } else if (self._clonedSelfTableNamePathFd === undefined) { - fs.open(self._clonedSelfTableNamePath, 'a', self._0777, function(error, fd) { - if (!error) { - self._clonedSelfTableNamePathFd = fd; - fs.write(self._clonedSelfTableNamePathFd, buffer, 0, buffer.length, null, function() { - resolveTableLog(self); - }); - } else { - resolveTableLog(self); - } - }); - } else { - fs.write(self._clonedSelfTableNamePathFd, buffer, 0, buffer.length, null, function() { - resolveTableLog(self); - }); + function(self) { + return new Promise(function(resolveTableLog, rejectTableLog) { + if (self._clonedSelfTableNamePath === undefined) { + resolveTableLog(self); + } else if (self._clonedSelfTableNamePathFd === undefined) { + fs.open(self._clonedSelfTableNamePath, 'a', self._0777, function(error, fd) { + if (!error) { + self._clonedSelfTableNamePathFd = fd; + fs.write(self._clonedSelfTableNamePathFd, buffer, 0, buffer.length, null, function(error) { + resolveTableLog(self); + }); + + } else { + resolveTableLog(self); + } + }); + + } else { + fs.write(self._clonedSelfTableNamePathFd, buffer, 0, buffer.length, null, function(error) { + resolveTableLog(self); + }); + } + }); } - }); - } - ); + ); }; /** @@ -368,7 +378,7 @@ FromMySQL2PostgreSQL.prototype.log = function(self, log, isErrorLog) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.generateError = function(self, message, sql) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { message += sql === undefined ? '' : '\n\tSQL: ' + sql + '\n\n'; var buffer = new Buffer(message); self.log(self, message, true); @@ -377,7 +387,7 @@ FromMySQL2PostgreSQL.prototype.generateError = function(self, message, sql) { fs.open(self._errorLogsPath, 'a', self._0777, function(error, fd) { if (!error) { self._errorLogsPathFd = fd; - fs.write(self._errorLogsPathFd, buffer, 0, buffer.length, null, function() { + fs.write(self._errorLogsPathFd, buffer, 0, buffer.length, null, function(error) { resolve(self); }); @@ -387,7 +397,7 @@ FromMySQL2PostgreSQL.prototype.generateError = function(self, message, sql) { }); } else { - fs.write(self._errorLogsPathFd, buffer, 0, buffer.length, null, function() { + fs.write(self._errorLogsPathFd, buffer, 0, buffer.length, null, function(error) { resolve(self); }); } @@ -416,6 +426,7 @@ FromMySQL2PostgreSQL.prototype.connect = function(self) { self.log(self, '\t--[connect] Cannot connect to MySQL server...'); reject(self); } + } else { resolve(self); } @@ -457,6 +468,7 @@ FromMySQL2PostgreSQL.prototype.createSchema = function(self) { resolve(self); } }); + } else { resolve(self); } @@ -473,15 +485,16 @@ FromMySQL2PostgreSQL.prototype.createSchema = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.loadStructureToMigrate = function(self) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { resolve(self); }).then( self.connect, function() { self.log(self, '\t--[loadStructureToMigrate] Cannot establish DB connections...'); } + ).then( - function() { + function(self) { return new Promise(function(resolve, reject) { var sql = 'SHOW FULL TABLES IN `' + self._mySqlDbName + '`;'; self._mysql.getConnection(function(error, connection) { @@ -501,9 +514,8 @@ FromMySQL2PostgreSQL.prototype.loadStructureToMigrate = function(self) { var viewsCnt = 0; var processTablePromises = []; var createViewPromises = []; - + for (var i = 0; i < rows.length; i++) { - //if (i < 3) // TODO. if (rows[i].Table_type === 'BASE TABLE') { self._tablesToMigrate.push(rows[i]); tablesCnt++; @@ -527,19 +539,15 @@ FromMySQL2PostgreSQL.prototype.loadStructureToMigrate = function(self) { Promise.all( processTablePromises - ).then( - function() { - resolve(self); - }, + ).then( + self.cleanupLocal, // TODO. function() { reject(); } ).then( - // TODO: treat FKs, views etc.... - function() { + function(self) { resolve(self); }, - // TODO: probably one o tables was not created, do not create FKs, views etc.... function() { reject(); } @@ -553,6 +561,26 @@ FromMySQL2PostgreSQL.prototype.loadStructureToMigrate = function(self) { ); }; +/** + * Cleanup resources related to given table. + * + * @param {FromMySQL2PostgreSQL} self + * @returns {Promise} + */ +FromMySQL2PostgreSQL.prototype.cleanupLocal = function(self) { + return new Promise(function(resolve, reject) { + if (self._clonedSelfTableNamePathFd === undefined) { + self.log(self, '\t--[cleanupLocal] Finished processing table `' + self._clonedSelfTableName + '`...'); + resolve(self); + } else { + fs.close(self._clonedSelfTableNamePathFd, function() { + self.log(self, '\t--[cleanupLocal] Finished processing table `' + self._clonedSelfTableName + '`...'); + resolve(self); + }); + } + }); +}; + /** * Migrates structure of a single table to PostgreSql server. * @@ -560,12 +588,12 @@ FromMySQL2PostgreSQL.prototype.loadStructureToMigrate = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.createTable = function(self) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { resolve(self); }).then( self.connect ).then( - function() { + function(self) { return new Promise(function(resolveCreateTable, rejectCreateTable) { self.log(self, '\t--[createTable] Currently creating table: `' + self._clonedSelfTableName + '`'); var sql = 'SHOW COLUMNS FROM `' + self._clonedSelfTableName + '`;'; @@ -590,7 +618,7 @@ FromMySQL2PostgreSQL.prototype.createTable = function(self) { } sql = sql.slice(0, -1) + ');'; - + pg.connect(self._targetConString, function(error, client, done) { if (error) { done(); @@ -629,12 +657,12 @@ FromMySQL2PostgreSQL.prototype.createTable = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.populateTable = function(self) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { resolve(self); }).then( self.connect ).then( - function() { + function(self) { return new Promise(function(resolvePopulateTable, rejectPopulateTable) { self.log(self, '\t--[populateTable] Currently populating table: `' + self._clonedSelfTableName + '`'); @@ -685,7 +713,7 @@ FromMySQL2PostgreSQL.prototype.populateTable = function(self) { } Promise.all(populateTableWorkers).then( - function() { + function(self) { resolvePopulateTable(self); }, function() { @@ -716,13 +744,13 @@ FromMySQL2PostgreSQL.prototype.populateTable = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.populateTableWorker = function(self, offset, rowsInChunk, rowsCnt) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { resolve(self); }).then( self.connect ).then( - function() { - return new Promise(function(resolvePopulateTableWorker) { + function(self) { + return new Promise(function(resolvePopulateTableWorker, rejectPopulateTableWorker) { var csvAddr = self._tempDirPath + '/' + self._clonedSelfTableName + offset + '.csv'; var sql = 'SELECT * FROM `' + self._clonedSelfTableName + '` LIMIT ' + offset + ',' + rowsInChunk + ';'; @@ -743,8 +771,8 @@ FromMySQL2PostgreSQL.prototype.populateTableWorker = function(self, offset, rows // Sanitize records. // When sanitized - write them to a csv file. rowsInChunk = rows.length; // Must check amount of rows BEFORE sanitizing. - var sanitizedRecords = []; - + var sanitizedRecords = []; + for (var cnt = 0; cnt < rows.length; cnt++) { var sanitizedRecord = Object.create(null); @@ -754,7 +782,7 @@ FromMySQL2PostgreSQL.prototype.populateTableWorker = function(self, offset, rows sanitizedRecords.push(sanitizedRecord); } - + csvStringify(sanitizedRecords, function(csvError, csvString) { var buffer = new Buffer(csvString); @@ -783,26 +811,25 @@ FromMySQL2PostgreSQL.prototype.populateTableWorker = function(self, offset, rows client.query(sql, function(err, result) { done(); - + if (err) { - self.generateError(self, '\t--[populateTableWorker] ' + err, sql); + self.generateError(self, '\t--[populateTableWorker] ' + err, sql); resolvePopulateTableWorker(self); } else { self._totalRowsInserted += result.rowCount; var msg = '\t--[populateTableWorker] For now inserted: ' + self._totalRowsInserted + ' rows, ' - + 'Total rows to insert into "' + self._schema + '"."' + self._clonedSelfTableName + '": ' + rowsCnt; + + 'Total rows in "' + self._schema + '"."' + self._clonedSelfTableName + '": ' + rowsCnt; self.log(self, msg); - fs.unlink(csvAddr, function() { fs.close(fd, function() { resolvePopulateTableWorker(self); }); }); } - }); + }); } - }); + }); } }); } @@ -843,12 +870,13 @@ FromMySQL2PostgreSQL.prototype.sanitizeValue = function(value) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.processTable = function(self, tableName) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { self = self.clone(self); self._clonedSelfTableName = tableName; self._totalRowsInserted = 0; - self._clonedSelfTableNamePath = self._logsDirPath + '/' + tableName + '.log'; + self._clonedSelfTableNamePath = self._logsDirPath + '/' + tableName + '.log'; resolve(self); + }).then( self.connect ).then( @@ -857,36 +885,13 @@ FromMySQL2PostgreSQL.prototype.processTable = function(self, tableName) { self.log(self, '\t--[processTable] Cannot establish DB connections...'); } ).then( - self.populateTable, // TODO: Promise chain MUST continue, even after unsuccessfull 'self.populateTable'. + self.populateTable, function() { self.log(self, '\t--[processTable] Cannot create table "' + self._schema + '"."' + self._clonedSelfTableName + '"...'); - self.cleanupLocal(self); } - ).then( - self.cleanupLocal ); }; -/** - * Cleanup resources related to given table. - * - * @param {FromMySQL2PostgreSQL} self - * @returns {Promise} - */ -FromMySQL2PostgreSQL.prototype.cleanupLocal = function(self) { - return new Promise(function(resolve) { - if (self._clonedSelfTableNamePathFd === undefined) { - self.log(self, '\t--[cleanupLocal] Finished processing table `' + self._clonedSelfTableName + '`...'); - resolve(self); - } else { - fs.close(self._clonedSelfTableNamePathFd, function() { - self.log(self, '\t--[cleanupLocal] Finished processing table `' + self._clonedSelfTableName + '`...'); - resolve(self); - }); - } - }); -}; - /** * Closes DB connections. * @@ -894,7 +899,7 @@ FromMySQL2PostgreSQL.prototype.cleanupLocal = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.closeConnections = function(self) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { if (self._mysql) { self._mysql.end(function(error) { if (error) { @@ -905,6 +910,7 @@ FromMySQL2PostgreSQL.prototype.closeConnections = function(self) { pg.end(); resolve(self); }); + } else { self.log(self, '\t--[closeConnections] All DB connections to both MySQL and PostgreSQL servers have been closed...'); pg.end(); @@ -920,7 +926,7 @@ FromMySQL2PostgreSQL.prototype.closeConnections = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.closeLogFiles = function(self) { - return new Promise(function(resolveAllLogs) { + return new Promise(function(resolveAllLogs, rejectAllLogs) { if (self._allLogsPathFd) { fs.close(self._allLogsPathFd, function() { resolveAllLogs(self); @@ -929,8 +935,8 @@ FromMySQL2PostgreSQL.prototype.closeLogFiles = function(self) { resolveAllLogs(self); } }).then( - function() { - return new Promise(function(resolveErrorLogs) { + function(self) { + return new Promise(function(resolveErrorLogs, rejectErrorLogs) { if (self._errorLogsPathFd) { fs.close(self._errorLogsPathFd, function() { resolveErrorLogs(self); @@ -950,7 +956,7 @@ FromMySQL2PostgreSQL.prototype.closeLogFiles = function(self) { * @returns {Promise} */ FromMySQL2PostgreSQL.prototype.cleanup = function(self) { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { self.log(self, '\t--[cleanup] Cleanup resources...'); resolve(self); }).then( @@ -960,9 +966,9 @@ FromMySQL2PostgreSQL.prototype.cleanup = function(self) { ).then( self.closeLogFiles ).then( - function() { - return new Promise(function(resolve) { - self.log(self, '\t--[cleanup] Cleanup finished...'); + function(self) { + return new Promise(function(resolve, reject) { + self.log(self, '\t--[cleanup] Cleanup finished...'); resolve(self); }); } @@ -978,7 +984,7 @@ FromMySQL2PostgreSQL.prototype.cleanup = function(self) { FromMySQL2PostgreSQL.prototype.run = function(config) { var self = this; self._config = config; - var promise = new Promise(function(resolve) { + var promise = new Promise(function(resolve, reject) { resolve(self); }); @@ -989,20 +995,23 @@ FromMySQL2PostgreSQL.prototype.run = function(config) { function() { console.log('\t--[run] Failed to boot migration'); } + ).then( self.createTemporaryDirectory, function() { self.log(self, '\t--[run] Logs directory was not created...'); } + ).then( self.createSchema, function() { self.log(self, '\t--[run] Temporary directory was not created...'); } + ).then( self.loadStructureToMigrate, function() { - return new Promise(function(resolveError) { + return new Promise(function(resolveError, rejectError) { resolveError(self); }).then( function() { @@ -1011,21 +1020,21 @@ FromMySQL2PostgreSQL.prototype.run = function(config) { } ); } + ).then( function() { - return new Promise(function(resolve) { + return new Promise(function(resolve, reject) { resolve(self); }).then( self.cleanup ).then( - function() { + function(self) { var timeTaken = (new Date()) - self._timeBegin; var hours = Math.floor(timeTaken / 1000 / 3600); timeTaken -= hours * 1000 * 3600; var minutes = Math.floor(timeTaken / 1000 / 60); timeTaken -= minutes * 1000 * 60; var seconds = Math.ceil(timeTaken / 1000); - hours = hours < 10 ? '0' + hours : hours; minutes = minutes < 10 ? '0' + minutes : minutes; seconds = seconds < 10 ? '0' + seconds : seconds; @@ -1038,7 +1047,7 @@ FromMySQL2PostgreSQL.prototype.run = function(config) { ); }, function() { - return new Promise(function(resolveErr) { + return new Promise(function(resolveErr, rejectErr) { resolveErr(self); }).then( function() { @@ -1052,7 +1061,6 @@ FromMySQL2PostgreSQL.prototype.run = function(config) { var minutes = Math.floor(timeTaken / 1000 / 60); timeTaken -= minutes * 1000 * 60; var seconds = Math.ceil(timeTaken / 1000); - hours = hours < 10 ? '0' + hours : hours; minutes = minutes < 10 ? '0' + minutes : minutes; seconds = seconds < 10 ? '0' + seconds : seconds; @@ -1068,5 +1076,3 @@ FromMySQL2PostgreSQL.prototype.run = function(config) { }; module.exports.FromMySQL2PostgreSQL = FromMySQL2PostgreSQL; - -