Использование соединителя ADF REST для чтения и преобразования данных FHIR

Я пытаюсь использовать фабрику данных Azure для чтения данных с сервера FHIR и преобразования результатов в файлы JSON (ndjson) с разделителями новой строки в хранилище BLOB-объектов Azure. В частности, если вы запросите сервер FHIR, вы можете получить что-то вроде:

{
    "resourceType": "Bundle",
    "id": "som-id",
    "type": "searchset",
    "link": [
        {
            "relation": "next",
            "url": "https://fhirserver/?ct=token"
        },
        {
            "relation": "self",
            "url": "https://fhirserver/"
        }
    ],
    "entry": [
        {
            "fullUrl": "https://fhirserver/Organization/1234",
            "resource": {
                "resourceType": "Organization",
                "id": "1234",
                // More fields
        },
        {
            "fullUrl": "https://fhirserver/Organization/456",
            "resource": {
                "resourceType": "Organization",
                "id": "456",
                // More fields
        },

        // More resources
    ]
}

По сути набор ресурсов. Я хотел бы преобразовать это в файл с разделителями новой строки (также известный как ndjson), где каждая строка является просто json для ресурса:

{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources

Я могу настроить коннектор REST, и он может запрашивать сервер FHIR (включая разбивку на страницы), но независимо от того, что я пытаюсь сделать, я не могу сгенерировать нужный мне вывод. Я настроил набор данных хранилища BLOB-объектов Azure:

{
    "name": "AzureBlob1",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "typeProperties": {
            "format": {
                "type": "JsonFormat",
                "filePattern": "setOfObjects"
            },
            "fileName": "myout.json",
            "folderPath": "outfhirfromadf"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

И настройте действие копирования:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "resource": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Но в конце (несмотря на настройку сопоставления схемы) конечным результатом в большом двоичном объекте всегда является исходный пакет, возвращенный с сервера. Если я настрою выходной большой двоичный объект как текст с разделителями-запятыми, я смогу извлечь поля и создать плоское табличное представление, но это не совсем то, что мне нужно.

Любые предложения будут высоко ценится.

До сих пор мой опыт работы с действием копирования фабрики данных Azure заключался в том, что он только копировал данные с одного места на другое, и каждый раз, когда мне нужно было какое-то преобразование, мне было больно :) Будет ли это приемлемо для вас, чтобы рассмотреть Databricks и использовать некоторые сценарии python/scala? сделать преобразование вам нужно?

Kzrystof 25.04.2019 22:51

@Kzrystof, спасибо за ваш комментарий. Это был бы один из возможных вариантов. Думаю, я пытался посмотреть, как далеко я смогу продвинуться с ADF на этом пути. Так что да, определенно вариант, но я также хотел бы знать, возможно ли это (или будет ли) с ADF.

MichaelHansen 26.04.2019 00:26

О, я понимаю, что вы имеете в виду :) Я на самом деле задавал аналогичный вопрос несколько недель назад о том, как действие копирования может исключать строки, которые не соответствуют определенным критериям...

Kzrystof 26.04.2019 00:43
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
Как установить LAMP Stack - Security 5/5 на виртуальную машину Azure Linux VM
В предыдущей статье мы завершили установку базы данных, для тех, кто не знает.
Как установить LAMP Stack 1/2 на Azure Linux VM
Как установить LAMP Stack 1/2 на Azure Linux VM
В дополнение к нашему предыдущему сообщению о намерении Azure прекратить поддержку Azure Database для MySQL в качестве единого сервера после 16...
3
3
758
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Как кратко обсуждалось в комментарии, Copy Activity не предоставляет много функций, кроме отображения данных. Как указано в документации, Копирование действий выполняет следующие операции:

  1. Reads data from a source data store.
  2. Performs serialization/deserialization, compression/decompression, column mapping, etc. It does these operations based on the configurations of the input dataset, output dataset, and Copy Activity.
  3. Writes data to the sink/destination data store.

Не похоже, что Copy Activity делает что-то еще, кроме эффективного копирования материала.

То, что я обнаружил, работало, так это использовать Databrick.

Вот шаги:

  1. Добавьте учетную запись Databricks в свою подписку;
  2. Перейдите на страницу Databricks, нажав кнопку авторинга;
  3. Создать блокнот;
  4. Напишите скрипт (Scala, Python или .Net был недавно анонсирован).

Сценарий будет следующим:

  1. Чтение данных из хранилища BLOB-объектов;
  2. Отфильтруйте и преобразуйте данные по мере необходимости;
  3. Запишите данные обратно в хранилище BLOB-объектов;

Вы можете протестировать свой сценарий оттуда и, когда он будет готов, вы можете вернуться к своему конвейеру и создать Записная книжка, который будет указывать на вашу записную книжку, содержащую сценарий.

Я изо всех сил пытался кодировать на Scala, но оно того стоило :)

Ответ принят как подходящий

Так что вроде нашел решение. Если я выполняю первоначальный шаг преобразования, при котором пакеты просто сбрасываются в файл JSON, а затем выполняю другое преобразование из файла JSON в то, что я притворяюсь текстовым файлом, в другой большой двоичный объект, я могу создать файл njson.

По сути, определите другой набор данных больших двоичных объектов:

{
    "name": "AzureBlob2",
    "properties": {
        "linkedServiceName": {
            "referenceName": "AzureBlobStorage1",
            "type": "LinkedServiceReference"
        },
        "type": "AzureBlob",
        "structure": [
            {
                "name": "Prop_0",
                "type": "String"
            }
        ],
        "typeProperties": {
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ",",
                "rowDelimiter": "",
                "quoteChar": "",
                "nullValue": "\\N",
                "encodingName": null,
                "treatEmptyAsNull": true,
                "skipLineCount": 0,
                "firstRowAsHeader": false
            },
            "fileName": "myout.json",
            "folderPath": "adfjsonout2"
        }
    },
    "type": "Microsoft.DataFactory/factories/datasets"
}

Обратите внимание, что это TextFormat, а также обратите внимание, что quoteChar пусто. Если я затем добавлю еще одно действие копирования:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Copy Data1",
                "type": "Copy",
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "RestSource",
                        "httpRequestTimeout": "00:01:40",
                        "requestInterval": "00.00:00:00.010"
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "schemaMapping": {
                            "['resource']": "resource"
                        },
                        "collectionReference": "$.entry"
                    }
                },
                "inputs": [
                    {
                        "referenceName": "FHIRSource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "Copy Data2",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "Copy Data1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "typeProperties": {
                    "source": {
                        "type": "BlobSource",
                        "recursive": true
                    },
                    "sink": {
                        "type": "BlobSink"
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "columnMappings": {
                            "resource": "Prop_0"
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "AzureBlob1",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureBlob2",
                        "type": "DatasetReference"
                    }
                ]
            }
        ]
    },
    "type": "Microsoft.DataFactory/factories/pipelines"
}

Тогда все получается. Это не идеально, поскольку теперь у меня есть две копии данных в блобах, но я полагаю, что одну можно легко удалить.

Я все равно хотел бы услышать об этом, если у кого-то есть одношаговое решение.

Для тех, кто найдет этот пост в будущем, вы можете просто использовать вызов API $export для этого. Обратите внимание, что у вас должна быть учетная запись хранения, связанная с вашим сервером Fhir.

https://build.fhir.org/ig/HL7/bulk-data/export.html#endpoint---системный-уровень-экспорт

Другие вопросы по теме