Какой самый быстрый способ найти случайный индекс в списке Python большое количество раз?

Каков наилучший (самый быстрый) способ извлечения случайного значения из списка, большое количество (> 1M) раз?

В настоящее время я нахожусь в ситуации, когда у меня есть граф, представленный в виде списка смежности, внутренние списки которого могут иметь очень разную длину (в диапазоне [2, потенциально 100 КБ]).

Мне нужно перебрать этот список, чтобы генерировать случайные блуждания, поэтому мое текущее решение соответствует строкам

  1. Получить случайный узел
  2. Выберите случайный индекс из списка смежности этого узла.
  3. Перейти к новому узлу
  4. Перейти к 2
  5. Повторяйте до тех пор, пока случайное блуждание не станет необходимой длины.
  6. Перейти к 1

Это работает достаточно хорошо, когда граф не слишком велик, однако теперь я работаю с графом, который содержит> 440 тыс. узлов, с очень большой разницей в количестве ребер, которые имеет каждый узел.

Функция, которую я использую в данный момент для извлечения случайного индекса,

node_neighbors[int(random.random() * number_neighbors_of_node)]

Это ускорило вычисления по сравнению с моей предыдущей реализацией, однако по-прежнему неприемлемо медленно для моих целей.

Количество соседей узла может варьироваться от 2 до десятков тысяч, я не могу удалять маленькие узлы, мне приходится генерировать в этой среде десятки тысяч случайных блужданий.

При профилировании кода большая часть времени генерации тратится на поиск этих индексов, поэтому я ищу метод, который может сократить время, затрачиваемое на это. Однако, если можно полностью обойти это, изменив алгоритм, это тоже было бы здорово.

Спасибо!

Обновлено: из любопытства я протестировал три варианта одного и того же кода, используя timeit, и вот результаты:

setup='''
import numpy as np
import random

# generate a random adjacency list, nodes have a number of neighbors between 2 and 10000

l = [list(range(random.randint(2, 10000))) for _ in range(10000)]
'''

for _ in range(1000):    
    v = l[random.randint(0, 10000-1)] # Get a random node adj list 
    vv = v[random.randint(0, len(v)-1)] # Find a random neighbor in v

0.29709450000001425

for _ in range(1000):    
    v = l[random.randint(0, 10000-1)]
    vv = v[np.random.choice(v)]

26.760767499999986

for _ in range(1000):    
    v = l[random.randint(0, 10000-1)]
    vv = v[int(random.random()*(len(v)))]

0.19086300000000733

for _ in range(1000):    
    v = l[random.randint(0, 10000-1)]
    vv = v[int(random.choice(v))]

0.24351880000000392
random.choice…?
deceze 11.12.2020 19:08

как упоминалось @deceze, вы можете использовать random.choice или вы можете использовать numpy.random.choice

Pawan 11.12.2020 19:09

@deceze choice очень медленный по сравнению с random.

superb rain 11.12.2020 19:14

Насколько быстрее вам это нужно?

superb rain 11.12.2020 19:14

@Pawan Для тебя это быстро? Я протестировал его с array([0] * 1000), и он был примерно в 20 раз медленнее, чем OP на [0] * 1000.

superb rain 11.12.2020 19:27

Я не уверен, правильно ли я понимаю. Но мне кажется, что вы создаете массив, а затем используете np.random.choice. Чтобы использовать numpy.random.choice, вам не нужно создавать массив, он будет работать и со списком.

Pawan 11.12.2020 19:31

@Pawan Я сделал это, потому что в списке он был в 378 раз (!) медленнее, чем OP.

superb rain 11.12.2020 19:33
numpy.random.choice() был моим первым выбором (!), Я также протестировал random.randint(), прежде чем найти текущее решение после некоторого поиска в Google, причем решение в посте было самым быстрым. @superbrain в идеале быстрее, чем это, однако я начинаю думать, что проблема связана с моей общей реализацией, а не с этим конкретным фрагментом кода.
ThCP 11.12.2020 21:25

