解析opencv中Box Filter的实现并提出进一步加速的方案(源码共享)。

说明:本文所有算法的涉及到的优化均指在PC上进行的,对于其他构架是否合适未知,请自行试验。

Box Filter,最经典的一种领域操作,在无数的场合中都有着广泛的应用,作为一个很基础的函数,其性能的好坏也直接影响着其他相关函数的性能,最典型莫如现在很好的EPF滤波器:GuideFilter。因此其优化的档次和程度是非常重要的,网络上有很多相关的代码和博客对该算法进行讲解和优化,提出了不少O(1)算法,但所谓的0(1)算法也有优劣之分,0(1)只是表示执行时间和某个参数无关,但本身的耗时还是有区别。比较流行的就是累积直方图法,其实这个是非常不可取的,因为稍大的图就会导致溢出,这里我们解析下 opencv的相关代码,看看他是如何进行优化的。

首先找到opencv的代码的位置,其在\opencv\sources\modules\imgproc\src\smooth.cpp中。

Box Filter 是一种行列可分离的滤波,因此,opencv也是这样处理的,先进行行方向的滤波,得到中间结果,然后再对中间结果进行列方向的处理,得到最终的结果。

opencv 行方向处理的相关代码如下:

template<typename T, typename ST>
struct RowSum :
        public BaseRowFilter
{
    RowSum( int _ksize, int _anchor ) :
        BaseRowFilter()
    {
        ksize = _ksize;
        anchor = _anchor;
    }

    virtual void operator()(const uchar* src, uchar* dst, int width, int cn)
    {
        const T* S = (const T*)src;
        ST* D = (ST*)dst;
        int i = 0, k, ksz_cn = ksize*cn;

        width = (width - 1)*cn;
        for( k = 0; k < cn; k++, S++, D++ )
        {
            ST s = 0;
            for( i = 0; i < ksz_cn; i += cn )
                s += S[i];
            D[0] = s;
            for( i = 0; i < width; i += cn )
            {
                s += S[i + ksz_cn] - S[i];
                D[i+cn] = s;
            }
        }
    }
};

  这个代码考虑了多个通道以及多种数据类型的情况,为了分析方便我们重写下单通道时的核心代码。

for(Z = 0, Value = 0; Z < Size; Z++)
    Value += RowData[Z];
LinePD[0] = Value;

for(X = 1; X < Width; X ++)
{
    Value += RowData[X + Size - 1] - RowData[X - 1];
    LinePD[X] = Value;
}

  上述代码中RowData是指对某一行像素进行扩展后的像素值,其宽度为Width + Radius + Radius, Radius是值滤波的半径, 而代码中Size = 2 * Radius + 1,LinePD即所谓的中间结果,考虑数据类型能表达的范围,必须使用 int类型。

对于每行第一个点很简单,直接用for计算出行方向的指定半径内的累加值。而之后所有点,用前一个累计值加上新加入的点,然后去除掉移出的哪一个点,得到新的累加值。这个方法在很多文献中都有提及,并没有什么新鲜之处,这里不多说。

按照正常的思维,在列方向的处理应该和行方向完全相同,只是处理的方向的不同、处理的数据源的不同以及最后需要对计算结果多一个归一化的过程而已。事实也是如此,有很多算法都是这样做的,但是由于CPU构架的一些原因(主要是cache miss的增加),同样的算法沿列方向处理总是会比沿行方向慢一个档次,解决方式有很多,例如先对中间结果进行转置,然后再按照行方向的规则进行处理,处理完后在将数据转置回去。转置过程是有非常高效的处理方式的,借助于SSE以及Cache优化,能实现比原始两重for循环快4倍以上的效果。还有一种方式就正如opencv中列处理过程所示,这正是下面将要重点描述的。

在opencv的代码中,并没有直接沿列方向处理,而是继续沿着行的方向一行一行处理,我先贴出我自己翻译的有关纯C的代码进行解说:

    for (Y = 0; Y < Size - 1; Y++)            //    注意没有最后一项哦
    {
        int *LinePS = (int *)(Sum->Data + ColPos[Y] * Sum->WidthStep);
        for(X = 0; X < Width; X++)    ColSum[X] += LinePS[X];
    }

    for (Y = 0; Y < Height; Y++)
    {
        unsigned char* LinePD    = Dest->Data + Y * Dest->WidthStep;
        int *AddPos              = (int*)(Sum->Data + ColPos[Y + Size - 1] * Sum->WidthStep);
        int *SubPos              = (int*)(Sum->Data + ColPos[Y] * Sum->WidthStep);

        for(X = 0; X < Width; X++)
        {
            Value = ColSum[X] + AddPos[X];
            LinePD[X] = Value * Scale;
            ColSum[X] = Value - SubPos[X];
        }
    }

