Как сгруппировать и дросселировать объект по ID Rx

У меня есть входящие объекты одного и того же типа, но если для свойства объекта IsThrottlable установлено значение false независимо от идентификатора, я НЕ хочу его регулировать, но если для IsThrottlable установлено значение true, я хотел бы регулировать объект каждые 3 секунды по идентификатору . Поэтому, если объект с одним и тем же идентификатором появляется 50 раз за 3 секунды, я хотел бы отправить HTTPSend для последнего объекта.

namespace BoatThrottle
{
    class MData
    {
        public int ID { get; set; }
        public bool IsThrottlable { get; set; }
        public string Description { get; set; }
    }
    class Program
    {
        static void Main(string[] args)
        {
            Random rand = new Random();

            while (true)
            {
                var data = GenerateRandomObj(rand);
                SendData(data);
                Task.Delay(rand.Next(100, 2000));
            }
        }

        static MData GenerateRandomObj(Random rand)
        {
            return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
        }

        static void SendData(MData mData)
        {
            if (mData.IsThrottlable)
            {
                _doValues.OnNext(mData);
                var dd = ThrottledById(DoValues);

                var observable =
                   dd
                    .Throttle(TimeSpan.FromMilliseconds(3000.0))
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations());

                _subscription =
                    observable
                    .ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
                        .Subscribe(y =>
                        {
                            HTTPSend(y);
                        });

            }
            else
            {
                // MData object coming in IsThrottlable set to false always send this data NO throttling
                HTTPSend(mData);
            }

        }
        private static IDisposable? _subscription = null;

        public static IObservable<MData> ThrottledById(IObservable<MData> observable)
        {
            return observable.Buffer(TimeSpan.FromSeconds(3))
                .SelectMany(x =>
                    x.GroupBy(y => y.ID)
                    .Select(y => y.Last()));
        }

        private static readonly Subject<MData> _doValues = new Subject<MData>();

        public static IObservable<MData> DoValues { get { return _doValues; } }

        static void HTTPSend(MData mData)
        {
            Console.WriteLine("===============HTTP===>>  " + mData.ID + "  " + mData.Description + " " + mData.IsThrottlable);
        }
    }
}

Обновлено:

например ВСЕ получены в течение 3 секунд

  • MData ID = 1, IsThrottlable = False, Description = "Уведомлять"

  • MData ID = 2, IsThrottlable = True, Description = "Notify1"

  • MData ID = 2, IsThrottlable = True, Description = "Notify2"

  • MData ID = 9, IsThrottlable = False, Description = "Notify2"

  • MData ID = 2, IsThrottlable = True, Description = "Notify3"

  • MData ID = 2, IsThrottlable = True, Description = "Notify4"

  • MData ID = 3, IsThrottlable = True, Description = "Уведомить"

  • MData ID = 4, IsThrottlable = True, Description = "Уведомить"

  • MData ID = 5, IsThrottlable = True, Description = "Notify1"

  • MData ID = 5, IsThrottlable = True, Description = "Notify2"

  • MData ID = 8, IsThrottlable = True, Description = "Notify1"

  • MData ID = 8, IsThrottlable = True, Description = "Notify2"

  • MData ID = 8, IsThrottlable = True, Description = "Notify3"

  • MData ID = 8, IsThrottlable = True, Description = "Notify4"

  • MData ID = 8, IsThrottlable = True, Description = "Notify5"

  • MData ID = 8, IsThrottlable = True, Description = "Notify6"

Ожидается на Первые 3 секунды:

  • MData ID = 1, IsThrottlable = False, Description = "Уведомлять"
  • MData ID = 9, IsThrottlable = False, Description = "Notify2"
  • MData ID = 2, IsThrottlable = True, Description = "Notify4"
  • MData ID = 3, IsThrottlable = True, Description = "Уведомить"
  • MData ID = 4, IsThrottlable = True, Description = "Уведомить"
  • MData ID = 5, IsThrottlable = True, Description = "Notify2"
  • MData ID = 8, IsThrottlable = True, Description = "Notify6"

Если ваша окончательная реализация является ответом на ваш собственный вопрос, опубликуйте ее как один. Не ставьте это в вопросе.

Enigmativity 08.05.2022 02:21

@Enigmativity Я удалил это из вопроса, спасибо

Dev 09.05.2022 12:19
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
97
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Один из способов сделать это — сгруппировать последовательность по свойству IsThrottlable. Таким образом, вы получите вложенную последовательность, содержащую две подпоследовательности: одну, содержащую регулируемые элементы, и другую, содержащую нерегулируемые элементы. Затем вы можете соответствующим образом преобразовать каждую из двух подпоследовательностей и, наконец, использовать оператор SelectMany, чтобы сгладить вложенную последовательность обратно в плоскую последовательность, содержащую элементы, испускаемые двумя преобразованными подпоследовательностями.

Подпоследовательность, содержащая нерегулируемые элементы, не нуждается в преобразовании, поэтому вы можете вернуть ее как есть.

Подпоследовательность, содержащую дросселируемые элементы, необходимо дополнительно сгруппировать по свойству ID, создавая еще более тонкие подпоследовательности, содержащие только регулируемые элементы с одинаковым идентификатором. Вот последовательности, которые необходимо регулировать:

IObservable<MData> throttled = source
    .GroupBy(x => x.IsThrottlable)
    .SelectMany(g1 =>
    {
        if (!g1.Key) return g1; // Not throttleable, return it as is.
        return g1
            .GroupBy(x => x.ID)
            .SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
    });

В конце вы получите плоскую последовательность, содержащую как регулируемые, так и нерегулируемые элементы, причем регулируемые элементы уже регулируются идентификатором.

Оператор SelectMany по сути является комбинацией операторов Select+Merge.

