У меня есть входящие объекты одного и того же типа, но если для свойства объекта IsThrottlable
установлено значение false независимо от идентификатора, я НЕ хочу его регулировать, но если для IsThrottlable
установлено значение true, я хотел бы регулировать объект каждые 3 секунды по идентификатору . Поэтому, если объект с одним и тем же идентификатором появляется 50 раз за 3 секунды, я хотел бы отправить HTTPSend для последнего объекта.
namespace BoatThrottle
{
class MData
{
public int ID { get; set; }
public bool IsThrottlable { get; set; }
public string Description { get; set; }
}
class Program
{
static void Main(string[] args)
{
Random rand = new Random();
while (true)
{
var data = GenerateRandomObj(rand);
SendData(data);
Task.Delay(rand.Next(100, 2000));
}
}
static MData GenerateRandomObj(Random rand)
{
return new MData() { ID = rand.Next(1, 20), Description = "Notification....", IsThrottlable = (rand.Next(2) == 1) };
}
static void SendData(MData mData)
{
if (mData.IsThrottlable)
{
_doValues.OnNext(mData);
var dd = ThrottledById(DoValues);
var observable =
dd
.Throttle(TimeSpan.FromMilliseconds(3000.0))
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations());
_subscription =
observable
.ObserveOn(Scheduler.ThreadPool.DisableOptimizations())
.Subscribe(y =>
{
HTTPSend(y);
});
}
else
{
// MData object coming in IsThrottlable set to false always send this data NO throttling
HTTPSend(mData);
}
}
private static IDisposable? _subscription = null;
public static IObservable<MData> ThrottledById(IObservable<MData> observable)
{
return observable.Buffer(TimeSpan.FromSeconds(3))
.SelectMany(x =>
x.GroupBy(y => y.ID)
.Select(y => y.Last()));
}
private static readonly Subject<MData> _doValues = new Subject<MData>();
public static IObservable<MData> DoValues { get { return _doValues; } }
static void HTTPSend(MData mData)
{
Console.WriteLine("===============HTTP===>> " + mData.ID + " " + mData.Description + " " + mData.IsThrottlable);
}
}
}
Обновлено:
например ВСЕ получены в течение 3 секунд
MData ID = 1, IsThrottlable = False, Description = "Уведомлять"
MData ID = 2, IsThrottlable = True, Description = "Notify1"
MData ID = 2, IsThrottlable = True, Description = "Notify2"
MData ID = 9, IsThrottlable = False, Description = "Notify2"
MData ID = 2, IsThrottlable = True, Description = "Notify3"
MData ID = 2, IsThrottlable = True, Description = "Notify4"
MData ID = 3, IsThrottlable = True, Description = "Уведомить"
MData ID = 4, IsThrottlable = True, Description = "Уведомить"
MData ID = 5, IsThrottlable = True, Description = "Notify1"
MData ID = 5, IsThrottlable = True, Description = "Notify2"
MData ID = 8, IsThrottlable = True, Description = "Notify1"
MData ID = 8, IsThrottlable = True, Description = "Notify2"
MData ID = 8, IsThrottlable = True, Description = "Notify3"
MData ID = 8, IsThrottlable = True, Description = "Notify4"
MData ID = 8, IsThrottlable = True, Description = "Notify5"
MData ID = 8, IsThrottlable = True, Description = "Notify6"
Ожидается на Первые 3 секунды:
@Enigmativity Я удалил это из вопроса, спасибо
Один из способов сделать это — сгруппировать последовательность по свойству IsThrottlable
. Таким образом, вы получите вложенную последовательность, содержащую две подпоследовательности: одну, содержащую регулируемые элементы, и другую, содержащую нерегулируемые элементы. Затем вы можете соответствующим образом преобразовать каждую из двух подпоследовательностей и, наконец, использовать оператор SelectMany
, чтобы сгладить вложенную последовательность обратно в плоскую последовательность, содержащую элементы, испускаемые двумя преобразованными подпоследовательностями.
Подпоследовательность, содержащая нерегулируемые элементы, не нуждается в преобразовании, поэтому вы можете вернуть ее как есть.
Подпоследовательность, содержащую дросселируемые элементы, необходимо дополнительно сгруппировать по свойству ID
, создавая еще более тонкие подпоследовательности, содержащие только регулируемые элементы с одинаковым идентификатором. Вот последовательности, которые необходимо регулировать:
IObservable<MData> throttled = source
.GroupBy(x => x.IsThrottlable)
.SelectMany(g1 =>
{
if (!g1.Key) return g1; // Not throttleable, return it as is.
return g1
.GroupBy(x => x.ID)
.SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3)));
});
В конце вы получите плоскую последовательность, содержащую как регулируемые, так и нерегулируемые элементы, причем регулируемые элементы уже регулируются идентификатором.
Оператор SelectMany
по сути является комбинацией операторов Select
+Merge
.
У вас есть пример использования в контексте?
@Dev, я собирался показать основную идею. Я позволю вам иметь удовольствие адаптировать его к контексту. :-)
согласен :) Я имел ввиду как разделить IsThrottlable
подписки
@Dev, ты не мог бы просто подписаться только на получившуюся последовательность throttled
? Таким образом, вы получите как регулируемые, так и нерегулируемые элементы, но регулируемые элементы уже будут регулироваться. Разве это не то, что вы хотите?
Ах, теперь это имеет смысл для меня, спасибо.
Может быть, вы правы, это не то, что я хочу, или, возможно, я неправильно его использую. Но это означает, что нужно сгруппировать по IsThrottlable
Затем сгруппировать по идентификатору и дросселировать каждую группу. Поэтому дросселируйте, только если IsThrottlable
истинно, а затем группируйте по идентификатору, если идентификатор объекта совпадает с дросселем. Так, например, представьте, что отдельные входящие объекты являются объектами пользователя, и если IsThrottlable
= true, если запрос исходит от одного и того же идентификатора пользователя, HTTPотправьте последний объект каждого уникального пользователя.
Я отредактировал вопрос с примером 1 экземпляра 3-секундного цикла, это то, для чего предназначен ответ? потратил несколько часов на настройку, но не получил желаемого результата
@Dev Теперь я думаю, что понял. Я обновил ответ соответственно.
Я добавил свою окончательную реализацию (отредактировал свой вопрос внизу), потому что это только Http-отправка, только если IsThrottlable
= False. IsThrottlable
вообще не печатает.
@Dev Я разместил онлайн-демонстрацию здесь. Вывод соответствует тому, что ожидается в вопросе.
Давайте продолжить обсуждение в чате.
Я решил взять вашу окончательную реализацию, как указано в вашем вопросе, но это должен быть ответ, и очистить запрос для вас таким образом, который является наиболее идиоматичным способом Rx.
Вот моя версия вашего кода:
public MainWindow()
{
InitializeComponent();
Debug.Print("======================= = ");
_subscription =
Observable
.Generate(0, x => true, x => x + 1,
x => new MData() { ID = Random.Shared.Next(1, 3), Description = "Notification....", IsThrottlable = Random.Shared.Next(2) == 1 },
x => TimeSpan.FromMilliseconds(Random.Shared.Next(100, 2000)))
.GroupBy(m => m.IsThrottlable)
.SelectMany(g =>
g.Key
? g.GroupBy(x => x.ID).SelectMany(g2 => g2.Throttle(TimeSpan.FromSeconds(3.0)))
: g)
.SelectMany(m => Observable.Start(() => HTTPSend(m)))
.Subscribe();
}
Финал .SelectMany(m => Observable.Start(() => HTTPSend(m)))
, возможно, нужно будет записать как .Select(m => Observable.Start(() => HTTPSend(m))).Merge(1)
.
+1. Возможно, вам придется обернуть Observable.Start
в Observable.Defer
, особенно если вы собираетесь лениво объединять подпоследовательности с помощью Merge(1). В противном случае HTTPSend
может быть вызван до подписки на подпоследовательность.
@TheodorZoulias - Спасибо. Но почему HTTPSend
может быть вызван до подписки?
Посмотрите демоверсию это. sequence
не подписан. Тем не менее, действие вызывается.
@TheodorZoulias - Что ж, это интересно. Беглый взгляд на источник показывает, что это тоже правда. Это заканчивается кэшированием результата для всех будущих вызывающих.
Observable.Start
— это синхронная версия Observable.StartAsync
. Насколько я вижу, синхронной версии Observable.FromAsync
не существует. Возможно, они не смогли выбрать подходящее название для этого метода.
Этот экземпляр Observable .Generate(0, x => true, x => x + 1, x => new MData() { ID = 5, Description = "Notification....", IsThrottlable = true }, x => TimeSpan.FromMilliseconds(rand.Next(100, 2000)))
вообще не печатает.
@TheodorZoulias - Нам нужно public static class ObservableEx { public static IObservable<T> From<T>(this Func<T> factory) => Observable.FromAsync(() => Task.Run(factory)); }
@Dev - у меня это работает без изменений по сравнению с кодом, который вы разместили в комментарии. Имейте в виду, что там нет ничего, что «печатало бы», но если вы подпишетесь, оно выдаст значения.
@Enigmativity извиняется, это работает (так как оно генерирует объекты), но я имел в виду, что я заменил случайные элементы Observable.Generate, чтобы всегда иметь только IsThrottlable
= true и ID = 5, чтобы гарантировать, что HTTPSend работает только с 1 тип/группа, поэтому он должен печатать последнее значение каждый интервал.
Загадочность, да, это тоже должно работать. Реализация, которую я имел в виду: From<T>(this Func<T> action) => Observable.Defer(() => Observable.Start(action));
Если ваша окончательная реализация является ответом на ваш собственный вопрос, опубликуйте ее как один. Не ставьте это в вопросе.