Каков наилучший (самый быстрый) способ извлечения случайного значения из списка, большое количество (> 1M) раз?
В настоящее время я нахожусь в ситуации, когда у меня есть граф, представленный в виде списка смежности, внутренние списки которого могут иметь очень разную длину (в диапазоне [2, потенциально 100 КБ]).
Мне нужно перебрать этот список, чтобы генерировать случайные блуждания, поэтому мое текущее решение соответствует строкам
Это работает достаточно хорошо, когда граф не слишком велик, однако теперь я работаю с графом, который содержит> 440 тыс. узлов, с очень большой разницей в количестве ребер, которые имеет каждый узел.
Функция, которую я использую в данный момент для извлечения случайного индекса,
node_neighbors[int(random.random() * number_neighbors_of_node)]
Это ускорило вычисления по сравнению с моей предыдущей реализацией, однако по-прежнему неприемлемо медленно для моих целей.
Количество соседей узла может варьироваться от 2 до десятков тысяч, я не могу удалять маленькие узлы, мне приходится генерировать в этой среде десятки тысяч случайных блужданий.
При профилировании кода большая часть времени генерации тратится на поиск этих индексов, поэтому я ищу метод, который может сократить время, затрачиваемое на это. Однако, если можно полностью обойти это, изменив алгоритм, это тоже было бы здорово.
Спасибо!
Обновлено: из любопытства я протестировал три варианта одного и того же кода, используя timeit
, и вот результаты:
setup='''
import numpy as np
import random
# generate a random adjacency list, nodes have a number of neighbors between 2 and 10000
l = [list(range(random.randint(2, 10000))) for _ in range(10000)]
'''
for _ in range(1000):
v = l[random.randint(0, 10000-1)] # Get a random node adj list
vv = v[random.randint(0, len(v)-1)] # Find a random neighbor in v
0.29709450000001425
for _ in range(1000):
v = l[random.randint(0, 10000-1)]
vv = v[np.random.choice(v)]
26.760767499999986
for _ in range(1000):
v = l[random.randint(0, 10000-1)]
vv = v[int(random.random()*(len(v)))]
0.19086300000000733
for _ in range(1000):
v = l[random.randint(0, 10000-1)]
vv = v[int(random.choice(v))]
0.24351880000000392
как упоминалось @deceze, вы можете использовать random.choice
или вы можете использовать numpy.random.choice
@deceze choice
очень медленный по сравнению с random
.
Насколько быстрее вам это нужно?
@Pawan Для тебя это быстро? Я протестировал его с array([0] * 1000)
, и он был примерно в 20 раз медленнее, чем OP на [0] * 1000
.
Я не уверен, правильно ли я понимаю. Но мне кажется, что вы создаете массив, а затем используете np.random.choice
. Чтобы использовать numpy.random.choice
, вам не нужно создавать массив, он будет работать и со списком.
@Pawan Я сделал это, потому что в списке он был в 378 раз (!) медленнее, чем OP.
numpy.random.choice()
был моим первым выбором (!), Я также протестировал random.randint()
, прежде чем найти текущее решение после некоторого поиска в Google, причем решение в посте было самым быстрым. @superbrain в идеале быстрее, чем это, однако я начинаю думать, что проблема связана с моей общей реализацией, а не с этим конкретным фрагментом кода.
Вы можете сделать их все немного быстрее, используя функции напрямую, например, l[randint(0, 10000-1)]
. Лучше также включить random.choice
. Куда идет шаг 4 «Повторить»? Шаг 1 или шаг 2? Если второе, то наверное v
не нужно быть быстрым, только vv
? Как долго ваши прогулки?
@superbrain обновил шаги. В вызове nprandom.choice
была опечатка, однако, на удивление, это не сильно изменило результат.
random.choice
все еще нет. И nprandom.choice(v)
тоже должно работать.
Ваше решение (sol3) уже является самым быстрым с большим отрывом, чем предполагают ваши тесты. Я адаптировал измерения производительности, чтобы исключить произвольный выбор узлов в пользу обхода пути, который будет намного ближе к заявленной цели.
Вот улучшенные тесты производительности и результат. Я добавил sol5(), чтобы увидеть, будет ли иметь значение предварительное вычисление списка случайных значений (я надеялся, что numpy векторизует это, но это не пошло быстрее).
Настраивать
import numpy as np
import random
# generate a random adjacency list, nodes have a number of neighbors between 2 and 10000
nodes = [list(range(random.randint(2, 10000))) for _ in range(10000)]
pathLen = 1000
Решения
def sol1():
node = nodes[0]
for _ in range(pathLen):
node = nodes[random.randint(0, len(node)-1)] # move to a random neighbor
def sol2():
node = nodes[0]
for _ in range(pathLen):
node = nodes[np.random.choice(node)]
def sol3():
node = nodes[0]
for _ in range(pathLen):
node = nodes[int(random.random()*(len(node)))]
def sol4():
node = nodes[0]
for _ in range(pathLen):
node = nodes[int(random.choice(node))]
def sol5():
node = nodes[0]
for rng in np.random.random_sample(pathLen):
node = nodes[int(rng*len(node))]
Измерения
from timeit import timeit
count = 100
print("sol1",timeit(sol1,number=count))
print("sol2",timeit(sol2,number=count))
print("sol3",timeit(sol3,number=count))
print("sol4",timeit(sol4,number=count))
print("sol5",timeit(sol5,number=count))
sol1 0.12516996199999975
sol2 30.445685411
sol3 0.03886452900000137
sol4 0.1244026900000037
sol5 0.05330073100000021
numpy не очень хорошо справляется с манипулированием матрицами, которые имеют переменные размеры (например, ваши списки соседей), но, возможно, способ ускорить процесс — векторизовать выбор следующего узла. Назначив случайное число с плавающей запятой каждому узлу в массиве numpy, вы можете использовать его для навигации между узлами, пока ваш путь не вернется к уже посещенному узлу. Только тогда вам нужно будет сгенерировать новое случайное значение для этого узла. Предположительно, и в зависимости от длины пути, таких «столкновений» будет относительно небольшое количество.
Используя ту же идею и используя векторизацию numpy, вы можете сделать несколько обходов параллельно, создав матрицу идентификаторов узлов (столбцов), где каждая строка является параллельным обходом.
Чтобы проиллюстрировать это, вот функция, которая продвигает несколько «муравьев» по их индивидуальным случайным путям через узлы:
import numpy as np
import random
nodes = [list(range(random.randint(2, 10000))) for _ in range(10000)]
nbLinks = np.array(list(map(len,nodes)),dtype=np.int) # number of neighbors per node
npNodes = np.array([nb+[-1]*(10000-len(nb)) for nb in nodes]) # fixed sized rows for numpy
def moveAnts(antCount=12,stepCount=8,antPos=None,allPaths=False):
if antPos is None:
antPos = np.random.choice(len(nodes),antCount)
paths = antPos[:,None]
for _ in range(stepCount):
nextIndex = np.random.random_sample(size=(antCount,))*nbLinks[antPos]
antPos = npNodes[antPos,nextIndex.astype(np.int)]
if allPaths:
paths = np.append(paths,antPos[:,None],axis=1)
return paths if allPaths else antPos
Пример: 12 муравьев случайным образом продвигаются на 8 шагов из случайных начальных мест.
print(moveAnts(12,8,allPaths=True))
"""
[[8840 1302 3438 4159 2983 2269 1284 5031 1760]
[4390 5710 4981 3251 3235 2533 2771 6294 2940]
[3610 2059 1118 4630 2333 552 1375 4656 6212]
[9238 1295 7053 542 6914 2348 2481 718 949]
[5308 2826 2622 17 78 976 13 1640 561]
[5763 6079 1867 7748 7098 4884 2061 432 1827]
[3196 3057 27 440 6545 3629 243 6319 427]
[7694 1260 1621 956 1491 2258 676 3902 582]
[1590 4720 772 1366 2112 3498 1279 5474 3474]
[2587 872 333 1984 7263 168 3782 823 9]
[8525 193 449 982 4521 449 3811 2891 3353]
[6824 9221 964 389 4454 720 1898 806 58]]
"""
Производительность не лучше для отдельных муравьев, но параллельно время на одного муравья намного лучше.
from timeit import timeit
count = 100
antCount = 100
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)
t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)
print(t) # 0.9538277329999989 / 100 --> 0.009538277329999989 per ant
[EDIT] Я подумал о лучшей модели массива для строк переменного размера и придумал подход, который не будет тратить память в (в основном пустой) матрице фиксированного размера. Подход заключается в использовании одномерного массива для последовательного хранения ссылок всех узлов и двух дополнительных массивов для хранения начальной позиции и количества соседей. Эта структура данных работает даже быстрее, чем двумерная матрица фиксированного размера.
import numpy as np
import random
nodes = [list(range(random.randint(2, 10000))) for _ in range(10000)]
links = np.array(list(n for neighbors in nodes for n in neighbors))
linkCount = np.array(list(map(len,nodes)),dtype=np.int) # number of neighbors for each node
firstLink = np.insert(np.add.accumulate(linkCount),0,0) # index of first link for each node
def moveAnts(antCount=12,stepCount=8,antPos=None,allPaths=False):
if antPos is None:
antPos = np.random.choice(len(nodes),antCount)
paths = antPos[:,None]
for _ in range(stepCount):
nextIndex = np.random.random_sample(size=(antCount,))*linkCount[antPos]
antPos = links[firstLink[antPos]+nextIndex.astype(np.int)]
if allPaths:
paths = np.append(paths,antPos[:,None],axis=1)
return paths if allPaths else antPos
from timeit import timeit
count = 100
antCount = 100
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)
t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)
print(t) # 0.7157810379999994 / 100 --> 0.007157810379999994 per ant
Производительность «на одного муравья» улучшается по мере добавления их до определенного предела (примерно в 10 раз быстрее, чем sol3):
antCount = 1000
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)
t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)
print(t,t/antCount) #3.9749405650000007, 0.0039749405650000005 per ant
antCount = 10000
stepCount = 1000
ap = np.random.choice(len(nodes),antCount)
t = timeit(lambda:moveAnts(antCount,stepCount,ap),number=count)
print(t,t/antCount) #32.688697579, 0.0032688697579 per ant
Спасибо за ответ, я уже думал, что numpy не будет так хорошо работать в ситуации с такой изменчивостью размера массивов, и мне было интересно, есть ли способ обойти проблему. Я не уверен, что смогу сразу реализовать ваше решение в своем коде, но в любом случае еще раз спасибо за то, что вы нашли потенциально более быструю технику, чем та, что у меня уже есть.
См. мой РЕДАКТИРОВАТЬ для более эффективного способа управления матрицей переменного размера с использованием numpy (и лучшей производительности для загрузки).
@Alain T. уже отвечает на конкретный вопрос о графиках.
Мой ответ будет посвящен общему вопросу: «Какой самый быстрый способ найти случайный индекс в списке Python большое количество раз?», что означает создание случайной подвыборки элементов N
из заданного списка или массива NumPy.
Мы будем оценивать следующие методы:
## Returns Python list
random_subset = random.sample(data, N)
## Returns numpy.ndarray
random_selection = np.random.choice(data, N)
## Obtain a single random idx using random.randrange, N times, used to index data
random_selection = [data[idx] for idx in (random.randrange(0, len(data)) for _ in range(N))]
## Obtain a single random idx using using np.random.randint, N times, used to index data
random_selection = [data[idx] for idx in (np.random.randint(0, len(data)) for _ in range(N))]
## Create an NumPy array of `N` random indexes using np.random.randint, used to index data
random_selection = [data[idx] for idx in np.random.randint(0, len(data), N)]
Предположим, у нас есть значения данных 1MM (точные значения не имеют значения).
data_nparray = np.random.random(int(1e6))
data_list = list(data_nparray)
Как оказалось, при оценке самого быстрого способа генерации серии случайных элементов ответ меняется, если данные хранятся в виде списка или массива NumPy.
random.sample
является самым быстрым (т.е. random.sample(data, N)
)N
случайных индексов, используя np.random.randint
(т.е. [data[idx] for idx in np.random.randint(0, len(data), N)]
)np.random.choice
здесь работает очень медленно, так как каждый раз преобразует данные в массив NumPy. Однако для > 100 000 вариантов это становится самым быстрым вариантом.random.sample
повсюду.random.sample
не работает с массивами NumPy, только со списками. Преобразование элементов размером 1 мм из массива NumPy в сам список занимает в среднем ~ 23 000 микросекунд, поэтому мы пропустим random.sample
.N
случайные индексы один за другим, используя random.randrange
(т. е. [data[idx] for idx in (random.randrange(0, len(data)) for _ in range(N))]
)N
случайных индексов, используя np.random.randint
(т. е. [data[idx] for idx in np.random.randint(0, len(data), N)]
)np.random.choice
(т. е. np.random.choice(data, N)
), который также возвращает массив NumPy, а не список.random.randrange
для <25 элементов, а затем переключитесь на np.random.choice
.import time
import random
import numpy as np
import pandas as pd
from IPython.display import display
def time_fn(fn, num_runs=7, num_loops=1_000):
run_times = []
for num_run_i in range(num_runs):
start = time.time()
for _ in range(num_loops):
fn()
end = time.time()
run_times.append((end-start)/num_loops)
return run_times
def create_times_df_row(data, N, fn, fn_name, num_runs=7, num_loops=1_000):
try:
run_times = time_fn(fn, num_runs=num_runs, num_loops=num_loops)
return [
{
'N': N,
'function': fn_name,
'run_time_avg': np.mean(run_times),
'run_time_std': np.std(run_times),
'num_runs': num_runs,
'num_loops': num_loops,
'input_type': str(type(data)),
'return_type': str(type(fn())),
}
]
except Exception as e:
print(e)
return []
data_nparray = np.random.random(int(1e4))
data_list = list(data_nparray)
all_times_df = []
for N in list(range(1, 10)) + list(range(10, 25, 5)) + list(range(25, 100, 25)) + list(range(100, 1000+1, 100)):
times_df = []
for data in [data_nparray, data_list]:
num_loops = 1_000
if N >= 100:
num_loops = 100
if N >= 1_000:
num_loops = 10
times_df.extend(create_times_df_row(
data=data,
N=N,
fn=lambda: np.random.choice(data, N),
fn_name='np.random.choice',
num_loops=num_loops if isinstance(data, np.ndarray) else 10,
))
times_df.extend(create_times_df_row(
data=data,
N=N,
fn=lambda: random.sample(data, N),
fn_name='random.sample',
num_loops=num_loops,
))
times_df.extend(create_times_df_row(
data=data,
N=N,
fn=lambda: [data[idx] for idx in (random.randrange(0, len(data)) for _ in range(N))],
fn_name='random.randrange',
num_loops=num_loops,
))
times_df.extend(create_times_df_row(
data=data,
N=N,
fn=lambda: [data[idx] for idx in (np.random.randint(0, len(data)) for _ in range(N))],
fn_name='np.random.randint (single idx)',
num_loops=num_loops,
))
times_df.extend(create_times_df_row(
data=data,
N=N,
fn=lambda: [data[idx] for idx in np.random.randint(0, len(data), N)],
fn_name='np.random.randint (idx array)',
num_loops=num_loops,
))
print(N, end=' ')
times_df = pd.DataFrame(times_df)
display(times_df)
all_times_df.append(times_df)
all_times_df = pd.concat(all_times_df).reset_index(drop=True)
from __future__ import division
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
import time
from typing import *
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
# Assignment Constants
RANDOM_STATE = 10
FIGSIZE = (12.0, 9.0)
#### Use the following line before plt.plot(....) to increase the plot size ####
plt.figure(figsize=FIGSIZE)
### Adjust some plot display options:
sns.set(rc = {
'figure.figsize':FIGSIZE,
'axes.facecolor':'white',
'figure.facecolor':'white',
'grid.color': 'gainsboro',
})
sns.set_context("talk")
def time_to_ms(row):
row['run_time_avg'] = 1000*row['run_time_avg']
row['run_time_std'] = 1000*row['run_time_std']
return row
def time_to_us(row):
row['run_time_avg'] = 1e6*row['run_time_avg']
row['run_time_std'] = 1e6*row['run_time_std']
return row
cmap = dict(zip(all_times_df['function'].unique(), sns.color_palette('tab10')))
xticks = [1, 2, 3, 5, 7, 10, 15, 25, 50, 100, 200, 500, 1000,]
## Plot for Python list
line_plot = sns.lineplot(
data=all_times_df.query(f'''input_type == "<class 'list'>"''').apply(time_to_us, axis=1),
x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
)
scatter_plot = sns.scatterplot(
data=all_times_df.query(f'''input_type == "<class 'list'>"''').apply(time_to_us, axis=1),
x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
legend=False,
)
line_plot.set_xscale("log")
line_plot.set_yscale("log")
plt.xticks(xticks)
line_plot.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())
plt.xlabel("Size")
plt.ylabel("Time (µs)")
line_plot.set_title("Random selection from a Python list")
plt.legend(bbox_to_anchor=(0.5,0.5))
plt.ylim(7e-1, 2e5)
display(line_plot)
## Plot for Numpy array
line_plot = sns.lineplot(
data=all_times_df.query(f'''input_type == "<class 'numpy.ndarray'>"''').apply(time_to_us, axis=1),
x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
# legend=False,
)
sns.scatterplot(
data=all_times_df.query(f'''input_type == "<class 'numpy.ndarray'>"''').apply(time_to_us, axis=1),
x='N', y='run_time_avg', hue='function', linewidth=3, palette=cmap,
legend=False,
)
line_plot.set_xscale("log")
line_plot.set_yscale("log")
plt.xticks(xticks)
line_plot.get_xaxis().set_major_formatter(matplotlib.ticker.ScalarFormatter())
plt.xlabel("Size")
plt.ylabel("Time (µs)")
line_plot.set_title("Random selection from a NumPy array")
plt.legend(bbox_to_anchor=(1, 1))
plt.ylim(7e-1, 2e5)
display(line_plot)
random.choice
…?