Ответить на звонок из плагина Ktor

Что я пытаюсь сделать: прочитать тело звонка в плагине и, если тело содержит определенный контент, остановить дальнейшую обработку звонка и немедленно ответить.

Вот мой код, и он делает то, что мне нужно, но он также генерирует java.util.concurrent.CancellationException: Parent job is Cancelling под капотом. Я так понимаю, что в этом случае нужно остановить "нормальный" пайплайн, но как это сделать?

@KtorDsl
class TestPluginConfig

val TestPlugin: RouteScopedPlugin<TestPluginConfig> = createRouteScopedPlugin(
    "TestPlugin",
    ::TestPluginConfig
) {
    on(ReceiveBytes) { call, body ->
        val bodyBytes = body.readFully()

        if (String(bodyBytes) == "1")
            call.respond(HttpStatusCode.NotAcceptable, "You wish")

        ByteReadChannel(bodyBytes)
    }
}

private object ReceiveBytes : Hook<suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel> {
    override fun install(
        pipeline: ApplicationCallPipeline,
        handler: suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel
    ) {
        pipeline.receivePipeline.intercept(ApplicationReceivePipeline.Before) { body ->
            if (body !is ByteReadChannel) return@intercept
            val newBody = handler(call, body)
            proceedWith(newBody)
        }
    }
}

private suspend fun ByteReadChannel.readFully(): ByteArray {
    var array = ByteArray(0)
    while (!this.isClosedForRead) {
        val packet = this.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
        while (!packet.isEmpty) {
            array += packet.readBytes()
        }
    }
    return array
}

Не могли бы вы поделиться фрагментом кода, где установлен TestPlugin, и HTTP-запросом, вызывающим CancellationException?

Aleksei Tirman 14.12.2022 12:21

@AlekseiTirman оказывается немного сложнее. В моей логике POST есть еще один асинхронный {} блок, и я не жду его завершения. Именно этот блок отменяется с вышеупомянутым исключением. Вот суть, содержащая исполняемый пример gist.github.com/agathius/… Вся моя идея заключалась в том, чтобы ИЗБЕГАТЬ выполнения этой логики и возвращать кешированный ответ. Если вы запустите curl -i -XPOST http://localhost:8082/123 -d "1", вы увидите, что блок все равно выполняется

agathis 14.12.2022 14:00

К сожалению, я не могу воспроизвести CancellationException, используя ваш код. Как вы запускаете сервер?

Aleksei Tirman 14.12.2022 19:45

@AlekseiTirman я тоже, я не могу понять, почему это происходит в моем коде. Но все же не в этом дело. Если вы посмотрите, как работает POST, вы увидите, что строки 42-45 сути все еще выполняются, хотя сервер УЖЕ ответил на вызов в строке 65. Я бы хотел избежать этого выполнения.

agathis 14.12.2022 21:00

Вы можете сохранить контекст сопрограммы обработчика в Attributes, чтобы отменить его позже. Вот суть с измененным кодом gist.github.com/Stexxe/53f0e29e25ba683ba465a3f96f6c1f23

Aleksei Tirman 15.12.2022 09:13

@АлексейТирман, хм. Это означает, что post<path> {body()} на самом деле выполняется ДО моего плагина, поэтому он в любом случае начнет выполнение. Определенно не то, что я имел в виду. Можно ли перехватить конвейер до того, как тело метода начнет свое выполнение?

agathis 15.12.2022 15:04
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
6
101
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

this.call.receiveText() запускает выполнение ApplicationReceivePipeline, которое перехватывается хуком ReceiveBytes. Чтобы избежать выполнения блока обработчика маршрута, вам нужно поместить вызов receiveText() в его начало, чтобы отменить выполнение сопрограммы обработчика до того, как будет выполнен любой его код. Вы можете сохранить coroutineContext обработчика в Attributes вызова, чтобы отменить сопрограмму обработчика в обработчике хука.

import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.locations.*
import io.ktor.server.locations.post
import io.ktor.server.netty.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlin.text.String
import io.ktor.server.plugins.callid.*
import io.ktor.server.plugins.callloging.*
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlin.coroutines.CoroutineContext


@OptIn(KtorExperimentalLocationsAPI::class)
object Paths {
    @Location("/123")
    class Root
}

private val contextKey = AttributeKey<CoroutineContext>("context")

@OptIn(KtorExperimentalLocationsAPI::class)
fun main() {
    embeddedServer(Netty, port = 8082) {
        install(Locations)
        install(CallLogging)
        install(TestPlugin)

        routing {
            trace {
                application.log.trace(it.buildText())
            }

            post<Paths.Root> {
                call.attributes.put(contextKey, coroutineContext) // Save current coroutine context
                val text = this.call.receiveText()
                // this async logic is being started even though the call is already answered
                // it is much more complicated in my original code, and it's being killed with `java.util.concurrent.CancellationException: Parent job is Cancelling`
                async {
                    println("start delay")
                    delay(10000)
                    println("end delay")
                }
                call.respondText(text)
            }
        }

    }.start(wait = true)
}


@KtorDsl
class TestPluginConfig

val TestPlugin: RouteScopedPlugin<TestPluginConfig> = createRouteScopedPlugin(
    "TestPlugin",
    ::TestPluginConfig
) {
    on(ReceiveBytes) { call, body ->
        val bodyBytes = body.readFully()

        if (String(bodyBytes) == "1") {
            call.attributes[contextKey].cancel() // Cancel a route's handler children jobs
            call.respond(HttpStatusCode.NotAcceptable, "You wish")
        }

        ByteReadChannel(bodyBytes)
    }
}

private object ReceiveBytes : Hook<suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel> {
    override fun install(
        pipeline: ApplicationCallPipeline,
        handler: suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel
    ) {
        pipeline.receivePipeline.intercept(ApplicationReceivePipeline.Before) { body ->
            if (body !is ByteReadChannel) return@intercept
            val newBody = handler(call, body)
            proceedWith(newBody)
        }
    }
}

private suspend fun ByteReadChannel.readFully(): ByteArray {
    var array = ByteArray(0)
    while (!this.isClosedForRead) {
        val packet = this.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
        while (!packet.isEmpty) {
            array += packet.readBytes()
        }
    }
    return array
}

Благодаря @AlekseiTirman я нашел совершенно другое решение.

Основы:

  1. Пользовательская функция POST
val callIsProcessedAttributeKey: AttributeKey<Boolean> = AttributeKey("CallIsProcessedAttrKey")

@KtorExperimentalLocationsAPI
inline fun <reified T : Any, reified Q : Serializable> Route.typedPost(
    noinline handler: suspend PipelineContext<Unit, ApplicationCall>.(Q) -> Unit
): Route =
    post<T> {
        // will start ApplicationReceivePipeline
        val obj = call.receive<Q>()

        val callShouldBeHandled = !Try { call.attributes[callIsProcessedAttributeKey] }
            .getOrElse(false)
        
        // decides if we want to proceed with the request or it is already answered with the plugin
        if (callShouldBeHandled) 
            handler(obj)
    }
  1. Прежде чем ответить в плагине, я установил для этого атрибута вызова значение true
   call.attributes.put(callIsProcessedAttributeKey, true)
   call.respond(HttpStatusCode.NotAcceptable, "You wish")

Другие вопросы по теме