Я пытаюсь использовать фабрику данных Azure для чтения данных с сервера FHIR и преобразования результатов в файлы JSON (ndjson) с разделителями новой строки в хранилище BLOB-объектов Azure. В частности, если вы запросите сервер FHIR, вы можете получить что-то вроде:
{
"resourceType": "Bundle",
"id": "som-id",
"type": "searchset",
"link": [
{
"relation": "next",
"url": "https://fhirserver/?ct=token"
},
{
"relation": "self",
"url": "https://fhirserver/"
}
],
"entry": [
{
"fullUrl": "https://fhirserver/Organization/1234",
"resource": {
"resourceType": "Organization",
"id": "1234",
// More fields
},
{
"fullUrl": "https://fhirserver/Organization/456",
"resource": {
"resourceType": "Organization",
"id": "456",
// More fields
},
// More resources
]
}
По сути набор ресурсов. Я хотел бы преобразовать это в файл с разделителями новой строки (также известный как ndjson), где каждая строка является просто json для ресурса:
{"resourceType": "Organization", "id": "1234", // More fields }
{"resourceType": "Organization", "id": "456", // More fields }
// More lines with resources
Я могу настроить коннектор REST, и он может запрашивать сервер FHIR (включая разбивку на страницы), но независимо от того, что я пытаюсь сделать, я не могу сгенерировать нужный мне вывод. Я настроил набор данных хранилища BLOB-объектов Azure:
{
"name": "AzureBlob1",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"typeProperties": {
"format": {
"type": "JsonFormat",
"filePattern": "setOfObjects"
},
"fileName": "myout.json",
"folderPath": "outfhirfromadf"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
И настройте действие копирования:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"resource": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
Но в конце (несмотря на настройку сопоставления схемы) конечным результатом в большом двоичном объекте всегда является исходный пакет, возвращенный с сервера. Если я настрою выходной большой двоичный объект как текст с разделителями-запятыми, я смогу извлечь поля и создать плоское табличное представление, но это не совсем то, что мне нужно.
Любые предложения будут высоко ценится.
@Kzrystof, спасибо за ваш комментарий. Это был бы один из возможных вариантов. Думаю, я пытался посмотреть, как далеко я смогу продвинуться с ADF на этом пути. Так что да, определенно вариант, но я также хотел бы знать, возможно ли это (или будет ли) с ADF.
О, я понимаю, что вы имеете в виду :) Я на самом деле задавал аналогичный вопрос несколько недель назад о том, как действие копирования может исключать строки, которые не соответствуют определенным критериям...
Как кратко обсуждалось в комментарии, Copy Activity
не предоставляет много функций, кроме отображения данных. Как указано в документации, Копирование действий выполняет следующие операции:
- Reads data from a source data store.
- Performs serialization/deserialization, compression/decompression, column mapping, etc. It does these operations based on the configurations of the input dataset, output dataset, and Copy Activity.
- Writes data to the sink/destination data store.
Не похоже, что Copy Activity
делает что-то еще, кроме эффективного копирования материала.
То, что я обнаружил, работало, так это использовать Databrick.
Вот шаги:
Сценарий будет следующим:
Вы можете протестировать свой сценарий оттуда и, когда он будет готов, вы можете вернуться к своему конвейеру и создать Записная книжка, который будет указывать на вашу записную книжку, содержащую сценарий.
Я изо всех сил пытался кодировать на Scala, но оно того стоило :)
Так что вроде нашел решение. Если я выполняю первоначальный шаг преобразования, при котором пакеты просто сбрасываются в файл JSON, а затем выполняю другое преобразование из файла JSON в то, что я притворяюсь текстовым файлом, в другой большой двоичный объект, я могу создать файл njson.
По сути, определите другой набор данных больших двоичных объектов:
{
"name": "AzureBlob2",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"type": "AzureBlob",
"structure": [
{
"name": "Prop_0",
"type": "String"
}
],
"typeProperties": {
"format": {
"type": "TextFormat",
"columnDelimiter": ",",
"rowDelimiter": "",
"quoteChar": "",
"nullValue": "\\N",
"encodingName": null,
"treatEmptyAsNull": true,
"skipLineCount": 0,
"firstRowAsHeader": false
},
"fileName": "myout.json",
"folderPath": "adfjsonout2"
}
},
"type": "Microsoft.DataFactory/factories/datasets"
}
Обратите внимание, что это TextFormat
, а также обратите внимание, что quoteChar
пусто. Если я затем добавлю еще одно действие копирования:
{
"name": "pipeline1",
"properties": {
"activities": [
{
"name": "Copy Data1",
"type": "Copy",
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "RestSource",
"httpRequestTimeout": "00:01:40",
"requestInterval": "00.00:00:00.010"
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"schemaMapping": {
"['resource']": "resource"
},
"collectionReference": "$.entry"
}
},
"inputs": [
{
"referenceName": "FHIRSource",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
]
},
{
"name": "Copy Data2",
"type": "Copy",
"dependsOn": [
{
"activity": "Copy Data1",
"dependencyConditions": [
"Succeeded"
]
}
],
"policy": {
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
},
"typeProperties": {
"source": {
"type": "BlobSource",
"recursive": true
},
"sink": {
"type": "BlobSink"
},
"enableStaging": false,
"translator": {
"type": "TabularTranslator",
"columnMappings": {
"resource": "Prop_0"
}
}
},
"inputs": [
{
"referenceName": "AzureBlob1",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "AzureBlob2",
"type": "DatasetReference"
}
]
}
]
},
"type": "Microsoft.DataFactory/factories/pipelines"
}
Тогда все получается. Это не идеально, поскольку теперь у меня есть две копии данных в блобах, но я полагаю, что одну можно легко удалить.
Я все равно хотел бы услышать об этом, если у кого-то есть одношаговое решение.
Для тех, кто найдет этот пост в будущем, вы можете просто использовать вызов API $export для этого. Обратите внимание, что у вас должна быть учетная запись хранения, связанная с вашим сервером Fhir.
https://build.fhir.org/ig/HL7/bulk-data/export.html#endpoint---системный-уровень-экспорт
До сих пор мой опыт работы с действием копирования фабрики данных Azure заключался в том, что он только копировал данные с одного места на другое, и каждый раз, когда мне нужно было какое-то преобразование, мне было больно :) Будет ли это приемлемо для вас, чтобы рассмотреть Databricks и использовать некоторые сценарии python/scala? сделать преобразование вам нужно?