Помощь с алгоритмом объединения векторов

Мне нужен очень быстрый алгоритм для следующей задачи. Я уже реализовал несколько алгоритмов, которые его завершают, но все они слишком медленные для необходимой мне производительности. Он должен быть достаточно быстрым, чтобы алгоритм можно было запускать не менее 100 000 раз в секунду на современном процессоре. Он будет реализован на C++.

Я работаю с промежутками / диапазонами, структурой, у которой есть начальная и конечная координаты на линии.

У меня есть два вектора (динамические массивы) пролетов, и мне нужно их объединить. Один вектор - src, а другой - dst. Векторы сортируются по начальным координатам диапазона, и диапазоны не перекрываются в пределах одного вектора.

Интервалы в векторе src должны быть объединены с промежутками в векторе dst, чтобы результирующий вектор все еще был отсортирован и не имел перекрытий. Т.е. если во время слияния обнаруживаются перекрытия, два пролета объединяются в один. (Объединение двух пролетов - это всего лишь вопрос изменения координат в конструкции.)

Теперь есть еще одна загвоздка: промежутки в векторе src должны быть «расширены» во время слияния. Это означает, что константа будет добавлена ​​к началу, а другая (большая) константа к конечной координате каждого диапазона в src. Это означает, что после расширения диапазонов src они могут перекрываться.


До сих пор я пришел к выводу, что это невозможно сделать полностью на месте, необходимо какое-то временное хранилище. Я думаю, что это должно быть выполнено за линейное время по количеству суммированных элементов src и dst.

Любое временное хранилище, вероятно, может использоваться несколькими запусками алгоритма.

Я испробовал два основных подхода, которые слишком медленны:

  1. Добавьте все элементы src в dst, расширяя каждый элемент перед его добавлением. Затем запустите сортировку на месте. Наконец, выполните итерацию по полученному вектору, используя указатели «чтения» и «записи», причем указатель чтения идет впереди указателя записи, объединяя промежутки по мере их продвижения. Когда все элементы были объединены (указатель чтения достигает конца), dst обрезается.

  2. Создайте временный рабочий вектор. Выполните наивное слияние, как описано выше, многократно выбирая следующий элемент из src или dst и сливаясь с рабочим вектором. Когда закончите, скопируйте рабочий вектор в dst, заменив его.

Первый метод имеет проблему с сортировкой O ((m + n) * log (m + n)) вместо O (m + n) и имеет некоторые накладные расходы. Это также означает, что вектор dst должен вырасти намного больше, чем ему действительно нужно.

Во втором случае основная проблема заключается в частом копировании и повторном выделении / освобождении памяти.

Структуры данных, используемые для хранения / управления промежутками / векторами, могут быть изменены, если вы считаете, что это необходимо.

Обновление: забыл сказать, насколько велики наборы данных. Наиболее распространенные случаи - от 4 до 30 элементов в любом векторе, и либо dst пуст, либо существует большое перекрытие между промежутками в src и dst.

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
3
0
2 185
9

Ответы 9

Как насчет второго метода без повторного выделения - другими словами, выделить временный вектор один раз и никогда не выделять его снова? Или, если входные векторы достаточно малы (но не постоянного размера), просто используйте alloca вместо malloc.

Кроме того, с точки зрения скорости вы можете убедиться, что ваш код использует CMOV для сортировки, поскольку, если код действительно ветвится для каждой отдельной итерации сортировки слиянием:

if (src1[x] < src2[x])
    dst[x] = src1[x];
else
    dst[x] = src2[x];

Прогнозирование ветвления не выполняется в 50% случаев, что оказывает огромное влияние на производительность. Условное перемещение, вероятно, будет намного лучше, поэтому убедитесь, что компилятор это делает, а если нет, попробуйте уговорить его сделать это.

Сортировка, которую вы упоминаете в подходе 1, может быть сокращена до линейного времени (от лог-линейной, как вы ее описываете), потому что два входных списка уже отсортированы. Просто выполните этап слияния сортировки слиянием. При соответствующем представлении входных векторов диапазона (например, односвязных списков) это можно сделать на месте.

http://en.wikipedia.org/wiki/Merge_sort

Я не думаю, что возможно строго линейное решение, потому что расширение диапазона векторов src может в худшем случае привести к тому, что все они будут перекрываться (в зависимости от величины добавляемой константы)

проблема может быть в реализации, а не в алгоритме; я бы предложил профилировать код для ваших предыдущих решений, чтобы увидеть, на что тратится время

рассуждение:

для действительно «современного» процессора, такого как Intel Core 2 Extreme QX9770, работающего на частоте 3,2 ГГц, можно ожидать около 59 455 MIPS.

для 100 000 векторов вам придется обработать каждый вектор за 594 550 инструкций. Это МНОГО инструкций.

ссылка: википедия MIPS

кроме того, обратите внимание, что добавление константы к диапазонам векторов src не приводит к их сортировке, поэтому вы можете нормализовать диапазоны векторов src независимо, а затем объединить их с диапазонами векторов dst; это должно снизить нагрузку на ваш исходный алгоритм

