Я провел часы, просматривая документацию и учебные пособия, но не могу понять, как использовать ReactiveX для опроса внешнего ресурса или чего-либо еще, каждый с определенным интервалом. Ниже приведен код, который я написал для получения информации из REST API с интервалом.
open System
open System.Reactive.Linq
module MyObservable =
let getResources =
async {
use client = new HttpClient()
let! response = client.GetStringAsync("http://localhost:8080/items") |> Async.AwaitTask
return response
} |> Async.StartAsTask
let getObservable (interval: TimeSpan) =
let f () = getResources.Result
Observable.Interval(interval)
|> Observable.map(fun _ -> f ())
Чтобы проверить это, я попробовал подписаться на Observable и подождать пять секунд. Он получает что-то каждую секунду в течение пяти секунд, но getResources
вызывается только в первый раз, а затем результат просто используется в каждом интервале. Как я могу изменить это, чтобы сделать вызов REST с каждым интервалом, а не только результат первого вызова, который используется снова и снова?
let mutable res = Seq.empty
getObservable (new TimeSpan(0,0,1))
|> Observable.subscribe(fun (x: seq<string>) -> res <- res |> Seq.append x;)
|> ignore
Threading.Thread.Sleep(5000)
Не используйте Task
. Задачи — это то, что мы называем «горячими», это означает, что если у вас в руке значение типа Task
, это означает, что задача уже запущена, и вы ничего не можете с этим поделать. В частности, это означает, что вы не можете перезапустить его или запустить второй экземпляр. Как только Task
создан, уже слишком поздно.
В вашем конкретном случае это означает, что getResources
— это не «способ запуска задачи», а просто «задача». Уже начал, уже работает.
Если вы хотите каждый раз запускать новую задачу, у вас есть два варианта:
Первый (худшая альтернатива), вы можете сделать getResources
функцию, а не значение, что вы можете сделать, задав ей параметр:
let getResources () =
async { ...
А затем вызовите его с этим параметром:
let f () = getResources().Result
Это будет запускать функцию getResources
заново каждый раз, когда вы вызываете f()
, которая каждый раз будет создавать новую Task
и запускать ее.
Второй (лучший вариант), вообще не используйте Task
. Вы создаете совершенно хорошее вычисление async
, а затем превращаете его в Task
только для того, чтобы заблокировать получение результата. Почему? Вы также можете заблокировать результат async
!
let getResources = async { ... }
let getObservable interval =
let f () = getResources |> Async.RunSynchronously
...
Это работает, хотя getResources
не является функцией, потому что async
s, в отличие от Task
s, мы называем «холодными». Это означает, что если у вас в руке есть async
, это не значит, что он уже запущен. async
, в отличие от Task
, представляет собой не «уже запущенное» вычисление, а скорее «способ начать вычисление». Следствием этого является то, что вы можете запускать его несколько раз с одного и того же значения async
.
Один из способов начать это через Async.RunSynchronously
, как я делаю в моем примере выше. Это не лучший способ, потому что он блокирует текущий поток до тех пор, пока не будут выполнены вычисления, но это эквивалентно тому, что вы делали с доступом к свойству Task.Result
, которое также блокируется до тех пор, пока Task
не будет выполнено.