У вас есть пример использования в контексте?

Dev 06.05.2022 19:59

@Dev, я собирался показать основную идею. Я позволю вам иметь удовольствие адаптировать его к контексту. :-)

Theodor Zoulias 06.05.2022 20:10

согласен :) Я имел ввиду как разделить IsThrottlable подписки

Dev 06.05.2022 20:12

@Dev, ты не мог бы просто подписаться только на получившуюся последовательность throttled? Таким образом, вы получите как регулируемые, так и нерегулируемые элементы, но регулируемые элементы уже будут регулироваться. Разве это не то, что вы хотите?

Theodor Zoulias 06.05.2022 20:16

Ах, теперь это имеет смысл для меня, спасибо.

Dev 06.05.2022 20:18

Может быть, вы правы, это не то, что я хочу, или, возможно, я неправильно его использую. Но это означает, что нужно сгруппировать по IsThrottlable Затем сгруппировать по идентификатору и дросселировать каждую группу. Поэтому дросселируйте, только если IsThrottlable истинно, а затем группируйте по идентификатору, если идентификатор объекта совпадает с дросселем. Так, например, представьте, что отдельные входящие объекты являются объектами пользователя, и если IsThrottlable = true, если запрос исходит от одного и того же идентификатора пользователя, HTTPотправьте последний объект каждого уникального пользователя.

Dev 06.05.2022 22:23

Я отредактировал вопрос с примером 1 экземпляра 3-секундного цикла, это то, для чего предназначен ответ? потратил несколько часов на настройку, но не получил желаемого результата

Dev 06.05.2022 22:39

@Dev Теперь я думаю, что понял. Я обновил ответ соответственно.

Theodor Zoulias 06.05.2022 23:11

Я добавил свою окончательную реализацию (отредактировал свой вопрос внизу), потому что это только Http-отправка, только если IsThrottlable = False. IsThrottlable вообще не печатает.

Dev 07.05.2022 01:00

@Dev Я разместил онлайн-демонстрацию здесь. Вывод соответствует тому, что ожидается в вопросе.

Theodor Zoulias 07.05.2022 01:37

Давайте продолжить обсуждение в чате.

Dev 07.05.2022 08:41

Я решил взять вашу окончательную реализацию, как указано в вашем вопросе, но это должен быть ответ, и очистить запрос для вас таким образом, который является наиболее идиоматичным способом Rx.

Вот моя версия вашего кода:

public MainWindow()
{
    InitializeComponent();

    Debug.Print("======================= = ");

    _subscription =
        Observable
            .Generate(0, x => true, x => x + 1,
                x => new MData() { ID = Random.Shared.Next(1, 3), Description = "Notification....", IsThrottlable = Random.Shared.Next(2) == 1 },
                x => TimeSpan.FromMilliseconds(Random.Shared.Next(100, 2000)))
            .GroupBy(m => m.IsThrottlable)
            .SelectMany(g =>
                g.Key
                ? g.GroupBy(x => x.ID).SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3.0)))
                : g)
            .SelectMany(m => Observable.Start(() => HTTPSend(m)))
            .Subscribe();
}

Финал .SelectMany(m => Observable.Start(() => HTTPSend(m))), возможно, нужно будет записать как .Select(m => Observable.Start(() => HTTPSend(m))).Merge(1).

+1. Возможно, вам придется обернуть Observable.Start в Observable.Defer, особенно если вы собираетесь лениво объединять подпоследовательности с помощью Merge(1). В противном случае HTTPSend может быть вызван до подписки на подпоследовательность.

Theodor Zoulias 08.05.2022 11:14

@TheodorZoulias - Спасибо. Но почему HTTPSend может быть вызван до подписки?

Enigmativity 08.05.2022 23:51

Посмотрите демоверсию это. sequence не подписан. Тем не менее, действие вызывается.

Theodor Zoulias 08.05.2022 23:57

@TheodorZoulias - Что ж, это интересно. Беглый взгляд на источник показывает, что это тоже правда. Это заканчивается кэшированием результата для всех будущих вызывающих.

Enigmativity 09.05.2022 08:55

Observable.Start — это синхронная версия Observable.StartAsync. Насколько я вижу, синхронной версии Observable.FromAsync не существует. Возможно, они не смогли выбрать подходящее название для этого метода.

Theodor Zoulias 09.05.2022 12:34

Этот экземпляр Observable .Generate(0, x => true, x => x + 1, x => new MData() { ID = 5, Description = "Notification....", IsThrottlable = true }, x => TimeSpan.FromMilliseconds(rand.Next(100, 2000))) вообще не печатает.

Dev 09.05.2022 12:45

@TheodorZoulias - Нам нужно public static class ObservableEx { public static IObservable<T> From<T>(this Func<T> factory) => Observable.FromAsync(() => Task.Run(factory)); }

Enigmativity 09.05.2022 13:18

@Dev - у меня это работает без изменений по сравнению с кодом, который вы разместили в комментарии. Имейте в виду, что там нет ничего, что «печатало бы», но если вы подпишетесь, оно выдаст значения.

Enigmativity 09.05.2022 13:20

@Enigmativity извиняется, это работает (так как оно генерирует объекты), но я имел в виду, что я заменил случайные элементы Observable.Generate, чтобы всегда иметь только IsThrottlable = true и ID = 5, чтобы гарантировать, что HTTPSend работает только с 1 тип/группа, поэтому он должен печатать последнее значение каждый интервал.

Dev 09.05.2022 13:33

Загадочность, да, это тоже должно работать. Реализация, которую я имел в виду: From<T>(this Func<T> action) => Observable.Defer(() => Observable.Start(action));

Theodor Zoulias 09.05.2022 16:26

Другие вопросы по теме