Смоделируйте несколько вызовов функций с потоком (безопасным способом FP)

Учитывая функцию A => IO[B] (также известную как Kleisli[IO, A, B]), которая предназначена для многократного вызова и имеет побочные эффекты, такие как обновление БД, как делегировать такие множественные вызовы в поток (я думаю, Pipe[IO, A, B]) (fs2, monix observable/iterant )? Причина этого заключается в том, чтобы иметь возможность накапливать состояние, пакетные вызовы вместе в течение временного окна и т. д.

Более конкретно, для сервера http4s требуется Request => IO[Response], поэтому я ищу, как работать с потоками (для вышеуказанных преимуществ), но в конечном итоге предоставлю такую ​​функцию для http4s.

Я подозреваю, что за кулисами потребуется некоторый идентификатор корреляции, и меня это устраивает, меня больше интересует, как сделать это безопасно и правильно с точки зрения FP.

В конечном счете, подпись, которую я ожидаю, будет выглядеть примерно так:

Pipe[IO, A, B] => (A => IO[B]), так что вызовы Kleisli передаются по конвейеру.

В качестве запоздалой мысли, возможно ли вообще противодавить?

Если я правильно понимаю, вы хотите накапливать состояние между вызовами HTTP? Потому что я не понимаю, что значит "группировать вызовы вместе", когда вы находитесь за http4s?

Yuval Itzchakov 25.02.2019 12:56

Да, накопление состояния — это общая идея. Например, вы можете захотеть объединить несколько GET в течение 2 секунд, которые запрашивают одну и ту же таблицу в один запрос, используя предложение in, чтобы увеличить пропускную способность (за счет задержки).

V-Lamp 25.02.2019 13:20

Почему не подходят StateT-подобные трансформеры? Именно они обеспечивают возможность вычислений с сохранением состояния.

Some Name 25.02.2019 13:35

Нет, я думал о StateT, но он не соответствует ни требованию иметь Клейсли с прозрачным состоянием, ни доступу к последующим вызовам оконных операций.

V-Lamp 25.02.2019 14:03

Я нахожу немного странным, что вы накапливаете состояние, чтобы группировать запросы для службы HTTP и из-за этого откладываете каждый запрос. Возможно, я не в том контексте, но зачем вам это?

Yuval Itzchakov 25.02.2019 14:28

Если это поможет, akka http также моделирует сервер как запрос-ответ, существуют различные мотивы для его моделирования как такового.

V-Lamp 25.02.2019 14:39

Поправьте меня, если я ошибаюсь, но у Kleisli есть подпись A => IO[B], а вам нужно либо A => IO[A] (цепочка), либо Seq[A] => IO[Seq[B]] (пакетная обработка). Последнее полезно, только если эффект действительно поддерживает Seq.

Markus Appel 27.02.2019 16:23
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
4
7
201
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Одна из идей состоит в том, чтобы смоделировать это с помощью MPSC (Multiple Publisher Single Consumer). Я приведу пример с Monix, так как я с ним лучше знаком, но идея остается той же, даже если вы используете FS2.

object MPSC extends App {

  sealed trait Event
  object Event {
    // You'll need a promise in order to send the response back to user
    case class SaveItem(num: Int, promise: Deferred[Task, Int]) extends Event
  }

  // For backpressure, take a look at `PublishSubject`.
  val cs = ConcurrentSubject[Event](MulticastStrategy.Publish)

  def pushEvent(num: Int) = {
    for {
      promise <- Deferred[Task, Int]
      _ <- Task.delay(cs.onNext(SaveItem(num, promise)))
    } yield promise
  }

  // You get a list of events now since it is buffered
  // Monix has a lot of buffer strategies, check the docs for more details
  def processEvents(items: Seq[Event]): Task[Unit] = {
    Task.delay(println(s"Items: $items")) >>
      Task.traverse(items) {
        case SaveItem(_, promise) => promise.complete(Random.nextInt(100))
      }.void
  }

  val app = for {
    // Start the stream in the background
    _ <- cs
      .bufferTimed(3.seconds) // Buffer all events within 3 seconds
      .filter(_.nonEmpty)
      .mapEval(processEvents)
      .completedL
      .startAndForget

    _ <- Task.sleep(1.second)
    p1 <- pushEvent(10)
    p2 <- pushEvent(20)
    p3 <- pushEvent(30)

    // Wait for the promise to complete, you'll do this for each request
    x <- p1.get
    y <- p2.get
    z <- p3.get

    _ <- Task.delay(println(s"Completed promise: [$x, $y, $z]"))
  } yield ()

  app.runSyncUnsafe()
}

Другие вопросы по теме