Сопоставление string с tuple2 <string, long> в spark + java

Я пытаюсь научиться использовать Spark, кодируя на Java (пожалуйста, без кода Scala). Я пытаюсь реализовать очень простой пример Привет, мир Spark, подсчет слов.

Я позаимствовал код из документации Spark быстрый старт:

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

Все хорошо, теперь я хочу заменить filter на flatMap, а затем на map. У меня пока есть flatMap:

    logData.flatMap((FlatMapFunction<String, String>) l -> {
                          return Arrays.asList(l.split(" ")).iterator();
                    }, Encoders.STRING());

Теперь я хочу сопоставить каждое слово с Tuple2 (word, 1), а затем сгруппировать их по ключу. Но проблема в том, что я не могу найти, как перейти с String на (String, Long). В большинстве документов говорится о mapToPair, но в Dataset такого метода нет!

Может ли кто-нибудь помочь мне сопоставить String с Tuple2<String, Long>? Кстати, я даже не уверен, ищу ли я Tuple2 или какой-нибудь другой класс.

[ОБНОВИТЬ]

Основываясь на предложении @mangusta, я попробовал это:

    logData.flatMap((FlatMapFunction<String, String>) l -> {
        return Arrays.asList(l.split(" ")).iterator();
    }, Encoders.STRING())
    .map(new Function<String, Tuple2<String, Long>>() {
        public Tuple2<String, Long> call(String str) {
            return new Tuple2<String, Long>(str, 1L);
        }
    })
    .count()

И столкнулся с этой ошибкой компиляции:

Error:(108, 17) java: no suitable method found for map(<anonymous org.apache.spark.api.java.function.Function<java.lang.String,scala.Tuple2<java.lang.String,java.lang.Long>>>)
    method org.apache.spark.sql.Dataset.<U>map(scala.Function1<java.lang.String,U>,org.apache.spark.sql.Encoder<U>) is not applicable
      (cannot infer type-variable(s) U
        (actual and formal argument lists differ in length))
    method org.apache.spark.sql.Dataset.<U>map(org.apache.spark.api.java.function.MapFunction<java.lang.String,U>,org.apache.spark.sql.Encoder<U>) is not applicable
      (cannot infer type-variable(s) U
        (actual and formal argument lists differ in length))

Похоже, функция map принимает два параметра. Я не уверен, что передать вторым параметром.

В ссылке для быстрого запуска попробуйте посмотреть на пример groupByKey(identity).count.

OneCricketeer 26.10.2018 05:34

Спасибо, но это только код Scala и Python. Я не вижу примеров Java для groupByKey!

Mehran 26.10.2018 05:36

Это не значит, что метода не существует;)

OneCricketeer 26.10.2018 05:38

В документации для groupByKey написано When called on a dataset of (K, V) pairs, но мои предметы по-прежнему одиночные (а не пары). Не думаете ли вы, что я должен сначала преобразовать их в пары, прежде чем я смогу вызвать groupbyKey?

Mehran 26.10.2018 05:47

Кстати, если вы используете Java 8, почему вы вводите каждый анонимный класс?

OneCricketeer 26.10.2018 06:49
0
5
1 830
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Если вам нужно использовать Tuple2, вы должны использовать библиотеку Scala для Java, то есть scala-library.jar

Чтобы подготовить кортежи из некоторого JavaRDD<String> data, вы можете применить к этому RDD следующую функцию:

JavaRDD<Tuple2<String,Long>> tupleRDD  =  data.map(

new Function<String, Tuple2<String, Long>>() {

            public Tuple2<String, Long> call(String str) {

              return new Tuple2<String, Long>(str, 1L);

            }//end call

          }//end function

        );//end map

Спасибо, но как это поможет?

Mehran 26.10.2018 05:49

ну, вы спросили: «Теперь я хочу отобразить каждое слово в Tuple2», поэтому, если вы хотите это сделать, вам нужно инициализировать Tuple2 как new Tuple2<String, Long>(your_word, 1L)

mangusta 26.10.2018 05:52

Но какой (и как) метод мне следует использовать для преобразования слова в Tuple2? Вопрос не в том, как создать экземпляр объекта Tuple2. Но как перейти с String на Tuple2<String, Long> и не для отдельного элемента, а для всего набора данных.

Mehran 26.10.2018 05:58

см. обновленный ответ. Я не использовал лямбда-выражение, но при необходимости его просто изменить на лямбда

mangusta 26.10.2018 06:08

Спасибо, я попробовал это и столкнулся с ошибкой компиляции. Пожалуйста, найдите это в моем обновленном вопросе.

Mehran 26.10.2018 06:21

моя карта должна применяться к строке RDD. кажется, что ваш "logData.flatMap ()" возвращает RDD для списка строк, а не строку RDD

mangusta 26.10.2018 06:30

Нет. logData и возвращаемое значение flatMap имеют тип Dataset<String>, and not RDD. And I believe the code you've provided is correct but not complete. Even the flatMap` (как вы можете видеть в данном коде) принимает два параметра. В то время как для flatMap я мог догадаться, что мне для этого нужно, в данном случае я понятия не имею!

Mehran 26.10.2018 06:35

правильно, вы используете Dataset вместо RDD. До сих пор я никогда не использовал Dataset, но подозреваю, что первый аргумент метода карты набора данных может быть идентичен тому, который был в моем ответе, а второй аргумент org.apache.spark.sql.Encoder<U> мог быть Encoders.STRING(). После получения кортежей вы вызываете reduceByKey( ) или groupByKey( ) или любой другой метод сокращения и указываете лямбда как что-то вроде (a,b) -> a + b

mangusta 26.10.2018 08:27

вызов count( ) после карты не имеет смысла, поскольку ваша цель - подсчитывать по ключам, а не считать все

mangusta 26.10.2018 08:28
Ответ принят как подходящий

Я не уверен в причине ошибки, но вы можете попробовать этот код

final String sparkHome = "/usr/local/Cellar/apache-spark/2.3.2";
SparkConf conf = new SparkConf()
        .setMaster("local[*]")
        .setAppName("spark-example")
        .setSparkHome(sparkHome + "/libexec");

SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> df = spark.read().textFile(sparkHome + "/README.md")
        .flatMap(line -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING())
        .filter(s -> !s.isEmpty())
        .map(word -> new Tuple2<>(word.toLowerCase(), 1L), Encoders.tuple(Encoders.STRING(), Encoders.LONG()))
        .toDF("word", "count")
        .groupBy("word")
        .sum("count").orderBy(new Column("sum(count)").desc()).withColumnRenamed("sum(count)", "_cnt");

df.show(false);

И вы должны ожидать этого результата

+-------------+----+
|word         |_cnt|
+-------------+----+
|the          |25  |
|to           |19  |
|spark        |16  |
|for          |15  |
|and          |10  |
|a            |9   |
|##           |9   |
|you          |8   |
|run          |7   |
|on           |7   |
|can          |7   |
|is           |6   |
|in           |6   |
|of           |5   |
|using        |5   |
|including    |4   |
|if           |4   |
|with         |4   |
|documentation|4   |
|an           |4   |
+-------------+----+
only showing top 20 rows

ошибка произошла из-за того, что он не вставил Encoders.tuple(Encoders.STRING(), Encoders.LONG()), указанный во втором аргументе вашей карты

mangusta 26.10.2018 09:37

Попробуй это

 logData.flatMap((FlatMapFunction<String,String>)line -> Arrays.asList(line.split(" ")).iterator(), Encoders.STRING()).groupBy("value").count().show();

Другие вопросы по теме