Kafka в opentsdb с помощью flink: java.lang.NoClassDefFoundError: не удалось инициализировать класс org.apache.http.conn.ssl.SSLConnectionSocketFactory

Это мой код:

def saveToOpenTSDB(rows: Iterator[String], url: String) {
          val requestConfig: RequestConfig = RequestConfig.custom().setSocketTimeout(2000).setConnectTimeout(2000).setConnectionRequestTimeout(2000).build()
          val httpClient: CloseableHttpClient = HttpClients.createDefault() }

Трассировка стека ошибок:

2018-10-25 12:40:39,323 INFO org.apache.flink.client.cli.CliFrontend - -------------------------------------------------------------------------------- 2018-10-25 12:40:39,324 INFO org.apache.flink.client.cli.CliFrontend - Starting Command Line Client (Version: 1.6.1, Rev:23e2636, Date:14.09.2018 @ 19:56:46 UTC) 2018-10-25 12:40:39,324 INFO org.apache.flink.client.cli.CliFrontend - OS current user: root 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - Current Hadoop/Kerberos user: root 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.172-b11 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - Maximum heap size: 7136 MiBytes 2018-10-25 12:40:39,676 INFO org.apache.flink.client.cli.CliFrontend - JAVA_HOME: /usr/java/jdk1.8.0_172 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - Hadoop version: 2.6.5 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - JVM Options: 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend -
-Dlog.file=/root/flink-1.6.1/log/flink-root-client-cuiyk-cdn-test-10.log 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - -Dlog4j.configuration=file:/root/flink-1.6.1/conf/log4j-cli.properties 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - -Dlogback.configurationFile=file:/root/flink-1.6.1/conf/logback.xml 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - Program Arguments: 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend -
run 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - -c 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - dataclean.FlinkDataCleanDemo 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend -
--parallelism 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - 4 2018-10-25 12:40:39,678 INFO org.apache.flink.client.cli.CliFrontend - --jobmanager 2018-10-25 12:40:39,679 INFO org.apache.flink.client.cli.CliFrontend -
cuiyk-cdn-test-4:42115 2018-10-25 12:40:39,679 INFO org.apache.flink.client.cli.CliFrontend -
./flinkkafka2tsdb.jar 2018-10-25 12:40:39,679 INFO org.apache.flink.client.cli.CliFrontend - Classpath: /root/flink-1.6.1/lib/flink-python_2.11-1.6.1.jar:/root/flink-1.6.1/lib/flink-shaded-hadoop2-uber-1.6.1.jar:/root/flink-1.6.1/lib/log4j-1.2.17.jar:/root/flink-1.6.1/lib/slf4j-log4j12-1.7.7.jar:/root/flink-1.6.1/lib/flink-dist_2.11-1.6.1.jar::/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hadoop/etc/hadoop: 2018-10-25 12:40:39,679 INFO org.apache.flink.client.cli.CliFrontend - -------------------------------------------------------------------------------- 2018-10-25 12:40:39,682 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: classloader.resolve-order, parent-first 2018-10-25 12:40:39,682 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2018-10-25 12:40:39,682 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2018-10-25 12:40:39,682 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2018-10-25 12:40:39,682 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2018-10-25 12:40:39,683 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2018-10-25 12:40:39,683 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2018-10-25 12:40:39,683 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2018-10-25 12:40:39,696 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli
- Found Yarn properties file under /tmp/.yarn-properties-root. 2018-10-25 12:40:39,914 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to root (auth:SIMPLE) 2018-10-25 12:40:39,936 INFO org.apache.flink.client.cli.CliFrontend - Running 'run' command. 2018-10-25 12:40:39,940 INFO org.apache.flink.client.cli.CliFrontend - Building program from JAR file 2018-10-25 12:40:40,314 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 2018-10-25 12:40:40,316 INFO org.apache.flink.client.cli.CliFrontend - Starting execution of program 2018-10-25 12:40:40,316 INFO org.apache.flink.client.program.rest.RestClusterClient - Starting program in interactive mode (detached: false) 2018-10-25 12:40:40,346 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Ignoring configured key DeSerializer (key.deserializer) 2018-10-25 12:40:40,346 WARN org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 - Ignoring configured value DeSerializer (value.deserializer) 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: classloader.resolve-order, parent-first 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1 2018-10-25 12:40:40,532 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1 2018-10-25 12:40:40,533 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081 2018-10-25 12:40:40,540 INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 1b45b5780e0e7067fae20be1db8e7c2f (detached: false). 2018-10-25 12:41:38,177 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 2018-10-25 12:41:38,179 INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. 2018-10-25 12:41:38,180 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1b45b5780e0e7067fae20be1db8e7c2f) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at dataclean.FlinkDataCleanDemo$.main(FlinkDataCleanDemo.scala:162) at dataclean.FlinkDataCleanDemo.main(FlinkDataCleanDemo.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044) at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120) Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.http.conn.ssl.SSLConnectionSocketFactory at org.apache.http.impl.client.HttpClientBuilder.build(HttpClientBuilder.java:912) at org.apache.http.impl.client.HttpClients.createDefault(HttpClients.java:58) at dataclean.CommonDeploy$.saveToOpenTSDB(CommonDeploy.scala:21) at dataclean.FlinkDataCleanDemo$$anonfun$main$1.apply(FlinkDataCleanDemo.scala:152) at dataclean.FlinkDataCleanDemo$$anonfun$main$1.apply(FlinkDataCleanDemo.scala:150) at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:607) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:663) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:663) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:73) at scala.collection.mutable.MutableList.foreach(MutableList.scala:30) at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:663) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748)

//related dependecies:
    <dependency>
            <groupId>commons-httpclient</groupId>
            <artifactId>commons-httpclient</artifactId>
            <version>3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.4</version>
        </dependency>

Большое тебе спасибо. это как раз то, что я хочу выразить!

chen chen 25.10.2018 09:31

Таким образом я решил эту проблему.

chen chen 25.10.2018 14:43

@GhostCat О, ты прав, спасибо за добрый коммит!

chen chen 27.10.2018 05:46

И вы наконец можете удалить ненужные комментарии ...

GhostCat 27.10.2018 07:04
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
1
4
779
1

Ответы 1

Я только что решил эту проблему, добавив это в плагин maven shade в ярлык «конфигурация», как показано ниже:

                    <relocations>
                        <relocation>
                            <pattern>org.apache.http</pattern>
                            <shadedPattern>shade.org.apache.http</shadedPattern>
                        </relocation>
                    </relocations>

Хороший! Вы можете принять ответ, когда позволит время. Вы можете немного улучшить форматирование кода (вам нужно всего 4 пробела для отступа кода, нет необходимости использовать 15 или 30. Это просто перемещает материал далеко вправо). И добро пожаловать, чтобы проголосовать за привилегии!

GhostCat 27.10.2018 07:04

Другие вопросы по теме