diff --git a/src/BinaryDataDecoder.js b/src/BinaryDataDecoder.js deleted file mode 100644 index bfcb462..0000000 --- a/src/BinaryDataDecoder.js +++ /dev/null @@ -1,92 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const generateError = require('./ErrorGenerator'); -const log = require('./Logger'); -const connect = require('./Connector'); - -/** - * Decodes binary data from from textual representation in string. - * - * @param {Conversion} conversion - * - * @returns {Promise} - */ -module.exports = conversion => { - log(conversion, '\t--[decodeBinaryData] Decodes binary data from textual representation in string.'); - - return connect(conversion).then(() => { - return new Promise(resolve => { - conversion._pg.connect((error, client, release) => { - if (error) { - generateError(conversion, '\t--[decodeBinaryData] Cannot connect to PostgreSQL server...'); - return resolve(conversion); - } - - const sql = `SELECT table_name, column_name - FROM information_schema.columns - WHERE table_catalog = '${ conversion._targetConString.database }' - AND table_schema = '${ conversion._schema }' - AND data_type IN ('bytea', 'geometry');`; - - client.query(sql, (err, data) => { - release(); - - if (err) { - generateError(conversion, `\t--[decodeBinaryData] ${ err }`, sql); - return resolve(conversion); - } - - const decodePromises = []; - - for (let i = 0; i < data.rows.length; ++i) { - decodePromises.push(new Promise(resolveDecode => { - conversion._pg.connect((connectionError, pgClient, clientRelease) => { - if (connectionError) { - generateError(conversion, '\t--[decodeBinaryData] Cannot connect to PostgreSQL server...'); - return resolveDecode(); - } - - const tableName = data.rows[i].table_name; - const columnName = data.rows[i].column_name; - const sqlDecode = `UPDATE ${ conversion._schema }."${ tableName }" - SET "${ columnName }" = DECODE(ENCODE("${ columnName }", 'escape'), 'hex');`; - - pgClient.query(sqlDecode, decodeError => { - clientRelease(); - - if (decodeError) { - generateError(conversion, `\t--[decodeBinaryData] ${ decodeError }`, sqlDecode); - } - - resolveDecode(); - }); - }); - })); - } - - Promise.all(decodePromises).then(() => resolve(conversion)); - }); - }); - }); - }); -}; diff --git a/src/BinaryDataDecoder.ts b/src/BinaryDataDecoder.ts new file mode 100644 index 0000000..02af134 --- /dev/null +++ b/src/BinaryDataDecoder.ts @@ -0,0 +1,53 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import log from './Logger'; +import Conversion from './Conversion'; +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; + +/** + * Decodes binary data from from textual representation in string. + */ +export default async function (conversion: Conversion): Promise { + log(conversion, '\t--[BinaryDataDecoder::decodeBinaryData] Decodes binary data from textual representation in string.'); + + const dbAccess: DBAccess = new DBAccess(conversion); + const sql: string = `SELECT table_name, column_name + FROM information_schema.columns + WHERE table_catalog = '${ conversion._targetConString.database }' + AND table_schema = '${ conversion._schema }' + AND data_type IN ('bytea', 'geometry');`; + + const result: DBAccessQueryResult = await dbAccess.query('BinaryDataDecoder::decodeBinaryData', sql, DBVendors.PG, false, true); + + const decodePromises: Promise[] = result.data.rows.map(async (row: any) => { + const tableName: string = row.table_name; + const columnName: string = row.column_name; + const sqlDecode: string = `UPDATE ${ conversion._schema }."${ tableName }" + SET "${ columnName }" = DECODE(ENCODE("${ columnName }", 'escape'), 'hex');`; + + await dbAccess.query('BinaryDataDecoder::decodeBinaryData', sqlDecode, DBVendors.PG, false, false, result.client); + }); + + await Promise.all(decodePromises); + return conversion; +} diff --git a/src/BootProcessor.js b/src/BootProcessor.js deleted file mode 100644 index dbdda2f..0000000 --- a/src/BootProcessor.js +++ /dev/null @@ -1,93 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const connect = require('./Connector'); - -/** - * Boot the migration. - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - console.log('\t--[boot] Cannot connect to PostgreSQL server...\n' + error); - done(); - process.exit(); - } else { - const sql = 'SELECT EXISTS(SELECT 1 FROM information_schema.tables ' - + 'WHERE table_schema = \'' + self._schema - + '\' AND table_name = \'state_logs_' + self._schema + self._mySqlDbName + '\');'; - - client.query(sql, (err, result) => { - done(); - - if (err) { - console.log('\t--[boot] Error when executed query:\n' + sql + '\nError message:\n' + err); - process.exit(); - } else { - const isExists = !!result.rows[0].exists; - const message = (isExists - ? '\n\t--[boot] NMIG is ready to restart after some failure.' - + '\n\t--[boot] Consider checking log files at the end of migration.' - : '\n\t--[boot] NMIG is ready to start.') + '\n\t--[boot] Proceed? [Y/n]'; - - const logo = '\n\t/\\_ |\\ /\\/\\ /\\___' - + '\n\t| \\ | |\\ | | | __' - + '\n\t| |\\\\| || | | | \\_ \\' - + '\n\t| | \\| || | | |__/ |' - + '\n\t\\| \\/ /_|/______/' - + '\n\n\tNMIG - the database migration tool' - + '\n\tCopyright (C) 2016 - present, Anatoly Khaytovich \n\n' - + '\t--[boot] Configuration has been just loaded.' - + message; - - console.log(logo); - process - .stdin - .resume() - .setEncoding(self._encoding) - .on('data', stdin => { - if (stdin.indexOf('n') !== -1) { - console.log('\t--[boot] Migration aborted.\n'); - process.exit(); - } else if (stdin.indexOf('Y') !== -1) { - resolve(self); - } else { - const hint = '\t--[boot] Unexpected input ' + stdin + '\n' - + '\t--[boot] Expected input is upper case Y\n' - + '\t--[boot] or lower case n\n' + message; - - console.log(hint); - } - }); - } - }); - } - }); - }); - }); -}; diff --git a/src/BootProcessor.ts b/src/BootProcessor.ts new file mode 100644 index 0000000..89989d9 --- /dev/null +++ b/src/BootProcessor.ts @@ -0,0 +1,71 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import Conversion from './Conversion'; +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; + +/** + * Boots the migration. + */ +export default async (conversion: Conversion): Promise => { + const dbAccess: DBAccess = new DBAccess(conversion); + const sql: string = `SELECT EXISTS(SELECT 1 FROM information_schema.tables + WHERE table_schema = '${ conversion._schema }' + AND table_name = 'state_logs_${ conversion._schema }${ conversion._mySqlDbName }');`; + + const result: DBAccessQueryResult = await dbAccess.query('Boot', sql, DBVendors.PG, true, false); + const isExists: boolean = !!result.data.rows[0].exists; + const message: string = `${ (isExists + ? '\n\t--[boot] NMIG is ready to restart after some failure.\n\t--[boot] Consider checking log files at the end of migration.' + : '\n\t--[boot] NMIG is ready to start.') } \n\t--[boot] Proceed? [Y/n]`; + + const logo: string = '\n\t/\\_ |\\ /\\/\\ /\\___' + + '\n\t| \\ | |\\ | | | __' + + '\n\t| |\\\\| || | | | \\_ \\' + + '\n\t| | \\| || | | |__/ |' + + '\n\t\\| \\/ /_|/______/' + + '\n\n\tNMIG - the database migration tool' + + '\n\tCopyright (C) 2016 - present, Anatoly Khaytovich \n\n' + + '\t--[boot] Configuration has been just loaded.' + + message; + + console.log(logo); + + process + .stdin + .resume() + .setEncoding(conversion._encoding) + .on('data', (stdin: string) => { + if (stdin.indexOf('n') !== -1) { + console.log('\t--[boot] Migration aborted.\n'); + process.exit(); + } else if (stdin.indexOf('Y') !== -1) { + return conversion; + } else { + const hint: string = `\t--[boot] Unexpected input ${ stdin }\n + \t--[boot] Expected input is upper case Y\n + \t--[boot] or lower case n\n${ message }`; + + console.log(hint); + } + }); +} diff --git a/src/Classes/BufferStream.js b/src/BufferStream.js similarity index 100% rename from src/Classes/BufferStream.js rename to src/BufferStream.js diff --git a/src/ColumnsDataArranger.js b/src/ColumnsDataArranger.ts similarity index 52% rename from src/ColumnsDataArranger.js rename to src/ColumnsDataArranger.ts index 5b22c76..853b17d 100644 --- a/src/ColumnsDataArranger.js +++ b/src/ColumnsDataArranger.ts @@ -18,16 +18,11 @@ * * @author Anatoly Khaytovich */ -'use strict'; /** - * Define if given type is one of MySQL spacial types. - * - * @param {String} type - * - * @returns {Boolean} + * Defines if given type is one of MySQL spacial types. */ -const isSpacial = type => { +const isSpacial = (type: string): boolean => { return type.indexOf('geometry') !== -1 || type.indexOf('point') !== -1 || type.indexOf('linestring') !== -1 @@ -35,69 +30,50 @@ const isSpacial = type => { }; /** - * Define if given type is one of MySQL binary types. - * - * @param {String} type - * - * @returns {Boolean} + * Defines if given type is one of MySQL binary types. */ -const isBinary = type => { +const isBinary = (type: string): boolean => { return type.indexOf('blob') !== -1 || type.indexOf('binary') !== -1; }; /** - * Define if given type is one of MySQL bit types. - * - * @param {String} type - * - * @returns {Boolean} + * Defines if given type is one of MySQL bit types. */ -const isBit = type => { +const isBit = (type: string): boolean => { return type.indexOf('bit') !== -1; }; /** - * Define if given type is one of MySQL date-time types. - * - * @param {String} type - * - * @returns {Boolean} + * Defines if given type is one of MySQL date-time types. */ -const isDateTime = type => { +const isDateTime = (type: string): boolean => { return type.indexOf('timestamp') !== -1 || type.indexOf('date') !== -1; }; /** * Arranges columns data before loading. - * - * @param {Array} arrTableColumns - * @param {Number} mysqlVersion - * - * @returns {String} */ -module.exports = (arrTableColumns, mysqlVersion) => { - let strRetVal = ''; - const arrTableColumnsLength = arrTableColumns.length; - const wkbFunc = mysqlVersion >= 5.76 ? 'ST_AsWKB' : 'AsWKB'; +export default (arrTableColumns: any[], mysqlVersion: string|number): string => { + let strRetVal: string = ''; + const wkbFunc: string = mysqlVersion >= 5.76 ? 'ST_AsWKB' : 'AsWKB'; - for (let i = 0; i < arrTableColumnsLength; ++i) { - const field = arrTableColumns[i].Field; - const type = arrTableColumns[i].Type; + arrTableColumns.forEach((column: any) => { + const field: string = column.Field; + const type: string = column.Type; if (isSpacial(type)) { // Apply HEX(ST_AsWKB(...)) due to the issue, described at https://bugs.mysql.com/bug.php?id=69798 - strRetVal += 'HEX(' + wkbFunc + '(`' + field + '`)) AS `' + field + '`,'; + strRetVal += `HEX(${ wkbFunc }(\`${ field }\`)) AS \`${ field }\`,`; } else if (isBinary(type)) { - strRetVal += 'HEX(`' + field + '`) AS `' + field + '`,'; + strRetVal += `HEX(\`${ field }\`) AS \`${ field }\`,`; } else if (isBit(type)) { - strRetVal += 'BIN(`' + field + '`) AS `' + field + '`,'; + strRetVal += `BIN(\`${ field }\`) AS \`${ field }\`,`; } else if (isDateTime(type)) { - strRetVal += 'IF(`' + field + '` IN(\'0000-00-00\', \'0000-00-00 00:00:00\'), \'-INFINITY\', CAST(`' - + field + '` AS CHAR)) AS `' + field + '`,'; + strRetVal += `IF(\`${ field }\` IN('0000-00-00', '0000-00-00 00:00:00'), '-INFINITY', CAST(\`${ field }\` AS CHAR)) AS \`${ field }\`,`; } else { - strRetVal += '`' + field + '` AS `' + field + '`,'; + strRetVal += `\`${ field }\` AS \`${ field }\`,`; } - } + }); return strRetVal.slice(0, -1); -}; +} diff --git a/src/ConnectionEmitter.js b/src/ConnectionEmitter.js deleted file mode 100644 index 6109916..0000000 --- a/src/ConnectionEmitter.js +++ /dev/null @@ -1,151 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const mysql = require('mysql'); -const pg = require('pg'); -const generateError = require('./ErrorGenerator'); - -module.exports = class ConnectionEmitter { - - /** - * ConnectionEmitter constructor. - * - * @param {Conversion} conversion - */ - constructor(conversion) { - this._conversion = conversion; - } - - /** - * Ensure MySQL connection pool existence. - * - * @returns {undefined} - */ - _getMysqlConnection() { - if (!this._conversion._mysql) { - this._conversion._sourceConString.connectionLimit = this._conversion._maxDbConnectionPoolSize; - const pool = mysql.createPool(this._conversion._sourceConString); - - if (!pool) { - generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...'); - process.exit(); - } - - this._conversion._mysql = pool; - } - } - - /** - * Ensure PostgreSQL connection pool existence. - * - * @returns {undefined} - */ - _getPgConnection() { - if (!this._conversion._pg) { - this._conversion._targetConString.max = this._conversion._maxDbConnectionPoolSize; - const pool = new pg.Pool(this._conversion._targetConString); - - if (!pool) { - generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...'); - process.exit(); - } - - this._conversion._pg = pool; - - this._conversion._pg.on('error', error => { - const message = `Cannot connect to PostgreSQL server...\n${ error.message }\n${ error.stack }`; - generateError(this._conversion, message); - process.exit(); - }); - } - } - - /** - * Obtain Connection instance. - * - * @returns {Promise} - */ - async getMysqlClient() { - try { - this._getMysqlConnection(); - return await this._conversion._mysql.getConnection(); - } catch (error) { - generateError(this._conversion, `\t--[getMysqlClient] Cannot connect to PostgreSQL server...\n${ error }`); - process.exit(); - } - } - - /** - * Obtain pg.Client instance. - * - * @returns {Promise} - */ - async getPgClient() { - try { - this._getPgConnection(); - return await this._conversion._pg.connect(); - } catch (error) { - generateError(this._conversion, `\t--[getPgClient] Cannot connect to PostgreSQL server...\n${ error }`); - process.exit(); - } - } - - /** - * Runs a query on the first available idle client and returns its result. - * Note, the pool does the acquiring and releasing of the client internally. - * - * @param {String} sql - * - * @returns {Promise} - */ - async runPgPoolQuery(sql) { - try { - this._getPgConnection(); - return await this._conversion._pg.query(sql); - } catch (error) { - generateError(this._conversion, `\t--[pgPoolQuery] Cannot connect to PostgreSQL server...\n${ error }`); - process.exit(); - } - } - - /** - * Releases MySQL Client back to the pool. - * - * @param {Connection} mysqlClient - * - * @returns {undefined} - */ - releaseMysqlClient(mysqlClient) { - mysqlClient.release(); - } - - /** - * Releases pg.Client back to the pool. - * - * @param {pg.Client} pgClient - * - * @returns {undefined} - */ - releasePgClient(pgClient) { - pgClient.release(); - } -}; diff --git a/src/Connector.js b/src/Connector.js deleted file mode 100644 index 65d5ea0..0000000 --- a/src/Connector.js +++ /dev/null @@ -1,86 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const mysql = require('mysql'); -const pg = require('pg'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); -const generateReport = require('./ReportGenerator'); - -/** - * Check if both servers are connected. - * If not, than create connections. - * Kill current process if can not connect. - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports = self => { - return new Promise(resolve => { - const mysqlConnectionPromise = new Promise((mysqlResolve, mysqlReject) => { - if (!self._mysql) { - self._sourceConString.connectionLimit = self._maxDbConnectionPoolSize; - self._sourceConString.multipleStatements = true; - const pool = mysql.createPool(self._sourceConString); - - if (pool) { - self._mysql = pool; - mysqlResolve(); - } else { - log(self, '\t--[connect] Cannot connect to MySQL server...'); - mysqlReject(); - } - } else { - mysqlResolve(); - } - }); - - const pgConnectionPromise = new Promise((pgResolve, pgReject) => { - if (!self._pg) { - self._targetConString.max = self._maxDbConnectionPoolSize; - const pool = new pg.Pool(self._targetConString); - - if (pool) { - self._pg = pool; - - self._pg.on('error', error => { - const message = 'Cannot connect to PostgreSQL server...\n' + error.message + '\n' + error.stack; - generateError(self, message); - generateReport(self, message); - }); - - pgResolve(); - } else { - log(self, '\t--[connect] Cannot connect to PostgreSQL server...'); - pgReject(); - } - } else { - pgResolve(); - } - }); - - Promise.all([mysqlConnectionPromise, pgConnectionPromise]) - .then(() => resolve()) - .catch(() => process.exit()); - }); -}; diff --git a/src/Connector.ts b/src/Connector.ts new file mode 100644 index 0000000..db9ac4a --- /dev/null +++ b/src/Connector.ts @@ -0,0 +1,82 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import { Pool as MySQLPool } from 'mysql'; +import * as mysql from 'mysql'; +import { Pool as PgPool } from 'pg'; +import log from './Logger'; +import generateError from './ErrorGenerator'; +import generateReport from './ReportGenerator'; +import Conversion from './Conversion'; + +/** + * Check if both servers are connected. + * If not, than create connections. + * Kill current process if can not connect. + */ +export default (conversion: Conversion): Promise => { + return new Promise(resolve => { + const mysqlConnectionPromise: Promise = new Promise((mysqlResolve, mysqlReject) => { + if (!conversion._mysql) { + conversion._sourceConString.connectionLimit = conversion._maxDbConnectionPoolSize; + conversion._sourceConString.multipleStatements = true; + const pool: MySQLPool = mysql.createPool(conversion._sourceConString); + + if (pool) { + conversion._mysql = pool; + mysqlResolve(); + } else { + log(conversion, '\t--[connect] Cannot connect to MySQL server...'); + mysqlReject(); + } + } else { + mysqlResolve(); + } + }); + + const pgConnectionPromise: Promise = new Promise((pgResolve, pgReject) => { + if (!conversion._pg) { + conversion._targetConString.max = conversion._maxDbConnectionPoolSize; + const pool: PgPool = new PgPool(conversion._targetConString); + + if (pool) { + conversion._pg = pool; + + conversion._pg.on('error', (error: Error) => { + const message: string = `Cannot connect to PostgreSQL server...\n' ${ error.message }\n${ error.stack }`; + generateError(conversion, message); + generateReport(conversion, message); + }); + + pgResolve(); + } else { + log(conversion, '\t--[connect] Cannot connect to PostgreSQL server...'); + pgReject(); + } + } else { + pgResolve(); + } + }); + + Promise.all([mysqlConnectionPromise, pgConnectionPromise]) + .then(() => resolve(conversion)) + .catch(() => process.exit()); + }); +} diff --git a/src/ConstraintsProcessor.js b/src/ConstraintsProcessor.js deleted file mode 100644 index 29bf169..0000000 --- a/src/ConstraintsProcessor.js +++ /dev/null @@ -1,134 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const sequencesProcessor = require('./SequencesProcessor'); -const dataPoolManager = require('./DataPoolManager'); -const runVacuumFullAndAnalyze = require('./VacuumProcessor'); -const migrationStateManager = require('./MigrationStateManager'); -const generateReport = require('./ReportGenerator'); -const processEnum = require('./EnumProcessor'); -const processNull = require('./NullProcessor'); -const processDefault = require('./DefaultProcessor'); -const processIndexAndKey = require('./IndexAndKeyProcessor'); -const processComments = require('./CommentsProcessor'); -const processForeignKey = require('./ForeignKeyProcessor'); -const processViews = require('./ViewGenerator'); -const { dropDataChunkIdColumn } = require('./ConsistencyEnforcer'); - -/** - * Continues migration process after data loading, when migrate_only_data is true. - * - * @param {Conversion} self - * - * @returns {undefined} - */ -const continueProcessAfterDataLoadingShort = self => { - const promises = []; - - for (let i = 0; i < self._tablesToMigrate.length; ++i) { - const tableName = self._tablesToMigrate[i]; - promises.push( - dropDataChunkIdColumn(self, tableName).then(() => { - return sequencesProcessor.setSequenceValue(self, tableName); - }) - ); - } - - Promise.all(promises).then(() => { - return dataPoolManager.dropDataPoolTable(self); - }).then(() => { - return runVacuumFullAndAnalyze(self); - }).then(() => { - return migrationStateManager.dropStateLogsTable(self); - }).then( - () => generateReport(self, 'NMIG migration is accomplished.') - ); -} - -/** - * Continues migration process after data loading, when migrate_only_data is false. - * - * @param {Conversion} self - * - * @returns {undefined} - */ -const continueProcessAfterDataLoadingLong = self => { - migrationStateManager.get(self, 'per_table_constraints_loaded').then(isTableConstraintsLoaded => { - const promises = []; - - if (!isTableConstraintsLoaded) { - for (let i = 0; i < self._tablesToMigrate.length; ++i) { - const tableName = self._tablesToMigrate[i]; - promises.push( - dropDataChunkIdColumn(self, tableName).then(() => { - return processEnum(self, tableName); - }).then(() => { - return processNull(self, tableName); - }).then(() => { - return processDefault(self, tableName); - }).then(() => { - return sequencesProcessor.createSequence(self, tableName); - }).then(() => { - return processIndexAndKey(self, tableName); - }).then(() => { - return processComments(self, tableName); - }) - ); - } - } - - Promise.all(promises).then(() => { - migrationStateManager.set(self, 'per_table_constraints_loaded').then(() => { - return processForeignKey(self); - }).then(() => { - return migrationStateManager.set(self, 'foreign_keys_loaded'); - }).then(() => { - return dataPoolManager.dropDataPoolTable(self); - }).then(() => { - return processViews(self); - }).then(() => { - return migrationStateManager.set(self, 'views_loaded'); - }).then(() => { - return runVacuumFullAndAnalyze(self); - }).then(() => { - return migrationStateManager.dropStateLogsTable(self); - }).then( - () => generateReport(self, 'NMIG migration is accomplished.') - ); - }); - }); -} - -/** - * Continues migration process after data loading. - * - * @param {Conversion} self - * - * @returns {undefined} - */ -module.exports = self => { - if (self._migrateOnlyData) { - continueProcessAfterDataLoadingShort(self); - } else { - continueProcessAfterDataLoadingLong(self); - } -}; diff --git a/src/ConstraintsProcessor.ts b/src/ConstraintsProcessor.ts new file mode 100644 index 0000000..3cf7682 --- /dev/null +++ b/src/ConstraintsProcessor.ts @@ -0,0 +1,91 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import sequencesProcessor from './SequencesProcessor'; +import * as dataPoolManager from './DataPoolManager'; +import runVacuumFullAndAnalyze from './VacuumProcessor'; +import * as migrationStateManager from './MigrationStateManager'; +import generateReport from './ReportGenerator'; +import processEnum from './EnumProcessor'; +import processNull from './NullProcessor'; +import processDefault from './DefaultProcessor'; +import processIndexAndKey from './IndexAndKeyProcessor'; +import processComments from './CommentsProcessor'; +import processForeignKey from './ForeignKeyProcessor'; +import processViews from './ViewGenerator'; +import { dropDataChunkIdColumn } from './ConsistencyEnforcer'; +import Conversion from './Conversion'; + +/** + * Continues migration process after data loading, when migrate_only_data is true. + */ +async function continueProcessAfterDataLoadingShort(conversion: Conversion): Promise { + const promises: Promise[] = conversion._tablesToMigrate.map(async (tableName: string) => { + await dropDataChunkIdColumn(conversion, tableName); + return sequencesProcessor.setSequenceValue(conversion, tableName); + }); + + await Promise.all(promises); + await dataPoolManager.dropDataPoolTable(conversion); + await runVacuumFullAndAnalyze(conversion); + await migrationStateManager.dropStateLogsTable(conversion); + generateReport(conversion, 'NMIG migration is accomplished.'); +} + +/** + * Continues migration process after data loading, when migrate_only_data is false. + */ +async function continueProcessAfterDataLoadingLong(conversion: Conversion): Promise { + const isTableConstraintsLoaded: boolean = await migrationStateManager.get(conversion, 'per_table_constraints_loaded'); + const promises: Promise[] = conversion._tablesToMigrate.map(async (tableName: string) => { + if (!isTableConstraintsLoaded) { + await dropDataChunkIdColumn(conversion, tableName); + await processEnum(conversion, tableName); + await processNull(conversion, tableName); + await processDefault(conversion, tableName); + await sequencesProcessor.createSequence(conversion, tableName); + await processIndexAndKey(conversion, tableName); + await processComments(conversion, tableName); + } + }); + + await Promise.all(promises); + await migrationStateManager.set(conversion, 'per_table_constraints_loaded'); + await processForeignKey(conversion); + await migrationStateManager.set(conversion, 'foreign_keys_loaded'); + await dataPoolManager.dropDataPoolTable(conversion); + await processViews(conversion); + await migrationStateManager.set(conversion, 'views_loaded'); + await runVacuumFullAndAnalyze(conversion); + await migrationStateManager.dropStateLogsTable(conversion); + generateReport(conversion, 'NMIG migration is accomplished.'); +} + +/** + * Continues migration process after data loading. + */ +export default async function(conversion: Conversion): Promise { + if (conversion._migrateOnlyData) { + await continueProcessAfterDataLoadingShort(conversion); + return; + } + + await continueProcessAfterDataLoadingLong(conversion); +} diff --git a/src/Classes/Conversion.ts b/src/Conversion.ts similarity index 95% rename from src/Classes/Conversion.ts rename to src/Conversion.ts index 17775d5..eef3b1c 100644 --- a/src/Classes/Conversion.ts +++ b/src/Conversion.ts @@ -19,7 +19,9 @@ * @author Anatoly Khaytovich */ import * as path from 'path'; -import {EventEmitter} from "events"; +import { EventEmitter } from 'events'; +import { Pool as MySQLPool } from 'mysql'; +import { Pool as PgPool } from 'pg'; export default class Conversion { /** @@ -115,17 +117,17 @@ export default class Conversion { /** * Current version of source (MySQL) db. */ - public _mysqlVersion: string; + public _mysqlVersion: string|number; /** * Node-MySQL connections pool. */ - public _mysql: any; + public _mysql?: MySQLPool; /** * Node-Postgres connection pool. */ - public _pg: any; + public _pg?: PgPool; /** * An object, representing additional configuration options. @@ -187,10 +189,15 @@ export default class Conversion { */ public _eventEmitter: EventEmitter|null; + /** + * The data types map. + */ + public _dataTypesMap: any; + /** * Constructor. */ - constructor(config: any) { + public constructor(config: any) { this._config = config; this._sourceConString = this._config.source; this._targetConString = this._config.target; @@ -206,8 +213,6 @@ export default class Conversion { this._dataChunkSize = this._config.data_chunk_size === undefined ? 1 : +this._config.data_chunk_size; this._dataChunkSize = this._dataChunkSize <= 0 ? 1 : this._dataChunkSize; this._0777 = '0777'; - this._mysql = null; - this._pg = null; this._mysqlVersion = '5.6.21'; // Simply a default value. this._extraConfig = this._config.extraConfig === undefined ? false : this._config.extraConfig; this._tablesToMigrate = []; @@ -240,7 +245,7 @@ export default class Conversion { /** * Checks if given value is integer number. */ - isIntNumeric(value: any): boolean { + private isIntNumeric(value: any): boolean { return !isNaN(parseInt(value)) && isFinite(value); } -}; +} diff --git a/src/DBAccess.ts b/src/DBAccess.ts new file mode 100644 index 0000000..9142667 --- /dev/null +++ b/src/DBAccess.ts @@ -0,0 +1,217 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import * as mysql from 'mysql'; +import { Pool as MySQLPool, PoolConnection, MysqlError } from 'mysql'; +import { Pool as PgPool, PoolClient, QueryResult } from 'pg'; +import generateError from './ErrorGenerator'; +import Conversion from './Conversion'; +import generateReport from './ReportGenerator'; +import DBVendors from './DBVendors'; +import DBAccessQueryResult from './DBAccessQueryResult'; + +export default class DBAccess { + /** + * Conversion instance. + */ + private readonly _conversion: Conversion; + + /** + * DBAccess constructor. + */ + public constructor(conversion: Conversion) { + this._conversion = conversion; + } + + /** + * Ensure MySQL connection pool existence. + */ + private _getMysqlConnection(): void { + if (!this._conversion._mysql) { + this._conversion._sourceConString.connectionLimit = this._conversion._maxDbConnectionPoolSize; + this._conversion._sourceConString.multipleStatements = true; + const pool: MySQLPool = mysql.createPool(this._conversion._sourceConString); + + if (!pool) { + generateError(this._conversion, '\t--[getMysqlConnection] Cannot connect to MySQL server...'); + process.exit(); + } + + this._conversion._mysql = pool; + } + } + + /** + * Ensure PostgreSQL connection pool existence. + */ + private _getPgConnection(): void { + if (!this._conversion._pg) { + this._conversion._targetConString.max = this._conversion._maxDbConnectionPoolSize; + const pool: PgPool = new PgPool(this._conversion._targetConString); + + if (!pool) { + generateError(this._conversion, '\t--[getPgConnection] Cannot connect to PostgreSQL server...'); + process.exit(); + } + + this._conversion._pg = pool; + + this._conversion._pg.on('error', (error: Error) => { + const message: string = `Cannot connect to PostgreSQL server...\n' ${ error.message }\n${ error.stack }`; + generateError(this._conversion, message); + generateReport(this._conversion, message); + }); + } + } + + /** + * Obtain PoolConnection instance. + */ + public getMysqlClient(): Promise { + this._getMysqlConnection(); + + return new Promise((resolve, reject) => { + (this._conversion._mysql).getConnection((err: MysqlError|null, connection: PoolConnection) => { + return err ? reject(err) : resolve(connection); + }); + }); + } + + /** + * Obtain PoolClient instance. + */ + public getPgClient(): Promise { + this._getPgConnection(); + return (this._conversion._pg).connect(); + } + + /** + * Runs a query on the first available idle client and returns its result. + * Note, the pool does the acquiring and releasing of the client internally. + */ + public runPgPoolQuery(sql: string): Promise { + this._getPgConnection(); + return (this._conversion._pg).query(sql); + } + + /** + * Releases MySQL connection back to the pool. + */ + public releaseMysqlClient(connection: PoolConnection): void { + connection.release(); + } + + /** + * Releases PostgreSQL connection back to the pool. + */ + public releasePgClient(pgClient: PoolClient): void { + pgClient.release(); + } + + /** + * Sends given SQL query to specified DB. + * Performs appropriate actions (requesting/releasing client) against target connections pool. + */ + public async query( + caller: string, + sql: string, + vendor: DBVendors, + processExitOnError: boolean, + shouldReturnClient: boolean, + client?: PoolConnection|PoolClient + ): Promise { + // Checks if there is an available client. + if (!client) { + try { + // Client is undefined. + // It must be requested from the connections pool. + client = vendor === DBVendors.PG ? await this.getPgClient() : await this.getMysqlClient(); + } catch (error) { + // Client request failed. + // Must exit the function. + return processExitOnError ? process.exit() : { client: client, data: error }; + } + } + + return vendor === DBVendors.PG + ? this._queryPG(caller, sql, processExitOnError, shouldReturnClient, (client)) + : this._queryMySQL(caller, sql, processExitOnError, shouldReturnClient, (client)); + } + + /** + * Sends given SQL query to MySQL. + */ + private _queryMySQL( + caller: string, + sql: string, + processExitOnError: boolean, + shouldReturnClient: boolean, + client?: PoolConnection + ): Promise { + return new Promise(async (resolve, reject) => { + (client).query(sql, (error: MysqlError|null, data: any) => { + // Checks if there are more queries to be sent using current client. + if (!shouldReturnClient) { + // No more queries to be sent using current client. + // The client must be released. + this.releaseMysqlClient((client)); + client = undefined; + } + + if (error) { + // An error occurred during DB querying. + generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); + return processExitOnError ? process.exit() : reject({ client: client, data: error }); + } + + return resolve({ client: client, data: data }); + }); + }); + } + + /** + * Sends given SQL query to PostgreSQL. + */ + private async _queryPG( + caller: string, + sql: string, + processExitOnError: boolean, + shouldReturnClient: boolean, + client?: PoolClient + ): Promise { + try { + const data: any = await (client).query(sql); + + // Checks if there are more queries to be sent using current client. + if (!shouldReturnClient) { + // No more queries to be sent using current client. + // The client must be released. + this.releasePgClient((client)); + client = undefined; + } + + return { client: client, data: data }; + } catch (error) { + // An error occurred during DB querying. + generateError(this._conversion, `\t--[${ caller }] ${ error }`, sql); + return processExitOnError ? process.exit() : { client: client, data: error }; + } + } +} diff --git a/src/Classes/Table.js b/src/DBAccessQueryResult.ts similarity index 72% rename from src/Classes/Table.js rename to src/DBAccessQueryResult.ts index 8e471a2..ef56013 100644 --- a/src/Classes/Table.js +++ b/src/DBAccessQueryResult.ts @@ -18,18 +18,18 @@ * * @author Anatoly Khaytovich */ -'use strict'; +import { PoolClient } from 'pg'; +import { PoolConnection } from 'mysql'; -module.exports = class Table { +export default interface DBAccessQueryResult { /** - * This function represents table related metadata. - * Constructor. - * - * @param {String} tableLogPath + * MySQL's or PostgreSQL's client instance. + * The client may be undefined. */ - constructor(tableLogPath) { - this.tableLogPath = tableLogPath; - this.arrTableColumns = []; - this.totalRowsInserted = 0; - } -}; + client?: PoolConnection|PoolClient; + + /** + * Query result. + */ + data: any; +} diff --git a/src/DBVendors.ts b/src/DBVendors.ts new file mode 100644 index 0000000..c34a0fc --- /dev/null +++ b/src/DBVendors.ts @@ -0,0 +1,26 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +enum DBVendors { + MYSQL, + PG, +} + +export default DBVendors; diff --git a/src/DataChunksProcessor.js b/src/DataChunksProcessor.js deleted file mode 100644 index 401dce7..0000000 --- a/src/DataChunksProcessor.js +++ /dev/null @@ -1,148 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const connect = require('./Connector'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); -const arrangeColumnsData = require('./ColumnsDataArranger'); -const extraConfigProcessor = require('./ExtraConfigProcessor'); - -/** - * Prepares an array of tables and chunk offsets. - * - * @param {Conversion} self - * @param {String} tableName - * @param {Boolean} haveDataChunksProcessed - * - * @returns {Promise} - */ -module.exports = (self, tableName, haveDataChunksProcessed) => { - return connect(self).then(() => { - return new Promise(resolve => { - if (haveDataChunksProcessed) { - return resolve(); - } - - self._mysql.getConnection((error, connection) => { - if (error) { - // The connection is undefined. - generateError(self, '\t--[prepareDataChunks] Cannot connect to MySQL server...\n\t' + error); - resolve(); - } else { - // Determine current table size, apply "chunking". - const originalTableName = extraConfigProcessor.getTableName(self, tableName, true); - let sql = "SELECT (data_length / 1024 / 1024) AS size_in_mb " - + "FROM information_schema.tables " - + "WHERE table_schema = '" + self._mySqlDbName + "' " - + "AND table_name = '" + originalTableName + "';"; - - connection.query(sql, (err, rows) => { - if (err) { - connection.release(); - generateError(self, '\t--[prepareDataChunks] ' + err, sql); - resolve(); - } else { - const tableSizeInMb = +rows[0].size_in_mb; - rows = null; - sql = 'SELECT COUNT(1) AS rows_count FROM `' + originalTableName + '`;'; - const strSelectFieldList = arrangeColumnsData( - self._dicTables[tableName].arrTableColumns, - self._mysqlVersion - ); - - connection.query(sql, (err2, rows2) => { - connection.release(); - - if (err2) { - generateError(self, '\t--[prepareDataChunks] ' + err2, sql); - resolve(); - } else { - const rowsCnt = rows2[0].rows_count; - rows2 = null; - let chunksCnt = tableSizeInMb / self._dataChunkSize; - chunksCnt = chunksCnt < 1 ? 1 : chunksCnt; - const rowsInChunk = Math.ceil(rowsCnt / chunksCnt); - const arrDataPoolPromises = []; - const msg = '\t--[prepareDataChunks] Total rows to insert into ' - + '"' + self._schema + '"."' + tableName + '": ' + rowsCnt; - - log(self, msg, self._dicTables[tableName].tableLogPath); - - for (let offset = 0; offset < rowsCnt; offset += rowsInChunk) { - arrDataPoolPromises.push(new Promise(resolveDataUnit => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[prepareDataChunks] Cannot connect to PostgreSQL server...\n' + error); - resolveDataUnit(); - } else { - const strJson = '{"_tableName":"' + tableName - + '","_selectFieldList":"' + strSelectFieldList + '",' - + '"_offset":' + offset + ',' - + '"_rowsInChunk":' + rowsInChunk + ',' - + '"_rowsCnt":' + rowsCnt + '}'; - - /* - * Define current data chunk size in MB. - * If there is only one chunk, then its size is equal to the table size. - * If there are more than one chunk, - * then a size of each chunk besides the last one is equal to "data_chunk_size", - * and a size of the last chunk is either "data_chunk_size" or tableSizeInMb % chunksCnt. - */ - let currentChunkSizeInMb = 0; - - if (chunksCnt === 1) { - currentChunkSizeInMb = tableSizeInMb; - } else if (offset + rowsInChunk >= rowsCnt) { - currentChunkSizeInMb = tableSizeInMb % chunksCnt; - currentChunkSizeInMb = currentChunkSizeInMb || self._dataChunkSize; - } else { - currentChunkSizeInMb = self._dataChunkSize; - } - - sql = 'INSERT INTO "' + self._schema + '"."data_pool_' + self._schema - + self._mySqlDbName + '"("is_started", "json", "size_in_mb")' - + ' VALUES(FALSE, $1, $2);'; - - client.query(sql, [strJson, currentChunkSizeInMb], err => { - done(); - - if (err) { - generateError(self, '\t--[prepareDataChunks] INSERT failed...\n' + err, sql); - } - - resolveDataUnit(); - }); - } - }); - })); - } - - Promise.all(arrDataPoolPromises).then(() => resolve()); - } - }); - } - }); - } - }); - }); - }); -}; diff --git a/src/DataChunksProcessor.ts b/src/DataChunksProcessor.ts new file mode 100644 index 0000000..c99868c --- /dev/null +++ b/src/DataChunksProcessor.ts @@ -0,0 +1,101 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import log from './Logger'; +import arrangeColumnsData from './ColumnsDataArranger'; +import * as extraConfigProcessor from './ExtraConfigProcessor'; +import Conversion from './Conversion'; +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; + +/** + * Prepares an array of tables and chunk offsets. + */ +export default async (conversion: Conversion, tableName: string, haveDataChunksProcessed: boolean): Promise => { + if (haveDataChunksProcessed) { + return; + } + + // Determine current table size, apply "chunking". + const originalTableName: string = extraConfigProcessor.getTableName(conversion, tableName, true); + let sql: string = `SELECT (data_length / 1024 / 1024) AS size_in_mb FROM information_schema.tables + WHERE table_schema = '${ conversion._mySqlDbName }' AND table_name = '${ originalTableName }';`; + + const dbAccess: DBAccess = new DBAccess(conversion); + const sizeQueryResult: DBAccessQueryResult = await dbAccess.query( + 'DataChunksProcessor::default', + sql, + DBVendors.MYSQL, + true, + true + ); + + const tableSizeInMb: number = +sizeQueryResult.data[0].size_in_mb; + const strSelectFieldList: string = arrangeColumnsData(conversion._dicTables[tableName].arrTableColumns, conversion._mysqlVersion); + sql = `SELECT COUNT(1) AS rows_count FROM \`${ originalTableName }\`;`; + const countResult: DBAccessQueryResult = await dbAccess.query( + 'DataChunksProcessor::default', + sql, + DBVendors.MYSQL, + true, + false, + sizeQueryResult.client + ); + + const rowsCnt: number = countResult.data[0].rows_count; + let chunksCnt: number = tableSizeInMb / conversion._dataChunkSize; + chunksCnt = chunksCnt < 1 ? 1 : chunksCnt; + const rowsInChunk: number = Math.ceil(rowsCnt / chunksCnt); + const arrDataPoolPromises: Promise[] = []; + const msg: string = `\t--[prepareDataChunks] Total rows to insert into "${ conversion._schema }"."${ tableName }": ${ rowsCnt }`; + log(conversion, msg, conversion._dicTables[tableName].tableLogPath); + + for (let offset: number = 0; offset < rowsCnt; offset += rowsInChunk) { + arrDataPoolPromises.push(new Promise(async resolveDataUnit => { + const strJson: string = `{"_tableName":"${ tableName }","_selectFieldList":"${ strSelectFieldList }", + "_offset":${ offset },"_rowsInChunk":${ rowsInChunk },"_rowsCnt":${ rowsCnt }`; + + // Define current data chunk size in MB. + // If there is only one chunk, then its size is equal to the table size. + // If there are more than one chunk, + // then a size of each chunk besides the last one is equal to "data_chunk_size", + // and a size of the last chunk is either "data_chunk_size" or tableSizeInMb % chunksCnt. + let currentChunkSizeInMb: number = 0; + + if (chunksCnt === 1) { + currentChunkSizeInMb = tableSizeInMb; + } else if (offset + rowsInChunk >= rowsCnt) { + currentChunkSizeInMb = tableSizeInMb % chunksCnt; + currentChunkSizeInMb = currentChunkSizeInMb || conversion._dataChunkSize; + } else { + currentChunkSizeInMb = conversion._dataChunkSize; + } + + sql = `INSERT INTO "${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }" + ("is_started", "json", "size_in_mb") VALUES (FALSE, '${ strJson }', ${ currentChunkSizeInMb });`; + + await dbAccess.query('DataChunksProcessor::default', sql, DBVendors.PG,true,false); + resolveDataUnit(); + })); + } + + await Promise.all(arrDataPoolPromises); +} diff --git a/src/DataLoader.js b/src/DataLoader.js index 81d309c..1e7437a 100644 --- a/src/DataLoader.js +++ b/src/DataLoader.js @@ -26,11 +26,11 @@ const csvStringify = require('./CsvStringifyModified'); const log = require('./Logger'); const generateError = require('./ErrorGenerator'); const connect = require('./Connector'); -const Conversion = require('./Classes/Conversion'); -const MessageToMaster = require('./Classes/MessageToMaster'); +const Conversion = require('./Conversion'); +const MessageToMaster = require('./MessageToMaster'); const { enforceConsistency } = require('./ConsistencyEnforcer'); const extraConfigProcessor = require('./ExtraConfigProcessor'); -const BufferStream = require('./Classes/BufferStream'); +const BufferStream = require('./BufferStream'); process.on('message', signal => { const self = new Conversion(signal.config); diff --git a/src/DataPipeManager.js b/src/DataPipeManager.js deleted file mode 100644 index 2a25980..0000000 --- a/src/DataPipeManager.js +++ /dev/null @@ -1,191 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const childProcess = require('child_process'); -const path = require('path'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); -const MessageToDataLoader = require('./Classes/MessageToDataLoader'); -const processConstraints = require('./ConstraintsProcessor'); -const decodeBinaryData = require('./BinaryDataDecoder'); - -/** - * Kill a process specified by the pid. - * - * @param {Number} pid - * - * @returns {undefined} - */ -const killProcess = pid => { - try { - process.kill(pid); - } catch (killError) { - generateError(self, '\t--[killProcess] ' + killError); - } -}; - -/** - * Check if all data chunks were processed. - * - * @param {Conversion} self - * - * @returns {Boolean} - */ -const dataPoolProcessed = self => { - return self._processedChunks === self._dataPool.length; -}; - -/** - * Get a size (in MB) of the smallest, non processed data chunk. - * If all data chunks are processed then return 0. - * - * @param {Conversion} self - * - * @returns {Number} - */ -const getSmallestDataChunkSizeInMb = self => { - for (let i = self._dataPool.length - 1; i >= 0; --i) { - if (self._dataPool[i]._processed === false) { - return self._dataPool[i]._size_in_mb; - } - } - - return 0; -}; - -/** - * Create an array of indexes, that point to data chunks, that will be processed during current COPY operation. - * - * @param {Conversion} self - * - * @returns {Array} - */ -const fillBandwidth = self => { - const dataChunkIndexes = []; - - /* - * Loop through the data pool from the beginning to the end. - * Note, the data pool is created with predefined order, the order by data chunk size descending. - * Note, the "bandwidth" variable represents an actual amount of data, that will be loaded during current COPY operation. - */ - for (let i = 0, bandwidth = 0; i < self._dataPool.length; ++i) { - /* - * Check if current chunk has already been marked as "processed". - * If yes, then continue to the next iteration. - */ - if (self._dataPool[i]._processed === false) { - // Sum a size of data chunks, that are yet to be processed. - bandwidth += self._dataPool[i]._size_in_mb; - - if (self._dataChunkSize - bandwidth >= getSmallestDataChunkSizeInMb(self)) { - /* - * Currently, the bandwidth is smaller than "data_chunk_size", - * and the difference between "data_chunk_size" and the bandwidth - * is larger or equal to currently-smallest data chunk. - * This means, that more data chunks can be processed during current COPY operation. - */ - dataChunkIndexes.push(i); - self._dataPool[i]._processed = true; - continue; - } - - if (self._dataChunkSize >= bandwidth) { - /* - * Currently, the "data_chunk_size" is greater or equal to the bandwidth. - * This means, that no more data chunks can be processed during current COPY operation. - * Current COPY operation will be performed with maximal possible bandwidth capacity. - */ - dataChunkIndexes.push(i); - self._dataPool[i]._processed = true; - break; - } - - /* - * This data chunk will not be processed during current COPY operation, because when it is added - * to the bandwidth, the bandwidth's size may become larger than "data_chunk_size". - * The bandwidth's value should be decreased prior the next iteration. - */ - bandwidth -= self._dataPool[i]._size_in_mb; - } - } - - return dataChunkIndexes; -}; - -/** - * Instructs DataLoader which data chunks should be loaded. - * No need to check the state-log. - * If dataPool's length is zero, then nmig will proceed to the next step. - * - * @param {Conversion} self - * @param {String} strDataLoaderPath - * @param {Object} options - * - * @returns {undefined} - */ -const pipeData = (self, strDataLoaderPath, options) => { - if (dataPoolProcessed(self)) { - return decodeBinaryData(self).then(processConstraints); - } - - const loaderProcess = childProcess.fork(strDataLoaderPath, options); - const bandwidth = fillBandwidth(self); - const chunksToLoad = bandwidth.map(index => { - return self._dataPool[index]; - }); - - loaderProcess.on('message', signal => { - if (typeof signal === 'object') { - self._dicTables[signal.tableName].totalRowsInserted += signal.rowsInserted; - const msg = '\t--[pipeData] For now inserted: ' + self._dicTables[signal.tableName].totalRowsInserted + ' rows, ' - + 'Total rows to insert into "' + self._schema + '"."' + signal.tableName + '": ' + signal.totalRowsToInsert; - - log(self, msg); - } else { - killProcess(loaderProcess.pid); - self._processedChunks += chunksToLoad.length; - return pipeData(self, strDataLoaderPath, options); - } - }); - - loaderProcess.send(new MessageToDataLoader(self._config, chunksToLoad)); -}; - -/** - * Manage the DataPipe. - * - * @param {Conversion} self - * - * @returns {undefined} - */ -module.exports = self => { - if (dataPoolProcessed(self)) { - return decodeBinaryData(self).then(processConstraints); - } - - const strDataLoaderPath = path.join(__dirname, 'DataLoader.js'); - const options = self._loaderMaxOldSpaceSize === 'DEFAULT' - ? Object.create(null) - : { execArgv: ['--max-old-space-size=' + self._loaderMaxOldSpaceSize] }; - - return pipeData(self, strDataLoaderPath, options); -}; diff --git a/src/DataPipeManager.ts b/src/DataPipeManager.ts new file mode 100644 index 0000000..b87267c --- /dev/null +++ b/src/DataPipeManager.ts @@ -0,0 +1,155 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import { ChildProcess } from 'child_process'; +import * as childProcess from 'child_process'; +import * as path from 'path'; +import log from './Logger'; +import Conversion from './Conversion'; +import generateError from './ErrorGenerator'; +import MessageToDataLoader from './MessageToDataLoader'; +import processConstraints from './ConstraintsProcessor'; +import decodeBinaryData from './BinaryDataDecoder'; + +/** + * Kills a process specified by the pid. + */ +function killProcess(pid: number, conversion: Conversion): void { + try { + process.kill(pid); + } catch (killError) { + generateError(conversion, `\t--[killProcess] ${ killError }`); + } +} + +/** + * Checks if all data chunks were processed. + */ +function dataPoolProcessed(conversion: Conversion): boolean { + return conversion._processedChunks === conversion._dataPool.length; +} + +/** + * Gets a size (in MB) of the smallest, non processed data chunk. + * If all data chunks are processed then returns 0. + */ +function getSmallestDataChunkSizeInMb(conversion: Conversion): number { + for (let i: number = conversion._dataPool.length - 1; i >= 0; --i) { + if (conversion._dataPool[i]._processed === false) { + return conversion._dataPool[i]._size_in_mb; + } + } + + return 0; +} + +/** + * Creates an array of indexes, that point to data chunks, that will be processed during current COPY operation. + */ +function fillBandwidth(conversion: Conversion): number[] { + const dataChunkIndexes: number[] = []; + + // Loop through the data pool from the beginning to the end. + // Note, the data pool is created with predefined order, the order by data chunk size descending. + // Note, the "bandwidth" variable represents an actual amount of data, that will be loaded during current COPY operation. + for (let i: number = 0, bandwidth = 0; i < conversion._dataPool.length; ++i) { + // Check if current chunk has already been marked as "processed". + // If yes, then continue to the next iteration. + if (conversion._dataPool[i]._processed === false) { + // Sum a size of data chunks, that are yet to be processed. + bandwidth += conversion._dataPool[i]._size_in_mb; + + if (conversion._dataChunkSize - bandwidth >= getSmallestDataChunkSizeInMb(conversion)) { + // Currently, the bandwidth is smaller than "data_chunk_size", + // and the difference between "data_chunk_size" and the bandwidth + // is larger or equal to currently-smallest data chunk. + // This means, that more data chunks can be processed during current COPY operation. + dataChunkIndexes.push(i); + conversion._dataPool[i]._processed = true; + continue; + } + + if (conversion._dataChunkSize >= bandwidth) { + // Currently, the "data_chunk_size" is greater or equal to the bandwidth. + // This means, that no more data chunks can be processed during current COPY operation. + // Current COPY operation will be performed with maximal possible bandwidth capacity. + dataChunkIndexes.push(i); + conversion._dataPool[i]._processed = true; + break; + } + + // This data chunk will not be processed during current COPY operation, because when it is added + // to the bandwidth, the bandwidth's size may become larger than "data_chunk_size". + // The bandwidth's value should be decreased prior the next iteration. + bandwidth -= conversion._dataPool[i]._size_in_mb; + } + } + + return dataChunkIndexes; +} + +/** + * Instructs DataLoader which data chunks should be loaded. + * No need to check the state-log. + * If dataPool's length is zero, then nmig will proceed to the next step. + */ +async function pipeData(conversion: Conversion, dataLoaderPath: string, options: any): Promise { + if (dataPoolProcessed(conversion)) { + conversion = await decodeBinaryData(conversion); + return processConstraints(conversion); + } + + const loaderProcess: ChildProcess = childProcess.fork(dataLoaderPath, options); + const bandwidth: number[] = fillBandwidth(conversion); + const chunksToLoad: any[] = bandwidth.map((index: number) => conversion._dataPool[index]); + + loaderProcess.on('message', (signal: any) => { + if (typeof signal === 'object') { + conversion._dicTables[signal.tableName].totalRowsInserted += signal.rowsInserted; + const msg: string = `\t--[pipeData] For now inserted: ${ conversion._dicTables[signal.tableName].totalRowsInserted } rows, + Total rows to insert into "${ conversion._schema }"."${ signal.tableName }": ${ signal.totalRowsToInsert }`; + + log(conversion, msg); + } else { + killProcess(loaderProcess.pid, conversion); + conversion._processedChunks += chunksToLoad.length; + return pipeData(conversion, dataLoaderPath, options); + } + }); + + loaderProcess.send(new MessageToDataLoader(conversion._config, chunksToLoad)); +} + +/** + * Manages the DataPipe. + */ +export default async function(conversion: Conversion): Promise { + if (dataPoolProcessed(conversion)) { + conversion = await decodeBinaryData(conversion); + return processConstraints(conversion); + } + + const dataLoaderPath: string = path.join(__dirname, 'DataLoader.js'); + const options: any = conversion._loaderMaxOldSpaceSize === 'DEFAULT' + ? Object.create(null) + : { execArgv: [`--max-old-space-size=${ conversion._loaderMaxOldSpaceSize }`] }; + + return pipeData(conversion, dataLoaderPath, options); +} diff --git a/src/DataPoolManager.js b/src/DataPoolManager.js deleted file mode 100644 index 93d0ca5..0000000 --- a/src/DataPoolManager.js +++ /dev/null @@ -1,138 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const connect = require('./Connector'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); - -/** - * Create the "{schema}"."data_pool_{self._schema + self._mySqlDbName} temporary table." - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports.createDataPoolTable = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[DataPoolManager.createDataPoolTable] Cannot connect to PostgreSQL server...\n' + error); - process.exit(); - } else { - const sql = 'CREATE TABLE IF NOT EXISTS "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName - + '"("id" BIGSERIAL, "json" TEXT, "is_started" BOOLEAN, "size_in_mb" DOUBLE PRECISION);'; - - client.query(sql, err => { - done(); - - if (err) { - generateError(self, '\t--[DataPoolManager.createDataPoolTable] ' + err, sql); - process.exit(); - } else { - log(self, '\t--[DataPoolManager.createDataPoolTable] table "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName + '" is created...'); - resolve(self); - } - }); - } - }); - }); - }); -}; - -/** - * Drop the "{schema}"."data_pool_{self._schema + self._mySqlDbName} temporary table." - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports.dropDataPoolTable = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[DataPoolManager.dropDataPoolTable] Cannot connect to PostgreSQL server...\n' + error); - resolve(); - } else { - const sql = 'DROP TABLE "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName + '";'; - - client.query(sql, err => { - done(); - - if (err) { - generateError(self, '\t--[DataPoolManager.dropDataPoolTable] ' + err, sql); - } else { - log(self, '\t--[DataPoolManager.dropDataPoolTable] table "' + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName + '" is dropped...'); - } - - resolve(); - }); - } - }); - }); - }); -}; - -/** - * Reads temporary table, and generates Data-pool. - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports.readDataPool = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[DataPoolManager.readDataPool] Cannot connect to PostgreSQL server...\n' + error); - process.exit(); - } else { - const sql = 'SELECT id AS id, json AS json, size_in_mb AS size_in_mb FROM "' - + self._schema + '"."data_pool_' + self._schema + self._mySqlDbName - + '" ORDER BY size_in_mb DESC;'; - - client.query(sql, (err, arrDataPool) => { - done(); - - if (err) { - generateError(self, '\t--[DataPoolManager.readDataPool] ' + err, sql); - process.exit(); - } - - for (let i = 0; i < arrDataPool.rows.length; ++i) { - const obj = JSON.parse(arrDataPool.rows[i].json); - obj._id = arrDataPool.rows[i].id; - obj._size_in_mb = +arrDataPool.rows[i].size_in_mb; - obj._processed = false; - self._dataPool.push(obj); - } - - log(self, '\t--[DataPoolManager.readDataPool] Data-Pool is loaded...'); - resolve(self); - }); - } - }); - }); - }); -}; diff --git a/src/DataPoolManager.ts b/src/DataPoolManager.ts new file mode 100644 index 0000000..4dead1f --- /dev/null +++ b/src/DataPoolManager.ts @@ -0,0 +1,71 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; +import log from './Logger'; +import Conversion from './Conversion'; + +/** + * Creates the "{schema}"."data_pool_{self._schema + self._mySqlDbName}" temporary table. + */ +export async function createDataPoolTable(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`; + const sql: string = `CREATE TABLE IF NOT EXISTS ${ table } + ("id" BIGSERIAL, "json" TEXT, "is_started" BOOLEAN, "size_in_mb" DOUBLE PRECISION);`; + + await dbAccess.query('DataPoolManager::createDataPoolTable', sql, DBVendors.PG, true, false); + log(conversion, `\t--[DataPoolManager.createDataPoolTable] table ${ table } is created...`); + return conversion; +} + +/** + * Drops the "{schema}"."data_pool_{self._schema + self._mySqlDbName}" temporary table. + */ +export async function dropDataPoolTable(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`; + const sql: string = `DROP TABLE ${ table };`; + await dbAccess.query('DataPoolManager::dropDataPoolTable', sql, DBVendors.PG, false, false); + log(conversion, `\t--[DataPoolManager.dropDataPoolTable] table ${ table } is dropped...`); +} + +/** + * Reads temporary table, and generates Data-pool. + */ +export async function readDataPool(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const table: string = `"${ conversion._schema }"."data_pool_${ conversion._schema }${ conversion._mySqlDbName }"`; + const sql: string = `SELECT id AS id, json AS json, size_in_mb AS size_in_mb FROM ${ table } ORDER BY size_in_mb DESC;`; + const result: DBAccessQueryResult = await dbAccess.query('DataPoolManager::dropDataPoolTable', sql, DBVendors.PG, true, false); + + result.data.rows.forEach((row: any) => { + const obj: any = JSON.parse(row.json); + obj._id = row.id; + obj._size_in_mb = +row.size_in_mb; + obj._processed = false; + conversion._dataPool.push(obj); + }); + + log(conversion, '\t--[DataPoolManager.readDataPool] Data-Pool is loaded...'); + return conversion; +} diff --git a/src/DataTypesMapReader.js b/src/DataTypesMapReader.ts similarity index 72% rename from src/DataTypesMapReader.js rename to src/DataTypesMapReader.ts index d7521a1..8e153fa 100644 --- a/src/DataTypesMapReader.js +++ b/src/DataTypesMapReader.ts @@ -18,29 +18,23 @@ * * @author Anatoly Khaytovich */ -'use strict'; - -const fs = require('fs'); +import * as fs from 'fs'; +import Conversion from './Conversion'; /** * Reads "./config/data_types_map.json" and converts its json content to js object. - * Appends this object to "FromMySQL2PostgreSQL" instance. - * - * @param {Conversion} self - * - * @returns {Promise} */ -module.exports = self => { +export default (conversion: Conversion): Promise => { return new Promise(resolve => { - fs.readFile(self._dataTypesMapAddr, (error, data) => { + fs.readFile(conversion._dataTypesMapAddr, (error: Error, data: Buffer) => { if (error) { - console.log('\t--[readDataTypesMap] Cannot read "DataTypesMap" from ' + self._dataTypesMapAddr); + console.log(`\t--[readDataTypesMap] Cannot read "DataTypesMap" from ${conversion._dataTypesMapAddr}`); process.exit(); } - self._dataTypesMap = JSON.parse(data); + conversion._dataTypesMap = JSON.parse(data.toString()); console.log('\t--[readDataTypesMap] Data Types Map is loaded...'); - resolve(self); + resolve(conversion); }); }); -}; +} diff --git a/src/EnumProcessor.js b/src/EnumProcessor.js deleted file mode 100644 index bae8eb1..0000000 --- a/src/EnumProcessor.js +++ /dev/null @@ -1,95 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const connect = require('./Connector'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); -const extraConfigProcessor = require('./ExtraConfigProcessor'); - -/** - * Define which columns of the given table are of type "enum". - * Set an appropriate constraint, if need. - * - * @param {Conversion} self - * @param {String} tableName - * - * @returns {Promise} - */ -module.exports = (self, tableName) => { - return connect(self).then(() => { - return new Promise(resolve => { - log(self, '\t--[processEnum] Defines "ENUMs" for table "' + self._schema + '"."' + tableName + '"', self._dicTables[tableName].tableLogPath); - const processEnumPromises = []; - const originalTableName = extraConfigProcessor.getTableName(self, tableName, true); - - for (let i = 0; i < self._dicTables[tableName].arrTableColumns.length; ++i) { - if (self._dicTables[tableName].arrTableColumns[i].Type.indexOf('(') !== -1) { - const arrType = self._dicTables[tableName].arrTableColumns[i].Type.split('('); - - if (arrType[0] === 'enum') { - processEnumPromises.push( - new Promise(resolveProcessEnum => { - self._pg.connect((error, client, done) => { - if (error) { - const msg = '\t--[processEnum] Cannot connect to PostgreSQL server...\n' + error; - generateError(self, msg); - resolveProcessEnum(); - } else { - const columnName = extraConfigProcessor.getColumnName( - self, - originalTableName, - self._dicTables[tableName].arrTableColumns[i].Field, - false - ); - - const sql = 'ALTER TABLE "' + self._schema + '"."' + tableName + '" ' - + 'ADD CHECK ("' + columnName + '" IN (' + arrType[1] + ');'; - - client.query(sql, err => { - done(); - - if (err) { - const msg2 = '\t--[processEnum] Error while setting ENUM for "' + self._schema + '"."' - + tableName + '"."' + columnName + '"...\n' + err; - - generateError(self, msg2, sql); - resolveProcessEnum(); - } else { - const success = '\t--[processEnum] Set "ENUM" for "' + self._schema + '"."' + tableName - + '"."' + columnName + '"...'; - - log(self, success, self._dicTables[tableName].tableLogPath); - resolveProcessEnum(); - } - }); - } - }); - }) - ); - } - } - } - - Promise.all(processEnumPromises).then(() => resolve()); - }); - }); -}; diff --git a/src/EnumProcessor.ts b/src/EnumProcessor.ts new file mode 100644 index 0000000..a291432 --- /dev/null +++ b/src/EnumProcessor.ts @@ -0,0 +1,58 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import log from './Logger'; +import Conversion from './Conversion'; +import DBAccess from './DBAccess'; +import DBVendors from './DBVendors'; +import * as extraConfigProcessor from './ExtraConfigProcessor'; + +/** + * Defines which columns of the given table are of type "enum". + * Sets an appropriate constraint, if need. + */ +export default async function(conversion: Conversion, tableName: string): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const msg: string = `\t--[EnumProcessor] Defines "ENUMs" for table "${ conversion._schema }"."${ tableName }"`; + log(conversion, msg, conversion._dicTables[tableName].tableLogPath); + const originalTableName: string = extraConfigProcessor.getTableName(conversion, tableName, true); + + const processEnumPromises: Promise[] = conversion._dicTables[tableName].arrTableColumns.map(async (column: any) => { + if (column.Type.indexOf('(') !== -1) { + const arrType: string[] = column.Type.split('('); + + if (arrType[0] === 'enum') { + const columnName: string = extraConfigProcessor.getColumnName( + conversion, + originalTableName, + column.Field, + false + ); + + const sql: string = `ALTER TABLE "${ conversion._schema }"."${ tableName }" ADD CHECK ("${ columnName }" IN (${ arrType[1] });`; + await dbAccess.query('EnumProcessor', sql, DBVendors.PG, false, false); + const successMsg: string = `\t--[EnumProcessor] Set "ENUM" for "${ conversion._schema }"."${ tableName }"."${ columnName }"...`; + log(conversion, successMsg, conversion._dicTables[tableName].tableLogPath); + } + } + }); + + await Promise.all(processEnumPromises); +} diff --git a/src/ErrorGenerator.js b/src/ErrorGenerator.ts similarity index 72% rename from src/ErrorGenerator.js rename to src/ErrorGenerator.ts index fe70ec0..8bbbe9d 100644 --- a/src/ErrorGenerator.js +++ b/src/ErrorGenerator.ts @@ -18,26 +18,19 @@ * * @author Anatoly Khaytovich */ -'use strict'; - -const fs = require('fs'); -const log = require('./Logger'); +import * as fs from 'fs'; +import log from './Logger'; +import Conversion from './Conversion'; /** * Writes a ditailed error message to the "/errors-only.log" file - * - * @param {Conversion} self - * @param {String} message - * @param {String} sql - * - * @returns {undefined} */ -module.exports = (self, message, sql = '') => { - message += '\n\n\tSQL: ' + sql + '\n\n'; - const buffer = Buffer.from(message, self._encoding); - log(self, message, undefined, true); +export default (conversion: Conversion, message: string, sql: string = ''): void => { + message += `\n\n\tSQL: ${sql}\n\n`; + const buffer: Buffer = Buffer.from(message, conversion._encoding); + log(conversion, message, undefined, true); - fs.open(self._errorLogsPath, 'a', self._0777, (error, fd) => { + fs.open(conversion._errorLogsPath, 'a', conversion._0777, (error: Error, fd: number) => { if (!error) { fs.write(fd, buffer, 0, buffer.length, null, () => { fs.close(fd, () => { @@ -46,4 +39,4 @@ module.exports = (self, message, sql = '') => { }); } }); -}; +} diff --git a/src/ExtraConfigProcessor.js b/src/ExtraConfigProcessor.js deleted file mode 100644 index 48e85fc..0000000 --- a/src/ExtraConfigProcessor.js +++ /dev/null @@ -1,100 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -/** - * Get current table's name. - * - * @param {Conversion} self - * @param {String} currentTableName - * @param {Boolean} shouldGetOriginal - * - * @returns {String} - */ -module.exports.getTableName = (self, currentTableName, shouldGetOriginal) => { - if (self._extraConfig !== null && 'tables' in self._extraConfig) { - for (let i = 0; i < self._extraConfig.tables.length; ++i) { - if ((shouldGetOriginal ? self._extraConfig.tables[i].name.new : self._extraConfig.tables[i].name.original) === currentTableName) { - return shouldGetOriginal ? self._extraConfig.tables[i].name.original : self._extraConfig.tables[i].name.new; - } - } - } - - return currentTableName; -}; - -/** - * Get current column's name. - * - * @param {Conversion} self - * @param {String} originalTableName - * @param {String} currentColumnName - * @param {Boolean} shouldGetOriginal - * - * @returns {String} - */ -module.exports.getColumnName = (self, originalTableName, currentColumnName, shouldGetOriginal) => { - if (self._extraConfig !== null && 'tables' in self._extraConfig) { - for (let i = 0; i < self._extraConfig.tables.length; ++i) { - if (self._extraConfig.tables[i].name.original === originalTableName && 'columns' in self._extraConfig.tables[i]) { - for (let columnsCount = 0; columnsCount < self._extraConfig.tables[i].columns.length; ++columnsCount) { - if (self._extraConfig.tables[i].columns[columnsCount].original === currentColumnName) { - return shouldGetOriginal - ? self._extraConfig.tables[i].columns[columnsCount].original - : self._extraConfig.tables[i].columns[columnsCount].new; - } - } - } - } - } - - return currentColumnName; -}; - -/** - * Parse the extra_config foreign_keys attributes and generate - * an output array required by ForeignKeyProcessor::processForeignKeyWorker. - * - * @param {Conversion} self - * @param {String} tableName - * - * @returns {Array} - */ -module.exports.parseForeignKeys = (self, tableName) => { - const retVal = []; - - if (self._extraConfig !== null && 'foreign_keys' in self._extraConfig) { - for (let i = 0; i < self._extraConfig.foreign_keys.length; ++i) { - if (self._extraConfig.foreign_keys[i].table_name === tableName) { - // There may be several FKs in a single table. - const objFk = Object.create(null); - - for (const attribute in self._extraConfig.foreign_keys[i]) { - objFk[attribute.toUpperCase()] = self._extraConfig.foreign_keys[i][attribute]; - } - - retVal.push(objFk); - } - } - } - - return retVal; -}; diff --git a/src/ExtraConfigProcessor.ts b/src/ExtraConfigProcessor.ts new file mode 100644 index 0000000..8f0a11d --- /dev/null +++ b/src/ExtraConfigProcessor.ts @@ -0,0 +1,82 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import Conversion from './Conversion'; + +/** + * Retrieves current table's name. + */ +export function getTableName(conversion: Conversion, currentTableName: string, shouldGetOriginal: boolean): string { + if (conversion._extraConfig !== null && 'tables' in conversion._extraConfig) { + for (let i: number = 0; i < conversion._extraConfig.tables.length; ++i) { + if ((shouldGetOriginal ? conversion._extraConfig.tables[i].name.new : conversion._extraConfig.tables[i].name.original) === currentTableName) { + return shouldGetOriginal ? conversion._extraConfig.tables[i].name.original : conversion._extraConfig.tables[i].name.new; + } + } + } + + return currentTableName; +} + +/** + * Retrieves current column's name. + */ +export function getColumnName(conversion: Conversion, originalTableName: string, currentColumnName: string, shouldGetOriginal: boolean): string { + if (conversion._extraConfig !== null && 'tables' in conversion._extraConfig) { + for (let i: number = 0; i < conversion._extraConfig.tables.length; ++i) { + if (conversion._extraConfig.tables[i].name.original === originalTableName && 'columns' in conversion._extraConfig.tables[i]) { + for (let columnsCount: number = 0; columnsCount < conversion._extraConfig.tables[i].columns.length; ++columnsCount) { + if (conversion._extraConfig.tables[i].columns[columnsCount].original === currentColumnName) { + return shouldGetOriginal + ? conversion._extraConfig.tables[i].columns[columnsCount].original + : conversion._extraConfig.tables[i].columns[columnsCount].new; + } + } + } + } + } + + return currentColumnName; +} + +/** + * Parses the extra_config foreign_keys attributes and generate + * an output array required by ForeignKeyProcessor::processForeignKeyWorker. + */ +export function parseForeignKeys(conversion: Conversion, tableName: string): any[] { + const retVal: any[] = []; + + if (conversion._extraConfig !== null && 'foreign_keys' in conversion._extraConfig) { + for (let i: number = 0; i < conversion._extraConfig.foreign_keys.length; ++i) { + if (conversion._extraConfig.foreign_keys[i].table_name === tableName) { + // There may be several FKs in a single table. + const objFk: any = Object.create(null); + + for (const attribute in conversion._extraConfig.foreign_keys[i]) { + objFk[attribute.toUpperCase()] = conversion._extraConfig.foreign_keys[i][attribute]; + } + + retVal.push(objFk); + } + } + } + + return retVal; +} diff --git a/src/Logger.ts b/src/Logger.ts index f00e3b7..6142cd4 100644 --- a/src/Logger.ts +++ b/src/Logger.ts @@ -19,33 +19,26 @@ * @author Anatoly Khaytovich */ import * as fs from 'fs'; -import Conversion from './Classes/Conversion'; +import Conversion from './Conversion'; /** * Outputs given log. * Writes given log to the "/all.log" file. * If necessary, writes given log to the "/{tableName}.log" file. - * - * @param {Conversion} self - * @param {String} log - * @param {String} tableLogPath - * @param {Boolean} isErrorLog - * - * @returns {void} */ -export default (self: Conversion, log: string, tableLogPath?: string, isErrorLog?: boolean): void => { - const buffer: Buffer = Buffer.from(log + '\n\n', self._encoding); +export default (conversion: Conversion, log: string, tableLogPath?: string, isErrorLog?: boolean): void => { + const buffer: Buffer = Buffer.from(`${log}\n\n`, conversion._encoding); if (!isErrorLog) { console.log(log); } - fs.open(self._allLogsPath, 'a', self._0777, (error, fd) => { + fs.open(conversion._allLogsPath, 'a', conversion._0777, (error: Error, fd: number) => { if (!error) { fs.write(fd, buffer, 0, buffer.length, null, () => { fs.close(fd, () => { if (tableLogPath) { - fs.open(tableLogPath, 'a', self._0777, (error, fd) => { + fs.open(tableLogPath, 'a', conversion._0777, (error: Error, fd: number) => { if (!error) { fs.write(fd, buffer, 0, buffer.length, null, () => { fs.close(fd, () => { @@ -59,4 +52,4 @@ export default (self: Conversion, log: string, tableLogPath?: string, isErrorLog }); } }); -}; +} diff --git a/src/Main.ts b/src/Main.ts index a228b0e..929c133 100644 --- a/src/Main.ts +++ b/src/Main.ts @@ -21,7 +21,7 @@ import * as fs from 'fs'; import * as path from 'path'; import readDataTypesMap from './DataTypesMapReader'; -import Conversion from './Classes/Conversion'; +import Conversion from './Conversion'; import SchemaProcessor from './SchemaProcessor'; import loadStructureToMigrate from './StructureLoader'; import pipeData from './DataPipeManager'; @@ -29,24 +29,25 @@ import boot from './BootProcessor'; import { createStateLogsTable } from './MigrationStateManager'; import { createDataPoolTable, readDataPool } from './DataPoolManager'; import log from './Logger'; +import { Stats } from 'fs'; const Main = class { /** * Read the configuration file. */ - readConfig(baseDir: string, configFileName: string = 'config.json'): Promise { + public readConfig(baseDir: string, configFileName: string = 'config.json'): Promise { return new Promise(resolve => { const strPathToConfig = path.join(baseDir, 'config', configFileName); - fs.readFile(strPathToConfig, (error: Error, data: any) => { + fs.readFile(strPathToConfig, (error: Error, data: Buffer) => { if (error) { console.log(`\n\t--Cannot run migration\nCannot read configuration info from ${ strPathToConfig }`); process.exit(); } - const config = JSON.parse(data); - config.logsDirPath = path.join(baseDir, 'logs_directory'); + const config: any = JSON.parse(data.toString()); + config.logsDirPath = path.join(baseDir, 'logs_directory'); config.dataTypesMapAddr = path.join(baseDir, 'config', 'data_types_map.json'); resolve(config); }); @@ -56,7 +57,7 @@ const Main = class { /** * Read the extra configuration file, if necessary. */ - readExtraConfig(config: any, baseDir: string): Promise { + public readExtraConfig(config: any, baseDir: string): Promise { return new Promise(resolve => { if (config.enable_extra_config !== true) { config.extraConfig = null; @@ -65,13 +66,13 @@ const Main = class { const strPathToExtraConfig = path.join(baseDir, 'config', 'extra_config.json'); - fs.readFile(strPathToExtraConfig, (error: Error, data: any) => { + fs.readFile(strPathToExtraConfig, (error: Error, data: Buffer) => { if (error) { console.log(`\n\t--Cannot run migration\nCannot read configuration info from ${ strPathToExtraConfig }`); process.exit(); } - config.extraConfig = JSON.parse(data); + config.extraConfig = JSON.parse(data.toString()); resolve(config); }); }); @@ -80,17 +81,17 @@ const Main = class { /** * Initialize Conversion instance. */ - initializeConversion(config: any): Promise { + public initializeConversion(config: any): Promise { return Promise.resolve(new Conversion(config)); } /** * Creates logs directory. */ - createLogsDirectory(self: Conversion): Promise { + public createLogsDirectory(self: Conversion): Promise { return new Promise(resolve => { console.log('\t--[DirectoriesManager.createLogsDirectory] Creating logs directory...'); - fs.stat(self._logsDirPath, (directoryDoesNotExist, stat) => { + fs.stat(self._logsDirPath, (directoryDoesNotExist: Error, stat: Stats) => { if (directoryDoesNotExist) { fs.mkdir(self._logsDirPath, self._0777, e => { if (e) { @@ -117,20 +118,16 @@ const Main = class { }; module.exports = Main; -const app = new Main(); -const baseDir = path.join(__dirname, '..', '..'); +const app = new Main(); +const baseDir: string = path.join(__dirname, '..', '..'); app.readConfig(baseDir) - .then(config => { - return app.readExtraConfig(config, baseDir); - }) + .then(config => app.readExtraConfig(config, baseDir)) .then(app.initializeConversion) .then(boot) .then(readDataTypesMap) .then(app.createLogsDirectory) - .then(conversion => { - return (new SchemaProcessor(conversion)).createSchema(); - }) + .then(conversion => (new SchemaProcessor(conversion)).createSchema()) .then(createStateLogsTable) .then(createDataPoolTable) .then(loadStructureToMigrate) diff --git a/src/Classes/MessageToDataLoader.js b/src/MessageToDataLoader.ts similarity index 75% rename from src/Classes/MessageToDataLoader.js rename to src/MessageToDataLoader.ts index 49c0257..392fd0f 100644 --- a/src/Classes/MessageToDataLoader.js +++ b/src/MessageToDataLoader.ts @@ -18,19 +18,23 @@ * * @author Anatoly Khaytovich */ -'use strict'; +export default class MessageToDataLoader { + /** + * Parsed Nmig's configuration object. + */ + public readonly config: any; + + /** + * An array of data chunks. + */ + public readonly chunks: any[]; -module.exports = class MessageToDataLoader { /** * Representation of a message of the master process to DataLoader process. - * Contents migration's configuration and an array of "data-chunks". - * Constructor. - * - * @param {Object} config - * @param {Array} chunks + * Contains migration's configuration and an array of "data-chunks". */ - constructor(config, chunks) { + public constructor(config: any, chunks: any[]) { this.config = config; this.chunks = chunks; } -}; +} diff --git a/src/Classes/MessageToMaster.js b/src/MessageToMaster.js similarity index 100% rename from src/Classes/MessageToMaster.js rename to src/MessageToMaster.js diff --git a/src/MigrationStateManager.js b/src/MigrationStateManager.js deleted file mode 100644 index 4083abf..0000000 --- a/src/MigrationStateManager.js +++ /dev/null @@ -1,195 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const connect = require('./Connector'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); - -/** - * Get state-log. - * - * @param {Conversion} self - * @param {String} param - * - * @returns {Promise} - */ -module.exports.get = (self, param) => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[MigrationStateManager.get] Cannot connect to PostgreSQL server...\n' + error); - resolve(false); - } else { - const sql = 'SELECT ' + param + ' FROM "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '";'; - - client.query(sql, (err, data) => { - done(); - - if (err) { - generateError(self, '\t--[MigrationStateManager.get] ' + err, sql); - resolve(false); - } else { - resolve(data.rows[0][param]); - } - }); - } - }); - }); - }); -}; - -/** - * Update the state-log. - * - * @param {Conversion} self - * @param {String} param - * - * @returns {Promise} - */ -module.exports.set = (self, param) => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[MigrationStateManager.set] Cannot connect to PostgreSQL server...\n' + error); - resolve(); - } else { - const sql = 'UPDATE "' + self._schema + '"."state_logs_' - + self._schema + self._mySqlDbName + '" SET ' + param + ' = TRUE;'; - - client.query(sql, err => { - done(); - - if (err) { - generateError(self, '\t--[MigrationStateManager.set] ' + err, sql); - } - - resolve(); - }); - } - }); - }); - }); -}; - -/** - * Create the "{schema}"."state_logs_{self._schema + self._mySqlDbName} temporary table." - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports.createStateLogsTable = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[createStateLogsTable] Cannot connect to PostgreSQL server...\n' + error); - process.exit(); - } else { - let sql = 'CREATE TABLE IF NOT EXISTS "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName - + '"(' - + '"tables_loaded" BOOLEAN,' - + '"per_table_constraints_loaded" BOOLEAN,' - + '"foreign_keys_loaded" BOOLEAN,' - + '"views_loaded" BOOLEAN' - + ');'; - - client.query(sql, err => { - if (err) { - done(); - generateError(self, '\t--[createStateLogsTable] ' + err, sql); - process.exit(); - } else { - sql = 'SELECT COUNT(1) AS cnt FROM "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '";'; - client.query(sql, (errorCount, result) => { - if (errorCount) { - done(); - generateError(self, '\t--[createStateLogsTable] ' + errorCount, sql); - process.exit(); - } else if (+result.rows[0].cnt === 0) { - sql = 'INSERT INTO "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName - + '" VALUES(FALSE, FALSE, FALSE, FALSE);'; - - client.query(sql, errorInsert => { - done(); - - if (errorInsert) { - generateError(self, '\t--[createStateLogsTable] ' + errorInsert, sql); - process.exit(); - } else { - const msg = '\t--[createStateLogsTable] table "' + self._schema + '"."state_logs_' - + self._schema + self._mySqlDbName + '" is created...'; - - log(self, msg); - resolve(self); - } - }); - } else { - const msg2 = '\t--[createStateLogsTable] table "' + self._schema + '"."state_logs_' - + self._schema + self._mySqlDbName + '" is created...'; - - log(self, msg2); - resolve(self); - } - }); - } - }); - } - }); - }); - }); -}; - -/** - * Drop the "{schema}"."state_logs_{self._schema + self._mySqlDbName} temporary table." - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports.dropStateLogsTable = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[dropStateLogsTable] Cannot connect to PostgreSQL server...\n' + error); - resolve(); - } else { - const sql = 'DROP TABLE "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '";'; - client.query(sql, err => { - done(); - - if (err) { - generateError(self, '\t--[dropStateLogsTable] ' + err, sql); - } else { - log(self, '\t--[dropStateLogsTable] table "' + self._schema + '"."state_logs_' + self._schema + self._mySqlDbName + '" is dropped...'); - } - - resolve(); - }); - } - }); - }); - }); -}; diff --git a/src/MigrationStateManager.ts b/src/MigrationStateManager.ts new file mode 100644 index 0000000..2e29ccd --- /dev/null +++ b/src/MigrationStateManager.ts @@ -0,0 +1,78 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; +import log from './Logger'; +import Conversion from './Conversion'; + +/** + * Retrieves state-log. + */ +export async function get(conversion: Conversion, param: string): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const sql: string = `SELECT ${ param } FROM "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }";`; + const result: DBAccessQueryResult = await dbAccess.query('MigrationStateManager::get', sql, DBVendors.PG, false, false); + return result.data.rows[0][param]; +} + +/** + * Updates the state-log. + */ +export async function set(conversion: Conversion, param: string): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const sql: string = `UPDATE "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }" SET ${ param } = TRUE;`; + await dbAccess.query('MigrationStateManager::set', sql, DBVendors.PG, false, false); +} + +/** + * Creates the "{schema}"."state_logs_{self._schema + self._mySqlDbName}" temporary table. + */ +export async function createStateLogsTable(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + let sql: string = `CREATE TABLE IF NOT EXISTS "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }"( + "tables_loaded" BOOLEAN, "per_table_constraints_loaded" BOOLEAN, "foreign_keys_loaded" BOOLEAN, "views_loaded" BOOLEAN');`; + + let result: DBAccessQueryResult = await dbAccess.query('MigrationStateManager::createStateLogsTable', sql, DBVendors.PG, true, true); + sql = `SELECT COUNT(1) AS cnt FROM "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }";`; + result = await dbAccess.query('MigrationStateManager::createStateLogsTable', sql, DBVendors.PG, true, true, result.client); + + if (+result.data.rows[0].cnt === 0) { + sql = `INSERT INTO "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }" VALUES (FALSE, FALSE, FALSE, FALSE);`; + await await dbAccess.query('MigrationStateManager::createStateLogsTable', sql, DBVendors.PG, true, false, result.client); + return conversion; + } + + const msg: string = '\t--[MigrationStateManager::createStateLogsTable] table ' + + '"${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }" is created...'; + + log(conversion, msg); + return conversion; +} + +/** + * Drop the "{schema}"."state_logs_{self._schema + self._mySqlDbName}" temporary table. + */ +export async function dropStateLogsTable(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const sql: string = `DROP TABLE "${ conversion._schema }"."state_logs_${ conversion._schema }${ conversion._mySqlDbName }";`; + await dbAccess.query('MigrationStateManager::dropStateLogsTable', sql, DBVendors.PG, false, false); +} diff --git a/src/ReportGenerator.js b/src/ReportGenerator.js deleted file mode 100644 index c02b005..0000000 --- a/src/ReportGenerator.js +++ /dev/null @@ -1,54 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const log = require('./Logger'); - -/** - * Generates a summary report. - * - * @param {Conversion} self - * @param {String} endMsg - * - * @returns {undefined} - */ -module.exports = (self, endMsg) => { - let differenceSec = ((new Date()) - self._timeBegin) / 1000; - let seconds = Math.floor(differenceSec % 60); - differenceSec = differenceSec / 60; - let minutes = Math.floor(differenceSec % 60); - let hours = Math.floor(differenceSec / 60); - hours = hours < 10 ? '0' + hours : hours; - minutes = minutes < 10 ? '0' + minutes : minutes; - seconds = seconds < 10 ? '0' + seconds : seconds; - const output = '\t--[generateReport] ' + endMsg - + '\n\t--[generateReport] Total time: ' + hours + ':' + minutes + ':' + seconds - + '\n\t--[generateReport] (hours:minutes:seconds)'; - - log(self, output); - - if (self._runsInTestMode) { - self._eventEmitter.emit(self._migrationCompletedEvent); - return; - } - - process.exit(); -}; diff --git a/src/ReportGenerator.ts b/src/ReportGenerator.ts new file mode 100644 index 0000000..097faa5 --- /dev/null +++ b/src/ReportGenerator.ts @@ -0,0 +1,49 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import log from './Logger'; +import Conversion from './Conversion'; +import { EventEmitter } from 'events'; + +/** + * Generates a summary report. + */ +export default (conversion: Conversion, endMsg: string): void => { + let differenceSec: number = ((new Date()).getTime() - conversion._timeBegin.getTime()) / 1000; + const seconds: number = Math.floor(differenceSec % 60); + differenceSec = differenceSec / 60; + const minutes: number = Math.floor(differenceSec % 60); + const hours: number = Math.floor(differenceSec / 60); + const formattedHours: string = hours < 10 ? `0${ hours }` : `${ hours }`; + const formattedMinutes: string = minutes < 10 ? `0${ minutes }` : `${ minutes }`; + const formattedSeconds: string = seconds < 10 ? `0${ seconds }` : `${ seconds }`; + const output: string = `\t--[generateReport] ${ endMsg } + \n\t--[generateReport] Total time: ${ formattedHours }:${ formattedMinutes }:${ formattedSeconds } + \n\t--[generateReport] (hours:minutes:seconds)`; + + log(conversion, output); + + if (conversion._runsInTestMode) { + (conversion._eventEmitter).emit(conversion._migrationCompletedEvent); + return; + } + + process.exit(); +} diff --git a/src/SchemaProcessor.ts b/src/SchemaProcessor.ts index 99756b2..f9c064a 100644 --- a/src/SchemaProcessor.ts +++ b/src/SchemaProcessor.ts @@ -18,9 +18,10 @@ * * @author Anatoly Khaytovich */ -import ConnectionEmitter from './ConnectionEmitter'; -import generateError from './ErrorGenerator'; -import Conversion from './Classes/Conversion'; +import Conversion from './Conversion'; +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; export default class SchemaProcessor { /** @@ -29,39 +30,31 @@ export default class SchemaProcessor { private readonly _conversion: Conversion; /** - * An instance of "ConnectionEmitter". + * An instance of "DBAccess". */ - private readonly _connectionEmitter: ConnectionEmitter; + private readonly _dbAccess: DBAccess; /** * SchemaProcessor constructor. */ public constructor(conversion: Conversion) { - this._conversion = conversion; - this._connectionEmitter = new ConnectionEmitter(this._conversion); + this._conversion = conversion; + this._dbAccess = new DBAccess(this._conversion); } /** * Create a new database schema if it does not exist yet. */ - public async createSchema(): Promise { - const client: any = await this._connectionEmitter.getPgClient(); + public async createSchema(): Promise { let sql: string = `SELECT schema_name FROM information_schema.schemata WHERE schema_name = '${ this._conversion._schema }';`; - try { - const result: any = await client.query(sql); + const result: DBAccessQueryResult = await this._dbAccess.query('SchemaProcessor::createSchema', sql, DBVendors.PG, true, true); - if (result.rows.length === 0) { - sql = `CREATE SCHEMA "${ this._conversion._schema }";`; - await client.query(sql); - } - - this._connectionEmitter.releasePgClient(client); - return Promise.resolve(this._conversion); - - } catch (err) { - generateError(this._conversion, `\t--[createSchema] ${ err }`, sql); - return Promise.reject(err); + if (result.data.rows.length === 0) { + sql = `CREATE SCHEMA "${ this._conversion._schema }";`; + await this._dbAccess.query('SchemaProcessor::createSchema', sql, DBVendors.PG, true, false, result.client); } + + return this._conversion; } -}; +} diff --git a/src/StructureLoader.js b/src/StructureLoader.js deleted file mode 100644 index 4f93789..0000000 --- a/src/StructureLoader.js +++ /dev/null @@ -1,152 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const Table = require('./Classes/Table'); -const { createTable } = require('./TableProcessor'); -const connect = require('./Connector'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); -const prepareDataChunks = require('./DataChunksProcessor'); -const migrationStateManager = require('./MigrationStateManager'); -const extraConfigProcessor = require('./ExtraConfigProcessor'); - -/** - * Processes current table before data loading. - * - * @param {Conversion} self - * @param {String} tableName - * @param {Boolean} stateLog - * - * @returns {Promise} - */ -const processTableBeforeDataLoading = (self, tableName, stateLog) => { - return connect(self).then(() => { - return createTable(self, tableName); - }).then(() => { - return prepareDataChunks(self, tableName, stateLog); - }).catch(() => { - generateError(self, '\t--[processTableBeforeDataLoading] Cannot create table "' + self._schema + '"."' + tableName + '"...'); - }); -} - -/** - * Get the MySQL version. - * - * @param {Conversion} self - * - * @returns {Promise} - */ -const getMySqlVersion = self => { - return connect(self).then(() => { - return new Promise(resolve => { - self._mysql.getConnection((error, connection) => { - if (error) { - // The connection is undefined. - generateError(self, '\t--[getMySqlVersion] Cannot connect to MySQL server...\n' + error); - resolve(); - } else { - const sql = 'SELECT VERSION() AS mysql_version;'; - connection.query(sql, (err, rows) => { - connection.release(); - - if (err) { - generateError(self, '\t--[getMySqlVersion] ' + err, sql); - resolve(); - } else { - const arrVersion = rows[0].mysql_version.split('.'); - const majorVersion = arrVersion[0]; - const minorVersion = arrVersion.slice(1).join(''); - self._mysqlVersion = +(majorVersion + '.' + minorVersion); - resolve(); - } - }); - } - }); - }); - }); -} - -/** - * Load source tables and views, that need to be migrated. - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports = self => { - return getMySqlVersion(self).then(() => { - return migrationStateManager.get(self, 'tables_loaded').then(haveTablesLoaded => { - return new Promise(resolve => { - self._mysql.getConnection((error, connection) => { - if (error) { - // The connection is undefined. - generateError(self, '\t--[loadStructureToMigrate] Cannot connect to MySQL server...\n' + error); - process.exit(); - } else { - const sql = 'SHOW FULL TABLES IN `' + self._mySqlDbName + '`;'; - connection.query(sql, (strErr, rows) => { - connection.release(); - - if (strErr) { - generateError(self, '\t--[loadStructureToMigrate] ' + strErr, sql); - process.exit(); - } else { - let tablesCnt = 0; - let viewsCnt = 0; - const processTablePromises = []; - - for (let i = 0; i < rows.length; ++i) { - let relationName = rows[i]['Tables_in_' + self._mySqlDbName]; - - if (rows[i].Table_type === 'BASE TABLE' && self._excludeTables.indexOf(relationName) === -1) { - relationName = extraConfigProcessor.getTableName(self, relationName, false); - self._tablesToMigrate.push(relationName); - self._dicTables[relationName] = new Table(self._logsDirPath + '/' + relationName + '.log'); - processTablePromises.push(processTableBeforeDataLoading(self, relationName, haveTablesLoaded)); - tablesCnt++; - } else if (rows[i].Table_type === 'VIEW') { - self._viewsToMigrate.push(relationName); - viewsCnt++; - } - } - - rows = null; - let message = '\t--[loadStructureToMigrate] Source DB structure is loaded...\n' - + '\t--[loadStructureToMigrate] Tables to migrate: ' + tablesCnt + '\n' - + '\t--[loadStructureToMigrate] Views to migrate: ' + viewsCnt; - - log(self, message); - - Promise.all(processTablePromises).then( - () => { - migrationStateManager.set(self, 'tables_loaded').then(() => resolve(self)); - }, - () => process.exit() - ); - } - }); - } - }); - }); - }); - }); -}; diff --git a/src/StructureLoader.ts b/src/StructureLoader.ts new file mode 100644 index 0000000..056cb5f --- /dev/null +++ b/src/StructureLoader.ts @@ -0,0 +1,89 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import DBAccess from './DBAccess'; +import DBAccessQueryResult from './DBAccessQueryResult'; +import DBVendors from './DBVendors'; +import log from './Logger'; +import Conversion from './Conversion'; +import Table from './Table'; +import { createTable } from './TableProcessor'; +import prepareDataChunks from './DataChunksProcessor'; +import * as migrationStateManager from './MigrationStateManager'; +import * as extraConfigProcessor from './ExtraConfigProcessor'; + +/** + * Processes current table before data loading. + */ +async function processTableBeforeDataLoading(conversion: Conversion, tableName: string, stateLog: boolean): Promise { + await createTable(conversion, tableName); + await prepareDataChunks(conversion, tableName, stateLog); +} + +/** + * Retrieves the source db (MySQL) version. + */ +async function getMySqlVersion(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + const sql: string = 'SELECT VERSION() AS mysql_version;'; + const result: DBAccessQueryResult = await dbAccess.query('StructureLoader::getMySqlVersion', sql, DBVendors.MYSQL, false, false); + const arrVersion: string[] = result.data[0].mysql_version.split('.'); + const majorVersion: string = arrVersion[0]; + const minorVersion: string = arrVersion.slice(1).join(''); + conversion._mysqlVersion = +(`${ majorVersion }.${ minorVersion }`); +} + +/** + * Loads source tables and views, that need to be migrated. + */ +export default async (conversion: Conversion): Promise => { + await getMySqlVersion(conversion); + const dbAccess: DBAccess = new DBAccess(conversion); + const haveTablesLoaded: boolean = await migrationStateManager.get(conversion, 'tables_loaded'); + const sql: string = `SHOW FULL TABLES IN \`${ conversion._mySqlDbName }\`;`; + const result: DBAccessQueryResult = await dbAccess.query('StructureLoader::default', sql, DBVendors.MYSQL, true, false); + let tablesCnt: number = 0; + let viewsCnt: number = 0; + const processTablePromises: Promise[] = []; + + result.data.forEach((row: any) => { + let relationName: string = row[`Tables_in_${ conversion._mySqlDbName }`]; + + if (row.Table_type === 'BASE TABLE' && conversion._excludeTables.indexOf(relationName) === -1) { + relationName = extraConfigProcessor.getTableName(conversion, relationName, false); + conversion._tablesToMigrate.push(relationName); + conversion._dicTables[relationName] = new Table(`${ conversion._logsDirPath }/${ relationName }.log`); + processTablePromises.push(processTableBeforeDataLoading(conversion, relationName, haveTablesLoaded)); + tablesCnt++; + } else if (row.Table_type === 'VIEW') { + conversion._viewsToMigrate.push(relationName); + viewsCnt++; + } + }); + + const message: string = `\t--[loadStructureToMigrate] Source DB structure is loaded...\n + \t--[loadStructureToMigrate] Tables to migrate: ${ tablesCnt }\n + \t--[loadStructureToMigrate] Views to migrate: ${ viewsCnt }`; + + log(conversion, message); + await Promise.all(processTablePromises); + await migrationStateManager.set(conversion, 'tables_loaded'); + return conversion; +} diff --git a/src/Table.ts b/src/Table.ts new file mode 100644 index 0000000..cbf145e --- /dev/null +++ b/src/Table.ts @@ -0,0 +1,45 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +export default class Table { + /** + * The path to current table's log file. + */ + public readonly tableLogPath: string; + + /** + * Representation of given table's columns metadata. + */ + public readonly arrTableColumns: any[]; + + /** + * Total rows inserted into given table. + */ + public totalRowsInserted: number; + + /** + * Constructor. + */ + public constructor(tableLogPath: string) { + this.tableLogPath = tableLogPath; + this.arrTableColumns = []; + this.totalRowsInserted = 0; + } +} diff --git a/src/VacuumProcessor.js b/src/VacuumProcessor.js deleted file mode 100644 index c95621e..0000000 --- a/src/VacuumProcessor.js +++ /dev/null @@ -1,78 +0,0 @@ -/* - * This file is a part of "NMIG" - the database migration tool. - * - * Copyright (C) 2016 - present, Anatoly Khaytovich - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program (please see the "LICENSE.md" file). - * If not, see . - * - * @author Anatoly Khaytovich - */ -'use strict'; - -const connect = require('./Connector'); -const log = require('./Logger'); -const generateError = require('./ErrorGenerator'); -const extraConfigProcessor = require('./ExtraConfigProcessor'); - -/** - * Runs "vacuum full" and "analyze". - * - * @param {Conversion} self - * - * @returns {Promise} - */ -module.exports = self => { - return connect(self).then(() => { - return new Promise(resolve => { - const vacuumPromises = []; - - for (let i = 0; i < self._tablesToMigrate.length; ++i) { - if (self._noVacuum.indexOf(extraConfigProcessor.getTableName(self, self._tablesToMigrate[i], true)) === -1) { - const msg = '\t--[runVacuumFullAndAnalyze] Running "VACUUM FULL and ANALYZE" query for table "' - + self._schema + '"."' + self._tablesToMigrate[i] + '"...'; - - log(self, msg); - vacuumPromises.push( - new Promise(resolveVacuum => { - self._pg.connect((error, client, done) => { - if (error) { - generateError(self, '\t--[runVacuumFullAndAnalyze] Cannot connect to PostgreSQL server...'); - resolveVacuum(); - } else { - const sql = 'VACUUM (FULL, ANALYZE) "' + self._schema + '"."' + self._tablesToMigrate[i] + '";'; - client.query(sql, err => { - done(); - - if (err) { - generateError(self, '\t--[runVacuumFullAndAnalyze] ' + err, sql); - resolveVacuum(); - } else { - const msg2 = '\t--[runVacuumFullAndAnalyze] Table "' + self._schema - + '"."' + self._tablesToMigrate[i] + '" is VACUUMed...'; - - log(self, msg2); - resolveVacuum(); - } - }); - } - }); - }) - ); - } - } - - Promise.all(vacuumPromises).then(() => resolve()); - }); - }); -}; diff --git a/src/VacuumProcessor.ts b/src/VacuumProcessor.ts new file mode 100644 index 0000000..69f8dc7 --- /dev/null +++ b/src/VacuumProcessor.ts @@ -0,0 +1,48 @@ +/* + * This file is a part of "NMIG" - the database migration tool. + * + * Copyright (C) 2016 - present, Anatoly Khaytovich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program (please see the "LICENSE.md" file). + * If not, see . + * + * @author Anatoly Khaytovich + */ +import log from './Logger'; +import Conversion from './Conversion'; +import DBAccess from './DBAccess'; +import DBVendors from './DBVendors'; +import * as extraConfigProcessor from './ExtraConfigProcessor'; + +/** + * Runs "vacuum full" and "analyze". + */ +export default async function(conversion: Conversion): Promise { + const dbAccess: DBAccess = new DBAccess(conversion); + + const vacuumPromises: Promise[] = conversion._tablesToMigrate.map(async (table: string) => { + if (conversion._noVacuum.indexOf(extraConfigProcessor.getTableName(conversion, table, true)) === -1) { + const msg: string = `\t--[runVacuumFullAndAnalyze] Running "VACUUM FULL and ANALYZE" query for table + "${ conversion._schema }"."${ table }"...`; + + log(conversion, msg); + + const sql: string = `VACUUM (FULL, ANALYZE) "${ conversion._schema }"."${ table }";`; + await dbAccess.query('runVacuumFullAndAnalyze', sql, DBVendors.PG, false, false); + const msgSuccess: string = `\t--[runVacuumFullAndAnalyze] Table "${ conversion._schema }"."${ table }" is VACUUMed...`; + log(conversion, msgSuccess); + } + }); + + await Promise.all(vacuumPromises); +} diff --git a/tsconfig.json b/tsconfig.json index 9a45c76..a118bd1 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -3,12 +3,14 @@ "removeComments": true, "module": "commonjs", "allowJs": true, - "target": "es6", + "target": "es2017", "sourceMap": true, "outDir": "./dist", "strict": true, "types": [ - "node" + "node", + "pg", + "mysql" ] },