Сбой плоской карты RxJava даже при наличии onError

Я пытаюсь отладить сбой на Android, который возникает, когда у устройства нет сетевого подключения. Сбой происходит только иногда (~ 50% случаев), и я не уверен, почему. Если кто-нибудь сможет объяснить, что происходит, или дать решение, я буду очень признателен.

Я использую плоскую карту для параллельной выборки изображений и подписываюсь с блоком onError (). Несмотря на это, приложение все равно вылетает.

Вот код (я пометил строку, в которой происходит сбой, «Сбой здесь:»):

fun downloadImages() {
    var progress = 0
    var maxProgress = 0

    downloadModel.postValue(
            Resource.inProgress(getString(R.string.downloading_offline_data)))

    disposables.add(Observable
            .create<Pair<String, File>> { emitter ->
                val files = readFileDirs()

                for (file in files) {
                  val urlList = readUrlListFrom(file)
                  maxProgress += urlList.size

                  for (url in urlList) {
                    emitter.onNext(url to file)
                  }
                }
                emitter.onComplete()
            }
            .flatMap { (url, file) ->
                downloadImage(url, file)
                        .subscribeOn(Schedulers.computation())
                        .andThen(Observable.fromCallable {
                            ++progress
                        })
            }
            .doOnDispose {
                downloadModel.postValue(Resource.idle())
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({
                Log.d(TAG, "Done downloading $it")

                if (it % 100 == 0) {
                    refreshCacheSize()
                }

                downloadModel.postValue(Resource.inProgress(
                        getString(R.string.downloading_offline_data),
                        it,
                        maxProgress
                ))
            }, { error ->
                val errorId = when (error) {
                    is SocketTimeoutException ->
                        NETWORK_TIMEOUT_ERROR
                    is InterruptedIOException -> {
                        // do nothing
                        return@subscribe
                    }
                    is UnknownHostException ->
                        NETWORK_ERROR
                    is IOException ->
                        UNKNOWN_ERROR
                    is UnsupportedServerVersionException ->
                        UNSUPPORTED_VERSION_ERROR
                    else -> {
                        UNKNOWN_ERROR
                    }
                }
                downloadModel.postValue(Resource.error(errorId))
            }, {
                refreshCacheSize()

                downloadModel.postValue(Resource.success())
            })
    )
}

private fun downloadImage(url: String, outFile: File): Completable {
    val outFileTmp = File(outFile.canonicalPath + ".tmp")
    return Completable
            .create { emitter ->
                val request = Request.Builder()
                        .url(url)
                        .build()
                try {
Crash here:          val response = Client.get().newCall(request).execute()
                    val sink: BufferedSink
                    try {
                        Log.d(TAG, "Writing to file")
                        val body = response.body()
                        if (body != null) {
                            sink = Okio.buffer(Okio.sink(outFileTmp))
                            sink.writeAll(body.source())
                            sink.close()
                        } else {
                            if (!emitter.isDisposed) {
                                emitter.onError(DownloadImageException("Body was null."))
                            }
                        }
                    } finally {
                        response.body()?.close()
                    }

                    outFileTmp.renameTo(outFile)

                    emitter.onComplete()
                } catch (e: Exception) {
                    Log.e(TAG, "Could not save splash", e)
                    emitter.tryOnError(e)
                }
            }
            .doOnDispose {
                outFileTmp.delete()
            }
            .doOnError {
                outFileTmp.delete()
            }
}

Трассировки стека:

2018-10-08 02:50:07.055 21984-22263/com.ggstudios.lolcatalyst E/AndroidRuntime: FATAL EXCEPTION: RxComputationThreadPool-2
    Process: com.ggstudios.lolcatalyst, PID: 21984
    java.lang.Throwable: Unable to resolve host "mywebsite.com": No address associated with hostname
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:157)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:105)
        at java.net.InetAddress.getAllByName(InetAddress.java:1154)
        at okhttp3.Dns$1.lookup(Dns.java:40)
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:185)
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:149)
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84)
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214)
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at com.ggstudios.lolcatalyst.offline.OfflineDownloadViewModel$downloadImage$1.subscribe(OfflineDownloadViewModel.kt:372)
        at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39)
        at io.reactivex.Completable.subscribe(Completable.java:1918)
        at hu.akarnokd.rxjava2.debug.CompletableOnAssembly.subscribeActual(CompletableOnAssembly.java:39)
        at io.reactivex.Completable.subscribe(Completable.java:1918)
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64)
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641)
        at java.lang.Thread.run(Thread.java:764)
     Caused by: java.lang.Throwable: android_getaddrinfo failed: EAI_NODATA (No address associated with hostname)
        at libcore.io.Linux.android_getaddrinfo(Native Method)
        at libcore.io.BlockGuardOs.android_getaddrinfo(BlockGuardOs.java:172)
        at java.net.Inet6AddressImpl.lookupHostByName(Inet6AddressImpl.java:137)
        at java.net.Inet6AddressImpl.lookupAllHostAddr(Inet6AddressImpl.java:105) 
        at java.net.InetAddress.getAllByName(InetAddress.java:1154) 
        at okhttp3.Dns$1.lookup(Dns.java:40) 
        at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:185) 
        at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:149) 
        at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:84) 
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:214) 
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135) 
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114) 
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147) 
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121) 
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200) 
        at okhttp3.RealCall.execute(RealCall.java:77) 
        at com.ggstudios.lolcatalyst.offline.OfflineDownloadViewModel$downloadImage$1.subscribe(OfflineDownloadViewModel.kt:372) 
        at io.reactivex.internal.operators.completable.CompletableCreate.subscribeActual(CompletableCreate.java:39) 
        at io.reactivex.Completable.subscribe(Completable.java:1918) 
        at hu.akarnokd.rxjava2.debug.CompletableOnAssembly.subscribeActual(CompletableOnAssembly.java:39) 
        at io.reactivex.Completable.subscribe(Completable.java:1918) 
        at io.reactivex.internal.operators.completable.CompletableSubscribeOn$SubscribeOnObserver.run(CompletableSubscribeOn.java:64) 
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38) 
        at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26) 
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301) 
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) 
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) 
        at java.lang.Thread.run(Thread.java:764) 

