Я пытаюсь получить распределенную настройку для запуска ignite-connector. К сожалению, это не работает. Мне удалось получить журнал создания коннектора через API.
/connectors
{
"name": "ignite-connector",
"config": {
"connector.class": "org.apache.ignite.stream.kafka.connect.IgniteSinkConnector",
"tasks.max": "2",
"topics": "someTopic1",
"cacheName": "myCache",
"cacheAllowOverwrite": true,
"igniteCfg":"/opt/ignite/examples/config/example-cache.xml"}
}
}
Я установил ignite-connector в качестве плагина. Я создал uber-jar из репозитория, поместил его в отдельный каталог и включил в качестве плагина в файл .properties
, который я использую для запуска connect-distributed.sh
.
Я установил путь к классам для заданий как для connetor, так и для kafka, которыми я управляю с помощью systemd:
Environment=CLASSPATH=/opt/kafka/ignite-connector/*
После полного журнала ошибок:
[2022-11-17 19:49:30,268] INFO [ignite-connector|worker] SinkConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = ignite-connector
predicates = []
tasks.max = 2
topics = [someTopic1]
topics.regex =
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SinkConnectorConfig:376)
[2022-11-17 19:49:30,272] INFO [ignite-connector|worker] EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = ignite-connector
predicates = []
tasks.max = 2
topics = [someTopic1]
topics.regex =
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:376)
[2022-11-17 19:49:30,276] INFO [ignite-connector|worker] Instantiated connector ignite-connector with version 3.3.1 of type class org.apache.ignite.stream.kafka.connect.IgniteSinkConnector (org.apache.kafka.connect.runtime.Worker:322)
[2022-11-17 19:49:30,276] INFO [ignite-connector|worker] Finished creating connector ignite-connector (org.apache.kafka.connect.runtime.Worker:347)
[2022-11-17 19:49:30,277] ERROR [ignite-connector|worker] WorkerConnector{id=ignite-connector} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector:201)
java.lang.NoClassDefFoundError: org/apache/ignite/internal/util/typedef/internal/A
at org.apache.ignite.stream.kafka.connect.IgniteSinkConnector.start(IgniteSinkConnector.java:55)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-11-17 19:49:30,277] INFO [Worker clientId=connect-1, groupId=connect-cluster] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1687)
[2022-11-17 19:49:30,280] ERROR [ignite-connector|worker] [Worker clientId=connect-1, groupId=connect-cluster] Failed to start connector 'ignite-connector' (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1811)
org.apache.kafka.connect.errors.ConnectException: Failed to start connector: ignite-connector
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$startConnector$35(DistributedHerder.java:1782)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:349)
at org.apache.kafka.connect.runtime.WorkerConnector.doRun(WorkerConnector.java:146)
at org.apache.kafka.connect.runtime.WorkerConnector.run(WorkerConnector.java:123)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to transition connector ignite-connector to state STARTED
... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/ignite/internal/util/typedef/internal/A
at org.apache.ignite.stream.kafka.connect.IgniteSinkConnector.start(IgniteSinkConnector.java:55)
at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:193)
at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:218)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:363)
at org.apache.kafka.connect.runtime.WorkerConnector.doTransitionTo(WorkerConnector.java:346)
... 7 more
Упомянутый класс (A) включен в ignite-core-2.9.1.jar, который находится в uberJar в каталоге плагинов.
Любые указатели приветствуются
@inetphantom, насколько я знаю, модуль ignite-kafka-ext
был исключен из бинарного дистрибутива Ignite, начиная с версии 2.10.0. Вы сами собрали его, используя исходный код из репозитория ignite-extensions?
Я думаю, вы должны создать исходники, как уже упоминалось. Затем вы можете упаковать uber jar со всеми включенными зависимостями. Точнее, ваши версии неверны. Вы должны использовать ignite-core 2.9.1, как указано в столбце версии mvnrepository.com/artifact/org.apache.ignite/ignite-kafka/2.9.1; вы можете получить аналогичную ошибку, но процент ошибки выше, когда вы смешиваете версии, поскольку они не будут компилироваться вместе
В любом случае вы можете попробовать установить переменную окружения CLASSPATH
так, чтобы она включала /plugin_path/*.jar
, и перезапустить воркер подключения.
Конфигурация пути плагина сканирует только классы коннекторов, коннекторы, конвертеры и т. д., а не все классы из jar-файлов в папке. Для этого вам нужно отредактировать CLASSPATH
@OneCricketeer Я перепроверил свой встроенный uberJar, добавил старые банки, и теперь он, кажется, работает - с хаком classpath. Я не понимаю, почему, как говорится в официальном документе , а также confluent, вы должны делать это не так, а как плагин. Спасибо за вашу помощь, пожалуйста, добавьте свои комментарии в качестве ответа.
Кажется, существует недопонимание того, что такое «плагины». Это только классы, определенные как реализации преобразователей, преобразований и соединителей.
Внутренние классы Ignite не являются ни одним из них, поэтому они не будут загружены в загрузчик классов plugin.path
.
Чтобы исправить это, вам нужно убедиться, что вы export CLASSPATH=/path/to/ignite-files/*.jar
и можете использовать команды jar -tf
для проверки существования класса в любом конкретном JAR перед запуском процесса подключения.
Это не взлом; именно так работает Java Classloader.
Как ты устанавливал коннектор? Все ли JAR-файлы зажигания имеют одинаковые разрешения? Можешь показать
ls -laR <plugin.path>
?