上述代码中定义了一个ColSum用于保存每行某个位置处在列方向上指定半径内各中间元素的累加值,对于第一行,做特殊处理,其他行的每个元素的处理方式和BaseRowFilter里的处理方式很像,只是在书写上有所区别,特别注意的是对第一行的累加时,最后一个元素并没有计算在内,这个处理技巧在下面的X循环里有体现,请大家仔细体味下。

上述代码这样做的好处是,能有效的减少CPU的cache miss,但是总的计算量是没有改变的,因此能有效的提高速度。

针对PC,在opencv内部,其在列方向上采用了SSE进行进一步的优化,我们贴出其处理uchar类型的代码:

  代码比较多,我稍微精简下(注意,精简后的是不可运行的,只是更清晰的表达了思路)。

virtual void operator()(const uchar** src, uchar* dst, int dststep, int count, int width)
{
    int i;
    int* SUM;
    bool haveScale = scale != 1;
    double _scale = scale;
    if( width != (int)sum.size() )
    {
        sum.resize(width);
        sumCount = 0;
    }

    SUM = &sum[0];
    if( sumCount == 0 )
    {
        memset((void*)SUM, 0, width*sizeof(int));
        for( ; sumCount < ksize - 1; sumCount++, src++ )
        {
            const int* Sp = (const int*)src[0];
            i = 0;

            for( ; i <= width-4; i+=4 )
            {
                __m128i _sum = _mm_loadu_si128((const __m128i*)(SUM+i));
                __m128i _sp = _mm_loadu_si128((const __m128i*)(Sp+i));
                _mm_storeu_si128((__m128i*)(SUM+i),_mm_add_epi32(_sum, _sp));
            }
            for( ; i < width; i++ )
                SUM[i] += Sp[i];
        }
    }
    else
    {
        src += ksize-1;
    }

    for( ; count--; src++ )
    {
        const int* Sp = (const int*)src[0];
        const int* Sm = (const int*)src[1-ksize];
        uchar* D = (uchar*)dst;

        i = 0;
        const __m128 scale4 = _mm_set1_ps((float)_scale);
        for( ; i <= width-8; i+=8 )
        {
            __m128i _sm  = _mm_loadu_si128((const __m128i*)(Sm+i));
            __m128i _sm1  = _mm_loadu_si128((const __m128i*)(Sm+i+4));

            __m128i _s0  = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i)),
                                         _mm_loadu_si128((const __m128i*)(Sp+i)));
            __m128i _s01  = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i+4)),
                                          _mm_loadu_si128((const __m128i*)(Sp+i+4)));

            __m128i _s0T = _mm_cvtps_epi32(_mm_mul_ps(scale4, _mm_cvtepi32_ps(_s0)));
            __m128i _s0T1 = _mm_cvtps_epi32(_mm_mul_ps(scale4, _mm_cvtepi32_ps(_s01)));

            _s0T = _mm_packs_epi32(_s0T, _s0T1);

            _mm_storel_epi64((__m128i*)(D+i), _mm_packus_epi16(_s0T, _s0T));

            _mm_storeu_si128((__m128i*)(SUM+i), _mm_sub_epi32(_s0,_sm));
            _mm_storeu_si128((__m128i*)(SUM+i+4),_mm_sub_epi32(_s01,_sm1));
        }
        for( ; i < width; i++ )
        {
            int s0 = SUM[i] + Sp[i];
            D[i] = saturate_cast<uchar>(s0*_scale);
            SUM[i] = s0 - Sm[i];
        }

        dst += dststep;
    }
}

在行方向上,ColSum每个元素的更新相互之间是没有任何关系的,因此,借助于SSE可以实现一次性的四个元素的更新,并且上述代码还将第一行的特殊计算也用SSE实现了,虽然这部分计算量是非常小的。

在具体的SSE细节上,对于uchar类型,虽然中间结果是用int类型表达的,但是由于SSE没有整形的除法指令(是否有?),因此上面借用了浮点数的乘法SSE指令实现,当然就多了_mm_cvtepi32_ps 以及 _mm_cvtps_epi32这样的类型转换的函数。如果有整形除法,那就能更好了。

SSE的实现,无非也就是用_mm_loadu_si128加载数据,用_mm_add_epi32, _mm_mul_ps之类的函数进行基本的运算,用_mm_storeu_si128来保存数据,并没有什么特别复杂的部分,注意到考虑到数据的普遍性,不一定都是16字节对齐的,因此在加载和保存是需要使用u方面的函数,其实现在的_mm_loadu_si128和_mm_load_si128在处理速度上已经没有特别明显的区别了。

