У меня есть PythonRDD. Мне нужно выполнить добавление элементов для нескольких списков. Добавьте элемент 1 списка 1 к элементу 1 списка 2, затем добавьте к элементу 1 списка 3. Для Канады добавьте 47,59,77 в качестве первого элемента, 97,98,63 в качестве второго элемента и так далее.
Я попытался сгладить список, чтобы добавить их, и попытался преобразовать в фреймворк данных, но мне это не удалось. И я хочу сделать это всеми тремя способами
countryCounts = [
('CANADA','47;97;33;94;6'),
('CANADA','59;98;24;83;3'),
('CANADA','77;63;93;86;62'),
('CHINA','86;71;72;23;27'),
('CHINA','74;69;72;93;7'),
('CHINA','58;99;90;93;41'),
('ENGLAND','40;13;85;75;90'),
('ENGLAND','39;13;33;29;14'),
('ENGLAND','99;88;57;69;49'),
('GERMANY','67;93;90;57;3'),
('GERMANY','9;15;20;19'),
('GERMANY','77;64;46;95;48'),
('INDIA','90;49;91;14;70'),
('INDIA','70;83;38;27;16'),
('INDIA','86;21;19;59;4')
]
countryCountsRdd = sc.parallelize(countryCounts)
countryCountsSplit.collect()
countryCountsGroup=countryCountsSplit.groupByKey().mapValues(list)
countryCountsGroup.collect()
CountsSplit=countryCountsRdd.map(lambda x : (x[0], ",".join(x[1].split(';'))))
countryCountsSplit.collect()
Inputs :
Way 1
[('CANADA', [47, 97, 33, 94, 6]), ('CANADA', [59, 98, 24, 83, 3]), ('CANADA', [77, 63, 93, 86, 62]), ('CHINA', [86, 71, 72, 23, 27]), ('CHINA', [74, 69, 72, 93, 7]), ('CHINA', [58, 99, 90, 93, 41]), ('ENGLAND', [40, 13, 85, 75, 90]), ('ENGLAND', [39, 13, 33, 29, 14]), ('ENGLAND', [99, 88, 57, 69, 49]), ('GERMANY', [67, 93, 90, 57, 3]), ('GERMANY', [9, 15, 20, 19]), ('GERMANY', [77, 64, 46, 95, 48]), ('INDIA', [90, 49, 91, 14, 70]), ('INDIA', [70, 83, 38, 27, 16]), ('INDIA', [86, 21, 19, 59, 4])]
Way 2:
[('CANADA', [[47, 97, 33, 94, 6], [59, 98, 24, 83, 3], [77, 63, 93, 86, 62]]), ('CHINA', [[86, 71, 72, 23, 27], [74, 69, 72, 93, 7], [58, 99, 90, 93, 41]]), ('INDIA', [[90, 49, 91, 14, 70], [70, 83, 38, 27, 16], [86, 21, 19, 59, 4]]), ('ENGLAND', [[40, 13, 85, 75, 90], [39, 13, 33, 29, 14], [99, 88, 57, 69, 49]]), ('GERMANY', [[67, 93, 90, 57, 3], [9, 15, 20, 19], [77, 64, 46, 95, 48]])]
Way 3:
[('CANADA', '47 ,97 ,33 ,94 ,6'), ('CANADA', '59 ,98 ,24 ,83 ,3'), ('CANADA', '77 ,63 ,93 ,86 ,62'), ('CHINA', '86 ,71 ,72 ,23 ,27'), ('CHINA', '74 ,69 ,72 ,93 ,7'), ('CHINA', '58 ,99 ,90 ,93 ,41'), ('ENGLAND', '40 ,13 ,85 ,75 ,90'), ('ENGLAND', '39 ,13 ,33 ,29 ,14'), ('ENGLAND', '99 ,88 ,57 ,69 ,49'), ('GERMANY', '67 ,93 ,90 ,57 ,3'), ('GERMANY', '9 ,15 ,20 ,19'), ('GERMANY', '77 ,64 ,46 ,95 ,48'), ('INDIA', '90 ,49 ,91 ,14 ,70'), ('INDIA', '70 ,83 ,38 ,27 ,16'), ('INDIA', '86 ,21 ,19 ,59 ,4')]
Require same output for all 3 :
[('CANADA','183;258;150;263;71)]
[('CHINA','218,239,234,209,75')]
[('ENGLAND','178,114,175,173,153')]
[('GERMANY','144,166,151,172,70')]
[('INDIA','246,153,148,100,90')]
Ваш вывод для Германии выглядит неверным. Похоже, что в одной из ваших записей всего 4 элемента.
Да Полт. Ты прав. Вывод для Германии неверный.
Таким образом, вы можете сделать это, используя простую операцию reduceByKey на RDD.
ВХОД СДР — СДР[СТРОКА, СПИСОК]
Выходной СДР — input.reduceByKey(x,y -> addFunction(x,y))
addFunction (x, y) выполняет итерацию по входным спискам, добавляет элементы по индексу и возвращает добавленный список.
Вы хотите объединить значения для данного ключа, взяв сумму. Это именно то, что делает reduceByKey
. Вам просто нужно определить ассоциативную и коммутативную функцию редукции, чтобы комбинировать значения по желанию.
def myReducer(a, b):
a, b = map(int, a.split(";")), map(int, b.split(";"))
maxLength = max(len(a), len(b))
if len(a) < len(b):
a = a + [0]*(len(b)-len(a))
elif len(b) < len(a):
b = b + [0]*(len(a)-len(b))
return ";".join([str(a[i] + b[i]) for i in range(maxLength)])
Единственная действительно сложная часть здесь заключается в том, что ваши примеры входных списков имеют разный размер. В этом случае я определил функцию для заполнения нулями более короткого списка.
Теперь звоните reduceByKey
:
countryCountsRdd.reduceByKey(myReducer).collect()
#[('CANADA', '183;258;150;263;71'),
# ('CHINA', '218;239;234;209;75'),
# ('INDIA', '246;153;148;100;90'),
# ('ENGLAND', '178;114;175;173;153'),
# ('GERMANY', '153;172;156;171;51')]
Спасибо, Пол. Если данные неверны, логика будет исправлена заменой NULL на 0.
Пол, не могли бы вы поделиться кодом для myCombiner. Потому что, когда я использую countryCountsReduceByKey=countryCountsSplit.reduceByKey(lambda x,y:x+y), это дает мне [('КАНАДА', [47, 97, 33, 94, 6, 59, 98, 24, 83, 3, 77, 63, 93, 86, 62])], что является неожиданной конкатенацией.
@AbhinavSingh myCombiner
была опечаткой, так и должно было быть myReducer
(исправлено). Вы наносите его на countryCountsRdd
, а НЕ на countCountsSplit
. Почему вы пробуете свою собственную функцию? x+y
для списков будет объединять списки.
Я попытался сгруппировать по «countryCountsGroup», но он создал вложенный список, который я не могу суммировать.