100k на самом деле может быть недооценкой, я еще не подсчитал это число. Кроме того, когда я сказал «современный» процессор, я на самом деле имел в виду «что-то возрастное до 5 лет», Athlon XP 3000+ не является нереальной целью.

jfs 18.09.2008 06:38

Мы знаем, что в наилучшем случае время выполнения - O (m + n), это связано с тем, что вам, по крайней мере, нужно сканировать все данные, чтобы иметь возможность объединить списки. Учитывая это, ваш второй метод должен дать вам такое поведение.

Вы проанализировали свой второй метод, чтобы выяснить, в чем заключаются узкие места? Вполне возможно, что в зависимости от количества данных, о которых вы говорите, на самом деле невозможно сделать то, что вы хотите, за указанный промежуток времени. Один из способов проверить это - сделать что-то простое, например, просуммировать все начальные и конечные значения промежутков в каждом векторе в цикле и отсчитать время. По сути, здесь вы выполняете минимальный объем работы для каждого элемента в векторах. Это обеспечит вам основу для наилучшей производительности, которую вы можете ожидать.

Кроме того, вы можете избежать копирования векторов элемент за элементом, используя метод stl swap, и вы можете предварительно выделить временный вектор до определенного размера, чтобы избежать запуска расширения массива при объединении элементов.

Вы можете рассмотреть возможность использования двух векторов в своей системе, и всякий раз, когда вам нужно выполнить слияние, вы объединяете его в неиспользуемый вектор, а затем меняете местами (это похоже на двойную буферизацию, используемую в графике). Таким образом, вам не нужно перераспределять векторы каждый раз, когда вы выполняете слияние.

Тем не менее, вам лучше сначала профилировать и выяснить, в чем ваше узкое место. Если выделения минимальны по сравнению с фактическим процессом слияния, вам нужно выяснить, как сделать это быстрее.

Некоторое возможное дополнительное ускорение может быть получено за счет прямого доступа к необработанным данным векторов, что позволяет избежать проверки границ при каждом доступе к данным.

Спасибо, что напомнили мне о std :: swap, на самом деле это может помешать сделке. Я вернусь после тестирования;)

jfs 18.09.2008 06:32

1 сразу - полная сортировка медленнее, чем объединение двух отсортированных списков.

Итак, вы смотрите на настройку 2 (или что-то совершенно новое).

Если вы измените структуры данных на двусвязные списки, вы можете объединить их в постоянном рабочем пространстве.

Используйте распределитель кучи фиксированного размера для узлов списка, чтобы уменьшить использование памяти на узел и повысить вероятность того, что узлы расположены близко друг к другу в памяти, уменьшая количество пропусков страниц.

Вы можете найти код в Интернете или в своей любимой книге алгоритмов для оптимизации слияния связанных списков. Вы захотите настроить это, чтобы объединять диапазоны одновременно с объединением списков.

Чтобы оптимизировать слияние, сначала обратите внимание, что для каждого прогона значений, приходящих с одной стороны, но не поступающих с другой стороны, вы можете вставить весь прогон в список dst за один раз, вместо того, чтобы вставлять каждый узел по очереди. И вы можете сэкономить одну запись на каждую вставку по сравнению с обычной операцией со списком, оставив конец «висящим», зная, что вы исправите его позже. И при условии, что вы не выполняете удаления где-либо еще в своем приложении, список может быть односвязным, что означает одну запись на узел.

Что касается времени выполнения 10 микросекунд - вроде зависит от n и m ...

Если ваша последняя реализация все еще недостаточно быстра, вам, возможно, придется искать альтернативные подходы.

Для чего вы используете выходы этой функции?

Что ж, вы упускаете одну вещь, здесь не одна «дельта», а две. Левая и правая части диапазона имеют разные добавленные значения, в частности, правая часть имеет большую добавленную стоимость, чем левая.

jfs 18.09.2008 07:06

Я написал новый класс контейнера только для этого алгоритма, адаптированный к потребностям. Это также дало мне возможность настроить другой код моей программы, который в то же время немного увеличил скорость.

Это значительно быстрее, чем старая реализация с использованием векторов STL, но в остальном это было в основном то же самое. Но хотя он и быстрее, он все же недостаточно быстр ... к сожалению.

Профилирование больше не показывает, что является настоящим узким местом. Профилировщик MSVC, кажется, иногда возлагает «вину» на неправильные вызовы (предположительно идентичные запуски определяют сильно разное время выполнения), и большинство вызовов объединяются в одну большую щель.

Глядя на дизассемблирование сгенерированного кода, видно, что в сгенерированном коде очень много скачков, я думаю, что это может быть основной причиной медлительности сейчас.

class SpanBuffer {
private:
    int *data;
    size_t allocated_size;
    size_t count;

    inline void EnsureSpace()
    {
        if (count == allocated_size)
            Reserve(count*2);
    }

public:
    struct Span {
        int start, end;
    };

public:
    SpanBuffer()
        : data(0)
        , allocated_size(24)
        , count(0)
    {
        data = new int[allocated_size];
    }

