Что я пытаюсь сделать: прочитать тело звонка в плагине и, если тело содержит определенный контент, остановить дальнейшую обработку звонка и немедленно ответить.
Вот мой код, и он делает то, что мне нужно, но он также генерирует java.util.concurrent.CancellationException: Parent job is Cancelling под капотом. Я так понимаю, что в этом случае нужно остановить "нормальный" пайплайн, но как это сделать?
@KtorDsl
class TestPluginConfig
val TestPlugin: RouteScopedPlugin<TestPluginConfig> = createRouteScopedPlugin(
"TestPlugin",
::TestPluginConfig
) {
on(ReceiveBytes) { call, body ->
val bodyBytes = body.readFully()
if (String(bodyBytes) == "1")
call.respond(HttpStatusCode.NotAcceptable, "You wish")
ByteReadChannel(bodyBytes)
}
}
private object ReceiveBytes : Hook<suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel> {
override fun install(
pipeline: ApplicationCallPipeline,
handler: suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel
) {
pipeline.receivePipeline.intercept(ApplicationReceivePipeline.Before) { body ->
if (body !is ByteReadChannel) return@intercept
val newBody = handler(call, body)
proceedWith(newBody)
}
}
}
private suspend fun ByteReadChannel.readFully(): ByteArray {
var array = ByteArray(0)
while (!this.isClosedForRead) {
val packet = this.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
while (!packet.isEmpty) {
array += packet.readBytes()
}
}
return array
}
@AlekseiTirman оказывается немного сложнее. В моей логике POST есть еще один асинхронный {} блок, и я не жду его завершения. Именно этот блок отменяется с вышеупомянутым исключением. Вот суть, содержащая исполняемый пример gist.github.com/agathius/… Вся моя идея заключалась в том, чтобы ИЗБЕГАТЬ выполнения этой логики и возвращать кешированный ответ. Если вы запустите curl -i -XPOST http://localhost:8082/123 -d "1", вы увидите, что блок все равно выполняется
К сожалению, я не могу воспроизвести CancellationException, используя ваш код. Как вы запускаете сервер?
@AlekseiTirman я тоже, я не могу понять, почему это происходит в моем коде. Но все же не в этом дело. Если вы посмотрите, как работает POST, вы увидите, что строки 42-45 сути все еще выполняются, хотя сервер УЖЕ ответил на вызов в строке 65. Я бы хотел избежать этого выполнения.
Вы можете сохранить контекст сопрограммы обработчика в Attributes, чтобы отменить его позже. Вот суть с измененным кодом gist.github.com/Stexxe/53f0e29e25ba683ba465a3f96f6c1f23
@АлексейТирман, хм. Это означает, что post<path> {body()} на самом деле выполняется ДО моего плагина, поэтому он в любом случае начнет выполнение. Определенно не то, что я имел в виду. Можно ли перехватить конвейер до того, как тело метода начнет свое выполнение?





this.call.receiveText() запускает выполнение ApplicationReceivePipeline, которое перехватывается хуком ReceiveBytes. Чтобы избежать выполнения блока обработчика маршрута, вам нужно поместить вызов receiveText() в его начало, чтобы отменить выполнение сопрограммы обработчика до того, как будет выполнен любой его код. Вы можете сохранить coroutineContext обработчика в Attributes вызова, чтобы отменить сопрограмму обработчика в обработчике хука.
import io.ktor.http.*
import io.ktor.server.application.*
import io.ktor.server.engine.*
import io.ktor.server.locations.*
import io.ktor.server.locations.post
import io.ktor.server.netty.*
import io.ktor.server.request.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.core.*
import kotlin.text.String
import io.ktor.server.plugins.callid.*
import io.ktor.server.plugins.callloging.*
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlin.coroutines.CoroutineContext
@OptIn(KtorExperimentalLocationsAPI::class)
object Paths {
@Location("/123")
class Root
}
private val contextKey = AttributeKey<CoroutineContext>("context")
@OptIn(KtorExperimentalLocationsAPI::class)
fun main() {
embeddedServer(Netty, port = 8082) {
install(Locations)
install(CallLogging)
install(TestPlugin)
routing {
trace {
application.log.trace(it.buildText())
}
post<Paths.Root> {
call.attributes.put(contextKey, coroutineContext) // Save current coroutine context
val text = this.call.receiveText()
// this async logic is being started even though the call is already answered
// it is much more complicated in my original code, and it's being killed with `java.util.concurrent.CancellationException: Parent job is Cancelling`
async {
println("start delay")
delay(10000)
println("end delay")
}
call.respondText(text)
}
}
}.start(wait = true)
}
@KtorDsl
class TestPluginConfig
val TestPlugin: RouteScopedPlugin<TestPluginConfig> = createRouteScopedPlugin(
"TestPlugin",
::TestPluginConfig
) {
on(ReceiveBytes) { call, body ->
val bodyBytes = body.readFully()
if (String(bodyBytes) == "1") {
call.attributes[contextKey].cancel() // Cancel a route's handler children jobs
call.respond(HttpStatusCode.NotAcceptable, "You wish")
}
ByteReadChannel(bodyBytes)
}
}
private object ReceiveBytes : Hook<suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel> {
override fun install(
pipeline: ApplicationCallPipeline,
handler: suspend (call: ApplicationCall, body: ByteReadChannel) -> ByteReadChannel
) {
pipeline.receivePipeline.intercept(ApplicationReceivePipeline.Before) { body ->
if (body !is ByteReadChannel) return@intercept
val newBody = handler(call, body)
proceedWith(newBody)
}
}
}
private suspend fun ByteReadChannel.readFully(): ByteArray {
var array = ByteArray(0)
while (!this.isClosedForRead) {
val packet = this.readRemaining(DEFAULT_BUFFER_SIZE.toLong())
while (!packet.isEmpty) {
array += packet.readBytes()
}
}
return array
}
Благодаря @AlekseiTirman я нашел совершенно другое решение.
Основы:
val callIsProcessedAttributeKey: AttributeKey<Boolean> = AttributeKey("CallIsProcessedAttrKey")
@KtorExperimentalLocationsAPI
inline fun <reified T : Any, reified Q : Serializable> Route.typedPost(
noinline handler: suspend PipelineContext<Unit, ApplicationCall>.(Q) -> Unit
): Route =
post<T> {
// will start ApplicationReceivePipeline
val obj = call.receive<Q>()
val callShouldBeHandled = !Try { call.attributes[callIsProcessedAttributeKey] }
.getOrElse(false)
// decides if we want to proceed with the request or it is already answered with the plugin
if (callShouldBeHandled)
handler(obj)
}
call.attributes.put(callIsProcessedAttributeKey, true)
call.respond(HttpStatusCode.NotAcceptable, "You wish")
Не могли бы вы поделиться фрагментом кода, где установлен
TestPlugin, и HTTP-запросом, вызывающимCancellationException?