Вызывает ли async* функцию лениво?

При написании юнит-тестов для своего проекта я столкнулся с интересной проблемой.

Вот карта, которую пользователь может использовать для размещения маркеров:

class DomainMap {
  static const _DEFAULT_COORDINATE = const Coordinate(40.73, -73.93);
  final ReverseGeocodingStrategy _geocodingStrategy;
  final RouteDefinitionStrategy _assemblyStrategy;
  final List<_IdentifiedCoordinate> _addressed = [];
  final List<Coordinate> _markers = [];
  final _Route _route = _Route();

  Coordinate get defaultCoordinate => _DEFAULT_COORDINATE;

  DomainMap(this._geocodingStrategy, this._assemblyStrategy);

  Stream<MarkersUpdateEvent> mark(Coordinate coordinate) async* {
    _markers.add(coordinate);
    yield _assembleMarkersUpdate();
    final Address address = await _geocodingStrategy.geocode(coordinate);
    _addressed.add(_IdentifiedCoordinate(coordinate, address));
    if (_addressed.length > 1) {
      final Iterable<Coordinate> assembledPolyline =
          await _assemblyStrategy.buildRoute(BuiltList(_addressed
              .map((identifiedCoordinate) => identifiedCoordinate.address)));
      assembledPolyline.forEach(_route.add);
      yield _assembleMarkersUpdate();
    }
  }

  MarkersUpdateEvent _assembleMarkersUpdate() =>
      MarkersUpdateEvent(BuiltList.from(_markers), _route.readOnly);
}

class _Route {
  final List<Coordinate> _points = [];

  Iterable<Coordinate> get readOnly => BuiltList(_points);

  void add(final Coordinate coordinate) => _points.add(coordinate);

  void addAll(final Iterable<Coordinate> coordinate) => _points.addAll(coordinate);
}

И вот модульный тест для него, который проверяет, что на второй метке здесь должен быть возвращен маршрут:

test("mark, assert that on second mark at first just markers update is published, and then the polyline update too", () async {
  final Coordinate secondCoordinate = plus(givenCoordinate, 1);
  final givenRoute = [
    givenCoordinate,
    minus(givenCoordinate, 1),
    plus(givenCoordinate, 1)
  ];
  when(geocodingStrategy.geocode(any)).thenAnswer((invocation) => Future.value(Address(invocation.positionalArguments[0].toString())));
  when(assemblyStrategy.buildRoute(any))
    .thenAnswer((_) => Future.value(givenRoute));
  final expectedFirstUpdate =
    MarkersUpdateEvent([givenCoordinate, secondCoordinate], []);
  final expectedSecondUpdate =
    MarkersUpdateEvent([givenCoordinate, secondCoordinate], givenRoute);
  final DomainMap map = domainMap();
  map.mark(givenCoordinate)
  //.forEach(print) //Important
  ;
  expect(map.mark(secondCoordinate),
    emitsInOrder([expectedFirstUpdate, expectedSecondUpdate]));
}, timeout: const Timeout(const Duration(seconds: 10)));

Когда я запускаю его таким образом, тест завершается с ошибкой и говорит, что поток выдал только одно значение — событие обновления только с непустым полем markers, которое содержит только secondCoordinate. Но когда я раскомментирую forEach, тест проходит.

Насколько я понимаю - метод async* не вызывается до тех пор, пока значения потока не будут запрошены, поэтому при вызове forEach функция выполняется до конца. Поэтому, если я запрашиваю все значения потока (который был возвращен из первого вызова) - метод выполняется, список markers заполняется, а второе выполнение выполняется в ожидаемом состоянии.

Я правильно понимаю async* семантику? И есть ли способ сделать эту функцию активной, а не ленивой (я не хочу запрашивать ненужные значения потока)?

Асинхронная передача данных с помощью sendBeacon в JavaScript
Асинхронная передача данных с помощью sendBeacon в JavaScript
В современных веб-приложениях отправка данных из JavaScript на стороне клиента на сервер является распространенной задачей. Одним из популярных...
0
0
19
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Да, async* лениво вызывает функцию после того, как вы вызвали listen в возвращаемом потоке. Если вы никогда не слушаете, то ничего не происходит. Он даже делает это асинхронно, а не напрямую в ответ на вызов listen.

Итак, если вам определенно нужно, чтобы что-то произошло, но, возможно, вам нужно только посмотреть на ответ, вы не можете использовать функцию async*, чтобы что-то сделать.

Что вы, вероятно, захотите сделать, так это заполнить поток условно, но только если поток действительно прослушивается. Это нетрадиционная последовательность операций, которая не соответствует async* или даже async семантике. Вы должны быть готовы к тому, что операция завершится, а потом поток прослушивается позже. Это предполагает разделение операции на две части: одну async для запроса и одну async* для ответа, и разделение будущего между ними двумя, что означает прослушивание одного и того же будущего дважды, что явно не-async поведение.

Я бы рекомендовал разделить поведение потока и использовать для этого StreamController.

Stream<MarkersUpdateEvent> mark(Coordinate coordinate) {
  var result = StreamController<MarkersUpdateEvent>();
  () async {
    _markers.add(coordinate);
    result.add(_assembleMarkersUpdate());
    final Address address = await _geocodingStrategy.geocode(coordinate);
    _addressed.add(_IdentifiedCoordinate(coordinate, address));
    if (_addressed.length > 1) {
      final Iterable<Coordinate> assembledPolyline =
          await _assemblyStrategy.buildRoute(BuiltList(_addressed
              .map((identifiedCoordinate) => identifiedCoordinate.address)));
      assembledPolyline.forEach(_route.add);
      result.add(_assembleMarkersUpdate());
    }
    result.close();
  }().catchError(result.addError);
  return result.stream;
}

Таким образом, логика программы работает независимо от того, слушает ли кто-нибудь поток. Вы по-прежнему буферизуете все потоковые события. Нет реального способа избежать этого, если только вы не сможете вычислить их позже, потому что вы не можете знать, когда кто-то может прослушать возвращенный поток. Это не должно происходить сразу же после его возвращения.

Другие вопросы по теме