Поддерживают ли rpc в google appengine функцию ожидания с тайм-аутом?

Я пытаюсь реализовать следующую логику в Google Appengine:

rpc = call_external_service(timeout=T)
rpc.wait(timeout=T/2)
if rpc.done:
   return rpc.result
rpc2 = call_backup_service(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2], timeout=T/2)
return finished_rpc.result

То есть вызовите службу (используя urlfetch) с таймаутом T. Если она не завершится в T/2, попробуйте вызвать службу резервного копирования и затем дождитесь завершения любого из них.

Проблема в том, что механизм RPC, похоже, не предлагает примитив «ожидание с тайм-аутом». То есть, если я создаю RPC с крайним сроком T, я не могу сказать «подождите T / 2 секунды и посмотрите, завершился ли RPC».

Есть ли у кого-нибудь обходной путь для этого?


Редактировать: @TarunLalwani опубликовал потенциальное решение. Идея состоит в том, чтобы иметь специальный обработчик, который спит в течение заранее определенного времени (что-то вроде /sleep?delay=5), и добавить его в качестве второго параметра к UserRPC.wait_any. Т.е. что-то вроде:

rpc = call_external_service(timeout=T)
rpc2 = create_wait_rpc(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2])
if finished_rpc == finished_rpc:
  return rpc.result
rpc2 = call_backup_service(timeout=T/2)
finished_rpc = wait_any([rpc, rpc2])
return finished_rpc.result

К сожалению, похоже, что UserRPC.wait_any реализован примерно так:

def wait_any(rpcs):
    last_rpc = rpcs[-1]
    last_rpc.wait()
    return last_rpc

То есть он всегда ожидает завершения последний RPC, что в нашем случае является проблемой, потому что, если первоначальный вызов завершается менее чем за время T / 2, мы хотели бы вернуть результат немедленно, а не ждать минимум Т / 2. Я тестировал это как на локальном dev_appserver, так и в производственной среде (код быстрого тестирования можно получить из https://github.com/cdman/gae-rpc-test).

Это все еще можно заставить работать, используя очень маленький тайм-аут для rpc2, что-то вроде:

rpc = call_external_service(timeout=T)
end_time = time.time() + T/2
while time.time() < end_time:
    wait_any([rpc, create_wait_rpc(timeout=0.1)])
    if rpc.status == 2:
        return rpc.result
# else, call backup service

Однако здесь я все еще искусственно ограничиваю свое временное разрешение до 100 мс (поэтому, если первоначальный вызов завершается через 230 мс, мы возвращаем результат только через 300 мс), и я буду спамить свои журналы с много запросов к /sleep. Кроме того, это может увеличить затраты на ведение проекта.

В качестве альтернативы, если бы был какой-то RPC без операций / с низкими накладными расходами, который можно было бы передать в качестве второго параметра UserRPC.wait_any, чтобы поддерживать движение цикла событий, тогда это решение с полупустым ожиданием, возможно, могло бы работать :-)


Редактировать 2: Я реализовал версию «занято-ожидание», используя асинхронную версию memcache.get из ndb. Вы можете посмотреть исходник здесь: https://github.com/cdman/gae-rpc-test/blob/ndb-async/main.py

