Как передать параметр скрипту Python в Nifi

Может быть, это глупый вопрос, но я должен спросить.

У меня есть процессор Collect_data в Nifi, и он передает сообщения в другой процесс, который использует скрипт python для его анализа и создания файла json. Проблема в том, что я не знаю, что является вводом для функции в скрипте Python. Как передать эти сообщения (16-значные числа) из процессора Collect_data в следующий процессор, содержащий скрипт Python. Есть ли на этот счет какой-нибудь хороший, простой пример?

Я уже искал несколько примеров в Интернете, но не совсем понял.

import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time


def parse_zap(inputStream, outputStream):
    data = inputStream
    buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
    buf = int(buf, 16)
    buf_check = str(buf)
    if buf_check[17] == 2:
        pass
    datetime_now = datetime.now()
    log_date = datetime_now.isoformat()
    try:
        mac = buf_check[7:14].upper()
        ams_id = buf_check[8:]
        action = buf_check[3:4]
        time_a = int(time())
        dict_test = {
        "user": {
            "guruq" : 'false'
        },
        "device" : {
            "type" : "siolbox",
            "mac": mac
        },
        "event" : {
            "origin" : "iptv",
            "timestamp": time_a,
            "type": "zap",
            "product-type" : "tv-channel",
            "channel": {
                "id" : 'channel_id',
                "ams-id": ams_id
            },
            "content": {
                "action": action
            }
        }
        }
        return dict_test
    except Exception as e:
        print('%s nod PARSE 500 \"%s\"' % (log_date, e))

Благодарю, я читаю правильно, но теперь не могу создать вывод. Заранее спасибо.

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
2
0
3 016
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Думаю, я понимаю ваш вопрос, но он несколько неоднозначен в отношении вашего потока. Я отвечаю на несколько возможных сценариев.

  1. У вас есть процессор, который получает данные из источника (например, FetchFTP) и подключен к процессору ExecuteScript, который содержит скрипт Python для преобразования этих значений. В этом случае сценарий Python может работать с атрибутами и содержимым потокового файла напрямую, используя стандартный API. См. В Блог Мэтта Берджесса множество примеров написания пользовательских сценариев для работы с данными.
  2. У вас есть процессор, который получает данные из источника и подключен к процессору ExecuteStreamCommand, который вызывает внешний скрипт Python с помощью такой команды, как python my_external_script.py arg1 arg2 .... В этом случае содержимое потокового файла передается в STDIN процессором ExecuteStreamCommand, поэтому ваш сценарий должен использовать его таким образом. Этот ответ объясняет больше об использовании ExecuteStreamCommand со скриптами Python.
  3. У вас есть собственный процессор, который внутренне вызывает отдельный процесс Python. Это плохая идея, и ее следует переделать на одну из других моделей. Это нарушает разделение задач, теряет поддержку жизненного цикла процессора, скрывает обработку потоков и синхронизацию, не обеспечивает видимости происхождения и идет вразрез с моделью разработки NiFi.

Если ваш сценарий Python очень прост, вы можете поместить его в ScriptedRecordWriter и использовать его для одновременной обработки нескольких «записей», чтобы получить преимущества в производительности. Это может быть расширено для вашего варианта использования, в зависимости от того, как выглядит ваш поток и входящие данные.

Обновление 2018-10-03 10:50

Попробуйте использовать этот скрипт в теле ExecuteScript:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
    def __init__(self):
        pass
    def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        result = parse_zap(text)

        outputStream.write(bytearray(result.encode('utf-8')))

flowFile = session.get()
if (flowFile != None):
    flowFile = session.write(flowFile,PyStreamCallback())
    flowFile = session.putAttribute(flowFile, "parsed_zap", "true")
    session.transfer(flowFile, REL_SUCCESS)

// Your parse_zap() method here, with the signature changed to just accept a single string
...

Во-первых, большое спасибо за помощь. Но я все еще не понимаю. Я должен для этого создать класс или нет? Я обновил свой вопрос с помощью скрипта Python, который я использую в окне процессора Nifi, вы можете дать мне прямой намек?

jovicbg 03.10.2018 15:18
Ответ принят как подходящий

Взгляните на этот сценарий:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[1:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

Требуется от свойства количество строк, которые нужно удалить из потокового файла, а затем взять потоковый файл и записать его снова без этих строк, это простой и хороший пример того, как использовать свойства и как использовать потоковый файл. .

Основываясь на обновленном коде, ваш код должен выглядеть так:

import datetime
import hashlib
from urlparse import urlparse, parse_qs
import sys
from urlparse import urlparse, parse_qs
from datetime import *
import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
from time import time


class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    data = inputStream
    buf = (hashlib.sha256(bytearray.fromhex(data)).hexdigest())
    buf = int(buf, 16)
    buf_check = str(buf)
    if buf_check[17] == 2:
        pass
    datetime_now = datetime.now()
    log_date = datetime_now.isoformat()
    try:
        mac = buf_check[7:14].upper()
        ams_id = buf_check[8:]
        action = buf_check[3:4]
        time_a = int(time())
        dict_test = {
        "user": {
            "guruq" : 'false'
        },
        "device" : {
            "type" : "siolbox",
            "mac": mac
        },
        "event" : {
            "origin" : "iptv",
            "timestamp": time_a,
            "type": "zap",
            "product-type" : "tv-channel",
            "channel": {
                "id" : 'channel_id',
                "ams-id": ams_id
            },
            "content": {
                "action": action
            }
        }
        }
        return dict_test
    except Exception as e:
        print('%s nod PARSE 500 \"%s\"' % (log_date, e))

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)        

Я обновил свой вопрос кодом из скрипта Python, используемого в окне Nifi. Не уверены, нужно ли мне создать класс и поместить внутрь функцию или нет? Любая прямая подсказка, основанная на моем коде, будет действительно полезна. Кстати, спасибо :)

jovicbg 03.10.2018 15:22

Другие вопросы по теме