Вы можете сделать их все немного быстрее, используя функции напрямую, например, l[randint(0, 10000-1)]. Лучше также включить random.choice. Куда идет шаг 4 «Повторить»? Шаг 1 или шаг 2? Если второе, то наверное v не нужно быть быстрым, только vv? Как долго ваши прогулки?

superb rain 11.12.2020 21:44

@superbrain обновил шаги. В вызове nprandom.choice была опечатка, однако, на удивление, это не сильно изменило результат.

ThCP 11.12.2020 22:24

random.choice все еще нет. И nprandom.choice(v) тоже должно работать.

superb rain 11.12.2020 22:41
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
4
11
1 043
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Ваше решение (sol3) уже является самым быстрым с большим отрывом, чем предполагают ваши тесты. Я адаптировал измерения производительности, чтобы исключить произвольный выбор узлов в пользу обхода пути, который будет намного ближе к заявленной цели.

Вот улучшенные тесты производительности и результат. Я добавил sol5(), чтобы увидеть, будет ли иметь значение предварительное вычисление списка случайных значений (я надеялся, что numpy векторизует это, но это не пошло быстрее).

Настраивать

import numpy as np
import random

# generate a random adjacency list, nodes have a number of neighbors between 2 and 10000

nodes     = [list(range(random.randint(2, 10000))) for _ in range(10000)]
pathLen   = 1000

Решения

def sol1():
    node = nodes[0]
    for _ in range(pathLen):
        node = nodes[random.randint(0, len(node)-1)] # move to a random neighbor

def sol2():
    node = nodes[0]
    for _ in range(pathLen):
        node = nodes[np.random.choice(node)]

def sol3():
    node = nodes[0]
    for _ in range(pathLen):
        node = nodes[int(random.random()*(len(node)))]

def sol4():
    node = nodes[0]
    for _ in range(pathLen):
        node = nodes[int(random.choice(node))]

def sol5():
    node = nodes[0]
    for rng in np.random.random_sample(pathLen):
        node = nodes[int(rng*len(node))]

Измерения

from timeit import timeit
count = 100

print("sol1",timeit(sol1,number=count))
print("sol2",timeit(sol2,number=count))
print("sol3",timeit(sol3,number=count))
print("sol4",timeit(sol4,number=count))
print("sol5",timeit(sol5,number=count))

sol1 0.12516996199999975
sol2 30.445685411
sol3 0.03886452900000137
sol4 0.1244026900000037
sol5 0.05330073100000021

numpy не очень хорошо справляется с манипулированием матрицами, которые имеют переменные размеры (например, ваши списки соседей), но, возможно, способ ускорить процесс — векторизовать выбор следующего узла. Назначив случайное число с плавающей запятой каждому узлу в массиве numpy, вы можете использовать его для навигации между узлами, пока ваш путь не вернется к уже посещенному узлу. Только тогда вам нужно будет сгенерировать новое случайное значение для этого узла. Предположительно, и в зависимости от длины пути, таких «столкновений» будет относительно небольшое количество.

Используя ту же идею и используя векторизацию numpy, вы можете сделать несколько обходов параллельно, создав матрицу идентификаторов узлов (столбцов), где каждая строка является параллельным обходом.

Чтобы проиллюстрировать это, вот функция, которая продвигает несколько «муравьев» по ​​их индивидуальным случайным путям через узлы:

import numpy as np
import random

nodes   = [list(range(random.randint(2, 10000))) for _ in range(10000)]
nbLinks = np.array(list(map(len,nodes)),dtype=np.int)         # number of neighbors per node
npNodes = np.array([nb+[-1]*(10000-len(nb)) for nb in nodes]) # fixed sized rows for numpy

def moveAnts(antCount=12,stepCount=8,antPos=None,allPaths=False):
    if antPos is None:
        antPos = np.random.choice(len(nodes),antCount)
    paths = antPos[:,None]

    for _ in range(stepCount):
        nextIndex = np.random.random_sample(size=(antCount,))*nbLinks[antPos]
        antPos    = npNodes[antPos,nextIndex.astype(np.int)]
        if allPaths:
            paths = np.append(paths,antPos[:,None],axis=1)
        
    return paths if allPaths else antPos

