У нас есть сетевая ситуация, когда у него четыре узла — скажем, A
, B
, C
и D
. Проблема - A
, B
, C
и D
не фиксированы, но имеют кучу возможностей.
Скажем, A
может иметь значения A1
, A2
до An
.
То же самое для B
- B1
, B2
, Bn
.
То же самое касается C
и D
. Каждый из An
, Bn
, Cn
и Dn
имеет координаты в качестве свойств (кортеж — (21.2134, -32.1189)
).
Чего мы пытаемся достичь — найти лучшую комбинацию An
, Bn
, Cn
и Dn
, которые наиболее близки друг к другу, если проходить в таком порядке (A -> B -> C -> D
). Например, это может быть A11 -> B1 -> C9 -> D4
— по сути, выберите по одному варианту из каждого из A
, B
, C
и D
из всех соответствующих возможных значений, чтобы комбинация этих значений была наиболее близкой друг к другу среди всех таких комбинаций.
Мы, конечно, пробуем метод грубой силы, но он очень дорогой, так как его временная сложность N(A) * N(A) * N(C) * N(D)
.
Итак, мы ищем какое-то решение на основе графа, где мы можем использовать график, сохраняя в нем все точки с координатами, и во время выполнения мы можем просто передавать все наборы точек (все точки n
для A
, все точки k
для B
, все j
точки для C
и все l
точки для D
) в него, и он выведет нужную точку для каждого из тех классов (A
, B
, C
и D
), для которых маршрут наиболее оптимален.
Если граф необходимо сохранить в памяти, это тоже нормально, чем быстрее время отклика среды выполнения, тем лучше, память не является ограничением, а также, существует ли маршрут между An
и Bn
, здесь не имеет значения, все, что мы хотим сделать это определение близости на основе координат (при условии линейного маршрута между каждым набором точек).
Кроме того, это не обязательно, на самом деле почти всегда будет так, что запрос времени выполнения включает только несколько категорий мест, например, в исчерпывающем списке есть A, B, C и D, но запрос времени выполнения может попытаться чтобы найти комбинацию только B и C, или A и D, или A, B и D и т. д.
Это, конечно, выполнимо, поскольку это будет происходить в автономном режиме, но как это сделать или какую библиотеку использовать, чтобы это сделать и как - мне нужна помощь.
Привет @ravenspoint, да, мы want to choose one of the possibilities for each node that gives the shortest total path
. И да, я mean output one of the possible co-ordinates for each node
, и optimal combination of alternative node co-ordinates
- типа A1
, B2
, C1
и D12
.
@ravenspoint: хотя я в целом с вами согласен, думаю, в данном случае цель (минимальный общий путь) была достаточно ясна.
@Swifty, ты хочешь получить точные ответы, твои вопросы должны быть кристально ясными. Что же нам следует понимать из вопроса, требующего «правдоподобного» ответа? Похоже на тот ответ, который вы ожидаете от ИИ.
Да, я согласен, что проблема должна быть достаточно ясна в названии, а не только в описании.
Привет @ravenspoint, спасибо, редактирую вопрос. Можно и так сказать немного вежливо, не надо ругать. :)
Намного лучше. Вы находитесь на этом сайте уже 12 лет – удивительно, что ваша кожа не стала толще.
@ravenspoint согласился, у меня тонкая кожа, хотя Stackoverflow много раз нарушал меня! :)
Вам нужно «абсолютно оптимальное» или просто «очень хорошее» решение? Возможно, вы могли бы использовать современную концепцию оптимизации, такую как генетический алгоритм. Что бы ни. Я думаю, вам следует сначала создать и сохранить информацию о расстоянии (между последовательными наборами).
Постановка задачи настолько ясна, что практически можно написать решение на основе networkx. Каковы мощности A/B/C/D? Являются ли эти местоположения узлов неизменными в течение 6 месяцев или года? или они меняются каждые 5 мс, 15 секунд или 2 часа? Существуют ли какие-либо правила, которые помогут установить вес, эвристику или жесткую границу на каждом ребре? Или пороговое значение? подпрыгивая на комментарий @lastchance: Всегда ли вам нужен оптимальный путь (я полагаю, он основан на расстоянии?)? Или было бы лучше иметь под рукой популяцию достаточно хорошего пути («достаточно хорошего» вам еще предстоит определить)?
О Боже, ситуация быстро обострилась, ха-ха. Однако ответ на большинство ваших вопросов — нет, @LoneWanderer.
Я написал короткий пример, используя networkx
(основы которого я изучил для целей этого ответа); код, вероятно, неуклюж и его можно сократить, но он должен предоставить хотя бы основу для написания вашего кода.
Я построил график на основе этого рисунка:
import networkx as nx
import math
nodes = {
"A": [("A_1", (0, 2)), ("A_2", (0, 5)), ("A_3", (0, 8)), ],
"B": [("B_1", (1, 1)), ("B_2", (1, 3)), ("B_3", (1, 5)), ("B_4", (1, 7)), ],
"C": [("C_1", (2, 0)), ("C_2", (2, 5)), ("C_3", (2, 9)), ],
"D": [("D_1", (3, 1)), ("D_2", (3, 5)), ("D_3", (3, 7)), ],
}
G = nx.Graph()
G.add_nodes_from(["in", "out"])
def add_edge(n1, n2):
origin = G._node[n1]["coords"]
extremity = G._node[n2]["coords"]
G.add_edge(n1, n2, weight=math.sqrt((origin[0] - extremity[0])**2 + (origin[1] - extremity[1])**2))
for node in nodes["A"]:
G.add_node(node[0], coords=node[1])
G.add_edge("in", node[0], weight=0)
for node in nodes["B"]:
G.add_node(node[0], coords=node[1])
for a_node in nodes["A"]:
add_edge(a_node[0], node[0])
for node in nodes["C"]:
G.add_node(node[0], coords=node[1])
for b_node in nodes["B"]:
add_edge(b_node[0], node[0])
for node in nodes["D"]:
G.add_node(node[0], coords=node[1])
G.add_edge(node[0], "out", weight=0)
for c_node in nodes["C"]:
add_edge(c_node[0], node[0])
print(nx.shortest_path(G, "in", "out", weight = "weight"))
Результат:
['in', 'A_2', 'B_3', 'C_2', 'D_2', 'out']
Выглядит многообещающе. Зачем добавлять ребра, поскольку реальный маршрут между ними здесь не имеет значения, простого декартова расстояния может быть достаточно?
Насколько я знаю, графовые алгоритмы Networkx полагаются на ребра, и им необходимо знать, что переходить можно только от A к B, от B к C... Но, как я уже сказал, я почти не знаю Networkx, поэтому могут быть и другие соответствующие способы.
понятно. В любом случае большое спасибо!
Одним из вариантов может быть создание KDTree для каждой категории и запроса чередующегося ближайшего места:
from scipy.spatial import KDTree
kdtrees = {cat: KDTree(list(coords.values())) for cat, coords in places.items()}
def shortest_comb(places, categories, kdtrees):
min_dis = float("inf")
for init_name, init_coo in places[categories[0]].items():
route, total_dis = [init_name], 0
curr = init_coo
for next_cat in categories[1:]: # from B and further..
distance, index = kdtrees[next_cat].query(curr)
following = list(places[next_cat].keys())[index]
curr = places[next_cat][following]
route.append(following)
total_dis += distance
if total_dis < min_dis:
min_dis = total_dis
mpr = route
return mpr, min_dis
mpr, min_dis = shortest_comb(places, categories, kdtrees)
print(f"most plausible route: {'->'.join(mpr)}, distance = {min_dis/1000:.2f}km")
# most plausible route: A6->B3->C3->D4, distance=43.50km
Используемые входы:
import random
random.seed(1)
def rnd_points(n, cat, bbox=(21, -32, 22, -31), as_latlon=False):
min_lon, min_lat, max_lon, max_lat = bbox
points = [
(
random.uniform(min_lat, max_lat),
random.uniform(min_lon, max_lon)
) for _ in range(n)
]
if not as_latlon:
from pyproj import Transformer
transformer = Transformer.from_crs(4326, 32734)
points = [pt for pt in transformer.itransform(points)]
return {f"{cat}{i}": coo for i, coo in enumerate(points, 1)}
categories = "ABCD"
places = {cat: rnd_points(n, cat) for cat, n in zip(categories, (9, 7, 5, 6))}
Привет. Выглядит многообещающе. Таким образом, это KDTree должно храниться в памяти (или регулярно обновляться, когда появляется больше мест) и запрашиваться во время выполнения без списка мест из запроса, верно? Какую производительность во время выполнения мы рассматриваем, учитывая сотни миллионов мест по всему миру, а также объем памяти?
Привет, еще один вопрос, здесь один и тот же ABCD
используется как общие данные, так и входные. Такое случается редко, поскольку наш исчерпывающий набор данных состоит из сотен миллионов мест (все As, Bs, Cs... вместе взятые). Общее количество мест может ABCDEFGH...
, но оптимальную комбинацию можно искать просто B (B1, B2...Bn), D (D1, D2...Dn) и F (F1, F2...Fn).
Не уверен, что я следую за тобой. В любом случае, не зацикливайтесь на вводе моего примера. это просто для того, чтобы дать вам общую логику;). Я не знаю точно, как вы готовите свои данные. Кроме того, код в ответе определенно можно оптимизировать дальше, но опять же, это зависит от ваших данных/рабочего процесса. Что касается вашего последнего пункта, вы никогда не упоминали об этом в своем исходном сообщении.
Хм, хорошо, но зависит ли ваш код от того, что все категории мест (A, B, C и D) также присутствуют в запросе во время выполнения?
Допустим, у вас есть только категории A и B. Тогда поиск оптимального A для каждого B будет стоить вам N(A) * N(B) (в худшем случае — более сложная структура данных вполне может обойтись). Теперь добавьте категорию C. За время N(B) * N( C), для каждого C вы можете попробовать каждый B и объединить его с оптимальным путем к этому B (уже решенным). Теперь для каждого C у вас есть оптимальная тройка A->B->C.
И так далее. Существует количество (в худшем случае) шагов за квадратичное время, на единицу меньше числа категорий. Работая на этапе (скажем) K, вам нужно запомнить N(K) оптимальных путей, по одному для каждого K: для этого K попробуйте каждый J, а затем соедините его с уже вычисленным оптимальным путем, заканчивающимся этим J.
Это форма «динамического программирования».
Вот полный код, показывающий грубую силу и вышеописанное; предложение намного быстрее:
Обновлено: заменено quicker
на более экономичную перезапись:
from random import random, seed
from math import dist, inf
seed(12345678987654321)
def getnum():
return round(random(), 3)
def getpoints(n):
return [(getnum(), getnum())
for i in range(n)]
NCAT = 5
NP = 30
categories = [getpoints(NP) for i in range(NCAT)]
def brute(categories):
from itertools import product
smallest = inf
for t in product(*categories):
tot = 0.0
it = iter(t)
last = next(it)
for this in it:
tot += dist(last, this)
last = this
if tot < smallest:
smallest = tot
best = t
return smallest, best
def quicker(categories):
old_dists = [0.0] * len(categories[0])
old_paths = [[p] for p in categories[0]]
for i in range(1, len(categories)):
new_dists = []
new_paths = []
for new in categories[i]:
smallest = inf
for oldi, (old, old_dist) in enumerate(zip(categories[i-1],
old_dists)):
test = dist(old, new) + old_dist
if test < smallest:
smallest = test
best_old = oldi
new_dists.append(smallest)
new_paths.append(old_paths[best_old] + [new])
old_dists = new_dists
old_paths = new_paths
winner = min(range(len(old_dists)), key=old_dists.__getitem__)
return old_dists[winner], old_paths[winner]
from time import perf_counter as now
for f in brute, quicker:
start = now()
x = f(categories)
finish = now()
print(f.__name__, "ran in", finish - start)
print(" ", x)
Обратите внимание, что я использовал случайное начальное число, поэтому вы должны получить тот же результат, что и я (хотя, конечно, время может отличаться):
brute ran in 18.69626900000003
(0.17976537535418252, ((0.339, 0.496), (0.333, 0.436),
(0.333, 0.401), (0.318, 0.477), (0.318, 0.47)))
quicker ran in 0.0008059000028879382
(0.17976537535418252, [(0.339, 0.496), (0.333, 0.436),
(0.333, 0.401), (0.318, 0.477), (0.318, 0.47)])
Результат тот же, но quicker
на много порядков быстрее.
Я советую не использовать общий алгоритм графа «кратчайшего пути». В вашем графике огромное количество структур, которые, как указано выше, легко использовать. Другой способ использовать это — выполнить поиск с возвратом по всему пространству продукта, но прерывая его раньше, когда расстояние, для которого это значение, >= лучшего результата, наблюдаемого на данный момент. В моих небольших тестовых случаях он работает на удивление хорошо. YMMV — очень зависит от данных.
def backtrack(categories):
path = []
best_path = []
smallest = inf
nc = len(categories)
def inner(i, sofar):
nonlocal smallest, best_path
if i >= nc:
smallest = sofar
best_path = path[:]
return
if i:
last = path[-1]
path.append(None)
for new in categories[i]:
if i:
test = sofar + dist(last, new)
else:
test = 0;0
if test < smallest:
path[-1] = new
inner(i+1, test)
path.pop()
inner(0, 0.0)
return smallest, best_path
Другой поворот заключается в том, чтобы сначала использовать другой подход, чтобы найти «довольно хороший» путь, чтобы backtrack()
мог раньше прервать бесполезные поиски.
Например, храните категории в виде k-d деревьев. Выберите наугад какую-нибудь точку P. Затем постройте путь от ближайшей точки к P в каждой из категорий (дерево k-d может возвращать самое близкое по логарифму времени, и для этой цели должно быть достаточно использовать опцию более быстрого поиска дерева «согласовать аппроксимацию»).
Если повезет, поскольку все точки пути «близки» к P, они также будут близки друг к другу. Вспеньте, промойте и повторяйте, пока не перестанете добиваться «разумного» прогресса. Тогда начните backtrack()
с лучшего, что вы видели на данный момент.
Если вы хотите выполнить поиск в литературе, ваша проблема — это пример кратчайшего пути на «направленных, взвешенных, многоступенчатых» графах. «Многоступенчатость» там самое главное качество.
Динамическое программирование — это предпочтительное решение, и я уже привел для него полный код. Это чрезвычайно эффективно, сокращая время выполнения с экспоненциального до (в основном) квадратичного.
Но в комментариях вы говорили о «сотнях миллионов» очков. В этом случае проблема меняется с «сложной» на «ничего страшного — вы никогда не найдете ничего, что могло бы решить столь большие проблемы точно и быстро». quicker()
просто непрактично, если у вас есть хотя бы миллион баллов в какой-то категории (квадрат этого числа равен триллиону).
Если вы хотите получить результаты в разумные сроки, вам придется согласиться на приблизительные значения. Насколько я могу судить, нет полезной литературы по аппроксимации кратчайшего пути для многоэтапных графов.
Если вы хотите продолжить это, я уже дал несколько советов. Однако вам придется сделать гораздо больше. На вашем месте я бы запустил два процесса. Один бежит backtrack()
. Другой (или более одного) систематически выбирает точки P из сетки, покрывающей пространство XY, и строит пути-кандидаты из ближайших (к P) точек в каждой категории. Если они находят новое лучшее, они убивают процесс backtrack()
и перезапускают его с новым лучшим. Процесс, который привел к выигрышу P, возможно, пожелает начать на новой, меньшей сетке с центром в P (приняв своего рода эвристику восхождения на холм).
backtrack()
в таких случаях, вероятно, никогда не закончишь, пока у тебя не кончится терпение. В этот момент лучшее, что вы можете сделать, — это согласиться на лучшее, что удалось найти на данный момент. Вы не будете знать, лучшее ли это или даже «близкое». Такие вот дела. Неразрешимое есть неразрешимое :-)
Такие трюки, как генетическое программирование или имитация отжига, мне не кажутся многообещающими. Локальных минимумов предостаточно, а «глубокие» локальные минимумы разделены высокими холмами. «Невозможно добраться отсюда» через правдоподобную последовательность небольших локальных изменений.
Вероятный способ получить «довольно хороший» путь — использовать жадный алгоритм. Выберите один из категории A. Затем выберите тот B, который ближе всего к этому. Затем выберите тот C, который ближе всего к выбранному B. И так далее.
На самом деле это работает не очень хорошо. Однако это можно сделать намного лучше, переписав quicker()
так, чтобы после обработки каждой категории запоминались только лучшие LIM
пути. Так, например, при работе на этапе K вместо рассмотрения возможностей len(J) * len(K) мы рассматриваем не более LIM
* len(K) и запоминаем максимум только LIM
лучшие из них. Это делает quicker()
линейное время (а не квадратичным), но тогда нет никакой гарантии оптимальности.
Кажется, это хороший способ найти хороший путь, с которого можно начать backtrack()
. Поскольку он рандомизирует точки LIM
категории A, с которых он начинается, его многократное выполнение может привести к разным путям.
def quicker_lim(categories, LIM=100):
from heapq import nsmallest
from random import sample
old_info = [(0.0, [p])
for p in sample(categories[0],
min(len(categories[0]), LIM))]
for i in range(1, len(categories)):
new_info = []
for new in categories[i]:
smallest = inf
for old_dist, p in old_info:
test = dist(p[-1], new) + old_dist
if test < smallest:
smallest = test
bestp = p
new_info.append((smallest, bestp + [new]))
old_info = nsmallest(LIM, new_info)
return old_info[0]
Это наиболее многообещающая схема аппроксимации, которую я придумал. Как и прежде, действуйте поэтапно, по одной категории за раз, сохраняя не более LIM
лучшего с каждого этапа.
На следующем этапе для каждого наилучшего пути попробуйте добавить FANOUT
ближайшие (к концу пути) точки в текущей категории. Деревья kd делают это быстро и легко, при условии, что дерево kd уже построено. Приведенный ниже код создает их «на лету» и тратит большую часть времени на построение деревьев (для категории с n
баллами затраты составляют примерно O(n log n)
).
Я использую очень короткий класс на чистом Python KDTree
(код здесь не приведен) из
https://github.com/Vectorized/Python-KD-Tree
Это прототип просто для проверки идеи. В производстве, если это вообще возможно, было бы лучше построить деревья заранее и использовать их повторно.
def kdgreedy(categories, LIM=100, FANOUT=3):
from heapq import nsmallest
from random import sample
old_info = [(0.0, [p])
for p in sample(categories[0],
min(len(categories[0]), LIM))]
for i in range(1, len(categories)):
new_info = []
print("cat", i, end = " ")
kdcat = KDTree(categories[i], 2)
print("built")
for old_dist, path in old_info:
p = path[-1]
for q in kdcat.get_knn(p, FANOUT, False):
new_info.append((old_dist + dist(p, q),
path + [q]))
old_info = nsmallest(LIM, new_info)
return old_info[0]
Самая большая «дыра» в том, что он пытается набрать максимум LIM
очков только из первой категории. Если оптимальный путь не начинается с одного из них, он не будет найден. При повторном запуске будет выбран другой (хотя, возможно, и перекрывающийся) набор отправных точек. Опять же, это предназначено для поиска достойного пути для backtrack()
для начала.
Идея: сделать возврат, но на каждом уровне стараться не больше, чем LIM
ближайших соседей (через k-d деревья). Это не гарантирует оптимальности, но во всех случаях, которые я пробовал до сих пор, оптимальный путь находится гораздо быстрее, чем полный возврат.
kds
хранит предварительно вычисленные деревья kd при входе; kds[i]
это KDTree(categories[i[, 2)
.
def backtrack_lim(categories, kds, LIM=5):
best_path = None
smallest = inf
nc = len(kds)
path = [None] * nc
def inner(i, sofar):
nonlocal smallest, best_path
if i >= nc:
smallest = sofar
best_path = path[:]
print("new smallest", smallest)
return
last = path[i-1]
for new in kds[i].get_knn(last, LIM, False):
test = sofar + dist(last, new)
if test < smallest:
path[i] = new
inner(i+1, test)
n = len(categories[0])
for j, path[0] in enumerate(categories[0]):
if not j & 0x1ff:
print(f"{j:_} {j/n:.2%}", end = "\r")
inner(1, 0.0)
print()
return smallest, best_path
Его можно использовать как есть или, например, переработать в генератор, чтобы с самого начала находить всё более лучшие пути для полноценного backtrack()
.
Это может помочь запомнить следующее:
Во всех случаях brute()
занимает время, пропорциональное
prod(map(len, categories))
Катастрофа.
Во всех случаях quicker()
занимает время, пропорциональное
sum(len(a) * len(b) for a, b in zip(categories, categories[1:]))
Не только намного быстрее, но и оптимально в худшем случае. Он смотрит на каждое ребро ровно один раз. Чтобы добиться большего, алгоритм должен быть в состоянии сделать вывод, что на некоторые ребра никогда не нужно смотреть. Но в худшем случае все ребра все-таки придется осмотреть.
backtrack()
сильно зависит от данных. В худшем случае это может занять до brute()
времени, хотя я не уверен, что вообще можно построить такие случаи. По мере увеличения количества баллов в категориях он, скорее всего, будет работать быстрее, чем quicker()
. В лучшем случае оно может завершиться за время, пропорциональное len(categories) + len(categories[0]) * len(categories[1])
. Например, рассмотрим 100 категорий, которые попарно не пересекаются, за исключением того, что первая точка в каждом списке категорий — p
. Первый путь, который он пробует, — это [p] * len(categories)
, общая длина которого равна 0. Затем все остальные пути отсекаются, как только будут выбраны первые две точки. Алгоритм можно было бы изменить так, чтобы он немедленно останавливался, если бы он обнаружил, что общая длина равна 0, но это почти наверняка будет контрпродуктивно, поскольку циклы записи будут проверяться на то, что почти наверняка никогда не произойдет.
Все остальные методы, приведенные здесь, не гарантируют оптимальности.
Во-первых, после дальнейшего тестирования backtrack_lim()
является явным победителем, поскольку относительно быстро получает отличные результаты. Он остается эффективным, даже если LIM
сократить с 5 до 2.
Во-вторых, большой сюрприз в PyPy 3.10.12: он math.dist
намного медленнее, чем CPython, примерно в 8 раз медленнее. PyPy почти всегда значительно быстрее CPython, но не в этом случае. В этом случае это можно исправить, используя вместо этого следующее:
def dist(x, y):
return sqrt((t := x[0] - y[0]) * t + (t := x[1] - y[1]) * t)
хотя это ограничено 2D-точками и не защищено от ложного переполнения/недополнения для очень больших/маленьких чисел с плавающей запятой. Обратите внимание, что ** 2
вместо умножения t*t
работает медленнее - универсальный int.__pow()
требует значительных накладных расходов, чтобы добраться до специального (для него) случая «о, нужно только одно умножение».
Не будет ли здесь уместен алгоритм Джикстры? Конечно, сначала вам придется построить данные о расстояниях, сложность этой задачи составит N(A) * N(B) + N(B) * N(C) + N(C) * N(D).