Aws lambda для активации конвейера данных

Я пытаюсь активировать конвейер данных на основе наличия файлов *.tar в S3. Я создал функцию Lambda и написал код Python Boto 3 для активации конвейера данных. Я протестировал функцию Lambda и обнаружил, что она работает, когда файл .tar существует, конвейер данных активирован, если конвейер данных не существует, не активируется.

Я пытаюсь понять причину этих проблем:

  1. Если в папке s3 нет файлов tar, print ("datapipeline not activated") не печатается в журналах.
  2. Если я прерываю конвейер данных в предыдущем запуске, и он помечается как завершенный до завершения конвейера данных, затем снова запускаю лямбда-функцию, я получаю следующую ошибку.

    ERROR: The field 'maxActiveInstances' can be set only on the Default object for On-demand pipelines

  3. Когда я попытался установить maxActiveInstances для ресурса EMR в конвейере данных,

    { "errorMessage": "An error occurred (InvalidRequestException) when calling the ActivatePipeline operation: Web service limit exceeded: Exceeded number of concurrent executions. Please set the field 'maxActiveInstances' to a higher value in your pipeline or wait for the currenly running executions to complete before trying again", "errorType": "InvalidRequestException", "stackTrace": [ [ "/var/task/lambda_function.py", 21, "lambda_handler", "activate = client.activate_pipeline(pipelineId=data_pipeline_id,parameterValues=[])" ], [ "/var/runtime/botocore/client.py", 314, "_api_call", "return self._make_api_call(operation_name, kwargs)" ], [ "/var/runtime/botocore/client.py", 612, "_make_api_call", "raise error_class(parsed_response, operation_name)" ] ] }

Это сценарий Python, пожалуйста, дайте рекомендации по устранению этих проблем.

import boto3
import logging
logger = logging.getLogger()

def lambda_handler(event, context):
client = boto3.client('datapipeline')
s3_client = boto3.client('s3')
#client = boto3.client('datapipeline')
data_pipeline_id="df-xxxxxxxx"
bucket = 'xxxxx'
prefix = 'xxxx/xxxxx/'
paginator = s3_client.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
response_pipeline = client.describe_pipelines(pipelineIds=[data_pipeline_id])
for response in response_iterator:
for object_data in response['Contents']:
key = object_data['Key']
    #print (key)
if key.endswith('.tar'):
if(response_pipeline):
activate = client.activate_pipeline(pipelineId=data_pipeline_id,parameterValues=[])
print ("activated")
else:
print ("datapipeline not activated")
0
0
1 894
1

Ответы 1

Думаю, я только что видел те же симптомы. Надеюсь, мы поделимся нашим исправлением, и это поможет вам?

Мы отменили экземпляр конвейера, и нам нужно было снова включить конвейер, чтобы избежать этой ошибки.

Другие вопросы по теме