Я собираюсь сделать следующее.
Я пытался сделать это, используя pg-query-stream, чтобы считывать данные в виде потока, а затем подсчитывать эти записи в пакетах, например. 1000 за раз, и как только мы достигнем предела пакетов, затем используйте pg-promise pgp.helpers.insert для вставки данных.
У меня проблема в том, что я не могу понять, как правильно приостановить поток, чтобы вставка завершилась, прежде чем продолжить. Особенно на on.end()
Код, который я пробовал, приведен ниже
const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')
const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
stream.pause()
const t0 = performance.now()
let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
tenant.db.none(query)
.then(() => {
let t1 = performance.now()
console.info('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
stream.resume()
})
.catch(error => {
throw error
})
}
module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
return new Promise((resolve, reject) => {
const query = new QueryStream(sql)
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
let records = []
let batchNumber = 1
let recordCount = 0
let t0 = performance.now()
tenant.db.stream(query, (stream) => {
stream.on('data', (record) => {
const mappedRecord = recordMapper(record)
records.push(mappedRecord)
recordCount++
if (records.length === options.batchSize) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.info(`Batch ${batchNumber} done`)
batchNumber++
}
})
stream.on('end', () => {
// If any records are left that are not part of a batch insert here.
if (records.length !== 0) {
batchInsertData(tenant, stream, records, columnSet, options)
records = []
console.info(`Batch ${batchNumber} done`)
batchNumber++
console.info('Total Records: ' + recordCount)
let t1 = performance.now()
console.info('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
} else {
console.info('Total Records: ' + recordCount)
let t1 = performance.now()
console.info('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
}
})
stream.on('error', (error) => {
throw error
})
})
.then(data => {
resolve()
})
.catch(error => {
console.info('ERROR:', error)
reject(error)
})
})
} catch (err) {
throw err
}
}
Я не уверен, что подход, который я пытаюсь, является лучшим. Я пробовал несколько разных вещей, основанных на документации, которую я могу найти по pg-promise и потокам, но не получил удовольствия.
Любая помощь/совет приветствуется.
Спасибо
Павел
Попытка 2
Ниже моя вторая попытка использовать getNextData и последовательность в соответствии со страницей импорта данных. Пытаюсь определить, как подключить к нему поток, чтобы извлекать только пакеты данных за раз перед вставкой.
const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')
module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
const query = new QueryStream(sql)
function getNextData(transaction, index) {
return new Promise(async (resolve, reject) => {
if (index < options.batchSize) {
let count = 1
await transaction.stream(query, async (stream) => {
let records = []
await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {
stream.resume()
count++
console.info(count, streamIndex, streamData)
records.push(streamData[0])
if (records.length === options.batchSize) {
stream.pause()
resolve(records)
}
}, {readChunks: true})
})
}
resolve(null)
})
}
return tenant.db.tx('massive-insert', (transaction) => {
return transaction.sequence((index) => {
return getNextData(transaction, index)
.then((records) => {
if (records > 0) {
let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
const i0 = performance.now()
return transaction.none(query)
.then(() => {
let i1 = performance.now()
console.info('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
})
}
})
})
})
} catch (err) {
throw err
}
}
Привет, я просмотрел импорт данных и изучил примеры страниц, но не могу понять, как привязать QueryStream к функциям sequence/getNextData, чтобы при необходимости извлекать пакеты данных из потока. Я прочитал документы, искал здесь и просмотрел примеры на Github, но не понимаю, как это понять.
Сегодня я провел некоторое время, работая с примерами импорта данных и пробуя их. Я опубликовал одну из более близких попыток, которые я получил выше, как попытку 2. Я пробовал все, чтобы заставить поток работать, но изо всех сил пытался понять, как анализировать его партиями с помощью getNextData. У вас есть какие-либо примеры, показывающие, как использовать getNextData с данными, поступающими из потока, или какие-либо дополнительные советы, которые вы могли бы дать?
В примерах показаны данные, поступающие из обещания, которое может быть любым, включая поток.
Я потратил время на чтение примеров и документации и не нашел ничего похожего на то, что я пытаюсь сделать. Основная проблема, с которой я сталкиваюсь, заключается в том, как подключить поток, который будет использоваться pg-promise в пакетах внутри getNextData. Получатель streamRead просто хочет обработать весь поток. Я не видел способа получить данные из самого приемника или как разрешить их обработку партиями. Я буду продолжать пытаться понять, смогу ли я это понять. Спасибо за помощь





У меня это работает, используя немного другой подход, более сосредоточенный на непосредственном использовании потоков, но все еще использующий pg-promise для работы со стороной БД.
const BatchStream = require('batched-stream')
const { performance } = require('perf_hooks')
const { Transform, Writable } = require('stream')
module.exports = async (tenant, sql, columnSet, recordMapper, options = {}) => {
try {
// Set options as required
options.batchSize = parseInt(options.batchSize) || 1000
options.onConflictExpression = options.onConflictExpression || null
const query = new tenant.lib.QueryStream(sql)
const stream = tenant.db.client.query(query)
return new Promise((resolve, reject) => {
// We want to process this in batches
const batch = new BatchStream({size : options.batchSize, objectMode: true, strictMode: false})
// We use a write stream to insert the batch into the database
let insertDatabase = new Writable({
objectMode: true,
write(records, encoding, callback) {
(async () => {
try {
/*
If we have a record mapper then do it here prior to inserting the records.
This way is much quicker than doing it as a transform stream below by
about 10 seconds for 100,000 records
*/
if (recordMapper) {
records = records.map(record => recordMapper(record))
}
let query = tenant.lib.pgp.helpers.insert(records, columnSet)
if (options.onConflictExpression) {
query += options.onConflictExpression
}
const i0 = performance.now()
await tenant.db.none(query)
.then(() => {
let i1 = performance.now()
console.info('Inserted ' + records.length + ' records in ' + ((i1 - i0) / 1000) + ' (seconds).')
})
} catch(e) {
return callback(e)
}
callback()
})()
}
})
// Process the stream
const t0 = performance.now()
stream
// Break it down into batches
.pipe(batch)
// Insert those batches into the database
.pipe(insertDatabase)
// Once we get here we are done :)
.on('finish', () => {
const t1 = performance.now()
console.info('Finished insert in ' + ((t1 - t0) / 1000) + ' (seconds).')
resolve()
})
.on('error', (error) => {
reject(error)
})
})
} catch (err) {
throw err
}
}
Вы пробовали подход, описанный в Импорт данных? Он приостанавливает чтение данных из файлов по мере создания вставок, поэтому логика, основанная на промисах, остается той же. Только источник данных другой.