Эффективное чтение, обработка и вставка данных с помощью PG-Promise и PG-Query-Stream

Я собираюсь сделать следующее.

  1. Запросите большую таблицу с группой по запросу, чтобы выполнить суммирование значений.
  2. Запустите эти записи через процедуру, чтобы добавить некоторые дополнительные данные.
  3. Эффективно вставьте их в БД.

Я пытался сделать это, используя pg-query-stream, чтобы считывать данные в виде потока, а затем подсчитывать эти записи в пакетах, например. 1000 за раз, и как только мы достигнем предела пакетов, затем используйте pg-promise pgp.helpers.insert для вставки данных.

У меня проблема в том, что я не могу понять, как правильно приостановить поток, чтобы вставка завершилась, прежде чем продолжить. Особенно на on.end()

Код, который я пробовал, приведен ниже

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

const batchInsertData = (tenant, stream, records, insertColumnSet, options = {}) => {
  stream.pause()
  const t0 = performance.now()
  let query = tenant.db.$config.pgp.helpers.insert(records, insertColumnSet)

  if (options.onConflictExpression) {
    query += options.onConflictExpression
  }

  tenant.db.none(query)
    .then(() => {
      let t1 = performance.now()
      console.info('Inserted ' + records.length + ' records done in ' + ((t1 - t0) / 1000) + ' (seconds).')
      stream.resume()
    })
    .catch(error => {
      throw error
    })
}

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {
  try {
    return new Promise((resolve, reject) => {
      const query = new QueryStream(sql)

      // Set options as required
      options.batchSize = parseInt(options.batchSize) || 1000
      options.onConflictExpression = options.onConflictExpression || null

      let records = []
      let batchNumber = 1
      let recordCount = 0

      let t0 = performance.now()
      tenant.db.stream(query, (stream) => {
        stream.on('data', (record) => {
          const mappedRecord = recordMapper(record)
          records.push(mappedRecord)
          recordCount++

          if (records.length === options.batchSize) {
            batchInsertData(tenant, stream, records, columnSet, options)
            records = []
            console.info(`Batch ${batchNumber} done`)
            batchNumber++
          }
        })
        stream.on('end', () => {
        // If any records are left that are not part of a batch insert here.
          if (records.length !== 0) {
            batchInsertData(tenant, stream, records, columnSet, options)
            records = []
            console.info(`Batch ${batchNumber} done`)
            batchNumber++
            console.info('Total Records: ' + recordCount)
            let t1 = performance.now()
            console.info('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
          } else {
            console.info('Total Records: ' + recordCount)
            let t1 = performance.now()
            console.info('Duration:', ((t1 - t0) / 1000) + ' (seconds).')
          }
        })
        stream.on('error', (error) => {
          throw error
        })
      })
        .then(data => {
          resolve()
        })
        .catch(error => {
          console.info('ERROR:', error)
          reject(error)
        })
    })
  } catch (err) {
    throw err
  }
}

Я не уверен, что подход, который я пытаюсь, является лучшим. Я пробовал несколько разных вещей, основанных на документации, которую я могу найти по pg-promise и потокам, но не получил удовольствия.

Любая помощь/совет приветствуется.

Спасибо

Павел

Попытка 2

Ниже моя вторая попытка использовать getNextData и последовательность в соответствии со страницей импорта данных. Пытаюсь определить, как подключить к нему поток, чтобы извлекать только пакеты данных за раз перед вставкой.

const { performance } = require('perf_hooks')
const QueryStream = require('pg-query-stream')

module.exports = (tenant, sql, columnSet, recordMapper, options = {}) => {

  try {
    // Set options as required
    options.batchSize = parseInt(options.batchSize) || 1000
    options.onConflictExpression = options.onConflictExpression || null

    const query = new QueryStream(sql)

    function getNextData(transaction, index) {
      return new Promise(async (resolve, reject) => {
        if (index < options.batchSize) {
          let count = 1
          await transaction.stream(query, async (stream) => {
            let records = []
            await tenant.db.$config.pgp.spex.stream.read.call(transaction, stream, function (streamIndex, streamData) {  
              stream.resume()
              count++
              console.info(count, streamIndex, streamData)        

              records.push(streamData[0])

              if (records.length === options.batchSize) {
                stream.pause()
                resolve(records)
              }
            }, {readChunks: true})

          })  
        }
        resolve(null)
      })
    }

    return tenant.db.tx('massive-insert', (transaction) => {
      return transaction.sequence((index) => {          
        return getNextData(transaction, index)
          .then((records) => {
            if (records > 0) {
              let query = tenant.db.$config.pgp.helpers.insert(records, columnSet)

              if (options.onConflictExpression) {
                query += options.onConflictExpression
              }

              const i0 = performance.now()
              return transaction.none(query)
                .then(() => {
                  let i1 = performance.now()
                  console.info('Inserted ' + records.length + ' records done in ' + ((i1 - i0) / 1000) + ' (seconds).')
                })
            }
          })
      })
    })
  } catch (err) {
    throw err
  }
}

Вы пробовали подход, описанный в Импорт данных? Он приостанавливает чтение данных из файлов по мере создания вставок, поэтому логика, основанная на промисах, остается той же. Только источник данных другой.

vitaly-t 22.02.2019 17:46

Привет, я просмотрел импорт данных и изучил примеры страниц, но не могу понять, как привязать QueryStream к функциям sequence/getNextData, чтобы при необходимости извлекать пакеты данных из потока. Я прочитал документы, искал здесь и просмотрел примеры на Github, но не понимаю, как это понять.

Paul Mowat 23.02.2019 22:51

Сегодня я провел некоторое время, работая с примерами импорта данных и пробуя их. Я опубликовал одну из более близких попыток, которые я получил выше, как попытку 2. Я пробовал все, чтобы заставить поток работать, но изо всех сил пытался понять, как анализировать его партиями с помощью getNextData. У вас есть какие-либо примеры, показывающие, как использовать getNextData с данными, поступающими из потока, или какие-либо дополнительные советы, которые вы могли бы дать?

Paul Mowat 24.02.2019 17:07

В примерах показаны данные, поступающие из обещания, которое может быть любым, включая поток.

vitaly-t 24.02.2019 17:10

Я потратил время на чтение примеров и документации и не нашел ничего похожего на то, что я пытаюсь сделать. Основная проблема, с которой я сталкиваюсь, заключается в том, как подключить поток, который будет использоваться pg-promise в пакетах внутри getNextData. Получатель streamRead просто хочет обработать весь поток. Я не видел способа получить данные из самого приемника или как разрешить их обработку партиями. Я буду продолжать пытаться понять, смогу ли я это понять. Спасибо за помощь

Paul Mowat 24.02.2019 17:30
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
5
1 136
1

Ответы 1

У меня это работает, используя немного другой подход, более сосредоточенный на непосредственном использовании потоков, но все еще использующий pg-promise для работы со стороной БД.

const BatchStream = require('batched-stream')
const { performance } = require('perf_hooks')
const { Transform, Writable } = require('stream')

module.exports = async (tenant, sql, columnSet, recordMapper, options = {}) => {

  try {
    // Set options as required
    options.batchSize = parseInt(options.batchSize) || 1000
    options.onConflictExpression = options.onConflictExpression || null

    const query = new tenant.lib.QueryStream(sql)

    const stream = tenant.db.client.query(query)

    return new Promise((resolve, reject) => {
      // We want to process this in batches
      const batch = new BatchStream({size : options.batchSize, objectMode: true, strictMode: false})

      // We use a write stream to insert the batch into the database
      let insertDatabase = new Writable({
        objectMode: true,
        write(records, encoding, callback) {
          (async () => {

            try {
              /*
                If we have a record mapper then do it here prior to inserting the records.
                This way is much quicker than doing it as a transform stream below by
                about 10 seconds for 100,000 records
              */
              if (recordMapper) {
                records = records.map(record => recordMapper(record))
              }

              let query = tenant.lib.pgp.helpers.insert(records, columnSet)

              if (options.onConflictExpression) {
                query += options.onConflictExpression
              }

              const i0 = performance.now()
              await tenant.db.none(query)
                .then(() => {
                  let i1 = performance.now()
                  console.info('Inserted ' + records.length + ' records in ' + ((i1 - i0) / 1000) + ' (seconds).')
                })

            } catch(e) {
              return callback(e)
            }

            callback()
          })()
        }
      })

      // Process the stream
      const t0 = performance.now()
      stream
        // Break it down into batches
        .pipe(batch)
        // Insert those batches into the database
        .pipe(insertDatabase)
        // Once we get here we are done :)
        .on('finish', () => {
          const t1 = performance.now()
          console.info('Finished insert in ' + ((t1 - t0) / 1000) + ' (seconds).')
          resolve()
        })
        .on('error', (error) => {
          reject(error)
        })

    })
  } catch (err) {
    throw err
  }
}

Другие вопросы по теме