improved disaster recovery mechanism

This commit is contained in:
AnatolyUss 2017-01-29 01:10:01 +02:00
parent 46f1848950
commit 86d872ea83
6 changed files with 130 additions and 32 deletions

View file

@ -61,7 +61,7 @@ from MySQL to PostgreSQL as easy and smooth as possible.</p>
<a href="mailto:anatolyuss@gmail.com?subject=NMIG">anatolyuss@gmail.com</a></p> <a href="mailto:anatolyuss@gmail.com?subject=NMIG">anatolyuss@gmail.com</a></p>
<h3>VERSION</h3> <h3>VERSION</h3>
<p>Current version is 2.4.0<br /> <p>Current version is 2.5.0<br />
(major version . improvements . bug fixes)</p> (major version . improvements . bug fixes)</p>

View file

@ -20,6 +20,9 @@
*/ */
'use strict'; 'use strict';
const generateError = require('./ErrorGenerator');
const extraConfigProcessor = require('./ExtraConfigProcessor');
/** /**
* Update consistency state. * Update consistency state.
* *
@ -53,14 +56,15 @@ const updateConsistencyState = (self, dataPoolId) => {
} }
/** /**
* Get consistency state. * Get the `is_started` value of current chunk.
* *
* @param {Conversion} self * @param {Conversion} self
* @param {Number} dataPoolId * @param {Number} dataPoolId
* *
* @returns {Promise} * @returns {Promise}
*/ */
const getConsistencyState = (self, dataPoolId) => {
const getIsStarted = (self, dataPoolId) => {
return new Promise(resolve => { return new Promise(resolve => {
self._pg.connect((error, client, done) => { self._pg.connect((error, client, done) => {
if (error) { if (error) {
@ -85,6 +89,63 @@ const getConsistencyState = (self, dataPoolId) => {
}); });
} }
/**
* Current data chunk runs after a disaster recovery.
* Must determine if current chunk has already been loaded.
* This is in order to prevent possible data duplications.
*
* @param {Conversion} self
* @param {Object} chunk
*
* @returns {Promise}
*/
const hasCurrentChunkLoaded = (self, chunk) => {
return new Promise(resolve => {
self._pg.connect((pgError, client, done) => {
if (pgError) {
generateError(self, '\t--[ConsistencyEnforcer::hasCurrentChunkLoaded] Cannot connect to PostgreSQL server...\n' + pgError);
resolve(true);
} else {
const originalTableName = extraConfigProcessor.getTableName(self, chunk._tableName, true);
const sql = 'SELECT EXISTS(SELECT 1 FROM "' + self._schema + '"."' + chunk._tableName
+ '" WHERE "' + self._schema + '_' + originalTableName + '_data_chunk_id_temp" = ' + chunk._id + ');';
client.query(sql, (err, result) => {
done();
if (err) {
generateError(self, '\t--[ConsistencyEnforcer::hasCurrentChunkLoaded] ' + err, sql);
resolve(true);
} else {
resolve(!!result.rows[0].exists);
}
});
}
});
});
}
/**
* Get consistency state.
*
* @param {Conversion} self
* @param {Object} chunk
*
* @returns {Promise}
*/
const getConsistencyState = (self, chunk) => {
return new Promise(resolve => {
getIsStarted(self, chunk._id).then(isStarted => {
if (isStarted) {
hasCurrentChunkLoaded(self, chunk).then(result => resolve(result));
} else {
// Normal migration flow.
resolve(false);
}
});
});
}
/** /**
* Enforce consistency before processing a chunk of data. * Enforce consistency before processing a chunk of data.
* Ensure there are no any data duplications. * Ensure there are no any data duplications.
@ -92,20 +153,59 @@ const getConsistencyState = (self, dataPoolId) => {
* In case of rerunning nmig after unexpected failure - it is absolutely mandatory. * In case of rerunning nmig after unexpected failure - it is absolutely mandatory.
* *
* @param {Conversion} self * @param {Conversion} self
* @param {Number} chunkId * @param {Object} chunk
* *
* @returns {Promise} * @returns {Promise}
*/ */
module.exports = (self, chunkId) => { module.exports.enforceConsistency = (self, chunk) => {
return new Promise(resolve => { return new Promise(resolve => {
getConsistencyState(self, chunkId).then(isStarted => { getConsistencyState(self, chunk).then(hasAlreadyBeenLoaded => {
if (isStarted) { if (hasAlreadyBeenLoaded) {
// Current data chunk runs after a disaster recovery. /*
* Current data chunk runs after a disaster recovery.
* It has already been loaded.
*/
resolve(false); resolve(false);
} else { } else {
// Normal migration flow. // Normal migration flow.
updateConsistencyState(self, chunkId).then(() => resolve(true)); updateConsistencyState(self, chunk._id).then(() => resolve(true));
} }
}) })
}); });
}; };
/**
* Drop the {self._schema + '_' + originalTableName + '_data_chunk_id_temp'} column from current table.
*
* @param {Conversion} self
* @param {String} tableName
*
* @returns {Promise}
*/
module.exports.dropDataChunkIdColumn = (self, tableName) => {
return new Promise(resolve => {
self._pg.connect((pgError, client, done) => {
if (pgError) {
generateError(self, '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] Cannot connect to PostgreSQL server...\n' + pgError);
resolve();
} else {
const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
const columnToDrop = self._schema + '_' + originalTableName + '_data_chunk_id_temp';
const sql = 'ALTER TABLE "' + self._schema + '"."' + tableName + '" DROP COLUMN "' + columnToDrop + '";';
client.query(sql, (err, result) => {
done();
if (err) {
const errMsg = '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] Failed to drop column "' + columnToDrop + '"\n'
+ '\t--[ConsistencyEnforcer::dropDataChunkIdColumn] '+ err;
generateError(self, errMsg, sql);
}
resolve();
});
}
});
});
};

View file

@ -33,6 +33,8 @@ const processIndexAndKey = require('./IndexAndKeyProcessor');
const processComments = require('./CommentsProcessor'); const processComments = require('./CommentsProcessor');
const processForeignKey = require('./ForeignKeyProcessor'); const processForeignKey = require('./ForeignKeyProcessor');
const processViews = require('./ViewGenerator'); const processViews = require('./ViewGenerator');
const consistencyEnforcer = require('./ConsistencyEnforcer');
const dropDataChunkIdColumn = consistencyEnforcer.dropDataChunkIdColumn;
/** /**
* Continues migration process after data loading, when migrate_only_data is true. * Continues migration process after data loading, when migrate_only_data is true.
@ -46,7 +48,11 @@ const continueProcessAfterDataLoadingShort = self => {
for (let i = 0; i < self._tablesToMigrate.length; ++i) { for (let i = 0; i < self._tablesToMigrate.length; ++i) {
const tableName = self._tablesToMigrate[i]; const tableName = self._tablesToMigrate[i];
promises.push(sequencesProcessor.setSequenceValue(self, tableName)); promises.push(
dropDataChunkIdColumn(self, tableName).then(() => {
return sequencesProcessor.setSequenceValue(self, tableName);
})
);
} }
Promise.all(promises).then(() => { Promise.all(promises).then(() => {
@ -77,7 +83,9 @@ const continueProcessAfterDataLoadingLong = self => {
for (let i = 0; i < self._tablesToMigrate.length; ++i) { for (let i = 0; i < self._tablesToMigrate.length; ++i) {
const tableName = self._tablesToMigrate[i]; const tableName = self._tablesToMigrate[i];
promises.push( promises.push(
processEnum(self, tableName).then(() => { dropDataChunkIdColumn(self, tableName).then(() => {
return processEnum(self, tableName);
}).then(() => {
return processNull(self, tableName); return processNull(self, tableName);
}).then(() => { }).then(() => {
return processDefault(self, tableName); return processDefault(self, tableName);

View file

@ -29,7 +29,8 @@ const generateError = require('./ErrorGenerator');
const connect = require('./Connector'); const connect = require('./Connector');
const Conversion = require('./Conversion'); const Conversion = require('./Conversion');
const MessageToMaster = require('./MessageToMaster'); const MessageToMaster = require('./MessageToMaster');
const enforceConsistency = require('./ConsistencyEnforcer'); const consistencyEnforcer = require('./ConsistencyEnforcer');
const enforceConsistency = consistencyEnforcer.enforceConsistency;
const extraConfigProcessor = require('./ExtraConfigProcessor'); const extraConfigProcessor = require('./ExtraConfigProcessor');
const copyFrom = pgCopyStreams.from; const copyFrom = pgCopyStreams.from;
const getBuffer = +process.version.split('.')[0].slice(1) < 6 const getBuffer = +process.version.split('.')[0].slice(1) < 6
@ -44,7 +45,7 @@ process.on('message', signal => {
for (let i = 0; i < signal.chunks.length; ++i) { for (let i = 0; i < signal.chunks.length; ++i) {
promises.push( promises.push(
connect(self).then(() => { connect(self).then(() => {
return enforceConsistency(self, signal.chunks[i]._id); return enforceConsistency(self, signal.chunks[i]);
}).then(isNormalFlow => { }).then(isNormalFlow => {
if (isNormalFlow) { if (isNormalFlow) {
return populateTableWorker( return populateTableWorker(
@ -58,19 +59,6 @@ process.on('message', signal => {
); );
} }
const sql = buildChunkQuery(
extraConfigProcessor.getTableName(self, signal.chunks[i]._tableName, true),
signal.chunks[i]._selectFieldList,
signal.chunks[i]._offset,
signal.chunks[i]._rowsInChunk
);
const strTwelveSpaces = ' ';
const rejectedData = '\n\t--[loadData] Possible data duplication alert!\n\t ' + strTwelveSpaces
+ 'Data, retrievable by following MySQL query:\n' + sql + '\n\t ' + strTwelveSpaces
+ 'may already be migrated.\n\t' + strTwelveSpaces + ' Please, check it.';
log(self, rejectedData, path.join(self._logsDirPath, signal.chunks[i]._tableName + '.log'));
return deleteChunk(self, signal.chunks[i]._id); return deleteChunk(self, signal.chunks[i]._id);
}) })
); );
@ -225,8 +213,9 @@ const populateTableWorker = (self, tableName, strSelectFieldList, offset, rowsIn
generateError(self, '\t--[populateTableWorker] Cannot connect to MySQL server...\n\t' + error); generateError(self, '\t--[populateTableWorker] Cannot connect to MySQL server...\n\t' + error);
resolvePopulateTableWorker(); resolvePopulateTableWorker();
} else { } else {
const csvAddr = path.join(self._tempDirPath, tableName + offset + '.csv'); const csvAddr = path.join(self._tempDirPath, tableName + offset + '.csv');
const sql = buildChunkQuery(extraConfigProcessor.getTableName(self, tableName, true), strSelectFieldList, offset, rowsInChunk); const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
const sql = buildChunkQuery(originalTableName, strSelectFieldList, offset, rowsInChunk);
connection.query(sql, (err, rows) => { connection.query(sql, (err, rows) => {
connection.release(); connection.release();
@ -235,7 +224,8 @@ const populateTableWorker = (self, tableName, strSelectFieldList, offset, rowsIn
generateError(self, '\t--[populateTableWorker] ' + err, sql); generateError(self, '\t--[populateTableWorker] ' + err, sql);
resolvePopulateTableWorker(); resolvePopulateTableWorker();
} else { } else {
rowsInChunk = rows.length; rowsInChunk = rows.length;
rows[0][self._schema + '_' + originalTableName + '_data_chunk_id_temp'] = dataPoolId;
csvStringify(rows, (csvError, csvString) => { csvStringify(rows, (csvError, csvString) => {
rows = null; rows = null;

View file

@ -126,8 +126,8 @@ module.exports.createTable = (self, tableName) => {
+ '" ' + mapDataTypes(self._dataTypesMap, rows[i].Type) + ','; + '" ' + mapDataTypes(self._dataTypesMap, rows[i].Type) + ',';
} }
rows = null; sql += '"' + self._schema + '_' + originalTableName + '_data_chunk_id_temp" BIGINT);';
sql = sql.slice(0, -1) + ');';
client.query(sql, err => { client.query(sql, err => {
done(); done();

View file

@ -1,6 +1,6 @@
{ {
"name": "nmig", "name": "nmig",
"version": "2.4.0", "version": "2.5.0",
"description": "The database migration app", "description": "The database migration app",
"author": "Anatoly Khaytovich<anatolyuss@gmail.com>", "author": "Anatoly Khaytovich<anatolyuss@gmail.com>",
"dependencies": { "dependencies": {