Пример: 12 муравьев случайным образом продвигаются на 8 шагов из случайных начальных мест.

print(moveAnts(12,8,allPaths=True))

"""
    [[8840 1302 3438 4159 2983 2269 1284 5031 1760]
     [4390 5710 4981 3251 3235 2533 2771 6294 2940]
     [3610 2059 1118 4630 2333  552 1375 4656 6212]
     [9238 1295 7053  542 6914 2348 2481  718  949]
     [5308 2826 2622   17   78  976   13 1640  561]
     [5763 6079 1867 7748 7098 4884 2061  432 1827]
     [3196 3057   27  440 6545 3629  243 6319  427]
     [7694 1260 1621  956 1491 2258  676 3902  582]
     [1590 4720  772 1366 2112 3498 1279 5474 3474]
     [2587  872  333 1984 7263  168 3782  823    9]
     [8525  193  449  982 4521  449 3811 2891 3353]
     [6824 9221  964  389 4454  720 1898  806   58]]
"""

Производительность не лучше для отдельных муравьев, но параллельно время на одного муравья намного лучше.

from timeit import timeit
count = 100

antCount  = 100
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)

t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)

print(t) # 0.9538277329999989 / 100 --> 0.009538277329999989 per ant

[EDIT] Я подумал о лучшей модели массива для строк переменного размера и придумал подход, который не будет тратить память в (в основном пустой) матрице фиксированного размера. Подход заключается в использовании одномерного массива для последовательного хранения ссылок всех узлов и двух дополнительных массивов для хранения начальной позиции и количества соседей. Эта структура данных работает даже быстрее, чем двумерная матрица фиксированного размера.

import numpy as np
import random

nodes     = [list(range(random.randint(2, 10000))) for _ in range(10000)]
links     = np.array(list(n for neighbors in nodes for n in neighbors))
linkCount = np.array(list(map(len,nodes)),dtype=np.int) # number of neighbors for each node
firstLink = np.insert(np.add.accumulate(linkCount),0,0) # index of first link for each node



def moveAnts(antCount=12,stepCount=8,antPos=None,allPaths=False):
    if antPos is None:
        antPos = np.random.choice(len(nodes),antCount)
    paths = antPos[:,None]

    for _ in range(stepCount):
        nextIndex = np.random.random_sample(size=(antCount,))*linkCount[antPos]
        antPos    = links[firstLink[antPos]+nextIndex.astype(np.int)]
        if allPaths:
            paths = np.append(paths,antPos[:,None],axis=1)
        
    return paths if allPaths else antPos

from timeit import timeit
count = 100

antCount  = 100
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)

t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)

print(t) # 0.7157810379999994 / 100 --> 0.007157810379999994 per ant

Производительность «на одного муравья» улучшается по мере добавления их до определенного предела (примерно в 10 раз быстрее, чем sol3):

antCount  = 1000
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)

t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)

print(t,t/antCount) #3.9749405650000007, 0.0039749405650000005 per ant

antCount  = 10000
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)

t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)

print(t,t/antCount) #32.688697579, 0.0032688697579 per ant

Спасибо за ответ, я уже думал, что numpy не будет так хорошо работать в ситуации с такой изменчивостью размера массивов, и мне было интересно, есть ли способ обойти проблему. Я не уверен, что смогу сразу реализовать ваше решение в своем коде, но в любом случае еще раз спасибо за то, что вы нашли потенциально более быструю технику, чем та, что у меня уже есть.

ThCP 16.12.2020 23:03

См. мой РЕДАКТИРОВАТЬ для более эффективного способа управления матрицей переменного размера с использованием numpy (и лучшей производительности для загрузки).

Alain T. 17.12.2020 18:49

@Alain T. уже отвечает на конкретный вопрос о графиках.

Мой ответ будет посвящен общему вопросу: «Какой самый быстрый способ найти случайный индекс в списке Python большое количество раз?», что означает создание случайной подвыборки элементов N из заданного списка или массива NumPy.

