Потоковая передача запросов клиента gRPC: — «Клиент сбросил поток запросов»

Я пытаюсь реализовать потоковую передачу запросов от моего клиента Python на мой сервер С#, используя gRPC. Это мой профиль:

    syntax = "proto3";
        
    service PerceiveAPIDataService {
      rpc UploadResource (stream UploadResourceRequest) returns (UploadResourceResponse);
    }
    
    message UploadResourceRequest {
      oneof request_data {
        ResourceChunk resource_chunk = 1;
        UploadResourceParameters parameters = 2;
      }
    }
    
    message ResourceChunk {
        bytes content = 1; 
    }
    
    message UploadResourceParameters {
        string path = 1;
    }

Это моя реализация С#:

    public override async Task<UploadResourceResponse> UploadResource(IAsyncStreamReader<UploadResourceRequest> requestStream, ServerCallContext context)
    {
        if (!await requestStream.MoveNext())
        {
            throw new RpcException(new Status(StatusCode.FailedPrecondition, "No upload parameters found."));
        }

        var initialMessage = requestStream.Current;
        if (initialMessage.RequestDataCase != UploadResourceRequest.RequestDataOneofCase.Parameters)
        {
            throw new RpcException(new Status(StatusCode.FailedPrecondition, "First message must contain upload parameters."));
        }

        var path = initialMessage.Parameters.Path;
        if (string.IsNullOrWhiteSpace(path))
        {
            throw new RpcException(new Status(StatusCode.InvalidArgument, "Upload path is required."));
        }

        using (var ms = new MemoryStream())
        {
            while (await requestStream.MoveNext())
            {
                var chunk = requestStream.Current.ResourceChunk;
                if (chunk == null)
                {
                    continue;  // Skip any messages that are not resource chunks
                }

                await ms.WriteAsync(chunk.Content.ToByteArray().AsMemory(0, chunk.Content.Length));
            }

            ms.Seek(0, SeekOrigin.Begin); // Reset memory stream position to the beginning for reading during upload
            var uploadResult = await _dataService.UploadResourceAsync(path, ms);
            return new UploadResourceResponse { Succeeded = uploadResult.IsSuccessful };
        }
    }

И это мой клиентский код Python:

    def generate_request(self, data: bytearray, next_cloud_path: str) -> Generator:
        first_req = perceive_api_data_service_pb2.UploadResourceRequest(
            parameters=perceive_api_data_service_pb2.UploadResourceParameters(path=next_cloud_path)
        )
        yield first_req
        print("Sent initial request with path:", next_cloud_path)
        
        chunk_size = 2048
        total_chunks = (len(data) + chunk_size - 1) // chunk_size  # Ceiling division to get total number of chunks
        print(f"Data size: {len(data)} bytes, chunk size: {chunk_size} bytes, total chunks: {total_chunks}")
        
        for i in range(0, len(data), chunk_size):
            chunk = data[i:i+chunk_size]
            yield perceive_api_data_service_pb2.UploadResourceRequest(
                resource_chunk=perceive_api_data_service_pb2.ResourceChunk(content=chunk)
            )
            print(f"Sent chunk {((i // chunk_size) + 1)} of {total_chunks}")
    
    async def upload_file(self, data: bytearray, next_cloud_path: str) -> bool:
        
        async with grpc.aio.insecure_channel("localhost:5228") as channel:
            stub = perceive_api_data_service_pb2_grpc.PerceiveAPIDataServiceStub(channel)
            
            request_iterator = self.generate_request(data, next_cloud_path)
            
            response = await stub.UploadResource(request_iterator)
                        
            return response.succeeded

Ошибка, которую я получаю на сервере, заключается в следующем:

Grpc.AspNetCore.Server.ServerCallHandler: Ошибка: ошибка при выполнении метода службы «UploadResource».

System.IO.IOException: клиент сбросил поток запросов. в System.IO.Pipelines.Pipe.GetReadResult(ReadResult& результат) в System.IO.Pipelines.Pipe.GetReadAsyncResult()

И ошибка, которую я получаю на клиенте, заключается в следующем:

Файл "C:\Users\имя_пользователя\AppData\Local\Programs\Python\Python311\Lib\site-packages\grpc\aio_call.py", строка 690 в _conduct_rpc сериализованный_ответ = ожидание self._cython_call.stream_unary( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Файл "src\python\grpcio\grpc_cython_cygrpc/aio/call .pyx.pxi", строка 458, в файлеstream_unary "src\python\grpcio\grpc_cython_cygrpc/aio/callback_common.pyx.pxi", строка 166 в файле _receive_initial_metadata "src\python\grpcio\grpc_cython_cygrpc/aio/callback_common.pyx.pxi", строка 99 в Execute_batch asyncio.Exceptions.CancelledError

Первое сообщение отправлено успешно. Итак параметры. Однако как только сервер пытается выполнить requestStream.MoveNext(), он выдает эту ошибку.

Я уже пробовал множество разных решений, но не могу найти ничего подходящего. Кто-нибудь видит, где я делаю ошибку?

Ваш вопрос не является минимальным воспроизводимым примером; вы упускаете значительную часть решения. Я не разработчик .NET, но ваш код кажется слишком сложным. См. этот пример сервера C#, реализующего потоковую передачу клиента , и клиента Python, реализующего потоковую передачу запросов, который может помочь.

DazWilkin 18.04.2024 16:46
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
1
149
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Оказывается, для потоковой передачи файла он должен сначала быть закодирован в Base64. В конце концов, в этом и заключалась вся проблема.

Другие вопросы по теме