Учитывая функцию A => IO[B] (также известную как Kleisli[IO, A, B]), которая предназначена для многократного вызова и имеет побочные эффекты, такие как обновление БД, как делегировать такие множественные вызовы в поток (я думаю, Pipe[IO, A, B]) (fs2, monix observable/iterant )? Причина этого заключается в том, чтобы иметь возможность накапливать состояние, пакетные вызовы вместе в течение временного окна и т. д.
Более конкретно, для сервера http4s требуется Request => IO[Response], поэтому я ищу, как работать с потоками (для вышеуказанных преимуществ), но в конечном итоге предоставлю такую функцию для http4s.
Я подозреваю, что за кулисами потребуется некоторый идентификатор корреляции, и меня это устраивает, меня больше интересует, как сделать это безопасно и правильно с точки зрения FP.
В конечном счете, подпись, которую я ожидаю, будет выглядеть примерно так:
Pipe[IO, A, B] => (A => IO[B]), так что вызовы Kleisli передаются по конвейеру.
В качестве запоздалой мысли, возможно ли вообще противодавить?
Да, накопление состояния — это общая идея. Например, вы можете захотеть объединить несколько GET в течение 2 секунд, которые запрашивают одну и ту же таблицу в один запрос, используя предложение in, чтобы увеличить пропускную способность (за счет задержки).
Почему не подходят StateT-подобные трансформеры? Именно они обеспечивают возможность вычислений с сохранением состояния.
Нет, я думал о StateT, но он не соответствует ни требованию иметь Клейсли с прозрачным состоянием, ни доступу к последующим вызовам оконных операций.
Я нахожу немного странным, что вы накапливаете состояние, чтобы группировать запросы для службы HTTP и из-за этого откладываете каждый запрос. Возможно, я не в том контексте, но зачем вам это?
Если это поможет, akka http также моделирует сервер как запрос-ответ, существуют различные мотивы для его моделирования как такового.
Поправьте меня, если я ошибаюсь, но у Kleisli есть подпись A => IO[B], а вам нужно либо A => IO[A] (цепочка), либо Seq[A] => IO[Seq[B]] (пакетная обработка). Последнее полезно, только если эффект действительно поддерживает Seq.





Одна из идей состоит в том, чтобы смоделировать это с помощью MPSC (Multiple Publisher Single Consumer). Я приведу пример с Monix, так как я с ним лучше знаком, но идея остается той же, даже если вы используете FS2.
object MPSC extends App {
sealed trait Event
object Event {
// You'll need a promise in order to send the response back to user
case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
}
// For backpressure, take a look at `PublishSubject`.
val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)
def pushEvent(num: Int) = {
for {
promise <- Deferred[Task, Int]
_ <- Task.delay(cs.onNext(SaveItem(num, promise)))
} yield promise
}
// You get a list of events now since it is buffered
// Monix has a lot of buffer strategies, check the docs for more details
def processEvents(items: Seq[Event]): Task[Unit] = {
Task.delay(println(s"Items: $items")) >>
Task.traverse(items) {
case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
}.void
}
val app = for {
// Start the stream in the background
_ <- cs
.bufferTimed(3.seconds) // Buffer all events within 3 seconds
.filter(_.nonEmpty)
.mapEval(processEvents)
.completedL
.startAndForget
_ <- Task.sleep(1.second)
p1 <- pushEvent(10)
p2 <- pushEvent(20)
p3 <- pushEvent(30)
// Wait for the promise to complete, you'll do this for each request
x <- p1.get
y <- p2.get
z <- p3.get
_ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
} yield ()
app.runSyncUnsafe()
}
Если я правильно понимаю, вы хотите накапливать состояние между вызовами HTTP? Потому что я не понимаю, что значит "группировать вызовы вместе", когда вы находитесь за http4s?