Создание конвейера с запланированным триггером с ADFV2

Я пытаюсь перенести конвейер, который уже существует в ADFV1, на ADFV2, и у меня есть некоторые проблемы с концепцией триггеров. У моего конвейера есть два действия: первое - это действие Azure Data Lake Analytics, а второе - действие копирования. Первое действие запускает сценарий usql, в котором данные считываются из разделенной папки / {yyyy} / {MM} / {dd} /, обрабатывают их и записывают в папку / {yyyy} - {MM} - {dd} /. Вот несколько файлов JSON с моей фабрики (конвейер, триггер и наборы данных).

Трубопровод:

{
"name": "StreamCompressionBlob2SQL",
"properties": {
    "activities": [
        {
            "name": "compress",
            "type": "DataLakeAnalyticsU-SQL",
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "typeProperties": {
                "scriptPath": "d00044653/azure-configurations/usql-scripts/stream/compression.usql",
                "scriptLinkedService": {
                    "referenceName": "AzureBlobStorage",
                    "type": "LinkedServiceReference"
                },
                "parameters": {
                    "Year": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')",
                        "type": "Expression"
                    },
                    "Month": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'MM')",
                        "type": "Expression"
                    },
                    "Day": {
                        "value": "@formatDateTime(pipeline().parameters.windowStartTime,'dd')",
                        "type": "Expression"
                    }
                }
            },
            "linkedServiceName": {
                "referenceName": "AzureDataLakeAnalytics1",
                "type": "LinkedServiceReference"
            }
        },
        {
            "name": "Blob2SQL",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "compress",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "7.00:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "typeProperties": {
                "source": {
                    "type": "BlobSource",
                    "recursive": true
                },
                "sink": {
                    "type": "SqlSink",
                    "writeBatchSize": 10000
                },
                "enableStaging": false,
                "dataIntegrationUnits": 0,
                "translator": {
                    "type": "TabularTranslator",
                    "columnMappings": {
                        "tag": "TAG",
                        "device_id": "DEVICE_ID",
                        "system_id": "SYSTEM_ID",
                        "utc": "UTC",
                        "ts": "TS",
                        "median": "MEDIAN",
                        "min": "MIN",
                        "max": "MAX",
                        "avg": "AVG",
                        "stdev": "STDEV",
                        "first_value": "FIRST_VALUE",
                        "last_value": "LAST_VALUE",
                        "message_count": "MESSAGE_COUNT"
                    }
                }
            },
            "inputs": [
                {
                    "referenceName": "AzureBlobDataset_COMPRESSED_ASA_v1",
                    "type": "DatasetReference"
                }
            ],
            "outputs": [
                {
                    "referenceName": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
                    "type": "DatasetReference"
                }
            ]
        }
    ],
    "parameters": {
        "windowStartTime": {
            "type": "String"
        }
    }
}

}

Курок:

{
"name": "trigger1",
"properties": {
    "runtimeState": "Started",
    "pipelines": [
        {
            "pipelineReference": {
                "referenceName": "StreamCompressionBlob2SQL",
                "type": "PipelineReference"
            },
            "parameters": {
                "windowStartTime": "@trigger().scheduledTime"
            }
        }
    ],
    "type": "ScheduleTrigger",
    "typeProperties": {
        "recurrence": {
            "frequency": "Day",
            "interval": 1,
            "startTime": "2018-08-17T10:46:00.000Z",
            "endTime": "2018-11-04T10:46:00.000Z",
            "timeZone": "UTC"
        }
    }
}

}

Входной набор данных для операции копирования:

{
"name": "AzureBlobDataset_COMPRESSED_ASA_v1",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureBlobStorage",
        "type": "LinkedServiceReference"
    },
    "parameters": {
        "Year": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        },
        "Month": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        },
        "Day": {
            "type": "String",
            "defaultValue": "@formatDateTime(pipeline().parameters.windowStartTime,'yyyy')"
        }
    },
    "type": "AzureBlob",
    "structure": [
        {
            "name": "tag",
            "type": "String"
        },
        {
            "name": "device_id",
            "type": "String"
        },
        {
            "name": "system_id",
            "type": "String"
        },
        {
            "name": "utc",
            "type": "DateTime"
        },
        {
            "name": "ts",
            "type": "DateTime"
        },
        {
            "name": "median",
            "type": "Double"
        },
        {
            "name": "min",
            "type": "Double"
        },
        {
            "name": "max",
            "type": "Double"
        },
        {
            "name": "avg",
            "type": "Double"
        },
        {
            "name": "stdev",
            "type": "Double"
        },
        {
            "name": "first_value",
            "type": "Double"
        },
        {
            "name": "last_value",
            "type": "Double"
        },
        {
            "name": "message_count",
            "type": "Int16"
        }
    ],
    "typeProperties": {
        "format": {
            "type": "TextFormat",
            "columnDelimiter": ";",
            "nullValue": "\\N",
            "treatEmptyAsNull": true,
            "skipLineCount": 0,
            "firstRowAsHeader": true
        },
        "fileName": "",
        "folderPath": {
            "value": "@concat('d00044653/processed/stream/compressed',dataset().Year,'-',dataset().Month,'-',dataset().Day)",
            "type": "Expression"
        }
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

Выходной набор данных для операции копирования:

{
"name": "AzureSQLDataset_T_ASSET_MONITORING_WARM_ASA_v1",
"properties": {
    "linkedServiceName": {
        "referenceName": "AzureSqlDatabase1",
        "type": "LinkedServiceReference"
    },
    "type": "AzureSqlTable",
    "structure": [
        {
            "name": "TAG",
            "type": "String"
        },
        {
            "name": "DEVICE_ID",
            "type": "String"
        },
        {
            "name": "SYSTEM_ID",
            "type": "String"
        },
        {
            "name": "UTC",
            "type": "DateTime"
        },
        {
            "name": "TS",
            "type": "DateTime"
        },
        {
            "name": "MEDIAN",
            "type": "Decimal"
        },
        {
            "name": "MIN",
            "type": "Decimal"
        },
        {
            "name": "MAX",
            "type": "Decimal"
        },
        {
            "name": "AVG",
            "type": "Decimal"
        },
        {
            "name": "STDEV",
            "type": "Decimal"
        },
        {
            "name": "FIRST_VALUE",
            "type": "Decimal"
        },
        {
            "name": "LAST_VALUE",
            "type": "Decimal"
        },
        {
            "name": "MESSAGE_COUNT",
            "type": "Int32"
        }
    ],
    "typeProperties": {
        "tableName": "[dbo].[T_ASSET_MONITORING_WARM]"
    }
},
"type": "Microsoft.DataFactory/factories/datasets"

}

Моя проблема в том, что после публикации ничего не происходит. Какие-либо предложения??

Вы запустили конвейер и когда вы запустили конвейер? "startTime": "2018-08-17T10:46:00.000Z", "endTime": "2018-11-04T10:46:00.000Z", Это закончилось 4 ноября?

Bo Xiao 14.11.2018 03:24
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
0
1
664
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Триггер расписания не поддерживает сценарий обратной засыпки (в зависимости от определения триггера - вы начинаете с 17 августа 2018 г.). В триггере расписания запуски конвейера могут выполняться только в периоды времени от текущее время и в будущем.

В вашем случае для сценариев обратной засыпки используйте триггер Tumbling window.

Спасибо, что это было. Я не знал об этом, и с листанием окон он работает.

Veysel Ko 21.11.2018 10:03

Рад, что помог. Не могли бы вы отметить это как ответ, если он решил ваш вопрос.

databash 21.11.2018 10:23

Другие вопросы по теме