У меня есть файл .csv, который выглядит так:
message_id, hashtag
id_1, hashtag_1
id_2, hashtag_1
...
id_k, hashtag_m
....
Я пытаюсь найти меру косинуса между каждой парой хэштегов в CSV, выполнив следующие действия:
def isHeader(s: String):Boolean = {
s.take(1) == "m"
}
def cs(pair: ( (String, String), (List[String], List[String]) ) ) = {
val msgs1 = pair._2._1.toSet
val msgs2 = pair._2._2.toSet
val numer = msgs1.intersect(msgs2).size
val denom = Math.sqrt(msgs1.size*msgs2.size)
(pair._1._1, pair._1._2, numer / denom)
}
def to_csv_cos(t: (String, String, Double)): String = {
t._1 ++ "," ++ t._2 ++ "," ++ t._3.toString
}
val messages = sc.textFile("....csv")
val msgData1 = messages.filter(x => !isHeader(x))
val data = msgData1.map(x => x.split(','))
val pairs = data.map(x => (x(0), List(x(1)))).reduceByKey((a,b) => a ++ b).flatMap(x => x._2.combinations(2).toList).map(x => (x(0), x(1)))
val msgs = data.map( x => (x(1), List(x(0)))).reduceByKey((a,b) => a++b)
val pairs_mapped = pairs.join(msgs).map{
case (x, (y,z)) => (y, (x,z))
}.join(msgs).map{
case (x, ( (y,z),t) ) => ( (x,y), (t,z) )
}
val res = pairs_mapped.map{
x => cs(x)
}.map(x => to_csv_cos(x)).saveAsTextFile("F:\\Scala\\result")
Идея такая:
- Creating pairs from the elements (pairs)
- Finding all messages for each hashtag (msgs)
- Creating pairs: ( (hashtag_i, hashtag_j), (messages_with_hashtag_i, messages_with_hashtag_j) ) (pairs_mapped)
- Calculating a measure(res)
Что ж, я думаю, мой код - просто мусор, поскольку я новичок в scala, spark и концепции функционального программирования, но он работает для небольшого csv (я пробовал со 100 строками). Но я должен рассчитать его для csv с ~ 25 миллионами строк, и в этом проблема. Процесс останавливается на saveAsTextFile (или SparkHadoopWriter.scala в пользовательском интерфейсе Spark) и не продолжается даже через 30 минут, а затем вылетает с различными ошибками (ошибки памяти, иногда просто «прерывание соединения» ...)
Я обнаружил на веб-сайте, что мы можем вычислить косинусную меру, используя фреймы данных, но я не совсем понимаю, как создать правильный фрейм данных из моих данных.
Так что, пожалуйста, не могли бы вы дать мне несколько советов, как изменить мой код, чтобы он работал, или как создать правильный фрейм данных из CSV или чего-то еще?
Буду благодарен за любую помощь!





Это типичное поведение, когда машине не хватает памяти.
Я предлагаю вам попробовать разделить рабочую нагрузку на партии, с которыми ваша машина сможет справиться.
Вы говорите, что 100 работает легко. Попробуйте 1000. Это работает? Изучите, что возможно, а затем соответствующим образом организуйте свою программу.
У RDD ленивые и активные операции. Активные операции: saveToTextFile, persist, cash, collect, top, take, foreach. Ленивые операции - это «карта», «фильтр», «группа», «присоединение» и многие другие. В вашей программе почти все операции ленивы. Ленивую коллекцию можно вычислить много раз. Вы должны использовать кеширование, чтобы оно оценивалось только один раз.
Когда вы работаете с большое количество данных, вы должны использовать память скудно. Если ваши теги или идентификаторы - Int, вы должны использовать _.toInt. Избегайте использования текстовых файлов для программ, не связанных с обучением, и целей без тестирования. Текстовый файл довольно хорош для человека, но он может быть медленным для ПК: например, Double занимает 8 байтов, но если вы записываете 0,4082482904638631 в текстовый файл, этот Double занимает 18 символов (36 байтов). Кроме того, если ваш тег или идентификатор имеют постоянный размер, вы не можете писать запятую.
Извините за мой плохой английский.
type Id = String
type Tag = String
type Measure = Double
val messages: RDD[String] = sc.textFile("1.csv").filter(_.head != 'm')
val data: RDD[(Tag, Id)] = messages.map(line => line.split(","))
.map(pair => pair(1) -> pair(0))
val tagIds: RDD[(Tag, Set[Id])] = data.groupByKey()
.mapValues(_.toSet)
.persist(StorageLevel.MEMORY_AND_DISK)
val tagIds1TagIds2: RDD[((Tag, Set[Id]), (Tag, Set[Id]))] = tagIds.cartesian(tagIds).filter({
case ((t1,s1), (t2,s2)) => t1 < t2
})
val tagPairsWithMeasure: RDD[(Tag, Tag, Measure)] = tagIds1TagIds2.map({
case ((t1,l1), (t2,l2)) => (t1,t2, {
val numer = l1.intersect(l2).size
val denom = Math.sqrt(l1.size*l2.size)
numer.toDouble / denom
})
})
val lines: RDD[String] = tagPairsWithMeasure.map({
case (t1, t2, m) => s"$t1,$t2,$m"
})
Контрольная работа:
id1,tag1
id1,tag2
id3,tag3
id3,tag2
id5,tag3
id6,tag1
id7,tag1
id8,tag2
Отвечать:
tag2,tag3,0.4082482904638631 // 1/sqrt(3*2)
tag1,tag2,0.3333333333333333 // 1/sqrt(3*3)
tag1,tag3,0.0 // 0/sqrt(3*2)
@elfinorr Вы можете заменить aggregateByKey на groupByKey и mapValues (_. toSet)
извините, я не вижу, есть ли разница ... Разве aggregateByKey не похож на groupByKey, а с каким-то начальным значением?
@elfinorr Это похоже, но groupByKey короче (а может и быстрее). теперь я не знаю, почему tagIdsZipped пуст. Сегодня могу более подробно ответить.
получил, но пусть будет aggregateByKey, потому что работает :) Насчет пустого tagIdsZipped имеет смысл. Мы сжимаем два равных списка, поэтому первые элементы каждой пары также равны, и все элементы фильтруются.
@elfinorr я исправил свой ответ
Извините, забыл отметить. Большое спасибо, вы спасли мне жизнь! :)
Большое спасибо за ответ! Мне пришлось импортировать RDD и StorageLevel, но теперь он работает. Но tagIdzZipped в этом случае всегда пуст, потому что zip создает там пары на равных элементах. Я пытаюсь понять, как это изменить, но все же спасибо!