注意到在每个SSE代码后面,总还有部分C代码,这是因为我们实际数据宽度并不一定是4的整数倍,未被SSE处理的部分必须使用C实现,这其实对读者来说是非常好的事情,因为我们能从这部分C代码中搞明白上述的SSE代码是干什么的。

  以上就是opencv的Box Filter实现的关键代码,如果读者去看具体细节,opencv还有针对手机上的Neon优化,其实这个和SSE的意思基本是一样的。

  那是否还有改进的空间呢,从我的认知中,在对垂直方向的处理上,应该没有了,但是水平方向呢, SSE有没有用武之地,经过我的思考我认为还是有的。

  在行方向的计算中,这个for循环是主要的计算。

for(X = 1; X < Width; X ++)
{
    Value += RowData[X + Size - 1] - RowData[X - 1];
    LinePD[X] = Value;
}

本人认为虽然这里的操作是前后依赖的,全局无法并行化,但是观察这一行代码:

Value += RowData[X + Size - 1] - RowData[X - 1];    

其中RowData[X + Size - 1] - RowData[X - 1]; 并不是前后依赖的,是可以并行的,因此如果提前快速的计算出所有的差值,那么提速的空间还比较大,即需要提前计算出下面的函数:

 for(X = 0; X < (Width - 1); X++)
     Diff[X] = AddPos[X] - SubPos[X];

  这里用SSE实现则非常简单了:

        unsigned char *AddPos = RowData + Size * Channel;
        unsigned char *SubPos = RowData;
        X = 0;                    //    注意这个赋值在下面的循环外部,这可以避免当Width<8时第二个for循环循环变量未初始化
        __m128i Zero = _mm_setzero_si128();
        for (; X <= (Width - 1) * Channel - 8; X += 8)
        {
            __m128i Add = _mm_unpacklo_epi8(_mm_loadl_epi64((__m128i const *)(AddPos + X)), Zero);
            __m128i Sub = _mm_unpacklo_epi8(_mm_loadl_epi64((__m128i const *)(SubPos + X)), Zero);
            _mm_store_si128((__m128i *)(Diff + X + 0), _mm_sub_epi32(_mm_unpacklo_epi16(Add, Zero), _mm_unpacklo_epi16(Sub, Zero)));        //    由于采用了_aligned_malloc函数分配内存,可是使用_mm_store_si128
            _mm_store_si128((__m128i *)(Diff + X + 4), _mm_sub_epi32(_mm_unpackhi_epi16(Add, Zero), _mm_unpackhi_epi16(Sub, Zero)));
        }
        for(; X < (Width - 1) * Channel; X++)
            Diff[X] = AddPos[X] - SubPos[X];

  和列方向的SSE处理代码不同的是,这里加载的是uchar数据,因此加载的函数就有所不同,处理的方式也有区别,上面几个SSE函数各位查查MSDN就能理解其意思,还是很有味道的。

  经过这样的处理,经过测试发现,速度能够比opencv的版本在提高30%,也是额外的惊喜。

再有的优化可能提速有限,比如把剩下的一些for循环内部分成四路等等。

在我的I5台式机中,使用上述算法对1024*1024的三通道彩色图像进行半径为5的测试,进行100次的计算纯算法部分的耗时为800ms,如果是纯C版本大概为1800ms;当半径为200时,SSE版本约为950ms, C版本约1900ms;当半径为400时,SSE版本约为1000ms, C版本约2100ms;可见,半径增大,耗时稍有增加,这主要是由于算法中有部分初始化的计算和半径有关,但是这些都是不重要的。

在不使用多线程(虽然本算法非常适合多线程计算),不使用GPU,只使用单线程\CPU进行计算的情况下,个人觉得目前我这个代码是很难有质的超越的,从某个方面讲,代码中的用于计算的时间占用的时间比从内存等待数据的时间可能还要短,而似乎也没有更好的算法能进一步减少内存数据的访问量。

本人在这里邀请各位高手对目前我优化的这个代码进行进一步的优化,希望高人不要谦虚。

源代码下载地址(VS2010编写):Box Filter

  运行界面:

  

  本文的代码是针对常用的图像数据进行的优化处理,在很多场合下,需要对int或者float类型进行处理,比如GuideFilter,如果读者理解了本文解读的代码的原理,更改代码以便适应他们则是一件非常简单的事情,如果您不会,那么也请你不要给我留言,或者我可以有偿提供,因为从本质上讲我喜欢提供渔,而不是鱼,渔具已经送给你了,你却找我要鱼,那只能..............。

****************************作者: laviewpbt   时间: 2015.12.17    联系QQ:  33184777 转载请保留本行信息**********************

时间: 2024-12-25 22:26:37

解析opencv中Box Filter的实现并提出进一步加速的方案(源码共享)。的相关文章

解析opencv的 Box Filter的实现并提出进一步加速的方案。