Теоретически это должно быть бесплатно (см. https://cloud.google.com/appengine/pricing#other-resources), но все равно похоже на взлом.


Редактировать 3: Похоже, следующая работа должен:

from google.appengine.ext.ndb import eventloop
# ...
ev = eventloop.get_event_loop()
while time.time() < end_time:
  ev.run1()
  if rpc.done():
    break
  time.sleep(0.001)

(это явно запускает цикл событий, проверьте RPC, и это не сделано, немного поспите и повторите попытку)

К сожалению, шаг «запустить цикл событий» просто блокируется до тех пор, пока urlfetch не завершится в определенный момент :(

Когда вы говорите timeout, я предполагаю, что вы используете параметр deadline?

Tarun Lalwani 15.04.2018 21:59

@TarunLalwani - да, я просто хотел, чтобы это было общим.

Grey Panther 16.04.2018 06:59

Разместил мои наблюдения по тому же, пожалуйста, посмотрите

Tarun Lalwani 16.04.2018 11:29
3
3
237
2

Ответы 2

Вы можете установить тайм-аут пользовательского запроса, используя следующий urlfetch функция:

urlfetch.set_default_fetch_deadline(value)

Обратите внимание, что эта функция сохраняет новый крайний срок по умолчанию в локальной переменной потока, поэтому он должен быть установлен для каждого запроса, например, в настраиваемом промежуточном программном обеспечении. Параметр Value - это крайний срок выполнения операции в секундах; по умолчанию - крайний срок, зависящий от системы (обычно 5 секунд).

Фактическая реализация будет зависеть от вашего языка, но как только вы установите настраиваемый тайм-аут, вы можете легко установить значение крайнего срока / 2 для вашего вызова в службу резервного копирования.

да, я знаю об этой опции (и я даже могу настроить тайм-аут для каждого запроса). Однако его использование заставляет меня выбирать между: (1) отказаться от первоначального вызова после T / 2 или (2) заблокировать на время T Я хочу (1) запустить начальный вызов (2) проверить его статус после T / 2 раза. если он не завершен, НЕ прерывайте, а запустите второй запрос. (3) после дополнительного времени T / 2 проверьте статус обоих запросов и, если какой-либо из них завершился, верните результат. Таким образом, у исходного вызова есть самое большее время для завершения T, в то время как у резервного вызова есть время для завершения T / 2.

Grey Panther 16.04.2018 06:59

Также для меня важно: если какой-либо из вызовов завершится раньше, я хотел бы «проснуться» раньше, чем слепо ждать T или T / 2 секунды - чтобы я мог вернуть результат как можно скорее.

Grey Panther 16.04.2018 07:00

Это не сработает, я что-то протестировал, скоро опубликую свой вывод

Tarun Lalwani 16.04.2018 07:13

TL; DR;

После того, как я покопался в исходном коде python appengine sdk, ниже приведены мои наблюдения.

wait_any - это не то, на что похоже

Когда вы используете wait_any на двух RPCs, вам нужен тот, который закончил первым, но логика этого не кажется такой.

assert iter(rpcs) is not rpcs, 'rpcs must be a collection, not an iterator'
finished, running = cls.__check_one(rpcs)
if finished is not None:
  return finished
if running is None:
  return None
try:
  cls.__local.may_interrupt_wait = True
  try:
    running.__rpc.Wait()
  except apiproxy_errors.InterruptedError, err:
    err.rpc._exception = None
    err.rpc._traceback = None
finally:
  cls.__local.may_interrupt_wait = False
finished, runnning = cls.__check_one(rpcs)
return finished

В приведенной ниже строке кода

finished, running = cls.__check_one(rpcs)

Код метода __check_one приведен ниже.

rpc = None
for rpc in rpcs:
  assert isinstance(rpc, cls), repr(rpc)
  state = rpc.__rpc.state
  if state == apiproxy_rpc.RPC.FINISHING:
    rpc.__call_user_callback()
    return rpc, None
  assert state != apiproxy_rpc.RPC.IDLE, repr(rpc)
return None, rpc

Таким образом, он просто проверяет, завершен ли какой-либо из них, и если нет, он возвращает последний из коллекции in, последний return None, rpc

Затем wait_any обращается к running.__rpc.Wait(). Итак, был создан простой обработчик sleep / delay для того же

класс SleepHandler (webapp2.RequestHandler):

def get(self):
    delay = float(self.request.get('delay')) if self.request.get('delay') else 10
    sleep(delay)
    self.response.status_int = 200
    self.response.write('Response delayed by {}'.format(delay))

И добавлен ниже MainHandler для тестирования deadline

class MainHandler(webapp2.RequestHandler):
    def get(self):
        # rpc = UserRPC('dummywait', 5, stubmap=MyStubMap)

        rpc = urlfetch.create_rpc(deadline=2.0)
        rpc2 = urlfetch.create_rpc(deadline=6.0)
        urlfetch.make_fetch_call(rpc, self.request.host_url + "/sleep?delay=1")
        urlfetch.make_fetch_call(rpc2, self.request.host_url + "/sleep?delay=5")
        try:
            print(datetime.now())
            finished = apiproxy_stub_map.UserRPC.wait_any([rpc, rpc2])
            print(finished.request.url_)
            print(datetime.now())
            i = 0
        except Exception as ex:
            print_exception(ex)
        # ... do other things ...
        try:
            print(datetime.now())
            result = finished.get_result()
            print(datetime.now())
            if result.status_code == 200:
                text = result.content
                self.response.write(text)
            else:
                self.response.status_int = result.status_code
                self.response.write('URL returned status code {}'.format(
                    result.status_code))
        except urlfetch.DownloadError:
            print(datetime.now())
            self.response.status_int = 500
            self.response.write('Error fetching URL')


app = webapp2.WSGIApplication([
    ('/', MainHandler),
    ('/sleep', SleepHandler),
], debug=True)

Итак, ниже приведены точки в приведенном выше коде

  • rpc имеет крайний срок 2.0 секунд, и фактический запрос завершается в 1.0.
  • rpc2 имеет крайний срок 6.0 секунд, и фактический запрос завершается в 5.0.

Теперь в идеале можно ожидать, что мы получим rpc как завершенную задачу и данные, отображаемые в ответе по URL-адресу. Но на выходе

RPC response

Теперь, если мы изменим порядок аргументов wait_any с

finished = apiproxy_stub_map.UserRPC.wait_any([rpc, rpc2])

к

finished = apiproxy_stub_map.UserRPC.wait_any([rpc2, rpc])

Результат изменится на ниже

RPC wins

Это означает, что если вы создадите крайний срок для T и T/2, то ваше минимальное время ожидания всегда будет T/2, если вы используете его в качестве последнего параметра.

Так что какие бы решения вы ни пытались решить в Google appengine, они все равно будут грязными уловками. Теперь возможный трюк - делать интервалы. Ниже один такой образец

    T = 10.0
    # Deadline T
    rpc_main = urlfetch.create_rpc(deadline=T)

    # Deadline T/2
    rpc_backup = urlfetch.create_rpc(deadline=T / 2)

    urlfetch.make_fetch_call(rpc_main, self.request.host_url + "/sleep?delay=7")

    i = 0.0

    while i < T / 2:
        rpc_compare = urlfetch.create_rpc()
        urlfetch.make_fetch_call(rpc_compare, self.request.host_url + "/sleep?delay=0.5")

        finished = apiproxy_stub_map.UserRPC.wait_any([rpc_main, rpc_compare])
        i += 0.5
        if finished == rpc_main:
            break

    if finished != rpc_main:
        # we need to fire a backup request
        urlfetch.make_fetch_call(rpc_backup, self.request.host_url + "/sleep?delay=1")

        finished = apiproxy_stub_map.UserRPC.wait_any([rpc_backup, rpc_main])

    try:
        finished.get_result()
    except DeadlineExceededError as ex:
        # Rpc main finished with error then we need to switch to Backup request 
        finished = rpc_backup

Здесь мы отдаем приоритет rpc_main вместо резервного копирования, хотя в этом случае резервное копирование завершается первым, мы получаем ответ от rpc_main.

RPC Main

Теперь, если я изменю основной RPC, чтобы он упал ниже крайнего срока

    urlfetch.make_fetch_call(rpc_main, self.request.host_url + "/sleep?delay=20")

Выход изменится на

RPC Backup wins

Таким образом, он показывает как опрос, так и наихудший сценарий ожидания. Это единственный возможный обходной путь / реализация, которую я смог отработать, глядя на исходный исходный код.

Спасибо. Таким образом, основной трюк состоит в том, чтобы (а) создать другой RPC, нацеленный на конечную точку, которая задерживает засыпание на определенное время, и (б) выполнить wait_any с обоими RPC, но убедитесь, что тот RPC, который вы ожидаете завершить первым, будет первый. Я пропустил вторую часть, когда проводил эксперименты.

Grey Panther 16.04.2018 13:59

Последующий вопрос: знаете ли вы о других RPC, которые я мог бы использовать для ожидания определенного времени, кроме urlfetch? (возможно, я могу использовать хранилище данных / memcache / etc RPC). Просто кажется глупым (и потенциально дорогостоящим!) Запускать внешний URL-адрес всякий раз, когда я хочу немного поспать.

Grey Panther 16.04.2018 14:13

Это не внешний, а внутренний вызов одного и того же обработчика. Поэтому я использовал обработчик в том же проекте. Но я не уверен в том, как это повлияет на стоимость, вы можете попробовать демо и посмотреть, как это пойдет.

Tarun Lalwani 16.04.2018 14:16

Я опубликовал редактирование вопроса с некоторыми обновлениями. Я думаю, что мы движемся в правильном направлении, но мы еще не достигли цели.

Grey Panther 17.04.2018 16:34

@GreyPanther, я видел, как это работает, так это то, что основной код, который выполняется, на самом деле выполняет обратный вызов бэкэнду администратора и передает протобуфер администратору и запрашивает его на самом деле сделать вызов api и установить крайний срок. У меня есть возможное обходное решение, которое я изучаю, но я не уверен, как это повлияет на параллельные запросы.

Tarun Lalwani 17.04.2018 17:40

Пожалуйста, ознакомьтесь с моими последними изменениями - мне удалось реализовать версию с напряженным ожиданием, но я с нетерпением жду возможности проверить, что вы придумали!

Grey Panther 17.04.2018 22:27

На самом деле я смотрел google/appengine/ext/remote_api/remote_api_stub.py, encoded_response = self._server.Send(self._path, encoded_request), вы можете отправить сюда timeout. Но я думаю, что тайм-аут может повлиять на оба вызова.

Tarun Lalwani 17.04.2018 22:44

@GreyPanther, я думаю, у меня нет дальнейшего обновления / обходного пути. Дайте мне знать, как вы хотите двигаться дальше, хотите ли вы, чтобы я обновил свои ответы вашими выводами, или вы хотите дать другой ответ?

Tarun Lalwani 20.04.2018 13:40

Я просто принял ваш ответ, потому что - хотя он не дал окончательного решения - он очень помог мне прояснить мои мысли, и также я думаю, что нашел ответ! Я скоро добавлю ответ.

Grey Panther 20.04.2018 19:21

Рад слышать это и тоже жду вашего ответа :-)

Tarun Lalwani 20.04.2018 19:22

К сожалению, я не нашел работоспособного решения - см. Последнее изменение. И также кажется, что хотя операции с кэшем памяти не оплачиваются, на них определенно есть квота, и, вероятно, агрессивное выполнение memcache.get в цикле быстро приведет к тому, что я исчерпаю цитату, и вызовы станут ограниченными по скорости. Я постараюсь связаться со службой поддержки Google.

Grey Panther 20.04.2018 19:55

Да, держите нас в курсе, когда получите ответ от службы поддержки Google. Этого нелегко достичь без таймера типа события, в случае Node вы просто запустите setTimeout для запуска с T/2, и все станет намного проще. Но с Python становится немного сложно

Tarun Lalwani 20.04.2018 20:04

Мой контакт в Google только что подтвердил, что это невозможно с Python 2.7 в стандарте Google AppEngine, однако мой вариант использования был отмечен, и они рассматривают возможность добавления его поддержки: |

Grey Panther 11.05.2018 19:53

Спасибо за обновление @GreyPanther, надеюсь, они скоро добавят его для вас :-)

Tarun Lalwani 11.05.2018 19:55

Другие вопросы по теме