Я использую модуль запроса для загрузки zip-файла, который содержит файл .csv, затем использую pipe для чтения содержимого с помощью модулей unzip и split, а затем анализирую и записываю результат в mongodb с помощью модуля mongoose-object-stream. Мой код:
//index.js
var request = require('request');
var bun = require('bun');
var split = require('split');
var unzip = require('./lib/unzip');
var tomongo = require('./lib/tomongo');
var pipeline = bun([ unzip(), split()]);
request.get( "http://someurl/somefile.zip" )
.pipe( pipeline )
.pipe( tomongo() );
//tomongo.js
var mySchema = require('../schema.json');
var through = require('through2');
var mos = require('mongoose-object-stream');
var mongoose = require('mongoose');
var models = require('../models')
const dbpath = "mongodb://localhost:27017/test";
const mongo = mongoose.connect(dbpath, {useNewUrlParser: true });
mongo.then(() => {
console.info('mongoDB connected');
}).catch((err) => {
console.info('err', err);
});
var db = mongoose.connection;
db.on('error', console.error.bind(console, 'connection error:'));
var modelStream = new mos(models.books);
function parser(){
var columns = mySchema;
var parseandwrite = function( chunk, _, cb ){
var row = {}, cells = chunk.toString('utf-8').split('\t');
cells.forEach( function( cell, i ){
row[ columns[ i ] ] = ( cell || '' ).trim();
});
if ( !!chunk ){
modelStream.write( row );
}
cb();
};
return through.obj( parseandwrite );
}
module.exports = parser;
Я хочу что-то сделать, когда поток заканчивается и все записи хранятся в базе данных.
Я попытался добавить в канал .on ('finish', function () {process.exit ()}) или .on ('end', function () {process.exit ()}), но узел продолжал работать.
извини, я ошибся написал, поправляю пост
Если вы закроете открытые ресурсы, Узел выйдет автоматически. Лучше всего всегда аккуратно закрывать ресурсы, использование process.exit
обычно указывает на утечку памяти. Когда вы закончите запись данных, закройте соединение с БД.
как я могу увидеть, когда все данные записаны?
Я думаю, что могу подсчитать строки в предыдущем канале, затем передать эти данные в tomongo () и закрыть базу данных, когда я напишу последнюю строку .. это решение или есть какие-то лучшие?
Предполагая, что ваш метод parser
здесь не является проблемой, я бы предложил перенести логику подключения к базе данных в ваш индекс, вы должны подключиться к БД, прежде чем пытаться передавать в нее данные. Если вы заключите логику потоковой передачи в Promise
, вы можете выполнить логику обработки соединения с БД в одной цепочке Promise
.
Вот пример того, как это может выглядеть:
var Promise = require('bluebird');
var mongoose = require('mongoose');
var MongooseObjectStream = require('mongoose-object-stream');
var request = require('request');
var split = require('split');
var through = require('through2');
var unzip = require('unzip-stream');
function streamToDB(url) {
return new Promise((resolve, reject) => {
request.get(url)
.pipe(unzip.Parse())
.pipe(through.obj(function (entry, enc, cb) {
if (entry.path === 'file_with_content') {
entry.on('end', cb)
.on('error', cb)
.on('data', (data) => this.push(data));
} else {
entry.autodrain()
.on('error', cb)
.on('finish', cb);
}
}))
.pipe(split())
.pipe(through.obj((line, enc, cb) => {
cb(null, line.split('\t')); // Convert to "real" object here
}))
.pipe(new MongooseObjectStream(mongoose, 'Model', {}, { strict: false }))
.on('error', reject)
.on('finish', resolve);
});
}
mongoose.connect('mongodb://localhost:27017/test', {
useNewUrlParser: true,
promiseLibrary: Promise
}).then(() => {
return streamToDB('http://someurl/somefile.zip')
.finally(() => mongoose.disconnect());
}).catch((err) => {
console.error(err);
});
он возвращает ошибку после подключения к базе данных: mongoDB connected internal / streams / legacy.js: 57 throw er; // Необработанная ошибка потока в канале. ^ Ошибка: запись после завершения в writeAfterEnd (D: \ tests \ app \ node_modules \ readable-stream \ lib_stream_writab le.js: 144: 12) в BunWrapper.Writable.write (D: \ tests \ app \ node_modules \ readable- stream \ lib_stream_writab le.js: 192: 5) в Request.ondata (internal / streams / legacy.js: 15: 31) в Request.emit (events.js: 182: 13) в IncomingMessage. <anonymous> (D : \ tests \ app \ node_modules \ request \ request.js: 1076: 12) .....
и в коллекции mongodb я вижу только первый документ
Моя вина! У меня осталась строка, которая вызывает двойной запрос ... теперь он работает, но база данных остается подключенной, а узел не выходит, как и до того, как я изменил код с помощью обещания
Вероятно, вам нужно закрыть объект modelStream
с помощью modelStream.end()
, чтобы можно было закрыть соединение с БД.
Я полностью удалил модуль mongoose-object-stream, теперь данные сохраняются с помощью команды mongoose save: new someModel (row) .save (); База данных заполнена, но команда .on ('end', function ()) не работает, обещание не выполняется. Я не могу понять, почему это не заканчивается
Я смотрел на это больше, проблема была с bun
, для меня он запускал событие finish
задолго до того, как потоковая передача была действительно закончена. Я обновил свой ответ рабочим кодом.
спасибо большое, я решил по-другому (не знаю, видели ли вы мой ответ) но ваш совет оказался очень полезным
Я действительно видел, рад, что вы смогли заставить его работать, я в основном делился своим решением, потому что оригинал был не совсем правильным. У меня есть для вас отзывы, которые я опубликую о вашем решении.
Я это сделал! Through2 необходимо .on ("data", function () {}) перед .on ("end" ... Теперь процесс корректно отключит базу данных и завершится.
var request = require('request');
var bun = require('bun');
var split = require('split');
var unzip = require('./lib/unzip');
var tomongo = require('./lib/tomongo');
var pipeline = bun([unzip(), split()]);
function streamToDB(url) {
return new Promise((resolve, reject) => {
request.get(url)
.pipe(pipeline)
.pipe(tomongo())
.on("data", function(data){
new aModel( data ).save();}) //here i save to the db
.on("error", reject)
.on("end", resolve);
});
}
mongoose.connect("mongodb://localhost:27017/test", {
useNewUrlParser: true
}).then(() => {
console.info('mongoDB connected');
return streamToDB("http://someurl/somefile.zip")
}).catch((err) => {
console.info('err', err);
}).then(() => {
return mongoose.disconnect();
});
//tomongo.js
var parseandwrite = function( chunk, _, cb ){
var row = {}, cells = chunk.toString('utf-8').split('\t');
cells.forEach( function( cell, i ){
row[ columns[ i ] ] = ( cell || '' ).trim();
});
if ( !!chunk ){
this.push( row ); //here i push the row to the stream
}
cb();
};
Вы можете рассмотреть возможность использования mongoose-object-stream
или создания аналогичных функций с помощью through2
. Метод save()
является асинхронным, поэтому вы должны привязать его к потоку, это предотвратит завершение потока до тех пор, пока все сохранения не будут сброшены в базу данных. Использование через это будет выглядеть так: .pipe(through.obj((row, enc, cb) => new aModel(row).save(cb)))
process.exit([code])
...exit
- это функция. из вашего вопроса кажется, что вы не называете это так. Ссылка на документ: nodejs.org/api/process.html#process_exit_codes