    SpanBuffer(const SpanBuffer &src)
        : data(0)
        , allocated_size(src.allocated_size)
        , count(src.count)
    {
        data = new int[allocated_size];
        memcpy(data, src.data, sizeof(int)*count);
    }

    ~SpanBuffer()
    {
        delete [] data;
    }

    inline void AddIntersection(int x)
    {
        EnsureSpace();
        data[count++] = x;
    }

    inline void AddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);
        EnsureSpace();
        data[count] = s;
        data[count+1] = e;
        count += 2;
    }

    inline void Clear()
    {
        count = 0;
    }

    inline size_t GetCount() const
    {
        return count;
    }

    inline int GetIntersection(size_t i) const
    {
        return data[i];
    }

    inline const Span * GetSpanIteratorBegin() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data);
    }

    inline Span * GetSpanIteratorBegin()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data);
    }

    inline const Span * GetSpanIteratorEnd() const
    {
        assert((count & 1) == 0);
        return reinterpret_cast<const Span *>(data+count);
    }

    inline Span * GetSpanIteratorEnd()
    {
        assert((count & 1) == 0);
        return reinterpret_cast<Span *>(data+count);
    }

    inline void MergeOrAddSpan(int s, int e)
    {
        assert((count & 1) == 0);
        assert(s >= 0);
        assert(e >= 0);

        if (count == 0)
        {
            AddSpan(s, e);
            return;
        }

        int *lastspan = data + count-2;

        if (s > lastspan[1])
        {
            AddSpan(s, e);
        }
        else
        {
            if (s < lastspan[0])
                lastspan[0] = s;
            if (e > lastspan[1])
                lastspan[1] = e;
        }
    }

    inline void Reserve(size_t minsize)
    {
        if (minsize <= allocated_size)
            return;

        int *newdata = new int[minsize];

        memcpy(newdata, data, sizeof(int)*count);

        delete [] data;
        data = newdata;

        allocated_size = minsize;
    }

    inline void SortIntersections()
    {
        assert((count & 1) == 0);
        std::sort(data, data+count, std::less<int>());
        assert((count & 1) == 0);
    }

    inline void Swap(SpanBuffer &other)
    {
        std::swap(data, other.data);
        std::swap(allocated_size, other.allocated_size);
        std::swap(count, other.count);
    }
};


struct ShapeWidener {
    // How much to widen in the X direction
    int widen_by;
    // Half of width difference of src and dst (width of the border being produced)
    int xofs;

    // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call
    SpanBuffer buffer;

    inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst);

    ShapeWidener(int _xofs) : xofs(_xofs) { }
};


inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst)
{
    if (src.GetCount() == 0) return;
    if (src.GetCount() + dst.GetCount() == 0) return;

    assert((src.GetCount() & 1) == 0);
    assert((dst.GetCount() & 1) == 0);

    assert(buffer.GetCount() == 0);

    dst.Swap(buffer);

    const int widen_s = xofs - widen_by;
    const int widen_e = xofs + widen_by;

    size_t resta = src.GetCount()/2;
    size_t restb = buffer.GetCount()/2;
    const SpanBuffer::Span *spa = src.GetSpanIteratorBegin();
    const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin();

    while (resta > 0 || restb > 0)
    {
        if (restb == 0)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else if (resta == 0)
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
        else if (spa->start < spb->start)
        {
            dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e);
            --resta, ++spa;
        }
        else
        {
            dst.MergeOrAddSpan(spb->start, spb->end);
            --restb, ++spb;
        }
    }

    buffer.Clear();
}

Попробуйте использовать deque вместо vector. Deque выделяет меньше памяти за счет того, что она не является непрерывной памятью.

moswald 18.09.2008 10:28

Я бы всегда держал свой вектор пролетов отсортированным. Это НАМНОГО упрощает реализацию алгоритмов - и позволяет делать это за линейное время.

Хорошо, поэтому я бы отсортировал промежутки на основе:

  • минимальный интервал в порядке возрастания
  • затем охватите максимум в порядке убывания

Для этого вам нужно создать функцию.

Затем я бы использовал std :: set_union для объединения векторов (вы можете объединить более одного, прежде чем продолжить).

Затем для каждого последующего набора пролетов с одинаковыми минимумами вы оставляете первый и удаляете остальные (они являются частями первого пролета).

Затем вам нужно объединить свои пролеты. Это должно быть вполне выполнимо сейчас и выполнимо за линейное время.

Хорошо, вот трюк. Не пытайтесь делать это на месте. Используйте один или несколько временных векторов (и заранее зарезервируйте достаточно места). Затем, в конце, вызовите std :: vector :: swap, чтобы поместить результаты во входной вектор по вашему выбору.

Надеюсь, этого достаточно, чтобы вы начали.

Какая у вас целевая система? Это многоядерный? Если это так, вы можете рассмотреть возможность многопоточности этого алгоритма.

Моя целевая система - настольная система за последние 5 лет или около того, я не могу ничего предположить о поддержке SMP или наборах инструкций SIMD. (Ну, я могу предположить MMX на x86, но это все.)

jfs 19.09.2008 00:58

Другие вопросы по теме