Flatmap для streamtorx никогда не завершается

У меня есть этот тест

const Rx = require('rx')
const fs = require('fs')
const {streamToRx} = require('rxjs-stream')

it('should not be infinite', done => {
  let streamObservable = streamToRx(fs.createReadStream('/some/file.txt'));
  Rx.Observable.of(1).flatMap(any => streamObservable)
  // streamObservable
    .map(any => 'file processed')
    .subscribe(x => console.log('next', x), err => {
        console.error(err)
        done(err)
      },
      () => {
        console.log('complete!')
        done()
      }
    )
})

Этот тестовый тайм-аут - означает, что поток никогда не завершается. Однако, когда я не использую flatMap следующим образом:

const Rx = require('rx')
const fs = require('fs')
const {streamToRx} = require('rxjs-stream')

it('should not be infinite', done => {
  let streamObservable = streamToRx(fs.createReadStream('/some/file.txt'));
  // Rx.Observable.of(1).flatMap(any => streamObservable)
  streamObservable
    .map(any => 'file processed')
    .subscribe(x => console.log('next', x), err => {
        console.error(err)
        done(err)
      },
      () => {
        console.log('complete!')
        done()
      }
    )
})

тогда вывод:

next file processed
complete!

Что я делаю не так, связывая эти наблюдаемые в цепочку? Кажется, это происходит только тогда, когда второй конвертируется из потока с помощью rxjs-stream.

это похоже на ошибку в rxjs-stream? Пробовал с RxNode.fromStream, и это сработало

macias 11.04.2018 12:12

Технически разницы быть не должно.

S.D. 11.04.2018 13:58

Я взглянул на код RxNode.fromStream и rxjs-stream.streamToRx, и RxNode.fromStream создает и возвращает наблюдаемый холодный, тогда как streamToRx неправильно возвращает наблюдаемый горячий (в данном случае субъект). Таким образом, в вашем примере createReadStream выполняется и разрешается в точке вызова, а последующие подписки, следовательно, подписываются на fs.readStream, который уже разрешен. Короче это ошибка в streamToRx.

Jeremy 11.04.2018 15:08

эй @ Джереми, спасибо, ответь, я бы принял это

macias 11.04.2018 16:44

эй @macias. Я сделал свой комментарий ответом.

Jeremy 19.04.2018 10:43
0
5
49
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я взглянул на код RxNode.fromStream и rxjs-stream.streamToRx, и RxNode.fromStream создает и возвращает холодный Observable, тогда как streamToRx неправильно возвращает горячий Observable (в данном случае Subject).

Таким образом, в вашем примере createReadStream выполняется и разрешается в точке вызова, а последующие подписки, следовательно, подписываются на fs.readStream, который уже разрешен. Короче это ошибка в streamToRx.

Другие вопросы по теме