Я работаю над проектом Python, который генерирует случайные карты плиток с помощью NumPy. Хотя мой код работоспособен, в настоящее время он работает слишком медленно, и я изо всех сил пытаюсь его эффективно векторизовать, чтобы повысить производительность.
Я начинаю с небольшой сетки (например, 4х4) и размещаю несколько тайлов земли. Затем карта масштабируется путем разделения каждого фрагмента на 4 меньших фрагмента. Для каждой из 4 новых плиток код решает, сохранить ли ее как землю или превратить в воду. Это решение основано на окружающих плитках: чем больше соседних плиток с водой, тем больше шансов превратиться в воду, а чем больше соседних плиток с землей, тем выше вероятность того, что земля останется. Этот процесс приводит к нечеткому типу масштабирования по принципу ближайшего соседа.
Этот процесс масштабирования повторяется, постепенно увеличивая размер сетки до достижения желаемого размера карты (например, с 4x4 до 16x16, до 32x32 и так далее). Конечным результатом является массив суши с границами, которые выглядят естественно текстурированными благодаря процессу масштабирования.
В процессе масштабирования водные плитки не могут превратиться в землю, поэтому они не проверяются. Поскольку потенциально в воду могут превратиться только тайлы земли, а тайл, полностью окруженный землей, не может стать водой, коду не нужно проверять каждый тайл земли. Это уменьшает количество плиток, которые необходимо проверить. Код отслеживает граничные плитки (примыкающие к воде) и обновляет их только с помощью метода «check_tile».
Тем не менее, я изо всех сил пытаюсь векторизовать метод «check_tile», который эффективно потребляет 90% времени выполнения с помощью NumPy.
Вот код (требуется версия numpy >= 2.0):
import numpy as np
class MapDataGen:
"""
Procedurally generates a world map and returns a numpy array representation of it.
Water proceeds from borders to inland.
Water percentage increases with each iteration.
"""
def __init__(self, start_size: int, seed: int) -> None:
"""
Initialize starting world map.
start_size: size of the map (start_size x start_size).
seed: used for reproducible randomness.
"""
# Starting map, filled with 0, start_size by start_size big.
self.world_map = np.zeros((start_size, start_size), dtype=np.uint8)
# Random number generator.
self.rng = np.random.default_rng(seed)
# List to store border tile indexes.
self.borders = []
self.newborders = []
def add_land(self, land_id, from_index, to_index):
"""
Add land to the world map at any time and at any map resolution.
land_id: non-zero uint8, id of the new land tile (0 is reserved for water).
from_index: starting index (inclusive) for the land area.
to_index: ending index (exclusive) for the land area.
"""
row_size, column_size = self.world_map.shape
from_row = max(0, from_index[0])
to_row = min(to_index[0], row_size)
from_col = max(0, from_index[1])
to_col = min(to_index[1], column_size)
self.world_map[
from_row:to_row,
from_col:to_col,
] = land_id
for row in range(from_row, to_row):
for column in range(from_col, to_col):
self.borders.append((row, column))
def neighbours(self, index, radius) -> np.ndarray:
"""
Returns neighbour tiles within the given radius of the index.
index: tuple representing the index of the tile.
radius: the radius to search for neighbours.
"""
row_size, column_size = self.world_map.shape
return self.world_map[
max(0, index[0] - radius) : min(index[0] + radius + 1, row_size),
max(0, index[1] - radius) : min(index[1] + radius + 1, column_size),
]
def upscale_map(self) -> np.ndarray:
"""
Divide each tile into 4 pieces and upscale the map by a factor of 2.
"""
row, column = self.world_map.shape
rs, cs = self.world_map.strides
x = np.lib.stride_tricks.as_strided(
self.world_map, (row, 2, column, 2), (rs, 0, cs, 0)
)
self.newmap = x.reshape(row * 2, column * 2)
# /Old version/.
# self.newmap = np.repeat(np.repeat(self.worldmap, 2, axis=0), 2, axis=1)
def check_tile(self, index: tuple):
"""
Texturize borders and update new borders.
index: tuple representing the index of the tile to check.
"""
# Corresponding land id to current working tile.
tile = self.world_map[index]
# If tile at the index is surrounded by identical tiles within a 2-tile range.
if np.all(self.neighbours(index, 2) == tile):
# Don't store it; this tile cannot become water because it is surrounded by 2-tile wide same land as itself.
pass
else:
# The values of unique tiles and their counts.
values, counts = np.unique_counts(self.neighbours(index, 1))
# Calculate the probability of turning into other tiles for descended tiles.
probability = counts.astype(np.float16)
probability /= np.sum(probability)
# Randomly change each of the 4 newly descended tiles of the original tile to either water or not.
for row in range(2):
for column in range(2):
# One of the four descended tile's index.
new_tile_index = (index[0] * 2 + row, index[1] * 2 + column)
# Randomly chose a value according to its probability.
random = self.rng.choice(values, p=probability)
if random == 0: # If tile at the index became water.
# Update it on the new map.
self.newmap[new_tile_index] = random
# Don't store it because it is a water tile and no longer a border tile.
elif random == tile: # If the tile remained the same.
# Store it because it is still a border tile.
self.newborders.append(new_tile_index)
else: # If the tile changed to a different land.
# Update it on the new map.
self.newmap[new_tile_index] = random
# Store it because it is still a border tile.
self.newborders.append(new_tile_index)
def default_procedure(self):
"""
Default procedure: upscale (or zoom into) the map and texturize borders.
"""
self.upscale_map()
for index in self.borders:
self.check_tile(index)
self.borders = self.newborders
self.newborders = []
self.world_map = self.newmap
Чтобы увидеть, что он делает:
import time
from matplotlib import pyplot as plt
from matplotlib import colors
wmg = MapDataGen(13, 3)
wmg.add_land(1, (1, 1), (7, 7))
wmg.add_land(1, (8, 8), (12, 12))
plt.title(f"Starting Map")
colormap = colors.ListedColormap(
[[21.0 / 255, 128.0 / 255, 209.0 / 255], [227.0 / 255, 217.0 / 255, 159.0 / 255]]
)
plt.imshow(wmg.world_map, interpolation = "nearest", cmap=colormap)
plt.savefig(f"iteration {0}.png")
plt.show()
for i in range(13):
start = time.time()
wmg.default_procedure()
end = time.time()
plt.title(f"iteration {i+1} took {end-start} seconds")
plt.imshow(wmg.world_map, interpolation = "nearest", cmap=colormap)
plt.savefig(f"{i}.png")
plt.show()
Результаты:
Я проверил реализации NumPy Game of Life, но все они проверяют все сетки, но этот код знает, какие индексы плиток проверять, поэтому они неприменимы.
Как я могу оптимизировать этот алгоритм с помощью NumPy, особенно с векторизацией, чтобы повысить его производительность? Есть ли в NumPy какие-то конкретные стратегии или функции, которые могли бы помочь?
Любые предложения или примеры будут с благодарностью приняты!
Примечание для других, пытающихся запустить этот код. Он использует np.unique_counts, для которого требуется версия numpy >= 2.0.
Примечание. Я свел объяснения к минимуму, но этот ответ все еще длинный. Если вы ищете только код, просто прокрутите до конца.
По моему мнению, ключом к векторизации является извлечение частей, которые можно векторизовать.
Но перед этим есть одна часть, которую необходимо исправить. Следующий код делает что-то очень избыточное.
values, counts = np.unique_counts(self.neighbours(index, 1))
probability = counts.astype(np.float16)
probability /= np.sum(probability)
random = self.rng.choice(values, p=probability)
Его можно заменить следующим:
random = self.rng.choice(self.neighbours(index, 1).ravel())
Например, если соседей 9 и 3 из них — вода, вероятность выбора воды будет 3/9. Однако обратите внимание, что результат не будет таким же, как оригинал, хотя математически он делает то же самое.
Теперь, поскольку операция, которую вы хотели выполнить, заключалась в случайном выборе одного из соседей, вы можете напрямую получить доступ к целевой плитке следующим образом:
neighbour_offset = np.array([(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1)])
neighbour_index = self.rng.choice(neighbour_offset) + index
random = self.world_map[tuple(neighbour_index)]
Вы можете видеть, что rng.choice
нужно выбрать только один из neighbour_offset
, который не меняется во время цикла.
Следовательно, это можно вычислить вне цикла. Ниже приведены необходимые изменения:
def check_tile(self, index: tuple, neighbour_indexes):
"""
Texturize borders and update new borders.
index: tuple representing the index of the tile to check.
"""
# Corresponding land id to current working tile.
tile = self.world_map[index]
# If tile at the index is surrounded by identical tiles within a 2-tile range.
if np.all(self.neighbours(index, 2) == tile):
# Don't store it; this tile cannot become water because it is surrounded by 2-tile wide same land as itself.
pass
else:
# Randomly change each of the 4 newly descended tiles of the original tile to either water or not.
for row in range(2):
for column in range(2):
# One of the four descended tile's index.
new_tile_index = (index[0] * 2 + row, index[1] * 2 + column)
# Randomly chose a value according to its probability.
neighbour_index = neighbour_indexes[row * 2 + column]
random = self.world_map[tuple(neighbour_index)]
if random == 0: # If tile at the index became water.
# Update it on the new map.
self.newmap[new_tile_index] = random
# Don't store it because it is a water tile and no longer a border tile.
elif random == tile: # If the tile remained the same.
# Store it because it is still a border tile.
self.newborders.append(new_tile_index)
else: # If the tile changed to a different land.
# Update it on the new map.
self.newmap[new_tile_index] = random
# Store it because it is still a border tile.
self.newborders.append(new_tile_index)
def default_procedure(self):
"""
Default procedure: upscale (or zoom into) the map and texturize borders.
"""
self.upscale_map()
# Choose 4 random neighbours for each border tile.
# Note that below is NOT taking into account the case where the land is at the edge of the map.
neighbour_offset = np.array([(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1)])
random_neighbour_offsets = self.rng.choice(neighbour_offset, size=(len(self.borders), 4))
random_neighbour_indexes = np.asarray(self.borders)[:, None, :] + random_neighbour_offsets
for index, neighbour_index in zip(self.borders, random_neighbour_indexes):
self.check_tile(index, neighbour_index)
self.borders = self.newborders
self.newborders = []
self.world_map = self.newmap
(Обратите внимание, что в этом коде не учитываются тайлы земли на краю карты. Я опустил здесь этот случай, потому что это предотвратит векторизацию. Если вам нужно с этим справиться, я рекомендую разделить процесс на несколько шагов. и обрабатываем края отдельно.)
Благодаря этому rng.choice
теперь векторизован. На моем ПК только этот шаг сделал его в 8 раз быстрее, чем оригинал.
Вы можете продолжать этот процесс неоднократно. Наверное, мне следует все объяснить, но это будет очень долго, поэтому, пожалуйста, позвольте мне пропустить до конца.
Это полностью векторизованный код:
def default_procedure(self):
"""
Default procedure: upscale (or zoom into) the map and texturize borders.
"""
self.upscale_map()
def take(arr, indexes):
return arr[indexes[..., 0], indexes[..., 1]]
def put(arr, indexes, values):
arr[indexes[..., 0], indexes[..., 1]] = values
land_tile_indexes = np.asanyarray(self.borders)
land_tiles = take(self.world_map, land_tile_indexes)
# Skip tiles that are surrounded by water.
neighbour_25_windows = np.lib.stride_tricks.sliding_window_view(
np.pad(self.world_map, pad_width=5 // 2, mode='edge'), # Pad the map in order to skip edge cases.
window_shape=(5, 5),
)
neighbour_25_windows = take(neighbour_25_windows, land_tile_indexes) # Filter only the border tiles.
is_not_surrounded_tile = np.any(neighbour_25_windows != land_tiles[..., None, None], axis=(-2, -1))
# Filter land_tile_indexes by the condition.
land_tile_indexes = land_tile_indexes[is_not_surrounded_tile]
# Choose 4 random neighbours for each border tile.
# Note that below is NOT taking into account the case where the land is at the edge of the map.
neighbour_offset = np.array([(dx, dy) for dx in (-1, 0, 1) for dy in (-1, 0, 1)])
random_neighbour_offsets = self.rng.choice(neighbour_offset, size=(len(land_tile_indexes), 4))
random_neighbour_indexes = land_tile_indexes[:, None, :] + random_neighbour_offsets
random_neighbour_tiles = take(self.world_map, random_neighbour_indexes)
# Pre-calculate all the new tile indexes here.
upscaling_offset = np.array([(dx, dy) for dx in range(2) for dy in range(2)])
new_tile_indexes = (land_tile_indexes * 2)[:, None, :] + upscaling_offset[None]
# Put the new tile if it did NOT remain the same.
# In other words, this covers below cases:
# - If the tile became water.
# - If the tile changed to a different land.
condition = random_neighbour_tiles != take(self.world_map, land_tile_indexes)[..., None]
put(self.newmap, new_tile_indexes[condition], random_neighbour_tiles[condition])
# Update the border tiles if it did NOT become water.
# In other words, this covers below cases:
# - If the tile remained the same.
# - If the tile changed to a different land.
target_indexes = new_tile_indexes[random_neighbour_tiles != 0]
self.borders = target_indexes
self.world_map = self.newmap
На данный момент узкое место self.upscale_map()
.
На моем ПК этот процесс занял более 80% времени выполнения.
Самый простой способ решить эту проблему — использовать OpenCV.
def upscale_map(self) -> None:
"""
Divide each tile into 4 pieces and upscale the map by a factor of 2.
"""
self.newmap = cv2.resize(self.world_map, (0, 0), fx=2, fy=2, interpolation=cv2.INTER_NEAREST)
Вот результат после 13 итераций.
Наконец, в качестве альтернативы вы можете использовать Нумба. С Numba реализация может оказаться более интуитивно понятной. Однако объединение Numba с классами может быть немного сложным, поэтому вам все равно придется переписать большую часть кода. Тем не менее, на это стоит обратить внимание.
«Сверхбыстро, спасибо! За последние несколько дней я внес некоторые улучшения, и теперь он может обрабатывать края плитки, используя уже дополненную карту. Обязательно ли заполнение? Следующим узким местом, похоже, является память. Рекомендуете ли вы использовать SciPy разреженный массивы?"
Я не думаю, что этот код точно воспроизводит ваши правила, но я думаю, что идея реализации этого с помощью высокоуровневых операций масштабирования, фильтрации и бинаризации может помочь вам добиться большей производительности, чем узкие for
циклы.
import numpy as np
from scipy import ndimage
import matplotlib.pyplot as plt
rng = np.random.default_rng(1)
A = np.zeros((4, 4)).astype(bool)
A[1:3, 1:3] = 1
for j in range(8):
A = ndimage.zoom(A, 2, order=0)
B = ndimage.uniform_filter(A.astype(float))
z = rng.random(A.sum())
A[A] = B[A] > z
plt.imshow(A)
Это выполняет массу избыточной работы, но я думаю, что это все равно немного быстрее, чем у вас. Если добавить немного дополнительной логики, чтобы избежать фильтрации больших блоков, состоящих из воды, это, вероятно, может быть довольно быстрым, особенно если вы можете использовать cupyx.scipy.ndimage
с графическим процессором.
Рассчитали ли вы время своего кода, чтобы убедиться, что check_tile является узким местом? В любом случае вы вычисляете вероятность 4 раза, а нужно сделать это всего один раз. Должно быть возможно выполнить check_tile без цикла.