Видеть это. stackoverflow.com/questions/43135211/…

Naveen 08.10.2018 12:35

Возможный дубликат stackoverflow.com/questions/52468708/…

Ahmed Ashraf 08.10.2018 14:05

@Ahmed Ashraf Gamal не дубликат этого, так как я уже передаю лямбду для обработки ошибок.

idunnololz 08.10.2018 19:19

О, я понимаю, значит, это должно быть "tryOnError", насколько мне известно, должно быть "onError".

Ahmed Ashraf 08.10.2018 19:28

Попробуйте вызвать по сети с включенным режимом полета

EpicPandaForce 10.10.2018 17:52
0
5
754
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я выяснил как причину, так и возможные решения.

Начнем сначала с причины.

Мой код можно упростить следующим образом: Observable # 1 запускает n других Observable, которые выполняются в других потоках через flatMap().

Для простоты предположим, что все Observables выполняются в собственном потоке. Затем, когда ошибка выдается одним наблюдаемым объектом, ошибка распространяется на все другие наблюдаемые объекты и удаляет их.

Проблема в том, что это распространение не синхронизировано (похоже, из соображений производительности). Из-за этого tryOnError() может пройти проверку isDisposed(), затем быть удален, а затем выдать ошибку. Это объясняет, почему исключение все еще возникает, даже если используется tryOnError().

Некоторые решения

В конечном итоге после некоторого исследования и обсуждения я пришел к двум основным решениям.

Один из способов - преобразовать все ошибки в результаты в Observables через onErrorReturn(). Это решение работает, но может быть не лучшим вариантом, если вы хотите остановить выполнение, как только произойдет ошибка.

Другой способ - зарегистрировать глобальный обработчик ошибок и просто игнорировать эти ошибки.

Я выбрал последнее решение, но не полностью удовлетворен. Я действительно хотел бы, чтобы был способ зарегистрировать локальный обработчик ошибок. Например. у Observables есть метод игнорирования ошибок, возникающих после удаления Observable.

Ну что ж.

Другие вопросы по теме