Spaces:
Paused
Paused
| ; | |
| const process = require('process'); | |
| const Timers = require('timers'); | |
| const Readable = require('stream').Readable; | |
| const Command = require('./command.js'); | |
| const Packets = require('../packets/index.js'); | |
| const getTextParser = require('../parsers/text_parser.js'); | |
| const ServerStatus = require('../constants/server_status.js'); | |
| const EmptyPacket = new Packets.Packet(0, Buffer.allocUnsafe(4), 0, 4); | |
| // http://dev.mysql.com/doc/internals/en/com-query.html | |
| class Query extends Command { | |
| constructor(options, callback) { | |
| super(); | |
| this.sql = options.sql; | |
| this.values = options.values; | |
| this._queryOptions = options; | |
| this.namedPlaceholders = options.namedPlaceholders || false; | |
| this.onResult = callback; | |
| this.timeout = options.timeout; | |
| this.queryTimeout = null; | |
| this._fieldCount = 0; | |
| this._rowParser = null; | |
| this._fields = []; | |
| this._rows = []; | |
| this._receivedFieldsCount = 0; | |
| this._resultIndex = 0; | |
| this._localStream = null; | |
| this._unpipeStream = function () { }; | |
| this._streamFactory = options.infileStreamFactory; | |
| this._connection = null; | |
| } | |
| then() { | |
| const err = | |
| "You have tried to call .then(), .catch(), or invoked await on the result of query that is not a promise, which is a programming error. Try calling con.promise().query(), or require('mysql2/promise') instead of 'mysql2' for a promise-compatible version of the query interface. To learn how to use async/await or Promises check out documentation at https://sidorares.github.io/node-mysql2/docs#using-promise-wrapper, or the mysql2 documentation at https://sidorares.github.io/node-mysql2/docs/documentation/promise-wrapper"; | |
| // eslint-disable-next-line | |
| console.log(err); | |
| throw new Error(err); | |
| } | |
| /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */ | |
| start(_packet, connection) { | |
| if (connection.config.debug) { | |
| // eslint-disable-next-line | |
| console.log(' Sending query command: %s', this.sql); | |
| } | |
| this._connection = connection; | |
| this.options = Object.assign({}, connection.config, this._queryOptions); | |
| this._setTimeout(); | |
| const cmdPacket = new Packets.Query( | |
| this.sql, | |
| connection.config.charsetNumber | |
| ); | |
| connection.writePacket(cmdPacket.toPacket(1)); | |
| return Query.prototype.resultsetHeader; | |
| } | |
| done() { | |
| this._unpipeStream(); | |
| // if all ready timeout, return null directly | |
| if (this.timeout && !this.queryTimeout) { | |
| return null; | |
| } | |
| // else clear timer | |
| if (this.queryTimeout) { | |
| Timers.clearTimeout(this.queryTimeout); | |
| this.queryTimeout = null; | |
| } | |
| if (this.onResult) { | |
| let rows, fields; | |
| if (this._resultIndex === 0) { | |
| rows = this._rows[0]; | |
| fields = this._fields[0]; | |
| } else { | |
| rows = this._rows; | |
| fields = this._fields; | |
| } | |
| if (fields) { | |
| process.nextTick(() => { | |
| this.onResult(null, rows, fields); | |
| }); | |
| } else { | |
| process.nextTick(() => { | |
| this.onResult(null, rows); | |
| }); | |
| } | |
| } | |
| return null; | |
| } | |
| doneInsert(rs) { | |
| if (this._localStreamError) { | |
| if (this.onResult) { | |
| this.onResult(this._localStreamError, rs); | |
| } else { | |
| this.emit('error', this._localStreamError); | |
| } | |
| return null; | |
| } | |
| this._rows.push(rs); | |
| this._fields.push(void 0); | |
| this.emit('fields', void 0); | |
| this.emit('result', rs); | |
| if (rs.serverStatus & ServerStatus.SERVER_MORE_RESULTS_EXISTS) { | |
| this._resultIndex++; | |
| return this.resultsetHeader; | |
| } | |
| return this.done(); | |
| } | |
| resultsetHeader(packet, connection) { | |
| const rs = new Packets.ResultSetHeader(packet, connection); | |
| this._fieldCount = rs.fieldCount; | |
| if (connection.config.debug) { | |
| // eslint-disable-next-line | |
| console.log( | |
| ` Resultset header received, expecting ${rs.fieldCount} column definition packets` | |
| ); | |
| } | |
| if (this._fieldCount === 0) { | |
| return this.doneInsert(rs); | |
| } | |
| if (this._fieldCount === null) { | |
| return this._streamLocalInfile(connection, rs.infileName); | |
| } | |
| this._receivedFieldsCount = 0; | |
| this._rows.push([]); | |
| this._fields.push([]); | |
| return this.readField; | |
| } | |
| _streamLocalInfile(connection, path) { | |
| if (this._streamFactory) { | |
| this._localStream = this._streamFactory(path); | |
| } else { | |
| this._localStreamError = new Error( | |
| `As a result of LOCAL INFILE command server wants to read ${path} file, but as of v2.0 you must provide streamFactory option returning ReadStream.` | |
| ); | |
| connection.writePacket(EmptyPacket); | |
| return this.infileOk; | |
| } | |
| const onConnectionError = () => { | |
| this._unpipeStream(); | |
| }; | |
| const onDrain = () => { | |
| this._localStream.resume(); | |
| }; | |
| const onPause = () => { | |
| this._localStream.pause(); | |
| }; | |
| const onData = function (data) { | |
| const dataWithHeader = Buffer.allocUnsafe(data.length + 4); | |
| data.copy(dataWithHeader, 4); | |
| connection.writePacket( | |
| new Packets.Packet(0, dataWithHeader, 0, dataWithHeader.length) | |
| ); | |
| }; | |
| const onEnd = () => { | |
| connection.removeListener('error', onConnectionError); | |
| connection.writePacket(EmptyPacket); | |
| }; | |
| const onError = err => { | |
| this._localStreamError = err; | |
| connection.removeListener('error', onConnectionError); | |
| connection.writePacket(EmptyPacket); | |
| }; | |
| this._unpipeStream = () => { | |
| connection.stream.removeListener('pause', onPause); | |
| connection.stream.removeListener('drain', onDrain); | |
| this._localStream.removeListener('data', onData); | |
| this._localStream.removeListener('end', onEnd); | |
| this._localStream.removeListener('error', onError); | |
| }; | |
| connection.stream.on('pause', onPause); | |
| connection.stream.on('drain', onDrain); | |
| this._localStream.on('data', onData); | |
| this._localStream.on('end', onEnd); | |
| this._localStream.on('error', onError); | |
| connection.once('error', onConnectionError); | |
| return this.infileOk; | |
| } | |
| readField(packet, connection) { | |
| this._receivedFieldsCount++; | |
| // Often there is much more data in the column definition than in the row itself | |
| // If you set manually _fields[0] to array of ColumnDefinition's (from previous call) | |
| // you can 'cache' result of parsing. Field packets still received, but ignored in that case | |
| // this is the reason _receivedFieldsCount exist (otherwise we could just use current length of fields array) | |
| if (this._fields[this._resultIndex].length !== this._fieldCount) { | |
| const field = new Packets.ColumnDefinition( | |
| packet, | |
| connection.clientEncoding | |
| ); | |
| this._fields[this._resultIndex].push(field); | |
| if (connection.config.debug) { | |
| /* eslint-disable no-console */ | |
| console.log(' Column definition:'); | |
| console.log(` name: ${field.name}`); | |
| console.log(` type: ${field.columnType}`); | |
| console.log(` flags: ${field.flags}`); | |
| /* eslint-enable no-console */ | |
| } | |
| } | |
| // last field received | |
| if (this._receivedFieldsCount === this._fieldCount) { | |
| const fields = this._fields[this._resultIndex]; | |
| this.emit('fields', fields); | |
| this._rowParser = new (getTextParser(fields, this.options, connection.config))(fields); | |
| return Query.prototype.fieldsEOF; | |
| } | |
| return Query.prototype.readField; | |
| } | |
| fieldsEOF(packet, connection) { | |
| // check EOF | |
| if (!packet.isEOF()) { | |
| return connection.protocolError('Expected EOF packet'); | |
| } | |
| return this.row; | |
| } | |
| /* eslint no-unused-vars: ["error", { "argsIgnorePattern": "^_" }] */ | |
| row(packet, _connection) { | |
| if (packet.isEOF()) { | |
| const status = packet.eofStatusFlags(); | |
| const moreResults = status & ServerStatus.SERVER_MORE_RESULTS_EXISTS; | |
| if (moreResults) { | |
| this._resultIndex++; | |
| return Query.prototype.resultsetHeader; | |
| } | |
| return this.done(); | |
| } | |
| let row; | |
| try { | |
| row = this._rowParser.next( | |
| packet, | |
| this._fields[this._resultIndex], | |
| this.options | |
| ); | |
| } catch (err) { | |
| this._localStreamError = err; | |
| return this.doneInsert(null); | |
| } | |
| if (this.onResult) { | |
| this._rows[this._resultIndex].push(row); | |
| } else { | |
| this.emit('result', row); | |
| } | |
| return Query.prototype.row; | |
| } | |
| infileOk(packet, connection) { | |
| const rs = new Packets.ResultSetHeader(packet, connection); | |
| return this.doneInsert(rs); | |
| } | |
| stream(options) { | |
| options = options || {}; | |
| options.objectMode = true; | |
| const stream = new Readable(options); | |
| stream._read = () => { | |
| this._connection && this._connection.resume(); | |
| }; | |
| this.on('result', row => { | |
| if (!stream.push(row)) { | |
| this._connection.pause(); | |
| } | |
| stream.emit('result', row); // replicate old emitter | |
| }); | |
| this.on('error', err => { | |
| stream.emit('error', err); // Pass on any errors | |
| }); | |
| this.on('end', () => { | |
| stream.push(null); // pushing null, indicating EOF | |
| }); | |
| this.on('fields', fields => { | |
| stream.emit('fields', fields); // replicate old emitter | |
| }); | |
| stream.on('end', () => { | |
| stream.emit('close'); | |
| }); | |
| return stream; | |
| } | |
| _setTimeout() { | |
| if (this.timeout) { | |
| const timeoutHandler = this._handleTimeoutError.bind(this); | |
| this.queryTimeout = Timers.setTimeout( | |
| timeoutHandler, | |
| this.timeout | |
| ); | |
| } | |
| } | |
| _handleTimeoutError() { | |
| if (this.queryTimeout) { | |
| Timers.clearTimeout(this.queryTimeout); | |
| this.queryTimeout = null; | |
| } | |
| const err = new Error('Query inactivity timeout'); | |
| err.errorno = 'PROTOCOL_SEQUENCE_TIMEOUT'; | |
| err.code = 'PROTOCOL_SEQUENCE_TIMEOUT'; | |
| err.syscall = 'query'; | |
| if (this.onResult) { | |
| this.onResult(err); | |
| } else { | |
| this.emit('error', err); | |
| } | |
| } | |
| } | |
| Query.prototype.catch = Query.prototype.then; | |
| module.exports = Query; | |