Мы будем оценивать следующие методы:

## Returns Python list
random_subset = random.sample(data, N)  

## Returns numpy.ndarray
random_selection = np.random.choice(data, N)  

## Obtain a single random idx using random.randrange, N times, used to index data
random_selection = [data[idx] for idx in (random.randrange(0, len(data)) for _ in range(N))]  

## Obtain a single random idx using using np.random.randint, N times, used to index data
random_selection = [data[idx] for idx in (np.random.randint(0, len(data)) for _ in range(N))]

## Create an NumPy array of `N` random indexes using np.random.randint, used to index data
random_selection = [data[idx] for idx in np.random.randint(0, len(data), N)]

Предположим, у нас есть значения данных 1MM (точные значения не имеют значения).

data_nparray = np.random.random(int(1e6))  
data_list = list(data_nparray)  

Как оказалось, при оценке самого быстрого способа генерации серии случайных элементов ответ меняется, если данные хранятся в виде списка или массива NumPy.

Случайный выбор из списка или кортежа Python

  • На приведенном выше графике показано увеличение времени (в микросекундах) при случайном выборе элементов из списка Python.
    • Для <50 элементов random.sample является самым быстрым (т.е. random.sample(data, N))
    • Для >= 50 элементов быстрее всего создать массив N случайных индексов, используя np.random.randint (т.е. [data[idx] for idx in np.random.randint(0, len(data), N)])
    • np.random.choice здесь работает очень медленно, так как каждый раз преобразует данные в массив NumPy. Однако для > 100 000 вариантов это становится самым быстрым вариантом.
    • Вероятно, вам сойдет с рук использование random.sample повсюду.
    • Было обнаружено, что те же результаты справедливы и для кортежей Python.

Случайный выбор из массива NumPy

  • График выше показывает время увеличения (микросекунды) при случайном выборе элементов из массива NumPy.
    • random.sample не работает с массивами NumPy, только со списками. Преобразование элементов размером 1 мм из массива NumPy в сам список занимает в среднем ~ 23 000 микросекунд, поэтому мы пропустим random.sample.
    • Для <10 элементов быстрее всего выбирать N случайные индексы один за другим, используя random.randrange (т. е. [data[idx] for idx in (random.randrange(0, len(data)) for _ in range(N))])
    • Для 10–50 элементов быстрее всего создать массив N случайных индексов, используя np.random.randint (т. е. [data[idx] for idx in np.random.randint(0, len(data), N)])
    • Для > 50 элементов быстрее всего использовать np.random.choice (т. е. np.random.choice(data, N)), который также возвращает массив NumPy, а не список.
    • Возможно, вам сойдет с рук использование random.randrange для <25 элементов, а затем переключитесь на np.random.choice.

Код для воспроизведения

import time
import random
import numpy as np
import pandas as pd
from IPython.display import display
def time_fn(fn, num_runs=7, num_loops=1_000):
    run_times = [] 
    for num_run_i in range(num_runs):
        start = time.time()
        for _ in range(num_loops):
            fn()
        end = time.time()
        run_times.append((end-start)/num_loops)
    return run_times

def create_times_df_row(data, N, fn, fn_name, num_runs=7, num_loops=1_000):
    try:
        run_times = time_fn(fn, num_runs=num_runs, num_loops=num_loops)
        return [
            {
                'N': N,
                'function': fn_name,
                'run_time_avg': np.mean(run_times),
                'run_time_std': np.std(run_times),
                'num_runs': num_runs,
                'num_loops': num_loops,
                'input_type': str(type(data)),
                'return_type': str(type(fn())),
            }
        ]
    except Exception as e:
        print(e)
    return []

