У меня есть рабочий процесс, состоящий из направленного ациклического графа (DAG) с несколькими узлами, каждый из которых имеет случайное время выполнения. Я хотел бы определить максимальный параллелизм узлов в этом рабочем процессе. Что я должен делать?
В настоящее время мы тщательно просматриваем все возможные сценарии выполнения, чтобы найти максимальное значение параллелизма. Однако этот подход требует много времени при работе с большим количеством узлов или ребер. Существуют ли алгоритмы меньшей сложности для решения этой проблемы?
Например:
На этой диаграмме я перечислил 5 простых примеров.
В действительности наша группа обеспечения доступности баз данных может иметь несколько сотен или даже тысяч узлов, а также более сложные отношения зависимостей.
Максимальное значение параллелизма относится к максимальному количеству узлов, которые могут выполняться одновременно на основе отношений зависимости в группе обеспечения доступности баз данных, либо от корневого узла, либо при работе вниз. Например, в первом сценарии 1, 2, 5 и 6 могут выполняться одновременно, что представляет собой максимальный параллелизм среди всех возможных параллельных выполнения. Во втором сценарии, если между 2 и 5 вводится зависимость, максимальное одновременное выполнение будет либо 1, 2, 5, либо 1, 5, 6, либо 1 2 4, либо 3 5 6.
@Хуан Лопес ----------------------- 19.08.2024 добавить
Большое спасибо за ваше руководство. Я кратко изучил алгоритм задачи максимального потока и нашел несколько общих алгоритмов, включая метод Форда Фулкерсона, алгоритм Эдмондса Карпа, алгоритм Диника и алгоритм Push label.
Следуя предоставленной вами демонстрации Python, я усердно учился и с помощью )
Я добавил в свой проект несколько сравнительных тестов и обнаружил, что результаты весьма точны.
Однако есть еще одна проблема: две мои реализации алгоритма имеют очень низкую производительность в случае последовательной зависимости.
пример1:
201 узел, зависимость такая: 1 -> 2 -> 3 -> 4 -> 5 -> xxxxxx -> 201
вывод (dag Calcal Max. Эта строка представляет собой реализацию полного обхода + обрезки HashSet):
2024-08-19 16:11:24 CST UTC+08:00 INFO - test DinicSolver alg
2024-08-19 16:11:52 CST UTC+08:00 INFO - test DinicSolver alg return 1, cost time: 28177ms
2024-08-19 16:11:52 CST UTC+08:00 INFO - test FordFulkersonSolver alg
2024-08-19 16:11:53 CST UTC+08:00 INFO - test FordFulkersonSolver alg return 1, cost time: 372ms
2024-08-19 16:11:53 CST UTC+08:00 INFO - dag calculate max concurrency 1 cost time: 7ms
И я обнаружил, что затраты времени не фиксированы, и чем длиннее имя узла, тем больше затраты времени.
используйте номер в качестве имени узла:
используйте одну букву + цифру в качестве имени узла:
используйте две буквы + цифру в качестве имени узла:
используйте три буквы + цифру в качестве имени узла:
пример2:
201 узел, зависимость как на этой картинке:
вывод (dag Calcal Max. Эта строка представляет собой реализацию полного обхода + обрезки HashSet):
2024-08-19 16:11:55 CST UTC+08:00 INFO - test DinicSolver alg
2024-08-19 16:11:55 CST UTC+08:00 INFO - test DinicSolver alg return 201, cost time: 6ms
2024-08-19 16:11:55 CST UTC+08:00 INFO - test FordFulkersonSolver alg
2024-08-19 16:11:55 CST UTC+08:00 INFO - test FordFulkersonSolver alg return 201, cost time: 2ms
2024-08-19 16:11:55 CST UTC+08:00 INFO - dag calculate max concurrency 10 cost time: 660ms
добавить мою реализацию Java-кода
Сообщ. Форда-Фалкерсона:
public class FordFulkersonSolver<T> {
private final Map<String, Map<String, Integer>> network;
private final Set<String> visited;
public FordFulkersonSolver() {
network = new HashMap<>();
visited = new HashSet<>();
}
private int dfs(String source, String sink, int flow) {
visited.add(source);
if (source.equals(sink)) {
return flow;
}
for (Map.Entry<String, Integer> entry : network.get(source).entrySet()) {
String neighbor = entry.getKey();
Integer capacity = entry.getValue();
if (capacity <= 0 || visited.contains(neighbor)) continue;
int sent = dfs(neighbor, sink, Math.min(flow, capacity));
if (sent == 0) continue;
network.get(source).put(neighbor, capacity - sent);
network.get(neighbor).put(source, network.get(neighbor).getOrDefault(source, 0) + sent);
return sent;
}
return 0;
}
private Set<String> reach(Map<T, Set<T>> graph, T t, Set<String> visited) {
Queue<T> queue = new LinkedList<>();
queue.add(t);
while (!queue.isEmpty()) {
T current = queue.poll();
String currentKey = "A" + current.toString();
visited.add(currentKey);
for (T neighbor : graph.get(current)) {
String neighborKey = "B" + neighbor.toString();
if (!visited.contains(neighborKey)) {
queue.add(neighbor);
visited.add(neighborKey);
}
}
}
return visited;
}
private void addEdge(String from, String to, int capacity) {
network.computeIfAbsent(from, k -> new HashMap<>()).put(to, capacity);
network.computeIfAbsent(to, k -> new HashMap<>()).put(from, 0);
}
public int solve(Map<T, Set<T>> graph) {
for (T t : graph.keySet()) {
addEdge("src", "A" + t.toString(), 1);
addEdge("B" + t, "sink", 1);
// Corrected here to pass the correct generic type for visited
Set<String> visitedSubset = new HashSet<>();
for (String u : reach(graph, t, visitedSubset)) {
addEdge("A" + t, u, 1);
}
}
int total = 0;
while (true) {
this.visited.clear(); // Clear visited set for each iteration
int sent = dfs("src", "sink", Integer.MAX_VALUE);
if (sent == 0) break;
total += sent;
}
return graph.size() - total;
}
}
Динические условия:
public class DinicSolver<T> {
private final Map<String, Map<String, Integer>> network;
private List<String> nodes;
private int[] level;
public DinicSolver() {
network = new HashMap<>();
nodes = new ArrayList<>();
nodes.add("src");
nodes.add("sink");
}
private void bfs(String source) {
level = new int[nodes.size()];
Arrays.fill(level, -1);
level[nodes.indexOf(source)] = 0;
Queue<String> queue = new LinkedList<>();
queue.offer(source);
while (!queue.isEmpty()) {
String u = queue.poll();
for (Map.Entry<String, Integer> entry : network.get(u).entrySet()) {
String v = entry.getKey();
int capacity = entry.getValue();
if (capacity > 0 && level[nodes.indexOf(v)] == -1) {
level[nodes.indexOf(v)] = level[nodes.indexOf(u)] + 1;
queue.offer(v);
}
}
}
}
private int dfs(String u, int flow, String sink) {
if (u.equals(sink)) {
return flow;
}
for (Map.Entry<String, Integer> entry : network.get(u).entrySet()) {
String v = entry.getKey();
int capacity = entry.getValue();
if (capacity > 0 && level[nodes.indexOf(u)] < level[nodes.indexOf(v)]) {
int sent = dfs(v, Math.min(flow, capacity), sink);
if (sent > 0) {
network.get(u).put(v, capacity - sent);
network.get(v).put(u, network.get(v).getOrDefault(u, 0) + sent);
return sent;
}
}
}
return 0;
}
private void addEdge(String from, String to, int capacity) {
network.computeIfAbsent(from, k -> new HashMap<>()).put(to, capacity);
network.computeIfAbsent(to, k -> new HashMap<>()).put(from, 0);
if (!nodes.contains(from)) nodes.add(from);
if (!nodes.contains(to)) nodes.add(to);
}
private Set<String> reach(Map<T, Set<T>> graph, T t, Set<String> visited) {
Queue<T> queue = new LinkedList<>();
queue.add(t);
while (!queue.isEmpty()) {
T current = queue.poll();
String currentKey = "A" + current.toString();
visited.add(currentKey);
for (T neighbor : graph.get(current)) {
String neighborKey = "B" + neighbor.toString();
if (!visited.contains(neighborKey)) {
queue.add(neighbor);
visited.add(neighborKey);
}
}
}
return visited;
}
public int solve(Map<T, Set<T>> graph) {
for (T t : graph.keySet()) {
addEdge("src", "A" + t.toString(), 1);
addEdge("B" + t, "sink", 1);
Set<String> visitedSubset = new HashSet<>();
for (String u : reach(graph, t, visitedSubset)) {
addEdge("A" + t, u, 1);
}
}
int maxFlow = 0;
while (true) {
bfs("src");
if (level[nodes.indexOf("sink")] == -1) break;
int flow;
while ((flow = dfs("src", Integer.MAX_VALUE, "sink")) > 0) {
maxFlow += flow;
}
}
return graph.size() - maxFlow;
}
}
Этот вопрос похож на: Нахождение ширины ориентированного ациклического графа... только с возможностью найти родителей . Если вы считаете, что это другое, отредактируйте вопрос, поясните, чем он отличается и/или как ответы на этот вопрос не помогают решить вашу проблему.
Проблема, описанная по ссылке stackoverflow.com/questions/2790682/…, похоже, связана с поиском самого широкого слоя в DAG. Мне нужен самый широкий сценарий во всех случаях (без учета слоев), а проблема, описанная в ссылке, не имеет точного решения. Автор поста тоже склонен думать, что такая ширина не обязательна.
Это эквивалентно поиску максимальной антицепи в частично упорядоченном наборе (также известном как ширина). Это максимальное подмножество, в котором никакие два элемента не сравнимы, что в данном случае означает, что ни один узел не зависит от другого.
Чтобы найти это, мы можем использовать теорему Дилворта, которая гласит, что ширина частично упорядоченного множества равна размеру минимального разложения множества на цепочки, то есть подмножества элементов, в которых каждая пара элементов сопоставимо. Мы можем решить эту проблему, смоделировав граф как двудольный с ребром для каждого отношения «происходит до» и решив задачу максимального соответствия. Важно: DAG — это сжатая форма отношения порядка, в нем отсутствуют многие ребра. Смоделируйте двудольный граф из транзитивного замыкания DAG!
Вот пример на Python, в котором используется Форд-Фалкерсон (для простоты), но вы можете подключить свой любимый алгоритм максимального соответствия.
from collections import defaultdict
def dfs(network, source, sink, flow, visited):
visited.add(source)
if source == sink: return flow
for neighbor, capacity in network[source].items():
if capacity <= 0 or neighbor in visited: continue
sent = dfs(network, neighbor, sink, min(flow, capacity), visited)
if not sent: continue
network[source][neighbor] -= sent
network[neighbor][source] += sent
return sent
return 0
def reach(graph, source, visited):
for neighbor in graph[source]:
if neighbor not in visited:
visited.add(neighbor)
reach(graph, neighbor, visited)
return visited
def solve(graph):
network = defaultdict(lambda: defaultdict(lambda: 0))
for v in graph:
network['src']['A' + str(v)] = 1
network['B' + str(v)]['sink'] = 1
for u in reach(graph, v, set()):
network['A' + str(v)]['B' + str(u)] = 1
total = 0
while True:
sent = dfs(network, 'src', 'sink', float('+Inf'), set())
if sent == 0: break
total += sent
return len(graph) - total
print(solve({
1: [3],
2: [3],
3: [7],
4: [5, 6],
5: [7],
6: [7],
7: []
})) #4
print(solve({
1: [2],
2: [3],
3: []
})) #1
print(solve({
1: [3],
2: [3, 5],
3: [7],
4: [5, 6],
5: [7],
6: [7],
7: []
})) #3
print(solve({
1: [4],
2: [4],
3: [4],
4: [],
})) #3
print(solve({
1: [4, 2],
2: [4],
3: [4],
4: [],
})) #2
print(solve({
1: [3],
2: [3],
3: [5, 6, 7],
4: [5],
5: [8],
6: [8],
7: [8],
8: []
})) #3
Кроме того, вот пример представления исходного графа, его транзитивного замыкания и двудольного преобразования:
пожалуйста, посмотрите мой ответ
@хехе, извини, я совсем забыл важный шаг в алгоритме: тебе нужно создать двудольный граф на основе частичного порядка. DAG опускает многие ребра порядка. На практике это означает, что перед началом работы вам необходимо вычислить транзитивное замыкание графа. Я отредактировал свой ответ.
пожалуйста, посмотрите мой вопрос, 2024-08-91, добавьте, спасибо
Максимальное значение параллелизма относится к максимальному количеству узлов, которые могут выполняться одновременно на основе отношений зависимости в группе обеспечения доступности баз данных, либо от корневого узла, либо при работе вниз. Например, в первом сценарии 1, 2, 5 и 6 могут выполняться одновременно, что представляет собой максимальный параллелизм среди всех возможных параллельных выполнения. Во втором сценарии, если между 2 и 5 вводится зависимость, максимальное одновременное выполнение будет либо 1, 2, 5, либо 1, 5, 6, либо 1 2 4, либо 3 5 6.