migrating to typescript

This commit is contained in:
Anatoly 2018-05-18 20:00:50 +03:00
parent 7141c1be92
commit f69f807e53
43 changed files with 1438 additions and 1872 deletions

View file

@ -1,92 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const generateError = require('./ErrorGenerator');
const log = require('./Logger');
const connect = require('./Connector');
/**
* Decodes binary data from from textual representation in string.
*
* @param {Conversion} conversion
*
* @returns {Promise<Conversion>}
*/
module.exports = conversion => {
log(conversion, '\t--[decodeBinaryData] Decodes binary data from textual representation in string.');
return connect(conversion).then(() => {
return new Promise(resolve => {
conversion._pg.connect((error, client, release) => {
if (error) {
generateError(conversion, '\t--[decodeBinaryData] Cannot connect to PostgreSQL server...');
return resolve(conversion);
}
const sql = `SELECT table_name, column_name
FROM information_schema.columns
WHERE table_catalog = '${ conversion._targetConString.database }'
AND table_schema = '${ conversion._schema }'
AND data_type IN ('bytea', 'geometry');`;
client.query(sql, (err, data) => {
release();
if (err) {
generateError(conversion, `\t--[decodeBinaryData] ${ err }`, sql);
return resolve(conversion);
}
const decodePromises = [];
for (let i = 0; i < data.rows.length; ++i) {
decodePromises.push(new Promise(resolveDecode => {
conversion._pg.connect((connectionError, pgClient, clientRelease) => {
if (connectionError) {
generateError(conversion, '\t--[decodeBinaryData] Cannot connect to PostgreSQL server...');
return resolveDecode();
}
const tableName = data.rows[i].table_name;
const columnName = data.rows[i].column_name;
const sqlDecode = `UPDATE ${ conversion._schema }."${ tableName }"
SET "${ columnName }" = DECODE(ENCODE("${ columnName }", 'escape'), 'hex');`;
pgClient.query(sqlDecode, decodeError => {
clientRelease();
if (decodeError) {
generateError(conversion, `\t--[decodeBinaryData] ${ decodeError }`, sqlDecode);
}
resolveDecode();
});
});
}));
}
Promise.all(decodePromises).then(() => resolve(conversion));
});
});
});
});
};

53
src/BinaryDataDecoder.ts Normal file
View file

@ -0,0 +1,53 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import log from './Logger';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
/**
* Decodes binary data from from textual representation in string.
*/
export default async function (conversion: Conversion): Promise<Conversion> {
log(conversion, '\t--[BinaryDataDecoder::decodeBinaryData] Decodes binary data from textual representation in string.');
const dbAccess: DBAccess = new DBAccess(conversion);
const sql: string = `SELECT table_name, column_name
FROM information_schema.columns
WHERE table_catalog = '${ conversion._targetConString.database }'
AND table_schema = '${ conversion._schema }'
AND data_type IN ('bytea', 'geometry');`;
const result: DBAccessQueryResult = await dbAccess.query('BinaryDataDecoder::decodeBinaryData', sql, DBVendors.PG, false, true);
const decodePromises: Promise<void>[] = result.data.rows.map(async (row: any) => {
const tableName: string = row.table_name;
const columnName: string = row.column_name;
const sqlDecode: string = `UPDATE ${ conversion._schema }."${ tableName }"
SET "${ columnName }" = DECODE(ENCODE("${ columnName }", 'escape'), 'hex');`;
await dbAccess.query('BinaryDataDecoder::decodeBinaryData', sqlDecode, DBVendors.PG, false, false, result.client);
});
await Promise.all(decodePromises);
return conversion;
}

View file

@ -1,93 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const connect = require('./Connector');
/**
* Boot the migration.
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
console.log('\t--[boot] Cannot connect to PostgreSQL server...\n' + error);
done();
process.exit();
} else {
const sql = 'SELECT EXISTS(SELECT 1 FROM information_schema.tables '
+ 'WHERE table_schema = \'' + self._schema
+ '\' AND table_name = \'state_logs_' + self._schema + self._mySqlDbName + '\');';
client.query(sql, (err, result) => {
done();
if (err) {
console.log('\t--[boot] Error when executed query:\n' + sql + '\nError message:\n' + err);
process.exit();
} else {
const isExists = !!result.rows[0].exists;
const message = (isExists
? '\n\t--[boot] NMIG is ready to restart after some failure.'
+ '\n\t--[boot] Consider checking log files at the end of migration.'
: '\n\t--[boot] NMIG is ready to start.') + '\n\t--[boot] Proceed? [Y/n]';
const logo = '\n\t/\\_ |\\ /\\/\\ /\\___'
+ '\n\t| \\ | |\\ | | | __'
+ '\n\t| |\\\\| || | | | \\_ \\'
+ '\n\t| | \\| || | | |__/ |'
+ '\n\t\\| \\/ /_|/______/'
+ '\n\n\tNMIG - the database migration tool'
+ '\n\tCopyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>\n\n'
+ '\t--[boot] Configuration has been just loaded.'
+ message;
console.log(logo);
process
.stdin
.resume()
.setEncoding(self._encoding)
.on('data', stdin => {
if (stdin.indexOf('n') !== -1) {
console.log('\t--[boot] Migration aborted.\n');
process.exit();
} else if (stdin.indexOf('Y') !== -1) {
resolve(self);
} else {
const hint = '\t--[boot] Unexpected input ' + stdin + '\n'
+ '\t--[boot] Expected input is upper case Y\n'
+ '\t--[boot] or lower case n\n' + message;
console.log(hint);
}
});
}
});
}
});
});
});
};

71
src/BootProcessor.ts Normal file
View file

@ -0,0 +1,71 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
/**
* Boots the migration.
*/
export default async (conversion: Conversion): Promise<any> => {
const dbAccess: DBAccess = new DBAccess(conversion);
const sql: string = `SELECT EXISTS(SELECT 1 FROM information_schema.tables
WHERE table_schema = '${ conversion._schema }'
AND table_name = 'state_logs_${ conversion._schema }${ conversion._mySqlDbName }');`;
const result: DBAccessQueryResult = await dbAccess.query('Boot', sql, DBVendors.PG, true, false);
const isExists: boolean = !!result.data.rows[0].exists;
const message: string = `${ (isExists
? '\n\t--[boot] NMIG is ready to restart after some failure.\n\t--[boot] Consider checking log files at the end of migration.'
: '\n\t--[boot] NMIG is ready to start.') } \n\t--[boot] Proceed? [Y/n]`;
const logo: string = '\n\t/\\_ |\\ /\\/\\ /\\___'
+ '\n\t| \\ | |\\ | | | __'
+ '\n\t| |\\\\| || | | | \\_ \\'
+ '\n\t| | \\| || | | |__/ |'
+ '\n\t\\| \\/ /_|/______/'
+ '\n\n\tNMIG - the database migration tool'
+ '\n\tCopyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>\n\n'
+ '\t--[boot] Configuration has been just loaded.'
+ message;
console.log(logo);
process
.stdin
.resume()
.setEncoding(conversion._encoding)
.on('data', (stdin: string) => {
if (stdin.indexOf('n') !== -1) {
console.log('\t--[boot] Migration aborted.\n');
process.exit();
} else if (stdin.indexOf('Y') !== -1) {
return conversion;
} else {
const hint: string = `\t--[boot] Unexpected input ${ stdin }\n
\t--[boot] Expected input is upper case Y\n
\t--[boot] or lower case n\n${ message }`;
console.log(hint);
}
});
}

View file