说明:本文所有算法的涉及到的优化均指在PC上进行的,对于其他构架是否合适未知,请自行试验. Box Filter,最经典的一种领域操作,在无数的场合中都有着广泛的应用,作为一个很基础的函数,其性能的好坏也直接影响着其他相关函数的性能,最典型莫如现在很好的EPF滤波器:GuideFilter.因此其优化的档次和程度是非常重要的,网络上有很多相关的代码和博客对该算法进行讲解和优化,提出了不少O(1)算法,但所谓的0(1)算法也有优劣之分,0(1)只是表示执行时间和某个参数无关,但本身的耗时还是有区别

String源码分析之Java中的String为什么是不可变的以及replace方法源码分析

什么是不可变对象? 众所周知, 在Java中, String类是不可变的.那么到底什么是不可变的对象呢? 可以这样认为:如果一个对象,在它创建完成之后,不能再改变它的状态,那么这个对象就是不可变的.不能改变状态的意思是,不能改变对象内的成员变量,包括基本数据类型的值不能改变,引用类型的变量不能指向其他的对象,引用类型指向的对象的状态也不能改变. 区分对象和对象的引用 对于Java初学者, 对于String是不可变对象总是存有疑惑.看下面代码: String s = "ABCabc";

C#中Stack&lt;T&gt;类的使用及部分成员函数的源码分析

Stack<T>类 Stack<T> 作为数组来实现. Stack<T> 的容量是 Stack<T> 可以包含的元素数. 当向 Stack<T> 中添加元素时,将通过重新分配内部数组来根据需要自动增大容量. 可通过调用 TrimExcess 来减少容量. 如果 Count 小于堆栈的容量,则 Push 的运算复杂度是 O(1). 如果需要增加容量以容纳新元素,则 Push 的运算复杂度成为 O(n),其中 n 为 Count. Pop 的运算复杂

解析大象跳转如何在线生成微信自动跳转第三方浏览器源码

源码使用场景: 1.用来实现微信自动跳转外部浏览器下载app2.用来实现微信内打开网页链接自动跳转浏览器访问指定页面3.防止网页链接由于被微信拦截,导致用户无法正常在微信内打开 源码说明: 适用安卓和苹果系统,支持任何网页链接.并且无论链接是否已经被微信拦截,均可实现微信内自动跳转浏览器打开. 生成的跳转链接具有极佳的防拦截效果. 源码体验: 1.复制分享链接(app下载页链接或网页链接),然后在浏览器中打开大象跳转地址:http://www.go51w.cn/ 2.在工具的输入框中粘贴我们刚才

源码解析Android中View的layout布局过程

Android中的Veiw从内存中到呈现在UI界面上需要依次经历三个阶段:量算 -> 布局 -> 绘图,关于View的量算.布局.绘图的总体机制可参见博文 < Android中View的布局及绘图机制>.量算是布局的基础,如果想了解量算的细节,可参见博文<源码解析Android中View的measure量算过程>.本文将从源码角度解析View的布局layout过程,本文会详细介绍View布局过程中的关键方法,并对源码加上了注释以进行说明. 对View进行布局的目的是计算

SpringSecurity的Filter执行顺序在源码中的体现

在网上看各种SpringSecurity教程时,都讲到了SpringSecurity的Filter顺序.但是一直不知道这个顺序在源码中是如何体现的.今天一步一步的查找,最终找到顺序是在FilterComparator中定义的. 先看一下代码: /** * An internal use only {@link Comparator} that sorts the Security {@link Filter} * instances to ensure they are in the corre

JUC中Lock和ReentrantLock介绍及源码解析

Lock框架是jdk1.5新增的,作用和synchronized的作用一样,所以学习的时候可以和synchronized做对比.在这里先和synchronized做一下简单对比,然后分析下Lock接口以及ReentrantLock的源码和说明.具体的其他的Lock实现的分析在后面会慢慢介绍. Lock框架和synchronized 有关synchronized的作用和用法不在具体说明,应该都很熟悉了.而Lock有着和synchronized一样的语意,但是比synchronized多了一些功能,

深入源码解析Android中的Handler,Message,MessageQueue,Looper

本文主要是对Handler和消息循环的实现原理进行源码分析,如果不熟悉Handler可以参见博文< Android中Handler的使用>,里面对Android为何以引入Handler机制以及如何使用Handler做了讲解. 概括来说,Handler是Android中引入的一种让开发者参与处理线程中消息循环的机制.我们在使用Handler的时候与Message打交道最多,Message是Hanlder机制向开发人员暴露出来的相关类,可以通过Message类完成大部分操作Handler的功能.但

SpringSecurity 依据用户请求的过程进行源码解析

SpringSecurity实现安全管理主要通过滤器(filter).验证器(AuthenticationManager).用户数据提供器(ProviderManager).授权器(accessDecisionManager).投票器(AccessDecisionVoter)这几个基本模块协作完成的.大概分为两个部分 用户验证 和授权 这个两个部分.这个部分主要在AuthenticationProcessingFilter和AbstractSecurityInterceptor中完成. 使用过S