У меня есть поток GetFile-> ConvertRecord-> splittext-> PutdatabaseRecord. Мой CSV-файл, который я отправляю в GetFile, содержит следующие поля:
ID TIME M00B01 M00B02 M00B03
1 2018-09-27 10:44:23.972 3242 35 335
2 2018-09-21 11:44:23.972 323 24 978
Скелет таблицы моей базы данных в MYSQL выглядит следующим образом:
Create table test(ID INT,TIME DATETIME(3),MxB01 INT,MxB02 INT,MxB03 INT);
Примечание: я заменил названия заголовков на MxB00, MxB01 и т. д.
У меня есть ошибка в моем процессоре convertRecord, поскольку я читаю как CSVReader и пишу как CSVSetWritter. Я прилагаю для справки конфигурацию обоих.
Проблема в том, что он читает файл CSV, но из-за изменения имен заголовков он дает все другие поля как пустые (я изменил имена заголовков, потому что мне нужно записать имя заголовка как MxB00, чтобы соответствовать заголовкам, определенным в таблице MySQL). Я получаю значение идентификатора и времени, потому что я не изменил имя заголовка этих полей в определении таблицы CSVWritter и MySQL. поэтому эти значения я получаю, но для других значений я становлюсь пустым, потому что он запутывается из-за изменения имени.
Как я могу решить эту проблему? Любая помощь очень ценится. Спасибо!






Если вы хотите использовать создать собственный заголовок для выходного файла CSV, настройте службу контроллера Читатель CSV, как показано ниже.
Поскольку мы используем стратегию доступа к схеме как текст схемы и даем схему как
{
"type": "record",
"name": "SQLSchema",
"fields" : [
{"name": "ID", "type": ["null","int"]},
{"name": "TIME", "type": ["null","string"]},
{"name": "MxB01", "type": ["null","int"]},
{"name": "MxB02", "type": ["null","int"]},
{"name": "MxB03", "type": ["null","int"]}
]
}
А мы - обработка первой строки данных csv как заголовка и игнорирование имен столбцов заголовка CSV, поэтому выходной потоковый файл будет иметь схему, которую мы определили выше.
Конфигурации CsvWriter:
Поскольку мы наследуем стратегию записи схемы, выходной файл потока будет иметь тот же заголовок, который мы указали в считывателе.
Кроме того Я не уверен, почему вы используете процессор SplitText после ConvertRecord в качестве процессора PutDatabaseRecord, предназначенного для работы с частями записей одновременно.
Даже если вы можете настроить процессор PutDatabaseRecord с помощью Упомянутая выше служба контроллера CsvReader, тогда ваш поток будет:
Поток:
GetFile -> PutDatabaseRecord
Примечание:
Поскольку я не использовал логические типы Avro для поля отметки времени, если вы используете логические типы, измените конфигурацию службы контроллера соответствующим образом.
Кроме того, в полях MxB00, MxB01 и т. д. Я получаю все значения 0, а не фактические значения, присутствующие в файле CSV. @Shu
@ shrads, Использование логические типы avro немного сложно, я пробовал с приведенным выше примером и добавил шаблон по этой ссылке: github.com/shureddy/NiFiTemplates/blob/master/… .. загрузите и загрузите этот шаблон в свой экземпляр nifi и измените его в соответствии с вашим вариантом использования.
Попробуйте GetFile-> ReplaceText -> ConvertRecord-> splittext-> PutdatabaseRecord.
config:
Значение поиска: заголовки ввода, значение замены: Новые заголовки, Стратегия замены: буквальная замена, Оценочный режим; Весь текст
Спасибо за ваш ответ. Я использую {"name": "TimeOfDay", "type": "string", "logicalType": "timestamp-millis"} в качестве своего логического типа, но я получаю ошибку при преобразовании записи "приведет к ошибке: для строка ввода «2018-09-27 10: 44: 23.971». Я установил настройки CSVReader и Writer так же, как описано в u. Пожалуйста, помогите