Я пытаюсь использовать UserCodeClassLoader flink, но поскольку я новичок в Flink, я не мог точно понять, как его использовать.
Сценарий: В методе open () функции RichFilterFunction () я хочу загрузить внешнюю банку.
Для этого в open () я делаю следующее:
@Override
public void open(Configuration parameters) throws Exception {
ClassLoader userClassLoader = getRuntimeContext().getUserCodeClassLoader();
URL url = userClassLoader.getResource("/tmp/rohit/FilterTest/FilterTest.jar");
klazz = userClassLoader.loadClass("FilterTest");
Constructor<?> ctor = klazz.getConstructor();
Object obj = ctor.newInstance(new Object[] {});
control = (MyRichFilterInterface)obj;
... так далее
Однако я получаю ClassNotFoundException:
Caused by: java.lang.ClassNotFoundException: FilterTest
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at MyRichFilterFunction.open(MyRichFilterFunction.java:24)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Моя версия флинка - 1.4.0, и она установлена в /root/flink-1.4.0/. Я не менял никаких параметров в конфигурации flink, относящихся к этой проблеме.
Если кто-нибудь знает, что мне не хватает, это было бы здорово !!! Я использовал URLClassLoader ранее, но я не уверен, как использовать предоставленный Flink UserCodeClassLoader.
Почему вы хотите явно использовать загрузчик классов? Это довольно сложная вещь, которую следует использовать только в том случае, если вы действительно знаете, что происходит под ней.
Привет, TobiSH - я могу подтвердить, что после добавления jar в путь к классам он работает. Так что спасибо за это. Я новичок в Flink / Java, поэтому, наверное, я это пропустил.
Чтобы ответить, почему я явно использую загрузчик классов, мне нужно немного рассказать о том, чего я пытаюсь достичь. Цель состоит в том, чтобы разрешить потоковые запросы в реальном времени к потоку данных без простоев. Для этого я подключаю поток управления и данных и делаю coFlatMap. Вывод находится в формате Tuple2 <Control, data>. Итак, если есть 3 запроса и входящие данные совпадают с первыми двумя, то я отключу два элемента <control1, data1> и <control2, data1> Все операции с данными (например, filter, select, group и window) инкапсулируются внутри элемента управления (продолжение ....)
(..продолжено) Например: для выбора ключа я бы сделал что-то вроде этого: return control.getKey (data) Явно используя загрузчик классов, я могу разделить выполнение задания flink и реализацию объекта управления. Задание Flink будет написано с некоторым интерфейсом, а фактическая реализация может быть оставлена пользователям (в данном случае бизнес-единицам), поэтому пользователи могут продолжать добавлять новые реализации, не беспокоясь о базовом задании flink, которое останется неизменным. Я надеюсь, что то, что я описал, имеет смысл, и я смог объяснить, почему я хотел динамически загружать классы.
Может быть, есть какой-то другой способ / подход, с помощью которого мы сможем достичь того, что я только что описал. Мне было бы очень интересно услышать от вас, ребята.
Как запустить флинк? В каком-то кластере или в локальной среде? И, возможно, глупый вопрос: почему бы вам просто не добавить свой FilterTest.jar в путь к классам?