Загрузка пользовательских классов во flink RichFilterFunction с использованием загрузчика классов пользовательского кода

Я пытаюсь использовать UserCodeClassLoader flink, но поскольку я новичок в Flink, я не мог точно понять, как его использовать.

Сценарий: В методе open () функции RichFilterFunction () я хочу загрузить внешнюю банку.

Для этого в open () я делаю следующее:

@Override
public void open(Configuration parameters) throws Exception {
ClassLoader userClassLoader = getRuntimeContext().getUserCodeClassLoader();
URL url = userClassLoader.getResource("/tmp/rohit/FilterTest/FilterTest.jar");
klazz = userClassLoader.loadClass("FilterTest");
Constructor<?> ctor = klazz.getConstructor();
Object obj = ctor.newInstance(new Object[] {});
control = (MyRichFilterInterface)obj; 

... так далее

Однако я получаю ClassNotFoundException:

Caused by: java.lang.ClassNotFoundException: FilterTest
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at MyRichFilterFunction.open(MyRichFilterFunction.java:24)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)

Моя версия флинка - 1.4.0, и она установлена ​​в /root/flink-1.4.0/. Я не менял никаких параметров в конфигурации flink, относящихся к этой проблеме.

Если кто-нибудь знает, что мне не хватает, это было бы здорово !!! Я использовал URLClassLoader ранее, но я не уверен, как использовать предоставленный Flink UserCodeClassLoader.

Как запустить флинк? В каком-то кластере или в локальной среде? И, возможно, глупый вопрос: почему бы вам просто не добавить свой FilterTest.jar в путь к классам?

TobiSH 18.03.2018 17:43

Почему вы хотите явно использовать загрузчик классов? Это довольно сложная вещь, которую следует использовать только в том случае, если вы действительно знаете, что происходит под ней.

Dawid Wysakowicz 19.03.2018 09:02

Привет, TobiSH - я могу подтвердить, что после добавления jar в путь к классам он работает. Так что спасибо за это. Я новичок в Flink / Java, поэтому, наверное, я это пропустил.

Rohit Gupta 21.03.2018 13:00

Чтобы ответить, почему я явно использую загрузчик классов, мне нужно немного рассказать о том, чего я пытаюсь достичь. Цель состоит в том, чтобы разрешить потоковые запросы в реальном времени к потоку данных без простоев. Для этого я подключаю поток управления и данных и делаю coFlatMap. Вывод находится в формате Tuple2 <Control, data>. Итак, если есть 3 запроса и входящие данные совпадают с первыми двумя, то я отключу два элемента <control1, data1> и <control2, data1> Все операции с данными (например, filter, select, group и window) инкапсулируются внутри элемента управления (продолжение ....)

Rohit Gupta 21.03.2018 13:05

(..продолжено) Например: для выбора ключа я бы сделал что-то вроде этого: return control.getKey (data) Явно используя загрузчик классов, я могу разделить выполнение задания flink и реализацию объекта управления. Задание Flink будет написано с некоторым интерфейсом, а фактическая реализация может быть оставлена ​​пользователям (в данном случае бизнес-единицам), поэтому пользователи могут продолжать добавлять новые реализации, не беспокоясь о базовом задании flink, которое останется неизменным. Я надеюсь, что то, что я описал, имеет смысл, и я смог объяснить, почему я хотел динамически загружать классы.

Rohit Gupta 21.03.2018 13:08

Может быть, есть какой-то другой способ / подход, с помощью которого мы сможем достичь того, что я только что описал. Мне было бы очень интересно услышать от вас, ребята.

Rohit Gupta 21.03.2018 13:13
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
6
501
0

Другие вопросы по теме