Мера искрового косинуса двух элементов

У меня есть файл .csv, который выглядит так:

message_id, hashtag
id_1, hashtag_1
id_2, hashtag_1
...
id_k, hashtag_m
....

Я пытаюсь найти меру косинуса между каждой парой хэштегов в CSV, выполнив следующие действия:

def isHeader(s: String):Boolean = {
    s.take(1) == "m"
}
def cs(pair: ( (String, String), (List[String], List[String]) ) ) = {
    val msgs1 = pair._2._1.toSet
    val msgs2 = pair._2._2.toSet
    val numer = msgs1.intersect(msgs2).size
    val denom = Math.sqrt(msgs1.size*msgs2.size)
    (pair._1._1, pair._1._2, numer / denom)
}
def to_csv_cos(t: (String, String, Double)): String = {
    t._1 ++ "," ++ t._2 ++ "," ++ t._3.toString
}
val messages = sc.textFile("....csv")
val msgData1 = messages.filter(x => !isHeader(x))
val data = msgData1.map(x => x.split(','))
val pairs = data.map(x => (x(0), List(x(1)))).reduceByKey((a,b) => a ++ b).flatMap(x => x._2.combinations(2).toList).map(x => (x(0), x(1)))
val msgs = data.map( x => (x(1), List(x(0)))).reduceByKey((a,b) => a++b)
val pairs_mapped = pairs.join(msgs).map{
     case (x, (y,z)) => (y, (x,z))
}.join(msgs).map{
    case (x, ( (y,z),t) ) => ( (x,y), (t,z) )
}
val res = pairs_mapped.map{
    x => cs(x)  
}.map(x => to_csv_cos(x)).saveAsTextFile("F:\\Scala\\result")

Идея такая:

  1. Creating pairs from the elements (pairs)
  2. Finding all messages for each hashtag (msgs)
  3. Creating pairs: ( (hashtag_i, hashtag_j), (messages_with_hashtag_i, messages_with_hashtag_j) ) (pairs_mapped)
  4. Calculating a measure(res)

Что ж, я думаю, мой код - просто мусор, поскольку я новичок в scala, spark и концепции функционального программирования, но он работает для небольшого csv (я пробовал со 100 строками). Но я должен рассчитать его для csv с ~ 25 миллионами строк, и в этом проблема. Процесс останавливается на saveAsTextFile (или SparkHadoopWriter.scala в пользовательском интерфейсе Spark) и не продолжается даже через 30 минут, а затем вылетает с различными ошибками (ошибки памяти, иногда просто «прерывание соединения» ...)

Я обнаружил на веб-сайте, что мы можем вычислить косинусную меру, используя фреймы данных, но я не совсем понимаю, как создать правильный фрейм данных из моих данных. Так что, пожалуйста, не могли бы вы дать мне несколько советов, как изменить мой код, чтобы он работал, или как создать правильный фрейм данных из CSV или чего-то еще?
Буду благодарен за любую помощь!

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
119
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Это типичное поведение, когда машине не хватает памяти.

Я предлагаю вам попробовать разделить рабочую нагрузку на партии, с которыми ваша машина сможет справиться.

Вы говорите, что 100 работает легко. Попробуйте 1000. Это работает? Изучите, что возможно, а затем соответствующим образом организуйте свою программу.

Ответ принят как подходящий

У RDD ленивые и активные операции. Активные операции: saveToTextFile, persist, cash, collect, top, take, foreach. Ленивые операции - это «карта», «фильтр», «группа», «присоединение» и многие другие. В вашей программе почти все операции ленивы. Ленивую коллекцию можно вычислить много раз. Вы должны использовать кеширование, чтобы оно оценивалось только один раз.

Когда вы работаете с большое количество данных, вы должны использовать память скудно. Если ваши теги или идентификаторы - Int, вы должны использовать _.toInt. Избегайте использования текстовых файлов для программ, не связанных с обучением, и целей без тестирования. Текстовый файл довольно хорош для человека, но он может быть медленным для ПК: например, Double занимает 8 байтов, но если вы записываете 0,4082482904638631 в текстовый файл, этот Double занимает 18 символов (36 байтов). Кроме того, если ваш тег или идентификатор имеют постоянный размер, вы не можете писать запятую.

Извините за мой плохой английский.

  type Id = String
  type Tag = String
  type Measure = Double

  val messages: RDD[String] = sc.textFile("1.csv").filter(_.head != 'm')
  val data: RDD[(Tag, Id)] = messages.map(line => line.split(","))
    .map(pair => pair(1) -> pair(0))
  val tagIds: RDD[(Tag, Set[Id])] = data.groupByKey()
    .mapValues(_.toSet)
    .persist(StorageLevel.MEMORY_AND_DISK)
  val tagIds1TagIds2: RDD[((Tag, Set[Id]), (Tag, Set[Id]))] = tagIds.cartesian(tagIds).filter({
    case ((t1,s1), (t2,s2)) => t1 < t2
  })
  val tagPairsWithMeasure: RDD[(Tag, Tag, Measure)] = tagIds1TagIds2.map({
    case ((t1,l1), (t2,l2)) => (t1,t2, {
      val numer = l1.intersect(l2).size
      val denom = Math.sqrt(l1.size*l2.size)
      numer.toDouble / denom
    })
  })
  val lines: RDD[String] = tagPairsWithMeasure.map({
    case (t1, t2, m) => s"$t1,$t2,$m"
  })

Контрольная работа:

id1,tag1
id1,tag2
id3,tag3
id3,tag2
id5,tag3
id6,tag1
id7,tag1
id8,tag2

Отвечать:

tag2,tag3,0.4082482904638631 // 1/sqrt(3*2)
tag1,tag2,0.3333333333333333 // 1/sqrt(3*3)
tag1,tag3,0.0                // 0/sqrt(3*2)

Большое спасибо за ответ! Мне пришлось импортировать RDD и StorageLevel, но теперь он работает. Но tagIdzZipped в этом случае всегда пуст, потому что zip создает там пары на равных элементах. Я пытаюсь понять, как это изменить, но все же спасибо!

elfinorr 14.03.2018 06:47

@elfinorr Вы можете заменить aggregateByKey на groupByKey и mapValues ​​(_. toSet)

Mikhail Ionkin 14.03.2018 07:01

извините, я не вижу, есть ли разница ... Разве aggregateByKey не похож на groupByKey, а с каким-то начальным значением?

elfinorr 14.03.2018 08:53

@elfinorr Это похоже, но groupByKey короче (а может и быстрее). теперь я не знаю, почему tagIdsZipped пуст. Сегодня могу более подробно ответить.

Mikhail Ionkin 14.03.2018 10:18

получил, но пусть будет aggregateByKey, потому что работает :) Насчет пустого tagIdsZipped имеет смысл. Мы сжимаем два равных списка, поэтому первые элементы каждой пары также равны, и все элементы фильтруются.

elfinorr 14.03.2018 18:38

@elfinorr я исправил свой ответ

Mikhail Ionkin 14.03.2018 20:38

Извините, забыл отметить. Большое спасибо, вы спасли мне жизнь! :)

elfinorr 15.03.2018 20:54

Другие вопросы по теме