@ -18,16 +18,11 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
/**
* Define if given type is one of MySQL spacial types.
*
* @param {String} type
*
* @returns {Boolean}
* Defines if given type is one of MySQL spacial types.
*/
const isSpacial = type => {
const isSpacial = (type: string): boolean => {
return type.indexOf('geometry') !== -1
|| type.indexOf('point') !== -1
|| type.indexOf('linestring') !== -1
@ -35,69 +30,50 @@ const isSpacial = type => {
};
/**
* Define if given type is one of MySQL binary types.
*
* @param {String} type
*
* @returns {Boolean}
* Defines if given type is one of MySQL binary types.
*/
const isBinary = type => {
const isBinary = (type: string): boolean => {
return type.indexOf('blob') !== -1 || type.indexOf('binary') !== -1;
};
/**
* Define if given type is one of MySQL bit types.
*
* @param {String} type
*
* @returns {Boolean}
* Defines if given type is one of MySQL bit types.
*/
const isBit = type => {
const isBit = (type: string): boolean => {
return type.indexOf('bit') !== -1;
};
/**
* Define if given type is one of MySQL date-time types.
*
* @param {String} type
*
* @returns {Boolean}
* Defines if given type is one of MySQL date-time types.
*/
const isDateTime = type => {
const isDateTime = (type: string): boolean => {
return type.indexOf('timestamp') !== -1 || type.indexOf('date') !== -1;
};
/**
* Arranges columns data before loading.
*
* @param {Array} arrTableColumns
* @param {Number} mysqlVersion
*
* @returns {String}
*/
module.exports = (arrTableColumns, mysqlVersion) => {
let strRetVal = '';
const arrTableColumnsLength = arrTableColumns.length;
const wkbFunc = mysqlVersion >= 5.76 ? 'ST_AsWKB' : 'AsWKB';
export default (arrTableColumns: any[], mysqlVersion: string|number): string => {
let strRetVal: string = '';
const wkbFunc: string = mysqlVersion >= 5.76 ? 'ST_AsWKB' : 'AsWKB';
for (let i = 0; i < arrTableColumnsLength; ++i) {
const field = arrTableColumns[i].Field;
const type = arrTableColumns[i].Type;
arrTableColumns.forEach((column: any) => {
const field: string = column.Field;
const type: string = column.Type;
if (isSpacial(type)) {
// Apply HEX(ST_AsWKB(...)) due to the issue, described at https://bugs.mysql.com/bug.php?id=69798
strRetVal += 'HEX(' + wkbFunc + '(`' + field + '`)) AS `' + field + '`,';
strRetVal += `HEX(${ wkbFunc }(\`${ field }\`)) AS \`${ field }\`,`;
} else if (isBinary(type)) {
strRetVal += 'HEX(`' + field + '`) AS `' + field + '`,';
strRetVal += `HEX(\`${ field }\`) AS \`${ field }\`,`;
} else if (isBit(type)) {
strRetVal += 'BIN(`' + field + '`) AS `' + field + '`,';
strRetVal += `BIN(\`${ field }\`) AS \`${ field }\`,`;
} else if (isDateTime(type)) {
strRetVal += 'IF(`' + field + '` IN(\'0000-00-00\', \'0000-00-00 00:00:00\'), \'-INFINITY\', CAST(`'
+ field + '` AS CHAR)) AS `' + field + '`,';
strRetVal += `IF(\`${ field }\` IN('0000-00-00', '0000-00-00 00:00:00'), '-INFINITY', CAST(\`${ field }\` AS CHAR)) AS \`${ field }\`,`;
} else {
strRetVal += '`' + field + '` AS `' + field + '`,';
strRetVal += `\`${ field }\` AS \`${ field }\`,`;
}
}
});
return strRetVal.slice(0, -1);
};
}

View file

@ -1,151 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const mysql = require('mysql');
const pg = require('pg');
const generateError = require('./ErrorGenerator');
module.exports = class ConnectionEmitter {
/**
* ConnectionEmitter constructor.
*
* @param {Conversion} conversion
*/
constructor(conversion) {
this._conversion = conversion;
}
/**
* Ensure MySQL connection pool existence.
*
* @returns {undefined}
*/
_getMysqlConnection() {
if (!this._conversion._mysql) {
this._conversion._sourceConString.connectionLimit = this._conversion._maxDbConnectionPoolSize;
const pool = mysql.createPool(this._conversion._sourceConString);
if (!pool) {
generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...');
process.exit();
}
this._conversion._mysql = pool;
}
}
/**
* Ensure PostgreSQL connection pool existence.
*
* @returns {undefined}
*/
_getPgConnection() {
if (!this._conversion._pg) {
this._conversion._targetConString.max = this._conversion._maxDbConnectionPoolSize;
const pool = new pg.Pool(this._conversion._targetConString);
if (!pool) {
generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...');
process.exit();
}
this._conversion._pg = pool;
this._conversion._pg.on('error', error => {
const message = `Cannot connect to PostgreSQL server...\n${ error.message }\n${ error.stack }`;
generateError(this._conversion, message);
process.exit();
});
}
}
/**
* Obtain Connection instance.
*
* @returns {Promise<Connection>}
*/
async getMysqlClient() {
try {
this._getMysqlConnection();
return await this._conversion._mysql.getConnection();
} catch (error) {
generateError(this._conversion, `\t--[getMysqlClient] Cannot connect to PostgreSQL server...\n${ error }`);
process.exit();
}
}
/**
* Obtain pg.Client instance.
*
* @returns {Promise<pg.Client>}
*/
async getPgClient() {
try {
this._getPgConnection();
return await this._conversion._pg.connect();
} catch (error) {
generateError(this._conversion, `\t--[getPgClient] Cannot connect to PostgreSQL server...\n${ error }`);
process.exit();
}
}
/**
* Runs a query on the first available idle client and returns its result.
* Note, the pool does the acquiring and releasing of the client internally.
*
* @param {String} sql
*
* @returns {Promise<pg.Result>}
*/
async runPgPoolQuery(sql) {
try {
this._getPgConnection();
return await this._conversion._pg.query(sql);
} catch (error) {
generateError(this._conversion, `\t--[pgPoolQuery] Cannot connect to PostgreSQL server...\n${ error }`);
process.exit();
}
}
/**
* Releases MySQL Client back to the pool.
*
* @param {Connection} mysqlClient
*
* @returns {undefined}
*/
releaseMysqlClient(mysqlClient) {
mysqlClient.release();
}
/**
* Releases pg.Client back to the pool.
*
* @param {pg.Client} pgClient
*
* @returns {undefined}
*/
releasePgClient(pgClient) {
pgClient.release();
}
};

View file

@ -1,86 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const mysql = require('mysql');
const pg = require('pg');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const generateReport = require('./ReportGenerator');
/**
* Check if both servers are connected.
* If not, than create connections.
* Kill current process if can not connect.
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports = self => {
return new Promise(resolve => {
const mysqlConnectionPromise = new Promise((mysqlResolve, mysqlReject) => {
if (!self._mysql) {
self._sourceConString.connectionLimit = self._maxDbConnectionPoolSize;
self._sourceConString.multipleStatements = true;
const pool = mysql.createPool(self._sourceConString);
if (pool) {
self._mysql = pool;
mysqlResolve();
} else {
log(self, '\t--[connect] Cannot connect to MySQL server...');
mysqlReject();
}
} else {
mysqlResolve();
}
});
const pgConnectionPromise = new Promise((pgResolve, pgReject) => {
if (!self._pg) {
self._targetConString.max = self._maxDbConnectionPoolSize;
const pool = new pg.Pool(self._targetConString);
if (pool) {
self._pg = pool;
self._pg.on('error', error => {
const message = 'Cannot connect to PostgreSQL server...\n' + error.message + '\n' + error.stack;
generateError(self, message);
generateReport(self, message);
});
pgResolve();
} else {
log(self, '\t--[connect] Cannot connect to PostgreSQL server...');
pgReject();
}
} else {
pgResolve();
}
});
Promise.all([mysqlConnectionPromise, pgConnectionPromise])
.then(() => resolve())
.catch(() => process.exit());
});
};

82
src/Connector.ts Normal file
View file

@ -0,0 +1,82 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import { Pool as MySQLPool } from 'mysql';
import * as mysql from 'mysql';
import { Pool as PgPool } from 'pg';
import log from './Logger';
import generateError from './ErrorGenerator';
import generateReport from './ReportGenerator';
import Conversion from './Conversion';
/**
* Check if both servers are connected.
* If not, than create connections.
* Kill current process if can not connect.
*/
export default (conversion: Conversion): Promise<Conversion> => {
return new Promise(resolve => {
const mysqlConnectionPromise: Promise<void> = new Promise((mysqlResolve, mysqlReject) => {
if (!conversion._mysql) {
conversion._sourceConString.connectionLimit = conversion._maxDbConnectionPoolSize;
conversion._sourceConString.multipleStatements = true;
const pool: MySQLPool = mysql.createPool(conversion._sourceConString);
if (pool) {
conversion._mysql = pool;
mysqlResolve();
} else {
log(conversion, '\t--[connect] Cannot connect to MySQL server...');
mysqlReject();
}
} else {
mysqlResolve();
}
});
const pgConnectionPromise: Promise<void> = new Promise((pgResolve, pgReject) => {
if (!conversion._pg) {
conversion._targetConString.max = conversion._maxDbConnectionPoolSize;
const pool: PgPool = new PgPool(conversion._targetConString);
if (pool) {
conversion._pg = pool;
conversion._pg.on('error', (error: Error) => {
const message: string = `Cannot connect to PostgreSQL server...\n' ${ error.message }\n${ error.stack }`;
generateError(conversion, message);
generateReport(conversion, message);
});
pgResolve();
} else {
log(conversion, '\t--[connect] Cannot connect to PostgreSQL server...');
pgReject();
}
} else {
pgResolve();
}
});
Promise.all([mysqlConnectionPromise, pgConnectionPromise])
.then(() => resolve(conversion))
.catch(() => process.exit());
});
}

View file

@ -1,134 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const sequencesProcessor = require('./SequencesProcessor');
const dataPoolManager = require('./DataPoolManager');
const runVacuumFullAndAnalyze = require('./VacuumProcessor');
const migrationStateManager = require('./MigrationStateManager');
const generateReport = require('./ReportGenerator');
const processEnum = require('./EnumProcessor');
const processNull = require('./NullProcessor');
const processDefault = require('./DefaultProcessor');
const processIndexAndKey = require('./IndexAndKeyProcessor');
const processComments = require('./CommentsProcessor');
const processForeignKey = require('./ForeignKeyProcessor');
const processViews = require('./ViewGenerator');
const { dropDataChunkIdColumn } = require('./ConsistencyEnforcer');
/**
* Continues migration process after data loading, when migrate_only_data is true.
*
* @param {Conversion} self
*
* @returns {undefined}
*/
const continueProcessAfterDataLoadingShort = self => {
const promises = [];
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
const tableName = self._tablesToMigrate[i];
promises.push(
dropDataChunkIdColumn(self, tableName).then(() => {
return sequencesProcessor.setSequenceValue(self, tableName);
})
);
}
Promise.all(promises).then(() => {
return dataPoolManager.dropDataPoolTable(self);
}).then(() => {
return runVacuumFullAndAnalyze(self);
}).then(() => {
return migrationStateManager.dropStateLogsTable(self);
}).then(
() => generateReport(self, 'NMIG migration is accomplished.')
);
}
/**
* Continues migration process after data loading, when migrate_only_data is false.
*
* @param {Conversion} self
*
* @returns {undefined}
*/
const continueProcessAfterDataLoadingLong = self => {
migrationStateManager.get(self, 'per_table_constraints_loaded').then(isTableConstraintsLoaded => {
const promises = [];
if (!isTableConstraintsLoaded) {
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
const tableName = self._tablesToMigrate[i];
promises.push(
dropDataChunkIdColumn(self, tableName).then(() => {
return processEnum(self, tableName);
}).then(() => {
return processNull(self, tableName);
}).then(() => {
return processDefault(self, tableName);
}).then(() => {
return sequencesProcessor.createSequence(self, tableName);
}).then(() => {
return processIndexAndKey(self, tableName);
}).then(() => {
return processComments(self, tableName);
})
);
}
}
Promise.all(promises).then(() => {
migrationStateManager.set(self, 'per_table_constraints_loaded').then(() => {
return processForeignKey(self);
}).then(() => {
return migrationStateManager.set(self, 'foreign_keys_loaded');
}).then(() => {
return dataPoolManager.dropDataPoolTable(self);
}).then(() => {
return processViews(self);
}).then(() => {
return migrationStateManager.set(self, 'views_loaded');
}).then(() => {
return runVacuumFullAndAnalyze(self);
}).then(() => {
return migrationStateManager.dropStateLogsTable(self);
}).then(
() => generateReport(self, 'NMIG migration is accomplished.')
);
});
});
}
/**
* Continues migration process after data loading.
*
* @param {Conversion} self
*
* @returns {undefined}
*/
module.exports = self => {
if (self._migrateOnlyData) {
continueProcessAfterDataLoadingShort(self);
} else {
continueProcessAfterDataLoadingLong(self);
}
};

View file

@ -0,0 +1,91 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import sequencesProcessor from './SequencesProcessor';
import * as dataPoolManager from './DataPoolManager';
import runVacuumFullAndAnalyze from './VacuumProcessor';
import * as migrationStateManager from './MigrationStateManager';
import generateReport from './ReportGenerator';
import processEnum from './EnumProcessor';
import processNull from './NullProcessor';
import processDefault from './DefaultProcessor';
import processIndexAndKey from './IndexAndKeyProcessor';
import processComments from './CommentsProcessor';
import processForeignKey from './ForeignKeyProcessor';
import processViews from './ViewGenerator';
import { dropDataChunkIdColumn } from './ConsistencyEnforcer';
import Conversion from './Conversion';
/**
* Continues migration process after data loading, when migrate_only_data is true.
*/
async function continueProcessAfterDataLoadingShort(conversion: Conversion): Promise<void> {
const promises: Promise<void>[] = conversion._tablesToMigrate.map(async (tableName: string) => {
await dropDataChunkIdColumn(conversion, tableName);
return sequencesProcessor.setSequenceValue(conversion, tableName);
});
await Promise.all(promises);
await dataPoolManager.dropDataPoolTable(conversion);
await runVacuumFullAndAnalyze(conversion);
await migrationStateManager.dropStateLogsTable(conversion);
generateReport(conversion, 'NMIG migration is accomplished.');
}
/**
* Continues migration process after data loading, when migrate_only_data is false.
*/
async function continueProcessAfterDataLoadingLong(conversion: Conversion): Promise<void> {
const isTableConstraintsLoaded: boolean = await migrationStateManager.get(conversion, 'per_table_constraints_loaded');
const promises: Promise<void>[] = conversion._tablesToMigrate.map(async (tableName: string) => {
if (!isTableConstraintsLoaded) {
await dropDataChunkIdColumn(conversion, tableName);
await processEnum(conversion, tableName);
await processNull(conversion, tableName);
await processDefault(conversion, tableName);
await sequencesProcessor.createSequence(conversion, tableName);
await processIndexAndKey(conversion, tableName);
await processComments(conversion, tableName);
}
});
await Promise.all(promises);
await migrationStateManager.set(conversion, 'per_table_constraints_loaded');
await processForeignKey(conversion);
await migrationStateManager.set(conversion, 'foreign_keys_loaded');
await dataPoolManager.dropDataPoolTable(conversion);
await processViews(conversion);
await migrationStateManager.set(conversion, 'views_loaded');
await runVacuumFullAndAnalyze(conversion);
await migrationStateManager.dropStateLogsTable(conversion);
generateReport(conversion, 'NMIG migration is accomplished.');
}
/**
* Continues migration process after data loading.
*/
export default async function(conversion: Conversion): Promise<void> {
if (conversion._migrateOnlyData) {
await continueProcessAfterDataLoadingShort(conversion);
return;
}
await continueProcessAfterDataLoadingLong(conversion);
}

View file

@ -19,7 +19,9 @@
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import * as path from 'path';
import {EventEmitter} from "events";
import { EventEmitter } from 'events';
import { Pool as MySQLPool } from 'mysql';
import { Pool as PgPool } from 'pg';
export default class Conversion {
/**
@ -115,17 +117,17 @@ export default class Conversion {
/**
* Current version of source (MySQL) db.
*/
public _mysqlVersion: string;
public _mysqlVersion: string|number;
/**
* Node-MySQL connections pool.
*/
public _mysql: any;
public _mysql?: MySQLPool;
/**
* Node-Postgres connection pool.
*/
public _pg: any;
public _pg?: PgPool;
/**
* An object, representing additional configuration options.
@ -187,10 +189,15 @@ export default class Conversion {
*/
public _eventEmitter: EventEmitter|null;
/**
* The data types map.
*/
public _dataTypesMap: any;
/**
* Constructor.
*/
constructor(config: any) {
public constructor(config: any) {
this._config = config;
this._sourceConString = this._config.source;
this._targetConString = this._config.target;
@ -206,8 +213,6 @@ export default class Conversion {
this._dataChunkSize = this._config.data_chunk_size === undefined ? 1 : +this._config.data_chunk_size;
this._dataChunkSize = this._dataChunkSize <= 0 ? 1 : this._dataChunkSize;
this._0777 = '0777';
this._mysql = null;
this._pg = null;
this._mysqlVersion = '5.6.21'; // Simply a default value.
this._extraConfig = this._config.extraConfig === undefined ? false : this._config.extraConfig;
this._tablesToMigrate = [];
@ -240,7 +245,7 @@ export default class Conversion {
/**
* Checks if given value is integer number.
*/
isIntNumeric(value: any): boolean {
private isIntNumeric(value: any): boolean {
return !isNaN(parseInt(value)) && isFinite(value);
}
};
}

217
src/DBAccess.ts Normal file
View file

@ -0,0 +1,217 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import * as mysql from 'mysql';
import { Pool as MySQLPool, PoolConnection, MysqlError } from 'mysql';
import { Pool as PgPool, PoolClient, QueryResult } from 'pg';
import generateError from './ErrorGenerator';
import Conversion from './Conversion';
import generateReport from './ReportGenerator';
import DBVendors from './DBVendors';
import DBAccessQueryResult from './DBAccessQueryResult';
export default class DBAccess {
/**
* Conversion instance.
*/
private readonly _conversion: Conversion;
/**
* DBAccess constructor.
*/
public constructor(conversion: Conversion) {
this._conversion = conversion;
}
/**
* Ensure MySQL connection pool existence.
*/
private _getMysqlConnection(): void {
if (!this._conversion._mysql) {
this._conversion._sourceConString.connectionLimit = this._conversion._maxDbConnectionPoolSize;
this._conversion._sourceConString.multipleStatements = true;
const pool: MySQLPool = mysql.createPool(this._conversion._sourceConString);
if (!pool) {
generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...');
process.exit();
}
this._conversion._mysql = pool;
}
}
/**
* Ensure PostgreSQL connection pool existence.
*/
private _getPgConnection(): void {
if (!this._conversion._pg) {
this._conversion._targetConString.max = this._conversion._maxDbConnectionPoolSize;
const pool: PgPool = new PgPool(this._conversion._targetConString);
if (!pool) {
generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...');
process.exit();
}
this._conversion._pg = pool;
this._conversion._pg.on('error', (error: Error) => {
const message: string = `Cannot connect to PostgreSQL server...\n' ${ error.message }\n${ error.stack }`;
generateError(this._conversion, message);
generateReport(this._conversion, message);
});
}
}
/**
* Obtain PoolConnection instance.
*/
public getMysqlClient(): Promise<PoolConnection> {
this._getMysqlConnection();
return new Promise((resolve, reject) => {
(<MySQLPool>this._conversion._mysql).getConnection((err: MysqlError|null, connection: PoolConnection) => {
return err ? reject(err) : resolve(connection);
});
});
}
/**
* Obtain PoolClient instance.
*/
public getPgClient(): Promise<PoolClient> {
this._getPgConnection();
return (<PgPool>this._conversion._pg).connect();
}
/**
* Runs a query on the first available idle client and returns its result.
* Note, the pool does the acquiring and releasing of the client internally.
*/
public runPgPoolQuery(sql: string): Promise<QueryResult> {
this._getPgConnection();
return (<PgPool>this._conversion._pg).query(sql);
}
/**
* Releases MySQL connection back to the pool.
*/
public releaseMysqlClient(connection: PoolConnection): void {
connection.release();
}
/**
* Releases PostgreSQL connection back to the pool.
*/
public releasePgClient(pgClient: PoolClient): void {
pgClient.release();
}
/**
* Sends given SQL query to specified DB.
* Performs appropriate actions (requesting/releasing client) against target connections pool.
*/
public async query(
caller: string,
sql: string,
vendor: DBVendors,
processExitOnError: boolean,
shouldReturnClient: boolean,
client?: PoolConnection|PoolClient
): Promise<DBAccessQueryResult> {
// Checks if there is an available client.
if (!client) {
try {
// Client is undefined.
// It must be requested from the connections pool.
client = vendor === DBVendors.PG ? await this.getPgClient() : await this.getMysqlClient();
} catch (error) {
// Client request failed.
// Must exit the function.
return processExitOnError ? process.exit() : { client: client, data: error };
}
}
return vendor === DBVendors.PG
? this._queryPG(caller, sql, processExitOnError, shouldReturnClient, (<PoolClient>client))
: this._queryMySQL(caller, sql, processExitOnError, shouldReturnClient, (<PoolConnection>client));
}
/**
* Sends given SQL query to MySQL.
*/
private _queryMySQL(
caller: string,
sql: string,
processExitOnError: boolean,
shouldReturnClient: boolean,
client?: PoolConnection
): Promise<DBAccessQueryResult> {
return new Promise<DBAccessQueryResult>(async (resolve, reject) => {
(<PoolConnection>client).query(sql, (error: MysqlError|null, data: any) => {
// Checks if there are more queries to be sent using current client.
if (!shouldReturnClient) {
// No more queries to be sent using current client.
// The client must be released.
this.releaseMysqlClient((<PoolConnection>client));
client = undefined;
}
if (error) {
// An error occurred during DB querying.
generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
return processExitOnError ? process.exit() : reject({ client: client, data: error });
}
return resolve({ client: client, data: data });
});
});
}
/**
* Sends given SQL query to PostgreSQL.
*/
private async _queryPG(
caller: string,
sql: string,
processExitOnError: boolean,
shouldReturnClient: boolean,
client?: PoolClient
): Promise<DBAccessQueryResult> {
try {
const data: any = await (<PoolClient>client).query(sql);
// Checks if there are more queries to be sent using current client.
if (!shouldReturnClient) {
// No more queries to be sent using current client.
// The client must be released.
this.releasePgClient((<PoolClient>client));
client = undefined;
}
return { client: client, data: data };
} catch (error) {
// An error occurred during DB querying.
generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql);
return processExitOnError ? process.exit() : { client: client, data: error };
}
}
}

View file

@ -18,18 +18,18 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
import { PoolClient } from 'pg';
import { PoolConnection } from 'mysql';
module.exports = class Table {
export default interface DBAccessQueryResult {
/**
* This function represents table related metadata.
* Constructor.
*
* @param {String} tableLogPath
* MySQL's or PostgreSQL's client instance.
* The client may be undefined.
*/
constructor(tableLogPath) {
this.tableLogPath = tableLogPath;
this.arrTableColumns = [];
this.totalRowsInserted = 0;
}
};
client?: PoolConnection|PoolClient;
/**
* Query result.
*/
data: any;
}

26
src/DBVendors.ts Normal file
View file

@ -0,0 +1,26 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
enum DBVendors {
MYSQL,
PG,
}
export default DBVendors;

View file

@ -1,148 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const connect = require('./Connector');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const arrangeColumnsData = require('./ColumnsDataArranger');
const extraConfigProcessor = require('./ExtraConfigProcessor');
/**
* Prepares an array of tables and chunk offsets.
*
* @param {Conversion} self
* @param {String} tableName
* @param {Boolean} haveDataChunksProcessed
*
* @returns {Promise}
*/
module.exports = (self, tableName, haveDataChunksProcessed) => {
return connect(self).then(() => {
return new Promise(resolve => {
if (haveDataChunksProcessed) {
return resolve();
}
self._mysql.getConnection((error, connection) => {
if (error) {
// The connection is undefined.
generateError(self, '\t--[prepareDataChunks] Cannot connect to MySQL server...\n\t' + error);
resolve();
} else {
// Determine current table size, apply "chunking".
const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
let sql = "SELECT (data_length / 1024 / 1024) AS size_in_mb "
+ "FROM information_schema.tables "
+ "WHERE table_schema = '" + self._mySqlDbName + "' "
+ "AND table_name = '" + originalTableName + "';";
connection.query(sql, (err, rows) => {
if (err) {
connection.release();
generateError(self, '\t--[prepareDataChunks] ' + err, sql);
resolve();
} else {
const tableSizeInMb = +rows[0].size_in_mb;
rows = null;
sql = 'SELECT COUNT(1) AS rows_count FROM `' + originalTableName + '`;';
const strSelectFieldList = arrangeColumnsData(
self._dicTables[tableName].arrTableColumns,
self._mysqlVersion
);
connection.query(sql, (err2, rows2) => {
connection.release();
if (err2) {
generateError(self, '\t--[prepareDataChunks] ' + err2, sql);
resolve();
} else {
const rowsCnt = rows2[0].rows_count;
rows2 = null;
let chunksCnt = tableSizeInMb / self._dataChunkSize;
chunksCnt = chunksCnt < 1 ? 1 : chunksCnt;
const rowsInChunk = Math.ceil(rowsCnt / chunksCnt);
const arrDataPoolPromises = [];
const msg = '\t--[prepareDataChunks] Total rows to insert into '
+ '"' + self._schema + '"."' + tableName + '": ' + rowsCnt;
log(self, msg, self._dicTables[tableName].tableLogPath);
for (let offset = 0; offset < rowsCnt; offset += rowsInChunk) {
arrDataPoolPromises.push(new Promise(resolveDataUnit => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[prepareDataChunks] Cannot connect to PostgreSQL server...\n' + error);
resolveDataUnit();
} else {
const strJson = '{"_tableName":"' + tableName
+ '","_selectFieldList":"' + strSelectFieldList + '",'
+ '"_offset":' + offset + ','
+ '"_rowsInChunk":' + rowsInChunk + ','
+ '"_rowsCnt":' + rowsCnt + '}';
/*
* Define current data chunk size in MB.
* If there is only one chunk, then its size is equal to the table size.
* If there are more than one chunk,
* then a size of each chunk besides the last one is equal to "data_chunk_size",
* and a size of the last chunk is either "data_chunk_size" or tableSizeInMb % chunksCnt.
*/
let currentChunkSizeInMb = 0;
if (chunksCnt === 1) {
currentChunkSizeInMb = tableSizeInMb;
} else if (offset + rowsInChunk >= rowsCnt) {
currentChunkSizeInMb = tableSizeInMb % chunksCnt;
currentChunkSizeInMb = currentChunkSizeInMb || self._dataChunkSize;
} else {
currentChunkSizeInMb = self._dataChunkSize;
}
sql = 'INSERT INTO "' + self._schema + '"."data_pool_' + self._schema
+ self._mySqlDbName + '"("is_started", "json", "size_in_mb")'
+ ' VALUES(FALSE, $1, $2);';
client.query(sql, [strJson, currentChunkSizeInMb], err => {
done();
if (err) {
generateError(self, '\t--[prepareDataChunks] INSERT failed...\n' + err, sql);
}
resolveDataUnit();
});
}
});
}));
}
Promise.all(arrDataPoolPromises).then(() => resolve());
}
});
}
});
}
});
});
});
};

101
src/DataChunksProcessor.ts Normal file
View file

@ -0,0 +1,101 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import log from './Logger';
import arrangeColumnsData from './ColumnsDataArranger';
import * as extraConfigProcessor from './ExtraConfigProcessor';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
/**
* Prepares an array of tables and chunk offsets.
*/
export default async (conversion: Conversion, tableName: string, haveDataChunksProcessed: boolean): Promise<void> => {
if (haveDataChunksProcessed) {
return;
}
// Determine current table size, apply "chunking".
const originalTableName: string = extraConfigProcessor.getTableName(conversion, tableName, true);
let sql: string = `SELECT (data_length / 1024 / 1024) AS size_in_mb FROM information_schema.tables
WHERE table_schema = '${ conversion._mySqlDbName }' AND table_name = '${ originalTableName }';`;
const dbAccess: DBAccess = new DBAccess(conversion);
const sizeQueryResult: DBAccessQueryResult = await dbAccess.query(
'DataChunksProcessor::default',
sql,
DBVendors.MYSQL,
true,
true
);
const tableSizeInMb: number = +sizeQueryResult.data[0].size_in_mb;
const strSelectFieldList: string = arrangeColumnsData(conversion._dicTables[tableName].arrTableColumns, conversion._mysqlVersion);
sql = `SELECT COUNT(1) AS rows_count FROM \`${ originalTableName }\`;`;
const countResult: DBAccessQueryResult = await dbAccess.query(
'DataChunksProcessor::default',
sql,
DBVendors.MYSQL,
true,
false,
sizeQueryResult.client
);
const rowsCnt: number = countResult.data[0].rows_count;
let chunksCnt: number = tableSizeInMb / conversion._dataChunkSize;
chunksCnt = chunksCnt < 1 ? 1 : chunksCnt;
const rowsInChunk: number = Math.ceil(rowsCnt / chunksCnt);
const arrDataPoolPromises: Promise<void>[] = [];
const msg: string = `\t--[prepareDataChunks] Total rows to insert into "${ conversion._schema }"."${ tableName }": ${ rowsCnt }`;
log(conversion, msg, conversion._dicTables[tableName].tableLogPath);
for (let offset: number = 0; offset < rowsCnt; offset += rowsInChunk) {
arrDataPoolPromises.push(new Promise<void>(async resolveDataUnit => {
const strJson: string = `{"_tableName":"${ tableName }","_selectFieldList":"${ strSelectFieldList }",
"_offset":${ offset },"_rowsInChunk":${ rowsInChunk },"_rowsCnt":${ rowsCnt }`;
// Define current data chunk size in MB.
// If there is only one chunk, then its size is equal to the table size.
// If there are more than one chunk,
// then a size of each chunk besides the last one is equal to "data_chunk_size",
// and a size of the last chunk is either "data_chunk_size" or tableSizeInMb % chunksCnt.
let currentChunkSizeInMb: number = 0;
if (chunksCnt === 1) {
currentChunkSizeInMb = tableSizeInMb;
} else if (offset + rowsInChunk >= rowsCnt) {
currentChunkSizeInMb = tableSizeInMb % chunksCnt;
currentChunkSizeInMb = currentChunkSizeInMb || conversion._dataChunkSize;
} else {
currentChunkSizeInMb = conversion._dataChunkSize;
}
sql = `INSERT INTO "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"
("is_started", "json", "size_in_mb") VALUES (FALSE, '${ strJson }', ${ currentChunkSizeInMb });`;
await dbAccess.query('DataChunksProcessor::default', sql, DBVendors.PG,true,false);
resolveDataUnit();
}));
}
await Promise.all(arrDataPoolPromises);
}

View file

@ -26,11 +26,11 @@ const csvStringify = require('./CsvStringifyModified');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const connect = require('./Connector');
const Conversion = require('./Classes/Conversion');
const MessageToMaster = require('./Classes/MessageToMaster');
const Conversion = require('./Conversion');
const MessageToMaster = require('./MessageToMaster');
const { enforceConsistency } = require('./ConsistencyEnforcer');
const extraConfigProcessor = require('./ExtraConfigProcessor');
const BufferStream = require('./Classes/BufferStream');
const BufferStream = require('./BufferStream');
process.on('message', signal => {
const self = new Conversion(signal.config);

View file

@ -1,191 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const childProcess = require('child_process');
const path = require('path');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const MessageToDataLoader = require('./Classes/MessageToDataLoader');
const processConstraints = require('./ConstraintsProcessor');
const decodeBinaryData = require('./BinaryDataDecoder');
/**
* Kill a process specified by the pid.
*
* @param {Number} pid
*
* @returns {undefined}
*/
const killProcess = pid => {
try {
process.kill(pid);
} catch (killError) {
generateError(self, '\t--[killProcess] ' + killError);
}
};
/**
* Check if all data chunks were processed.
*
* @param {Conversion} self
*
* @returns {Boolean}
*/
const dataPoolProcessed = self => {
return self._processedChunks === self._dataPool.length;
};
/**
* Get a size (in MB) of the smallest, non processed data chunk.
* If all data chunks are processed then return 0.
*
* @param {Conversion} self
*
* @returns {Number}
*/
const getSmallestDataChunkSizeInMb = self => {
for (let i = self._dataPool.length - 1; i >= 0; --i) {
if (self._dataPool[i]._processed === false) {
return self._dataPool[i]._size_in_mb;
}
}
return 0;
};
/**
* Create an array of indexes, that point to data chunks, that will be processed during current COPY operation.
*
* @param {Conversion} self
*
* @returns {Array}
*/
const fillBandwidth = self => {
const dataChunkIndexes = [];
/*
* Loop through the data pool from the beginning to the end.
* Note, the data pool is created with predefined order, the order by data chunk size descending.
* Note, the "bandwidth" variable represents an actual amount of data, that will be loaded during current COPY operation.
*/
for (let i = 0, bandwidth = 0; i < self._dataPool.length; ++i) {
/*
* Check if current chunk has already been marked as "processed".
* If yes, then continue to the next iteration.
*/
if (self._dataPool[i]._processed === false) {
// Sum a size of data chunks, that are yet to be processed.
bandwidth += self._dataPool[i]._size_in_mb;
if (self._dataChunkSize - bandwidth >= getSmallestDataChunkSizeInMb(self)) {
/*
* Currently, the bandwidth is smaller than "data_chunk_size",
* and the difference between "data_chunk_size" and the bandwidth
* is larger or equal to currently-smallest data chunk.
* This means, that more data chunks can be processed during current COPY operation.
*/
dataChunkIndexes.push(i);
self._dataPool[i]._processed = true;
continue;
}
if (self._dataChunkSize >= bandwidth) {
/*
* Currently, the "data_chunk_size" is greater or equal to the bandwidth.
* This means, that no more data chunks can be processed during current COPY operation.
* Current COPY operation will be performed with maximal possible bandwidth capacity.
*/
dataChunkIndexes.push(i);
self._dataPool[i]._processed = true;
break;
}
/*
* This data chunk will not be processed during current COPY operation, because when it is added
* to the bandwidth, the bandwidth's size may become larger than "data_chunk_size".
* The bandwidth's value should be decreased prior the next iteration.
*/
bandwidth -= self._dataPool[i]._size_in_mb;
}
}
return dataChunkIndexes;
};
/**
* Instructs DataLoader which data chunks should be loaded.
* No need to check the state-log.
* If dataPool's length is zero, then nmig will proceed to the next step.
*
* @param {Conversion} self
* @param {String} strDataLoaderPath
* @param {Object} options
*
* @returns {undefined}
*/
const pipeData = (self, strDataLoaderPath, options) => {
if (dataPoolProcessed(self)) {
return decodeBinaryData(self).then(processConstraints);
}
const loaderProcess = childProcess.fork(strDataLoaderPath, options);
const bandwidth = fillBandwidth(self);
const chunksToLoad = bandwidth.map(index => {
return self._dataPool[index];
});
loaderProcess.on('message', signal => {
if (typeof signal === 'object') {
self._dicTables[signal.tableName].totalRowsInserted += signal.rowsInserted;
const msg = '\t--[pipeData] For now inserted: ' + self._dicTables[signal.tableName].totalRowsInserted + ' rows, '
+ 'Total rows to insert into "' + self._schema + '"."' + signal.tableName + '": ' + signal.totalRowsToInsert;
log(self, msg);
} else {
killProcess(loaderProcess.pid);
self._processedChunks += chunksToLoad.length;
return pipeData(self, strDataLoaderPath, options);
}
});
loaderProcess.send(new MessageToDataLoader(self._config, chunksToLoad));
};
/**
* Manage the DataPipe.
*
* @param {Conversion} self
*
* @returns {undefined}
*/
module.exports = self => {
if (dataPoolProcessed(self)) {
return decodeBinaryData(self).then(processConstraints);
}
const strDataLoaderPath = path.join(__dirname, 'DataLoader.js');
const options = self._loaderMaxOldSpaceSize === 'DEFAULT'
? Object.create(null)
: { execArgv: ['--max-old-space-size=' + self._loaderMaxOldSpaceSize] };
return pipeData(self, strDataLoaderPath, options);
};

155
src/DataPipeManager.ts Normal file
View file

@ -0,0 +1,155 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import { ChildProcess } from 'child_process';
import * as childProcess from 'child_process';
import * as path from 'path';
import log from './Logger';
import Conversion from './Conversion';
import generateError from './ErrorGenerator';
import MessageToDataLoader from './MessageToDataLoader';
import processConstraints from './ConstraintsProcessor';
import decodeBinaryData from './BinaryDataDecoder';
/**
* Kills a process specified by the pid.
*/
function killProcess(pid: number, conversion: Conversion): void {
try {
process.kill(pid);
} catch (killError) {
generateError(conversion, `\t--[killProcess] ${ killError }`);
}
}
/**
* Checks if all data chunks were processed.
*/
function dataPoolProcessed(conversion: Conversion): boolean {
return conversion._processedChunks === conversion._dataPool.length;
}
/**
* Gets a size (in MB) of the smallest, non processed data chunk.
* If all data chunks are processed then returns 0.
*/
function getSmallestDataChunkSizeInMb(conversion: Conversion): number {
for (let i: number = conversion._dataPool.length - 1; i >= 0; --i) {
if (conversion._dataPool[i]._processed === false) {
return conversion._dataPool[i]._size_in_mb;
}
}
return 0;
}
/**
* Creates an array of indexes, that point to data chunks, that will be processed during current COPY operation.
*/
function fillBandwidth(conversion: Conversion): number[] {
const dataChunkIndexes: number[] = [];
// Loop through the data pool from the beginning to the end.
// Note, the data pool is created with predefined order, the order by data chunk size descending.
// Note, the "bandwidth" variable represents an actual amount of data, that will be loaded during current COPY operation.
for (let i: number = 0, bandwidth = 0; i < conversion._dataPool.length; ++i) {
// Check if current chunk has already been marked as "processed".
// If yes, then continue to the next iteration.
if (conversion._dataPool[i]._processed === false) {
// Sum a size of data chunks, that are yet to be processed.
bandwidth += conversion._dataPool[i]._size_in_mb;
if (conversion._dataChunkSize - bandwidth >= getSmallestDataChunkSizeInMb(conversion)) {
// Currently, the bandwidth is smaller than "data_chunk_size",
// and the difference between "data_chunk_size" and the bandwidth
// is larger or equal to currently-smallest data chunk.
// This means, that more data chunks can be processed during current COPY operation.
dataChunkIndexes.push(i);
conversion._dataPool[i]._processed = true;
continue;
}
if (conversion._dataChunkSize >= bandwidth) {
// Currently, the "data_chunk_size" is greater or equal to the bandwidth.
// This means, that no more data chunks can be processed during current COPY operation.
// Current COPY operation will be performed with maximal possible bandwidth capacity.
dataChunkIndexes.push(i);
conversion._dataPool[i]._processed = true;
break;
}
// This data chunk will not be processed during current COPY operation, because when it is added
// to the bandwidth, the bandwidth's size may become larger than "data_chunk_size".
// The bandwidth's value should be decreased prior the next iteration.
bandwidth -= conversion._dataPool[i]._size_in_mb;
}
}
return dataChunkIndexes;
}
/**
* Instructs DataLoader which data chunks should be loaded.
* No need to check the state-log.
* If dataPool's length is zero, then nmig will proceed to the next step.
*/
async function pipeData(conversion: Conversion, dataLoaderPath: string, options: any): Promise<void> {
if (dataPoolProcessed(conversion)) {
conversion = await decodeBinaryData(conversion);
return processConstraints(conversion);
}
const loaderProcess: ChildProcess = childProcess.fork(dataLoaderPath, options);
const bandwidth: number[] = fillBandwidth(conversion);
const chunksToLoad: any[] = bandwidth.map((index: number) => conversion._dataPool[index]);
loaderProcess.on('message', (signal: any) => {
if (typeof signal === 'object') {
conversion._dicTables[signal.tableName].totalRowsInserted += signal.rowsInserted;
const msg: string = `\t--[pipeData] For now inserted: ${ conversion._dicTables[signal.tableName].totalRowsInserted } rows,
Total rows to insert into "${ conversion._schema }"."${ signal.tableName }": ${ signal.totalRowsToInsert }`;
log(conversion, msg);
} else {
killProcess(loaderProcess.pid, conversion);
conversion._processedChunks += chunksToLoad.length;
return pipeData(conversion, dataLoaderPath, options);
}
});
loaderProcess.send(new MessageToDataLoader(conversion._config, chunksToLoad));
}
/**
* Manages the DataPipe.
*/
export default async function(conversion: Conversion): Promise<void> {
if (dataPoolProcessed(conversion)) {
conversion = await decodeBinaryData(conversion);
return processConstraints(conversion);
}
const dataLoaderPath: string = path.join(__dirname, 'DataLoader.js');
const options: any = conversion._loaderMaxOldSpaceSize === 'DEFAULT'
? Object.create(null)
: { execArgv: [`--max-old-space-size=${ conversion._loaderMaxOldSpaceSize }`] };
return pipeData(conversion, dataLoaderPath, options);
}

View file

@ -1,138 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const connect = require('./Connector');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
/**
* Create the "{schema}"."data_pool_{self._schema + self._mySqlDbName} temporary table."
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports.createDataPoolTable = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[DataPoolManager.createDataPoolTable] Cannot connect to PostgreSQL server...\n' + error);
process.exit();
} else {
const sql = 'CREATE TABLE IF NOT EXISTS "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName
+ '"("id" BIGSERIAL, "json" TEXT, "is_started" BOOLEAN, "size_in_mb" DOUBLE PRECISION);';
client.query(sql, err => {
done();
if (err) {
generateError(self, '\t--[DataPoolManager.createDataPoolTable] ' + err, sql);
process.exit();
} else {
log(self, '\t--[DataPoolManager.createDataPoolTable] table "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName + '" is created...');
resolve(self);
}
});
}
});
});
});
};
/**
* Drop the "{schema}"."data_pool_{self._schema + self._mySqlDbName} temporary table."
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports.dropDataPoolTable = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[DataPoolManager.dropDataPoolTable] Cannot connect to PostgreSQL server...\n' + error);
resolve();
} else {
const sql = 'DROP TABLE "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName + '";';
client.query(sql, err => {
done();
if (err) {
generateError(self, '\t--[DataPoolManager.dropDataPoolTable] ' + err, sql);
} else {
log(self, '\t--[DataPoolManager.dropDataPoolTable] table "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName + '" is dropped...');
}
resolve();
});
}
});
});
});
};
/**
* Reads temporary table, and generates Data-pool.
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports.readDataPool = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[DataPoolManager.readDataPool] Cannot connect to PostgreSQL server...\n' + error);
process.exit();
} else {
const sql = 'SELECT id AS id, json AS json, size_in_mb AS size_in_mb FROM "'
+ self._schema + '"."data_pool_' + self._schema + self._mySqlDbName
+ '" ORDER BY size_in_mb DESC;';
client.query(sql, (err, arrDataPool) => {
done();
if (err) {
generateError(self, '\t--[DataPoolManager.readDataPool] ' + err, sql);
process.exit();
}
for (let i = 0; i < arrDataPool.rows.length; ++i) {
const obj = JSON.parse(arrDataPool.rows[i].json);
obj._id = arrDataPool.rows[i].id;
obj._size_in_mb = +arrDataPool.rows[i].size_in_mb;
obj._processed = false;
self._dataPool.push(obj);
}
log(self, '\t--[DataPoolManager.readDataPool] Data-Pool is loaded...');
resolve(self);
});
}
});
});
});
};

71
src/DataPoolManager.ts Normal file
View file

@ -0,0 +1,71 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
import log from './Logger';
import Conversion from './Conversion';
/**
* Creates the "{schema}"."data_pool_{self._schema + self._mySqlDbName}" temporary table.
*/
export async function createDataPoolTable(conversion: Conversion): Promise<Conversion> {
const dbAccess: DBAccess = new DBAccess(conversion);
const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sql: string = `CREATE TABLE IF NOT EXISTS ${ table }
("id" BIGSERIAL, "json" TEXT, "is_started" BOOLEAN, "size_in_mb" DOUBLE PRECISION);`;
await dbAccess.query('DataPoolManager::createDataPoolTable', sql, DBVendors.PG, true, false);
log(conversion, `\t--[DataPoolManager.createDataPoolTable] table ${ table } is created...`);
return conversion;
}
/**
* Drops the "{schema}"."data_pool_{self._schema + self._mySqlDbName}" temporary table.
*/
export async function dropDataPoolTable(conversion: Conversion): Promise<void> {
const dbAccess: DBAccess = new DBAccess(conversion);
const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sql: string = `DROP TABLE ${ table };`;
await dbAccess.query('DataPoolManager::dropDataPoolTable', sql, DBVendors.PG, false, false);
log(conversion, `\t--[DataPoolManager.dropDataPoolTable] table ${ table } is dropped...`);
}
/**
* Reads temporary table, and generates Data-pool.
*/
export async function readDataPool(conversion: Conversion): Promise<Conversion> {
const dbAccess: DBAccess = new DBAccess(conversion);
const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`;
const sql: string = `SELECT id AS id, json AS json, size_in_mb AS size_in_mb FROM ${ table } ORDER BY size_in_mb DESC;`;
const result: DBAccessQueryResult = await dbAccess.query('DataPoolManager::dropDataPoolTable', sql, DBVendors.PG, true, false);
result.data.rows.forEach((row: any) => {
const obj: any = JSON.parse(row.json);
obj._id = row.id;
obj._size_in_mb = +row.size_in_mb;
obj._processed = false;
conversion._dataPool.push(obj);
});
log(conversion, '\t--[DataPoolManager.readDataPool] Data-Pool is loaded...');
return conversion;
}

View file

@ -18,29 +18,23 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const fs = require('fs');
import * as fs from 'fs';
import Conversion from './Conversion';
/**
* Reads "./config/data_types_map.json" and converts its json content to js object.
* Appends this object to "FromMySQL2PostgreSQL" instance.
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports = self => {
export default (conversion: Conversion): Promise<Conversion> => {
return new Promise(resolve => {
fs.readFile(self._dataTypesMapAddr, (error, data) => {
fs.readFile(conversion._dataTypesMapAddr, (error: Error, data: Buffer) => {
if (error) {
console.log('\t--[readDataTypesMap] Cannot read "DataTypesMap" from ' + self._dataTypesMapAddr);
console.log(`\t--[readDataTypesMap] Cannot read "DataTypesMap" from ${conversion._dataTypesMapAddr}`);
process.exit();
}
self._dataTypesMap = JSON.parse(data);
conversion._dataTypesMap = JSON.parse(data.toString());
console.log('\t--[readDataTypesMap] Data Types Map is loaded...');
resolve(self);
resolve(conversion);
});
});
};
}

View file

@ -1,95 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const connect = require('./Connector');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const extraConfigProcessor = require('./ExtraConfigProcessor');
/**
* Define which columns of the given table are of type "enum".
* Set an appropriate constraint, if need.
*
* @param {Conversion} self
* @param {String} tableName
*
* @returns {Promise}
*/
module.exports = (self, tableName) => {
return connect(self).then(() => {
return new Promise(resolve => {
log(self, '\t--[processEnum] Defines "ENUMs" for table "' + self._schema + '"."' + tableName + '"', self._dicTables[tableName].tableLogPath);
const processEnumPromises = [];
const originalTableName = extraConfigProcessor.getTableName(self, tableName, true);
for (let i = 0; i < self._dicTables[tableName].arrTableColumns.length; ++i) {
if (self._dicTables[tableName].arrTableColumns[i].Type.indexOf('(') !== -1) {
const arrType = self._dicTables[tableName].arrTableColumns[i].Type.split('(');
if (arrType[0] === 'enum') {
processEnumPromises.push(
new Promise(resolveProcessEnum => {
self._pg.connect((error, client, done) => {
if (error) {
const msg = '\t--[processEnum] Cannot connect to PostgreSQL server...\n' + error;
generateError(self, msg);
resolveProcessEnum();
} else {
const columnName = extraConfigProcessor.getColumnName(
self,
originalTableName,
self._dicTables[tableName].arrTableColumns[i].Field,
false
);
const sql = 'ALTER TABLE "' + self._schema + '"."' + tableName + '" '
+ 'ADD CHECK ("' + columnName + '" IN (' + arrType[1] + ');';
client.query(sql, err => {
done();
if (err) {
const msg2 = '\t--[processEnum] Error while setting ENUM for "' + self._schema + '"."'
+ tableName + '"."' + columnName + '"...\n' + err;
generateError(self, msg2, sql);
resolveProcessEnum();
} else {
const success = '\t--[processEnum] Set "ENUM" for "' + self._schema + '"."' + tableName
+ '"."' + columnName + '"...';
log(self, success, self._dicTables[tableName].tableLogPath);
resolveProcessEnum();
}
});
}
});
})
);
}
}
}
Promise.all(processEnumPromises).then(() => resolve());
});
});
};

58
src/EnumProcessor.ts Normal file
View file

@ -0,0 +1,58 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import log from './Logger';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBVendors from './DBVendors';
import * as extraConfigProcessor from './ExtraConfigProcessor';
/**
* Defines which columns of the given table are of type "enum".
* Sets an appropriate constraint, if need.
*/
export default async function(conversion: Conversion, tableName: string): Promise<void> {
const dbAccess: DBAccess = new DBAccess(conversion);
const msg: string = `\t--[EnumProcessor] Defines "ENUMs" for table "${ conversion._schema }"."${ tableName }"`;
log(conversion, msg, conversion._dicTables[tableName].tableLogPath);
const originalTableName: string = extraConfigProcessor.getTableName(conversion, tableName, true);
const processEnumPromises: Promise<void>[] = conversion._dicTables[tableName].arrTableColumns.map(async (column: any) => {
if (column.Type.indexOf('(') !== -1) {
const arrType: string[] = column.Type.split('(');
if (arrType[0] === 'enum') {
const columnName: string = extraConfigProcessor.getColumnName(
conversion,
originalTableName,
column.Field,
false
);
const sql: string = `ALTER TABLE "${ conversion._schema }"."${ tableName }" ADD CHECK ("${ columnName }" IN (${ arrType[1] });`;
await dbAccess.query('EnumProcessor', sql, DBVendors.PG, false, false);
const successMsg: string = `\t--[EnumProcessor] Set "ENUM" for "${ conversion._schema }"."${ tableName }"."${ columnName }"...`;
log(conversion, successMsg, conversion._dicTables[tableName].tableLogPath);
}
}
});
await Promise.all(processEnumPromises);
}

View file

@ -18,26 +18,19 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const fs = require('fs');
const log = require('./Logger');
import * as fs from 'fs';
import log from './Logger';
import Conversion from './Conversion';
/**
* Writes a ditailed error message to the "/errors-only.log" file
*
* @param {Conversion} self
* @param {String} message
* @param {String} sql
*
* @returns {undefined}
*/
module.exports = (self, message, sql = '') => {
message += '\n\n\tSQL: ' + sql + '\n\n';
const buffer = Buffer.from(message, self._encoding);
log(self, message, undefined, true);
export default (conversion: Conversion, message: string, sql: string = ''): void => {
message += `\n\n\tSQL: ${sql}\n\n`;
const buffer: Buffer = Buffer.from(message, conversion._encoding);
log(conversion, message, undefined, true);
fs.open(self._errorLogsPath, 'a', self._0777, (error, fd) => {
fs.open(conversion._errorLogsPath, 'a', conversion._0777, (error: Error, fd: number) => {
if (!error) {
fs.write(fd, buffer, 0, buffer.length, null, () => {
fs.close(fd, () => {
@ -46,4 +39,4 @@ module.exports = (self, message, sql = '') => {
});
}
});
};
}

View file

@ -1,100 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
/**
* Get current table's name.
*
* @param {Conversion} self
* @param {String} currentTableName
* @param {Boolean} shouldGetOriginal
*
* @returns {String}
*/
module.exports.getTableName = (self, currentTableName, shouldGetOriginal) => {
if (self._extraConfig !== null && 'tables' in self._extraConfig) {
for (let i = 0; i < self._extraConfig.tables.length; ++i) {
if ((shouldGetOriginal ? self._extraConfig.tables[i].name.new : self._extraConfig.tables[i].name.original) === currentTableName) {
return shouldGetOriginal ? self._extraConfig.tables[i].name.original : self._extraConfig.tables[i].name.new;
}
}
}
return currentTableName;
};
/**
* Get current column's name.
*
* @param {Conversion} self
* @param {String} originalTableName
* @param {String} currentColumnName
* @param {Boolean} shouldGetOriginal
*
* @returns {String}
*/
module.exports.getColumnName = (self, originalTableName, currentColumnName, shouldGetOriginal) => {
if (self._extraConfig !== null && 'tables' in self._extraConfig) {
for (let i = 0; i < self._extraConfig.tables.length; ++i) {
if (self._extraConfig.tables[i].name.original === originalTableName && 'columns' in self._extraConfig.tables[i]) {
for (let columnsCount = 0; columnsCount < self._extraConfig.tables[i].columns.length; ++columnsCount) {
if (self._extraConfig.tables[i].columns[columnsCount].original === currentColumnName) {
return shouldGetOriginal
? self._extraConfig.tables[i].columns[columnsCount].original
: self._extraConfig.tables[i].columns[columnsCount].new;
}
}
}
}
}
return currentColumnName;
};
/**
* Parse the extra_config foreign_keys attributes and generate
* an output array required by ForeignKeyProcessor::processForeignKeyWorker.
*
* @param {Conversion} self
* @param {String} tableName
*
* @returns {Array}
*/
module.exports.parseForeignKeys = (self, tableName) => {
const retVal = [];
if (self._extraConfig !== null && 'foreign_keys' in self._extraConfig) {
for (let i = 0; i < self._extraConfig.foreign_keys.length; ++i) {
if (self._extraConfig.foreign_keys[i].table_name === tableName) {
// There may be several FKs in a single table.
const objFk = Object.create(null);
for (const attribute in self._extraConfig.foreign_keys[i]) {
objFk[attribute.toUpperCase()] = self._extraConfig.foreign_keys[i][attribute];
}
retVal.push(objFk);
}
}
}
return retVal;
};

View file

@ -0,0 +1,82 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import Conversion from './Conversion';
/**
* Retrieves current table's name.
*/
export function getTableName(conversion: Conversion, currentTableName: string, shouldGetOriginal: boolean): string {
if (conversion._extraConfig !== null && 'tables' in conversion._extraConfig) {
for (let i: number = 0; i < conversion._extraConfig.tables.length; ++i) {
if ((shouldGetOriginal ? conversion._extraConfig.tables[i].name.new : conversion._extraConfig.tables[i].name.original) === currentTableName) {
return shouldGetOriginal ? conversion._extraConfig.tables[i].name.original : conversion._extraConfig.tables[i].name.new;
}
}
}
return currentTableName;
}
/**
* Retrieves current column's name.
*/
export function getColumnName(conversion: Conversion, originalTableName: string, currentColumnName: string, shouldGetOriginal: boolean): string {
if (conversion._extraConfig !== null && 'tables' in conversion._extraConfig) {
for (let i: number = 0; i < conversion._extraConfig.tables.length; ++i) {
if (conversion._extraConfig.tables[i].name.original === originalTableName && 'columns' in conversion._extraConfig.tables[i]) {
for (let columnsCount: number = 0; columnsCount < conversion._extraConfig.tables[i].columns.length; ++columnsCount) {
if (conversion._extraConfig.tables[i].columns[columnsCount].original === currentColumnName) {
return shouldGetOriginal
? conversion._extraConfig.tables[i].columns[columnsCount].original
: conversion._extraConfig.tables[i].columns[columnsCount].new;
}
}
}
}
}
return currentColumnName;
}
/**
* Parses the extra_config foreign_keys attributes and generate
* an output array required by ForeignKeyProcessor::processForeignKeyWorker.
*/
export function parseForeignKeys(conversion: Conversion, tableName: string): any[] {
const retVal: any[] = [];
if (conversion._extraConfig !== null && 'foreign_keys' in conversion._extraConfig) {
for (let i: number = 0; i < conversion._extraConfig.foreign_keys.length; ++i) {
if (conversion._extraConfig.foreign_keys[i].table_name === tableName) {
// There may be several FKs in a single table.
const objFk: any = Object.create(null);
for (const attribute in conversion._extraConfig.foreign_keys[i]) {
objFk[attribute.toUpperCase()] = conversion._extraConfig.foreign_keys[i][attribute];
}
retVal.push(objFk);
}
}
}
return retVal;
}

View file

@ -19,33 +19,26 @@
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import * as fs from 'fs';
import Conversion from './Classes/Conversion';
import Conversion from './Conversion';
/**
* Outputs given log.
* Writes given log to the "/all.log" file.
* If necessary, writes given log to the "/{tableName}.log" file.
*
* @param {Conversion} self
* @param {String} log
* @param {String} tableLogPath
* @param {Boolean} isErrorLog
*
* @returns {void}
*/
export default (self: Conversion, log: string, tableLogPath?: string, isErrorLog?: boolean): void => {
const buffer: Buffer = Buffer.from(log + '\n\n', self._encoding);
export default (conversion: Conversion, log: string, tableLogPath?: string, isErrorLog?: boolean): void => {
const buffer: Buffer = Buffer.from(`${log}\n\n`, conversion._encoding);
if (!isErrorLog) {
console.log(log);
}
fs.open(self._allLogsPath, 'a', self._0777, (error, fd) => {
fs.open(conversion._allLogsPath, 'a', conversion._0777, (error: Error, fd: number) => {
if (!error) {
fs.write(fd, buffer, 0, buffer.length, null, () => {
fs.close(fd, () => {
if (tableLogPath) {
fs.open(tableLogPath, 'a', self._0777, (error, fd) => {
fs.open(tableLogPath, 'a', conversion._0777, (error: Error, fd: number) => {
if (!error) {
fs.write(fd, buffer, 0, buffer.length, null, () => {
fs.close(fd, () => {
@ -59,4 +52,4 @@ export default (self: Conversion, log: string, tableLogPath?: string, isErrorLog
});
}
});
};
}

View file

@ -21,7 +21,7 @@
import * as fs from 'fs';
import * as path from 'path';
import readDataTypesMap from './DataTypesMapReader';
import Conversion from './Classes/Conversion';
import Conversion from './Conversion';
import SchemaProcessor from './SchemaProcessor';
import loadStructureToMigrate from './StructureLoader';
import pipeData from './DataPipeManager';
@ -29,24 +29,25 @@ import boot from './BootProcessor';
import { createStateLogsTable } from './MigrationStateManager';
import { createDataPoolTable, readDataPool } from './DataPoolManager';
import log from './Logger';
import { Stats } from 'fs';
const Main = class {
/**
* Read the configuration file.
*/
readConfig(baseDir: string, configFileName: string = 'config.json'): Promise<any> {
public readConfig(baseDir: string, configFileName: string = 'config.json'): Promise<any> {
return new Promise(resolve => {
const strPathToConfig = path.join(baseDir, 'config', configFileName);
fs.readFile(strPathToConfig, (error: Error, data: any) => {
fs.readFile(strPathToConfig, (error: Error, data: Buffer) => {
if (error) {
console.log(`\n\t--Cannot run migration\nCannot read configuration info from ${ strPathToConfig }`);
process.exit();
}
const config = JSON.parse(data);
config.logsDirPath = path.join(baseDir, 'logs_directory');
const config: any = JSON.parse(data.toString());
config.logsDirPath = path.join(baseDir, 'logs_directory');
config.dataTypesMapAddr = path.join(baseDir, 'config', 'data_types_map.json');
resolve(config);
});
@ -56,7 +57,7 @@ const Main = class {
/**
* Read the extra configuration file, if necessary.
*/
readExtraConfig(config: any, baseDir: string): Promise<any> {
public readExtraConfig(config: any, baseDir: string): Promise<any> {
return new Promise(resolve => {
if (config.enable_extra_config !== true) {
config.extraConfig = null;
@ -65,13 +66,13 @@ const Main = class {
const strPathToExtraConfig = path.join(baseDir, 'config', 'extra_config.json');
fs.readFile(strPathToExtraConfig, (error: Error, data: any) => {
fs.readFile(strPathToExtraConfig, (error: Error, data: Buffer) => {
if (error) {
console.log(`\n\t--Cannot run migration\nCannot read configuration info from ${ strPathToExtraConfig }`);
process.exit();
}
config.extraConfig = JSON.parse(data);
config.extraConfig = JSON.parse(data.toString());
resolve(config);
});
});
@ -80,17 +81,17 @@ const Main = class {
/**
* Initialize Conversion instance.
*/
initializeConversion(config: any): Promise<Conversion> {
public initializeConversion(config: any): Promise<Conversion> {
return Promise.resolve(new Conversion(config));
}
/**
* Creates logs directory.
*/
createLogsDirectory(self: Conversion): Promise<Conversion> {
public createLogsDirectory(self: Conversion): Promise<Conversion> {
return new Promise(resolve => {
console.log('\t--[DirectoriesManager.createLogsDirectory] Creating logs directory...');
fs.stat(self._logsDirPath, (directoryDoesNotExist, stat) => {
fs.stat(self._logsDirPath, (directoryDoesNotExist: Error, stat: Stats) => {
if (directoryDoesNotExist) {
fs.mkdir(self._logsDirPath, self._0777, e => {
if (e) {
@ -117,20 +118,16 @@ const Main = class {
};
module.exports = Main;
const app = new Main();
const baseDir = path.join(__dirname, '..', '..');
const app = new Main();
const baseDir: string = path.join(__dirname, '..', '..');
app.readConfig(baseDir)
.then(config => {
return app.readExtraConfig(config, baseDir);
})
.then(config => app.readExtraConfig(config, baseDir))
.then(app.initializeConversion)
.then(boot)
.then(readDataTypesMap)
.then(app.createLogsDirectory)
.then(conversion => {
return (new SchemaProcessor(conversion)).createSchema();
})
.then(conversion => (new SchemaProcessor(conversion)).createSchema())
.then(createStateLogsTable)
.then(createDataPoolTable)
.then(loadStructureToMigrate)

View file

@ -18,19 +18,23 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
export default class MessageToDataLoader {
/**
* Parsed Nmig's configuration object.
*/
public readonly config: any;
/**
* An array of data chunks.
*/
public readonly chunks: any[];
module.exports = class MessageToDataLoader {
/**
* Representation of a message of the master process to DataLoader process.
* Contents migration's configuration and an array of "data-chunks".
* Constructor.
*
* @param {Object} config
* @param {Array} chunks
* Contains migration's configuration and an array of "data-chunks".
*/
constructor(config, chunks) {
public constructor(config: any, chunks: any[]) {
this.config = config;
this.chunks = chunks;
}
};
}

View file

@ -1,195 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const connect = require('./Connector');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
/**
* Get state-log.
*
* @param {Conversion} self
* @param {String} param
*
* @returns {Promise}
*/
module.exports.get = (self, param) => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[MigrationStateManager.get] Cannot connect to PostgreSQL server...\n' + error);
resolve(false);
} else {
const sql = 'SELECT ' + param + ' FROM "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '";';
client.query(sql, (err, data) => {
done();
if (err) {
generateError(self, '\t--[MigrationStateManager.get] ' + err, sql);
resolve(false);
} else {
resolve(data.rows[0][param]);
}
});
}
});
});
});
};
/**
* Update the state-log.
*
* @param {Conversion} self
* @param {String} param
*
* @returns {Promise}
*/
module.exports.set = (self, param) => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[MigrationStateManager.set] Cannot connect to PostgreSQL server...\n' + error);
resolve();
} else {
const sql = 'UPDATE "' + self._schema + '"."state_logs_'
+ self._schema + self._mySqlDbName + '" SET ' + param + ' = TRUE;';
client.query(sql, err => {
done();
if (err) {
generateError(self, '\t--[MigrationStateManager.set] ' + err, sql);
}
resolve();
});
}
});
});
});
};
/**
* Create the "{schema}"."state_logs_{self._schema + self._mySqlDbName} temporary table."
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports.createStateLogsTable = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[createStateLogsTable] Cannot connect to PostgreSQL server...\n' + error);
process.exit();
} else {
let sql = 'CREATE TABLE IF NOT EXISTS "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName
+ '"('
+ '"tables_loaded" BOOLEAN,'
+ '"per_table_constraints_loaded" BOOLEAN,'
+ '"foreign_keys_loaded" BOOLEAN,'
+ '"views_loaded" BOOLEAN'
+ ');';
client.query(sql, err => {
if (err) {
done();
generateError(self, '\t--[createStateLogsTable] ' + err, sql);
process.exit();
} else {
sql = 'SELECT COUNT(1) AS cnt FROM "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '";';
client.query(sql, (errorCount, result) => {
if (errorCount) {
done();
generateError(self, '\t--[createStateLogsTable] ' + errorCount, sql);
process.exit();
} else if (+result.rows[0].cnt === 0) {
sql = 'INSERT INTO "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName
+ '" VALUES(FALSE, FALSE, FALSE, FALSE);';
client.query(sql, errorInsert => {
done();
if (errorInsert) {
generateError(self, '\t--[createStateLogsTable] ' + errorInsert, sql);
process.exit();
} else {
const msg = '\t--[createStateLogsTable] table "' + self._schema + '"."state_logs_'
+ self._schema + self._mySqlDbName + '" is created...';
log(self, msg);
resolve(self);
}
});
} else {
const msg2 = '\t--[createStateLogsTable] table "' + self._schema + '"."state_logs_'
+ self._schema + self._mySqlDbName + '" is created...';
log(self, msg2);
resolve(self);
}
});
}
});
}
});
});
});
};
/**
* Drop the "{schema}"."state_logs_{self._schema + self._mySqlDbName} temporary table."
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports.dropStateLogsTable = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[dropStateLogsTable] Cannot connect to PostgreSQL server...\n' + error);
resolve();
} else {
const sql = 'DROP TABLE "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '";';
client.query(sql, err => {
done();
if (err) {
generateError(self, '\t--[dropStateLogsTable] ' + err, sql);
} else {
log(self, '\t--[dropStateLogsTable] table "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '" is dropped...');
}
resolve();
});
}
});
});
});
};

View file

@ -0,0 +1,78 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
import log from './Logger';
import Conversion from './Conversion';
/**
* Retrieves state-log.
*/
export async function get(conversion: Conversion, param: string): Promise<boolean> {
const dbAccess: DBAccess = new DBAccess(conversion);
const sql: string = `SELECT ${ param } FROM "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }";`;
const result: DBAccessQueryResult = await dbAccess.query('MigrationStateManager::get', sql, DBVendors.PG, false, false);
return result.data.rows[0][param];
}
/**
* Updates the state-log.
*/
export async function set(conversion: Conversion, param: string): Promise<void> {
const dbAccess: DBAccess = new DBAccess(conversion);
const sql: string = `UPDATE "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }" SET ${ param } = TRUE;`;
await dbAccess.query('MigrationStateManager::set', sql, DBVendors.PG, false, false);
}
/**
* Creates the "{schema}"."state_logs_{self._schema + self._mySqlDbName}" temporary table.
*/
export async function createStateLogsTable(conversion: Conversion): Promise<Conversion> {
const dbAccess: DBAccess = new DBAccess(conversion);
let sql: string = `CREATE TABLE IF NOT EXISTS "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }"(
"tables_loaded" BOOLEAN, "per_table_constraints_loaded" BOOLEAN, "foreign_keys_loaded" BOOLEAN, "views_loaded" BOOLEAN');`;
let result: DBAccessQueryResult = await dbAccess.query('MigrationStateManager::createStateLogsTable', sql, DBVendors.PG, true, true);
sql = `SELECT COUNT(1) AS cnt FROM "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }";`;
result = await dbAccess.query('MigrationStateManager::createStateLogsTable', sql, DBVendors.PG, true, true, result.client);
if (+result.data.rows[0].cnt === 0) {
sql = `INSERT INTO "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }" VALUES (FALSE, FALSE, FALSE, FALSE);`;
await await dbAccess.query('MigrationStateManager::createStateLogsTable', sql, DBVendors.PG, true, false, result.client);
return conversion;
}
const msg: string = '\t--[MigrationStateManager::createStateLogsTable] table ' +
'"${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }" is created...';
log(conversion, msg);
return conversion;
}
/**
* Drop the "{schema}"."state_logs_{self._schema + self._mySqlDbName}" temporary table.
*/
export async function dropStateLogsTable(conversion: Conversion): Promise<void> {
const dbAccess: DBAccess = new DBAccess(conversion);
const sql: string = `DROP TABLE "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }";`;
await dbAccess.query('MigrationStateManager::dropStateLogsTable', sql, DBVendors.PG, false, false);
}

View file

@ -1,54 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const log = require('./Logger');
/**
* Generates a summary report.
*
* @param {Conversion} self
* @param {String} endMsg
*
* @returns {undefined}
*/
module.exports = (self, endMsg) => {
let differenceSec = ((new Date()) - self._timeBegin) / 1000;
let seconds = Math.floor(differenceSec % 60);
differenceSec = differenceSec / 60;
let minutes = Math.floor(differenceSec % 60);
let hours = Math.floor(differenceSec / 60);
hours = hours < 10 ? '0' + hours : hours;
minutes = minutes < 10 ? '0' + minutes : minutes;
seconds = seconds < 10 ? '0' + seconds : seconds;
const output = '\t--[generateReport] ' + endMsg
+ '\n\t--[generateReport] Total time: ' + hours + ':' + minutes + ':' + seconds
+ '\n\t--[generateReport] (hours:minutes:seconds)';
log(self, output);
if (self._runsInTestMode) {
self._eventEmitter.emit(self._migrationCompletedEvent);
return;
}
process.exit();
};

49
src/ReportGenerator.ts Normal file
View file

@ -0,0 +1,49 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import log from './Logger';
import Conversion from './Conversion';
import { EventEmitter } from 'events';
/**
* Generates a summary report.
*/
export default (conversion: Conversion, endMsg: string): void => {
let differenceSec: number = ((new Date()).getTime() - conversion._timeBegin.getTime()) / 1000;
const seconds: number = Math.floor(differenceSec % 60);
differenceSec = differenceSec / 60;
const minutes: number = Math.floor(differenceSec % 60);
const hours: number = Math.floor(differenceSec / 60);
const formattedHours: string = hours < 10 ? `0${ hours }` : `${ hours }`;
const formattedMinutes: string = minutes < 10 ? `0${ minutes }` : `${ minutes }`;
const formattedSeconds: string = seconds < 10 ? `0${ seconds }` : `${ seconds }`;
const output: string = `\t--[generateReport] ${ endMsg }
\n\t--[generateReport] Total time: ${ formattedHours }:${ formattedMinutes }:${ formattedSeconds }
\n\t--[generateReport] (hours:minutes:seconds)`;
log(conversion, output);
if (conversion._runsInTestMode) {
(<EventEmitter>conversion._eventEmitter).emit(conversion._migrationCompletedEvent);
return;
}
process.exit();
}

View file

@ -18,9 +18,10 @@
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import ConnectionEmitter from './ConnectionEmitter';
import generateError from './ErrorGenerator';
import Conversion from './Classes/Conversion';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
export default class SchemaProcessor {
/**
@ -29,39 +30,31 @@ export default class SchemaProcessor {
private readonly _conversion: Conversion;
/**
* An instance of "ConnectionEmitter".
* An instance of "DBAccess".
*/
private readonly _connectionEmitter: ConnectionEmitter;
private readonly _dbAccess: DBAccess;
/**
* SchemaProcessor constructor.
*/
public constructor(conversion: Conversion) {
this._conversion = conversion;
this._connectionEmitter = new ConnectionEmitter(this._conversion);
this._conversion = conversion;
this._dbAccess = new DBAccess(this._conversion);
}
/**
* Create a new database schema if it does not exist yet.
*/
public async createSchema(): Promise<Conversion> {
const client: any = await this._connectionEmitter.getPgClient();
public async createSchema(): Promise<Conversion|void> {
let sql: string = `SELECT schema_name FROM information_schema.schemata WHERE schema_name = '${ this._conversion._schema }';`;
try {
const result: any = await client.query(sql);
const result: DBAccessQueryResult = await this._dbAccess.query('SchemaProcessor::createSchema', sql, DBVendors.PG, true, true);
if (result.rows.length === 0) {
sql = `CREATE SCHEMA "${ this._conversion._schema }";`;
await client.query(sql);
}
this._connectionEmitter.releasePgClient(client);
return Promise.resolve(this._conversion);
} catch (err) {
generateError(this._conversion, `\t--[createSchema] ${ err }`, sql);
return Promise.reject(err);
if (result.data.rows.length === 0) {
sql = `CREATE SCHEMA "${ this._conversion._schema }";`;
await this._dbAccess.query('SchemaProcessor::createSchema', sql, DBVendors.PG, true, false, result.client);
}
return this._conversion;
}
};
}

View file

@ -1,152 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const Table = require('./Classes/Table');
const { createTable } = require('./TableProcessor');
const connect = require('./Connector');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const prepareDataChunks = require('./DataChunksProcessor');
const migrationStateManager = require('./MigrationStateManager');
const extraConfigProcessor = require('./ExtraConfigProcessor');
/**
* Processes current table before data loading.
*
* @param {Conversion} self
* @param {String} tableName
* @param {Boolean} stateLog
*
* @returns {Promise}
*/
const processTableBeforeDataLoading = (self, tableName, stateLog) => {
return connect(self).then(() => {
return createTable(self, tableName);
}).then(() => {
return prepareDataChunks(self, tableName, stateLog);
}).catch(() => {
generateError(self, '\t--[processTableBeforeDataLoading] Cannot create table "' + self._schema + '"."' + tableName + '"...');
});
}
/**
* Get the MySQL version.
*
* @param {Conversion} self
*
* @returns {Promise}
*/
const getMySqlVersion = self => {
return connect(self).then(() => {
return new Promise(resolve => {
self._mysql.getConnection((error, connection) => {
if (error) {
// The connection is undefined.
generateError(self, '\t--[getMySqlVersion] Cannot connect to MySQL server...\n' + error);
resolve();
} else {
const sql = 'SELECT VERSION() AS mysql_version;';
connection.query(sql, (err, rows) => {
connection.release();
if (err) {
generateError(self, '\t--[getMySqlVersion] ' + err, sql);
resolve();
} else {
const arrVersion = rows[0].mysql_version.split('.');
const majorVersion = arrVersion[0];
const minorVersion = arrVersion.slice(1).join('');
self._mysqlVersion = +(majorVersion + '.' + minorVersion);
resolve();
}
});
}
});
});
});
}
/**
* Load source tables and views, that need to be migrated.
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports = self => {
return getMySqlVersion(self).then(() => {
return migrationStateManager.get(self, 'tables_loaded').then(haveTablesLoaded => {
return new Promise(resolve => {
self._mysql.getConnection((error, connection) => {
if (error) {
// The connection is undefined.
generateError(self, '\t--[loadStructureToMigrate] Cannot connect to MySQL server...\n' + error);
process.exit();
} else {
const sql = 'SHOW FULL TABLES IN `' + self._mySqlDbName + '`;';
connection.query(sql, (strErr, rows) => {
connection.release();
if (strErr) {
generateError(self, '\t--[loadStructureToMigrate] ' + strErr, sql);
process.exit();
} else {
let tablesCnt = 0;
let viewsCnt = 0;
const processTablePromises = [];
for (let i = 0; i < rows.length; ++i) {
let relationName = rows[i]['Tables_in_' + self._mySqlDbName];
if (rows[i].Table_type === 'BASE TABLE' && self._excludeTables.indexOf(relationName) === -1) {
relationName = extraConfigProcessor.getTableName(self, relationName, false);
self._tablesToMigrate.push(relationName);
self._dicTables[relationName] = new Table(self._logsDirPath + '/' + relationName + '.log');
processTablePromises.push(processTableBeforeDataLoading(self, relationName, haveTablesLoaded));
tablesCnt++;
} else if (rows[i].Table_type === 'VIEW') {
self._viewsToMigrate.push(relationName);
viewsCnt++;
}
}
rows = null;
let message = '\t--[loadStructureToMigrate] Source DB structure is loaded...\n'
+ '\t--[loadStructureToMigrate] Tables to migrate: ' + tablesCnt + '\n'
+ '\t--[loadStructureToMigrate] Views to migrate: ' + viewsCnt;
log(self, message);
Promise.all(processTablePromises).then(
() => {
migrationStateManager.set(self, 'tables_loaded').then(() => resolve(self));
},
() => process.exit()
);
}
});
}
});
});
});
});
};

89
src/StructureLoader.ts Normal file
View file

@ -0,0 +1,89 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import DBAccess from './DBAccess';
import DBAccessQueryResult from './DBAccessQueryResult';
import DBVendors from './DBVendors';
import log from './Logger';
import Conversion from './Conversion';
import Table from './Table';
import { createTable } from './TableProcessor';
import prepareDataChunks from './DataChunksProcessor';
import * as migrationStateManager from './MigrationStateManager';
import * as extraConfigProcessor from './ExtraConfigProcessor';
/**
* Processes current table before data loading.
*/
async function processTableBeforeDataLoading(conversion: Conversion, tableName: string, stateLog: boolean): Promise<void> {
await createTable(conversion, tableName);
await prepareDataChunks(conversion, tableName, stateLog);
}
/**
* Retrieves the source db (MySQL) version.
*/
async function getMySqlVersion(conversion: Conversion): Promise<void> {
const dbAccess: DBAccess = new DBAccess(conversion);
const sql: string = 'SELECT VERSION() AS mysql_version;';
const result: DBAccessQueryResult = await dbAccess.query('StructureLoader::getMySqlVersion', sql, DBVendors.MYSQL, false, false);
const arrVersion: string[] = result.data[0].mysql_version.split('.');
const majorVersion: string = arrVersion[0];
const minorVersion: string = arrVersion.slice(1).join('');
conversion._mysqlVersion = +(`${ majorVersion }.${ minorVersion }`);
}
/**
* Loads source tables and views, that need to be migrated.
*/
export default async (conversion: Conversion): Promise<Conversion> => {
await getMySqlVersion(conversion);
const dbAccess: DBAccess = new DBAccess(conversion);
const haveTablesLoaded: boolean = await migrationStateManager.get(conversion, 'tables_loaded');
const sql: string = `SHOW FULL TABLES IN \`${ conversion._mySqlDbName }\`;`;
const result: DBAccessQueryResult = await dbAccess.query('StructureLoader::default', sql, DBVendors.MYSQL, true, false);
let tablesCnt: number = 0;
let viewsCnt: number = 0;
const processTablePromises: Promise<void>[] = [];
result.data.forEach((row: any) => {
let relationName: string = row[`Tables_in_${ conversion._mySqlDbName }`];
if (row.Table_type === 'BASE TABLE' && conversion._excludeTables.indexOf(relationName) === -1) {
relationName = extraConfigProcessor.getTableName(conversion, relationName, false);
conversion._tablesToMigrate.push(relationName);
conversion._dicTables[relationName] = new Table(`${ conversion._logsDirPath }/${ relationName }.log`);
processTablePromises.push(processTableBeforeDataLoading(conversion, relationName, haveTablesLoaded));
tablesCnt++;
} else if (row.Table_type === 'VIEW') {
conversion._viewsToMigrate.push(relationName);
viewsCnt++;
}
});
const message: string = `\t--[loadStructureToMigrate] Source DB structure is loaded...\n
\t--[loadStructureToMigrate] Tables to migrate: ${ tablesCnt }\n
\t--[loadStructureToMigrate] Views to migrate: ${ viewsCnt }`;
log(conversion, message);
await Promise.all(processTablePromises);
await migrationStateManager.set(conversion, 'tables_loaded');
return conversion;
}

45
src/Table.ts Normal file
View file

@ -0,0 +1,45 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
export default class Table {
/**
* The path to current table's log file.
*/
public readonly tableLogPath: string;
/**
* Representation of given table's columns metadata.
*/
public readonly arrTableColumns: any[];
/**
* Total rows inserted into given table.
*/
public totalRowsInserted: number;
/**
* Constructor.
*/
public constructor(tableLogPath: string) {
this.tableLogPath = tableLogPath;
this.arrTableColumns = [];
this.totalRowsInserted = 0;
}
}

View file

@ -1,78 +0,0 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
'use strict';
const connect = require('./Connector');
const log = require('./Logger');
const generateError = require('./ErrorGenerator');
const extraConfigProcessor = require('./ExtraConfigProcessor');
/**
* Runs "vacuum full" and "analyze".
*
* @param {Conversion} self
*
* @returns {Promise}
*/
module.exports = self => {
return connect(self).then(() => {
return new Promise(resolve => {
const vacuumPromises = [];
for (let i = 0; i < self._tablesToMigrate.length; ++i) {
if (self._noVacuum.indexOf(extraConfigProcessor.getTableName(self, self._tablesToMigrate[i], true)) === -1) {
const msg = '\t--[runVacuumFullAndAnalyze] Running "VACUUM FULL and ANALYZE" query for table "'
+ self._schema + '"."' + self._tablesToMigrate[i] + '"...';
log(self, msg);
vacuumPromises.push(
new Promise(resolveVacuum => {
self._pg.connect((error, client, done) => {
if (error) {
generateError(self, '\t--[runVacuumFullAndAnalyze] Cannot connect to PostgreSQL server...');
resolveVacuum();
} else {
const sql = 'VACUUM (FULL, ANALYZE) "' + self._schema + '"."' + self._tablesToMigrate[i] + '";';
client.query(sql, err => {
done();
if (err) {
generateError(self, '\t--[runVacuumFullAndAnalyze] ' + err, sql);
resolveVacuum();
} else {
const msg2 = '\t--[runVacuumFullAndAnalyze] Table "' + self._schema
+ '"."' + self._tablesToMigrate[i] + '" is VACUUMed...';
log(self, msg2);
resolveVacuum();
}
});
}
});
})
);
}
}
Promise.all(vacuumPromises).then(() => resolve());
});
});
};

48
src/VacuumProcessor.ts Normal file
View file

@ -0,0 +1,48 @@
/*
* This file is a part of "NMIG" - the database migration tool.
*
* Copyright (C) 2016 - present, Anatoly Khaytovich <anatolyuss@gmail.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program (please see the "LICENSE.md" file).
* If not, see <http://www.gnu.org/licenses/gpl.txt>.
*
* @author Anatoly Khaytovich <anatolyuss@gmail.com>
*/
import log from './Logger';
import Conversion from './Conversion';
import DBAccess from './DBAccess';
import DBVendors from './DBVendors';
import * as extraConfigProcessor from './ExtraConfigProcessor';
/**
* Runs "vacuum full" and "analyze".
*/
export default async function(conversion: Conversion): Promise<void> {
const dbAccess: DBAccess = new DBAccess(conversion);
const vacuumPromises: Promise<void>[] = conversion._tablesToMigrate.map(async (table: string) => {
if (conversion._noVacuum.indexOf(extraConfigProcessor.getTableName(conversion, table, true)) === -1) {
const msg: string = `\t--[runVacuumFullAndAnalyze] Running "VACUUM FULL and ANALYZE" query for table
"${ conversion._schema }"."${ table }"...`;
log(conversion, msg);
const sql: string = `VACUUM (FULL, ANALYZE) "${ conversion._schema }"."${ table }";`;
await dbAccess.query('runVacuumFullAndAnalyze', sql, DBVendors.PG, false, false);
const msgSuccess: string = `\t--[runVacuumFullAndAnalyze] Table "${ conversion._schema }"."${ table }" is VACUUMed...`;
log(conversion, msgSuccess);
}
});
await Promise.all(vacuumPromises);
}

View file

@ -3,12 +3,14 @@
"removeComments": true,
"module": "commonjs",
"allowJs": true,
"target": "es6",
"target": "es2017",
"sourceMap": true,
"outDir": "./dist",
"strict": true,
"types": [
"node"
"node",
"pg",
"mysql"
]
},