Я пытаюсь реализовать алгоритм Apriori при использовании Hadoop. Я уже реализовал нераспространяемую версию алгоритма Apriori, но мое незнание Hadoop и MapReduce вызвало ряд проблем.
Я хочу реализовать алгоритм в два этапа:
1) На первом этапе задание уменьшения карты будет работать с исходным набором данных транзакции. Результатом этого этапа является файл, содержащий все наборы из 1 элементов и их поддержку 1.
2) На втором этапе я хочу прочитать выходные данные предыдущего этапа, а затем построить новые наборы элементов. Важно отметить, что затем в картографе я хочу определить, есть ли какие-либо новые наборы элементов в наборе данных. Я предполагаю, что если я отправлю исходный набор данных в качестве входных данных для сопоставителя, он разделит исходный файл, так что каждый сопоставитель будет сканировать только частичный набор данных. Однако список кандидатов должен быть составлен из всех результатов предыдущей фазы. Затем это будет повторяться в цикле для фиксированного количества проходов.
Моя проблема заключается в том, чтобы выяснить, как конкретно гарантировать, что я могу получить доступ ко всем наборам элементов в каждом картографе, а также получить доступ к исходному набору данных для расчета новой поддержки на каждой фазе.
Спасибо за любые советы, комментарии, предложения или ответы.
Обновлено: Основываясь на отзывах, я просто хочу уточнить, о чем я здесь спрашиваю.
Перед тем как начать, я предлагаю вам прочитать Учебное пособие по Hadoop Map-Reduce.
Шаг 1: Загрузите файл данных в HDFS. Предположим, что ваши данные - это текстовый файл, а каждый набор представляет собой строку.
a b c
a c d e
a e f
a f z
...
Шаг 2: Следуйте руководству Map-Reduce, чтобы создать свой собственный класс Apriori.
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
// Seprate the line into tokens by space
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
// Add the token into a writable set
... put the element into a writable set ...
}
context.write(word, one);
}
Шаг 3: Запустите jar-файл mapreduce. Результат будет в файле в HDFS. У вас будет что-то вроде:
a b 3 (number of occurrence)
a b c 5
a d 2
...
На основе выходного файла вы можете рассчитать взаимосвязь.
Спасибо за ваши Коментарии. К сожалению, по назначению я должен использовать MapReduce.
Я реализовал алгоритм AES как в Apache Spark, так и в Hadoop MapReduce, используя Hadoop Streaming. Я знаю, что это не то же самое, что Apriori, но вы можете попробовать использовать мой подход.
Простой пример AES, реализованный с использованием Hadoop Streming MapReduce.
Структура проекта для AES Hadoop Streaming
1n_reducer.py / 1n_combiner - это тот же код, но без ограничений.
import sys
CONSTRAINT = 1000
def do_reduce(word, _values):
return word, sum(_values)
prev_key = None
values = []
for line in sys.stdin:
key, value = line.split("\t")
if key != prev_key and prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
if result_value > CONSTRAINT:
print(result_key + "\t" + str(result_value))
values = []
prev_key = key
values.append(int(value))
if prev_key is not None:
result_key, result_value = do_reduce(prev_key, values)
if result_value > CONSTRAINT:
print(result_key + "\t" + str(result_value))
base_mapper.py:
import sys
def count_usage():
for line in sys.stdin:
elements = line.rstrip("\n").rsplit(",")
for item in elements:
print("{item}\t{count}".format(item=item, count=1))
if __name__ == "__main__":
count_usage()
2n_mapper.py использует результат предыдущей итерации. Отвечая на ваш вопрос, вы можете прочитать вывод предыдущей итерации, чтобы сформировать наборы элементов таким образом.
import itertools
import sys
sys.path.append('.')
N_DIM = 2
def get_2n_items():
items = set()
with open("part-00000") as inf:
for line in inf:
parts = line.split('\t')
if len(parts) > 1:
items.add(parts[0])
return items
def count_usage_of_2n_items():
all_items_set = get_2n_items()
for line in sys.stdin:
items = line.rstrip("\n").rsplit(",") # 74743 43355 53554
exist_in_items = set()
for item in items:
if item in all_items_set:
exist_in_items.add(item)
for combination in itertools.combinations(exist_in_items, N_DIM):
combination = sorted(combination)
print("{el1},{el2}\t{count}".format(el1=combination[0], el2=combination[1], count=1))
if __name__ == "__main__":
count_usage_of_2n_items()
По моему опыту, алгоритм Apriori не подходит для Hadoop, если количество уникальных комбинаций (наборов элементов) слишком велико (100K +). Если вы нашли элегантное решение для реализации алгоритма Apriori с использованием Hadoop MapReduce (реализация Streaming или Java MapReduce), поделитесь с сообществом.
PS. Если вам нужно больше фрагментов кода, попросите.
Я знаю, что ваш вопрос касается априорного алгоритма. Но я настоятельно рекомендую лучше использовать
FP Growth Algorithm
из-за многократного повторения времени, которое алгоритм apriori должен делать в процессе. Такой алгоритм не рекомендуется для длинных конвейеров обработки данных.