Я создаю заглушку, которая подключается к серверу, который передает данные с определенным интервалом, а затем загружаю их в TSDB. Я реализовал пакетную обработку для оптимизации загрузки, но если объем данных, передаваемых за один интервал, не соответствует размеру пакета, некоторые данные не будут загружены до следующего интервала, что мне не нужно. Есть ли способ на заглушке gRPC проверить, пуст ли поток?
class DialInClient(object):
def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
self._host = host
self._port = port
self._timeout = float(timeout)
self._channel = None
self._cisco_ems_stub = None
self._connected = False
self._metadata = [('username', user), ('password', password)]
def subscribe(self, sub_id):
sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
for segment in stream:
yield segment
def connect(self):
self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
try:
grpc.channel_ready_future(self._channel).result(timeout=10)
self._connected = True
except grpc.FutureTimeoutError as e:
raise DeviceFailedToConnect from e
else:
self._cisco_ems_stub = gRPCConfigOperStub(self._channel)
Если я установил низкий тайм-аут, весь канал отключается, я хочу добавить какой-то тайм-аут в цикле for для потоковой передачи, чтобы увидеть, не получу ли я еще один сегмент за 1 секунду, yield None
, чтобы сообщить моей другой части, что это конец и загрузить без полного размера пакета.
Такой механизм изначально не существует в GRPC, но библиотека threading
должна позволять вам отправлять пакеты до того, как они заполнятся. Я включил модифицированную версию Пример python GRPC hello world, чтобы дать вам представление о том, как это можно сделать.
from __future__ import print_function
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
import threading
from six.moves import queue
import time
# 10 second batches
BATCH_PERIOD = 10.0
def collect_responses(resp_queue, finished):
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings = "100"))):
resp_queue.put(response)
finished.set()
def is_batch_end(batch_start):
return time.time() - batch_start < BATCH_PERIOD
def get_remaining_time(time_start):
return (time_start + BATCH_PERIOD) - time.time()
def batch_responses(resp_queue, finished):
batch_num = 0
while True:
batch_resps = []
batch_start = time.time()
remaining_time = get_remaining_time(batch_start)
while remaining_time > 0.0 and not finished.is_set():
try:
batch_resps.append(resp_queue.get())
except queue.Empty:
pass
finally:
remaining_time = get_remaining_time(batch_start)
print("Batch {} ({}):".format(batch_num + 1, len(batch_resps)))
for resp in batch_resps:
print(" '{}'".format(resp.message))
batch_num += 1
def run():
resp_queue = queue.Queue()
finished = threading.Event()
client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))
client_thread.start()
batch_responses(resp_queue, finished)
client_thread.join()
if __name__ == '__main__':
run()