data_nparray = np.random.random(int(1e4))
data_list = list(data_nparray)
all_times_df = []
for N in list(range(1, 10)) + list(range(10, 25, 5)) + list(range(25, 100, 25)) + list(range(100, 1000+1, 100)):
    times_df = []
    for data in [data_nparray, data_list]:
        num_loops = 1_000
        if N >= 100:
            num_loops = 100
        if N >= 1_000:
            num_loops = 10
        times_df.extend(create_times_df_row(
            data=data,
            N=N,
            fn=lambda: np.random.choice(data, N),
            fn_name='np.random.choice',
            num_loops=num_loops if isinstance(data, np.ndarray) else 10,
        ))
        times_df.extend(create_times_df_row(
            data=data,
            N=N,
            fn=lambda: random.sample(data, N),
            fn_name='random.sample',
            num_loops=num_loops,
        ))
        times_df.extend(create_times_df_row(
            data=data,
            N=N,
            fn=lambda: [data[idx] for idx in (random.randrange(0, len(data)) for _ in range(N))],
            fn_name='random.randrange',
            num_loops=num_loops,
        ))
        times_df.extend(create_times_df_row(
            data=data,
            N=N,
            fn=lambda: [data[idx] for idx in (np.random.randint(0, len(data)) for _ in range(N))],
            fn_name='np.random.randint (single idx)',
            num_loops=num_loops,
        ))
        times_df.extend(create_times_df_row(
            data=data,
            N=N,
            fn=lambda: [data[idx] for idx in np.random.randint(0, len(data), N)],
            fn_name='np.random.randint (idx array)',
            num_loops=num_loops,
        ))
    print(N, end=' ')
    times_df = pd.DataFrame(times_df)
    display(times_df)
    all_times_df.append(times_df)
    
all_times_df = pd.concat(all_times_df).reset_index(drop=True)


from __future__ import division
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import time

from typing import *
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt 
# Assignment Constants
RANDOM_STATE = 10
FIGSIZE = (12.0, 9.0)

#### Use the following line before plt.plot(....) to increase the plot size ####
plt.figure(figsize=FIGSIZE)
### Adjust some plot display options:
sns.set(rc = {
    'figure.figsize':FIGSIZE, 
    'axes.facecolor':'white', 
    'figure.facecolor':'white', 
    'grid.color': 'gainsboro',
})
sns.set_context("talk")

def time_to_ms(row):
    row['run_time_avg'] = 1000*row['run_time_avg']
    row['run_time_std'] = 1000*row['run_time_std']
    return row

def time_to_us(row):
    row['run_time_avg'] = 1e6*row['run_time_avg']
    row['run_time_std'] = 1e6*row['run_time_std']
    return row

cmap = dict(zip(all_times_df['function'].unique(), sns.color_palette('tab10')))

xticks = [1, 2, 3, 5, 7, 10, 15, 25, 50, 100, 200, 500, 1000,]


## Plot for Python list
line_plot = sns.lineplot(
    data=all_times_df.query(f'''input_type == "<class 'list'>"''').apply(time_to_us, axis=1),
    x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
)
scatter_plot = sns.scatterplot(
    data=all_times_df.query(f'''input_type == "<class 'list'>"''').apply(time_to_us, axis=1),
    x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
    legend=False,
)

line_plot.set_xscale("log")
line_plot.set_yscale("log")
plt.xticks(xticks)
line_plot.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())
plt.xlabel("Size")
plt.ylabel("Time (µs)")
line_plot.set_title("Random selection from a Python list")
plt.legend(bbox_to_anchor=(0.5,0.5))
plt.ylim(7e-1, 2e5)
display(line_plot)


## Plot for Numpy array
line_plot = sns.lineplot(
    data=all_times_df.query(f'''input_type == "<class 'numpy.ndarray'>"''').apply(time_to_us, axis=1),
    x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
    # legend=False,
)
sns.scatterplot(
    data=all_times_df.query(f'''input_type == "<class 'numpy.ndarray'>"''').apply(time_to_us, axis=1),
    x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
    legend=False,
)
line_plot.set_xscale("log")
line_plot.set_yscale("log")
plt.xticks(xticks)
line_plot.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())

plt.xlabel("Size")
plt.ylabel("Time (µs)")
line_plot.set_title("Random selection from a NumPy array")
plt.legend(bbox_to_anchor=(1, 1))
plt.ylim(7e-1, 2e5)
display(line_plot)

Другие вопросы по теме