帮助合并向量的算法
我需要一个非常快速的算法来完成以下任务。我已经实现了几种算法来完成它,但是它们对于我需要的性能来说太慢了。它应该足够快,以使算法可以在现代CPU上每秒至少运行100,000次。它将用C ++实现。
我正在使用跨度/范围,即在一条线上具有起点和终点坐标的结构。
我有两个跨度的向量(动态数组),我需要将它们合并。一个向量是src,另一向量是dst。这些向量按跨度开始坐标排序,并且跨度在一个向量内不重叠。
src向量中的跨度必须与dst向量中的跨度合并,这样所得的向量仍会被排序并且没有重叠。 IE。如果在合并过程中检测到重叠,则将两个跨度合并为一个。 (合并两个跨度只不过是更改结构中的坐标而已。)
现在,还有一个问题,在合并过程中,必须"扩大" src向量中的跨度。这意味着一个常量将被添加到src中每个跨度的开始坐标处,另一个(较大)常量将被添加到结束坐标中。这意味着在src跨度扩大后,它们可能会重叠。
到目前为止,我得出的结论是它无法完全就地完成,需要某种临时存储。我认为在src和dst元素的总和上,它应该在线性时间内可行。
任何临时存储都可能在算法的多次运行之间共享。
我尝试过的两种主要方法太慢了:
- 将src的所有元素添加到dst,然后在添加每个元素之前将其扩展。然后运行就地排序。最后,使用"读"和"写"指针对结果向量进行迭代,使读指针在写指针之前运行,并合并跨度。当所有元素都已合并(读取指针到达末尾)时,dst将被截断。
- 创建一个临时工作向量。如上所述,通过反复从src或者dst中选取下一个元素并合并到工作向量中,进行幼稚的合并。完成后,将工作向量复制到dst,并将其替换。
第一种方法的问题是排序是O((m + n)* log(m + n))而不是O(m + n),并且有一些开销。这也意味着dst向量必须变得比实际需要大得多。
第二个主要问题是大量复制并再次分配/取消分配内存。
如果我们认为需要,可以更改用于存储/管理跨度/向量的数据结构。
更新:忘记说数据集有多大。最常见的情况是任一向量中的4到30个元素之间,并且dst为空或者src和dst的跨度之间存在大量重叠。
解决方案
没有重复分配的第二种方法怎么样-换句话说,一次分配临时向量,而不再分配它?或者,如果输入向量足够小(但不是恒定大小),则仅使用alloca代替malloc。
另外,在速度方面,我们可能要确保代码使用CMOV进行排序,因为如果代码实际上是为mergesort的每个单次迭代而分支的:
if(src1[x] < src2[x]) dst[x] = src1[x]; else dst[x] = src2[x];
分支预测将在50%的时间内失败,这将对性能造成巨大影响。有条件的移动可能会做得更好,因此请确保编译器正在执行此操作,否则请尝试哄骗它这样做。
我们可以将方法1中提到的排序减少为线性时间(从我们描述的对数线性开始),因为两个输入列表已经进行了排序。只需执行merge-sort的合并步骤即可。使用输入跨度矢量的适当表示形式(例如,单链接列表),可以就地完成此操作。
http://en.wikipedia.org/wiki/Merge_sort
我认为严格的线性解决方案是不可能的,因为在最坏的情况下扩宽src向量跨度可能会导致它们全部重叠(取决于所添加常数的大小)
问题可能出在实现中,而不是算法中;我建议为我们以前的解决方案分析代码,以了解花在哪里的时间
推理:
对于运行在3.2GHz的真正"现代" CPU(例如Intel Core 2 Extreme QX9770),可以期望约59,455 MIPS
对于100,000个向量,我们将必须以594,550个指令处理每个向量。那是很多指令。
参考:维基百科MIPS
另外,请注意,向src向量跨度添加常量不会对其进行重新排序,因此我们可以独立地对src向量跨度进行规范化,然后将其与dst向量跨度合并;这应该减少原始算法的工作量
我们知道绝对最佳的运行时是O(m + n),这是因为我们至少必须扫描所有数据才能合并列表。鉴于此,第二种方法应为我们提供这种类型的行为。
我们是否描述了第二种方法来找出瓶颈?实际上,根据所讨论的数据量,实际上不可能在指定的时间内执行所需的操作。一种验证方法是做一些简单的事情,例如对循环中每个向量中跨度的所有开始和结束值求和,并对时间进行计时。基本上,这里我们对向量中的每个元素所做的工作最少。这将为我们提供可望获得的最佳性能的基准。
除此之外,我们可以避免使用stl swap方法逐个复制vectors元素,并且可以将temp向量预分配为一定大小,以避免合并元素时触发数组扩展。
我们可能会考虑在系统中使用2个向量,并且每当需要进行合并时,都将合并到未使用的向量中,然后交换(这类似于图形中使用的双缓冲)。这样,我们不必在每次合并时都重新分配向量。
但是,最好先进行概要分析,然后找出瓶颈所在。如果与实际的合并过程相比分配量很少,那么我们需要弄清楚如何更快地进行分配。
直接访问向量原始数据可能会带来一些额外的加速,从而避免每次访问数据时进行边界检查。
1完全排序比合并两个排序的列表慢。
因此,我们正在查看调整2(或者全新的内容)的方法。
如果将数据结构更改为双向链表,则可以在恒定的工作空间中合并它们。
对列表节点使用固定大小的堆分配器,既可以减少每个节点的内存使用,又可以提高节点在内存中靠拢的机会,从而减少页面遗漏。
我们可能可以在线或者在我们喜欢的算法书中找到代码以优化链接列表合并。我们将需要对此进行自定义,以便在列表合并的同时进行跨度合并。
为了优化合并,首先请注意,对于来自同一侧的值的每次运行而没有来自另一侧的值的运行,我们可以一次将整个运行插入到dst列表中,而不必依次插入每个节点。而且我们可以在正常的列表操作中为每次插入节省一次写入操作,只需在结尾处保持"悬挂"即可,因为我们知道稍后会对其进行修补。并且只要我们不删除应用程序中的其他任何地方,该列表就可以单链接,这意味着每个节点一次写入。
至于10微秒的运行时间则取决于n和m ...
如果我们最近的实现仍然不够快,我们可能最终不得不考虑其他方法。
我们将此功能的输出用于什么?
我针对此算法编写了一个新的容器类,以适应需求。这也使我有机会调整程序周围的其他代码,同时使速度有所提高。
这比使用STL向量的旧实现要快得多,但是在其他方面基本上是相同的。但是,虽然速度更快,但仍然还不够快...不幸的是。
分析不再揭示什么是真正的瓶颈。 MSVC探查器有时有时会将错误归咎于错误的调用(假设相同的运行分配了截然不同的运行时间),并且大多数调用已合并为一个大缺陷。
查看生成的代码的反汇编表明,生成的代码中存在大量跳跃,我认为这可能是现在速度缓慢的主要原因。
class SpanBuffer { private: int *data; size_t allocated_size; size_t count; inline void EnsureSpace() { if (count == allocated_size) Reserve(count*2); } public: struct Span { int start, end; }; public: SpanBuffer() : data(0) , allocated_size(24) , count(0) { data = new int[allocated_size]; } SpanBuffer(const SpanBuffer &src) : data(0) , allocated_size(src.allocated_size) , count(src.count) { data = new int[allocated_size]; memcpy(data, src.data, sizeof(int)*count); } ~SpanBuffer() { delete [] data; } inline void AddIntersection(int x) { EnsureSpace(); data[count++] = x; } inline void AddSpan(int s, int e) { assert((count & 1) == 0); assert(s >= 0); assert(e >= 0); EnsureSpace(); data[count] = s; data[count+1] = e; count += 2; } inline void Clear() { count = 0; } inline size_t GetCount() const { return count; } inline int GetIntersection(size_t i) const { return data[i]; } inline const Span * GetSpanIteratorBegin() const { assert((count & 1) == 0); return reinterpret_cast<const Span *>(data); } inline Span * GetSpanIteratorBegin() { assert((count & 1) == 0); return reinterpret_cast<Span *>(data); } inline const Span * GetSpanIteratorEnd() const { assert((count & 1) == 0); return reinterpret_cast<const Span *>(data+count); } inline Span * GetSpanIteratorEnd() { assert((count & 1) == 0); return reinterpret_cast<Span *>(data+count); } inline void MergeOrAddSpan(int s, int e) { assert((count & 1) == 0); assert(s >= 0); assert(e >= 0); if (count == 0) { AddSpan(s, e); return; } int *lastspan = data + count-2; if (s > lastspan[1]) { AddSpan(s, e); } else { if (s < lastspan[0]) lastspan[0] = s; if (e > lastspan[1]) lastspan[1] = e; } } inline void Reserve(size_t minsize) { if (minsize <= allocated_size) return; int *newdata = new int[minsize]; memcpy(newdata, data, sizeof(int)*count); delete [] data; data = newdata; allocated_size = minsize; } inline void SortIntersections() { assert((count & 1) == 0); std::sort(data, data+count, std::less<int>()); assert((count & 1) == 0); } inline void Swap(SpanBuffer &other) { std::swap(data, other.data); std::swap(allocated_size, other.allocated_size); std::swap(count, other.count); } }; struct ShapeWidener { // How much to widen in the X direction int widen_by; // Half of width difference of src and dst (width of the border being produced) int xofs; // Temporary storage for OverlayScanline, so it doesn't need to reallocate for each call SpanBuffer buffer; inline void OverlayScanline(const SpanBuffer &src, SpanBuffer &dst); ShapeWidener(int _xofs) : xofs(_xofs) { } }; inline void ShapeWidener::OverlayScanline(const SpanBuffer &src, SpanBuffer &dst) { if (src.GetCount() == 0) return; if (src.GetCount() + dst.GetCount() == 0) return; assert((src.GetCount() & 1) == 0); assert((dst.GetCount() & 1) == 0); assert(buffer.GetCount() == 0); dst.Swap(buffer); const int widen_s = xofs - widen_by; const int widen_e = xofs + widen_by; size_t resta = src.GetCount()/2; size_t restb = buffer.GetCount()/2; const SpanBuffer::Span *spa = src.GetSpanIteratorBegin(); const SpanBuffer::Span *spb = buffer.GetSpanIteratorBegin(); while (resta > 0 || restb > 0) { if (restb == 0) { dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); --resta, ++spa; } else if (resta == 0) { dst.MergeOrAddSpan(spb->start, spb->end); --restb, ++spb; } else if (spa->start < spb->start) { dst.MergeOrAddSpan(spa->start+widen_s, spa->end+widen_e); --resta, ++spa; } else { dst.MergeOrAddSpan(spb->start, spb->end); --restb, ++spb; } } buffer.Clear(); }
我将始终保持跨度矢量排序。这使得算法的实现变得更加容易,并且有可能在线性时间内完成。
确定,所以我将基于以下内容对跨度进行排序:
- 最小跨度按升序排列
- 然后以降序跨越最大值
我们需要创建一个函数来执行此操作。
然后,我将使用std :: set_union合并向量(在继续之前,我们可以合并多个向量)。
然后,对于具有相同最小值的每个连续范围跨度,保留第一个范围并删除其余部分(它们是第一个范围的子范围)。
然后,我们需要合并跨度。现在应该可以在线性时间内实现。
好的,这就是窍门。不要尝试就地执行此操作。使用一个或者多个临时向量(并提前保留足够的空间)。然后最后,调用std :: vector :: swap将结果放入我们选择的输入向量中。
我希望这足以使我们前进。
目标系统是什么?是多核吗?如果是这样,我们可以考虑对该算法进行多线程处理