说明:本文所有算法的涉及到的优化均指在PC上进行的,对于其他构架是否合适未知,请自行试验。
Box Filter,最经典的一种领域操作,在无数的场合中都有着广泛的应用,作为一个很基础的函数,其性能的好坏也直接影响着其他相关函数的性能,最典型莫如现在很好的EPF滤波器:GuideFilter。因此其优化的档次和程度是非常重要的,网络上有很多相关的代码和博客对该算法进行讲解和优化,提出了不少O(1)算法,但所谓的0(1)算法也有优劣之分,0(1)只是表示执行时间和某个参数无关,但本身的耗时还是有区别。比较流行的就是累积直方图法,其实这个是非常不可取的,因为稍大的图就会导致溢出,这里我们解析下 opencv的相关代码,看看他是如何进行优化的。
首先找到opencv的代码的位置,其在\opencv\sources\modules\imgproc\src\smooth.cpp中。
Box Filter 是一种行列可分离的滤波,因此,opencv也是这样处理的,先进行行方向的滤波,得到中间结果,然后再对中间结果进行列方向的处理,得到最终的结果。
opencv 行方向处理的相关代码如下:
template<typename T, typename ST> struct RowSum : public BaseRowFilter { RowSum( int _ksize, int _anchor ) : BaseRowFilter() { ksize = _ksize; anchor = _anchor; } virtual void operator()(const uchar* src, uchar* dst, int width, int cn) { const T* S = (const T*)src; ST* D = (ST*)dst; int i = 0, k, ksz_cn = ksize*cn; width = (width - 1)*cn; for( k = 0; k < cn; k++, S++, D++ ) { ST s = 0; for( i = 0; i < ksz_cn; i += cn ) s += S[i]; D[0] = s; for( i = 0; i < width; i += cn ) { s += S[i + ksz_cn] - S[i]; D[i+cn] = s; } } } };
这个代码考虑了多个通道以及多种数据类型的情况,为了分析方便我们重写下单通道时的核心代码。
for(Z = 0, Value = RowData[0]; Z < Size; Z++) Value += RowData[Z]; LinePD[0] = Value; for(X = 1; X < Width; X ++) { Value += RowData[X + Size - 1] - RowData[X - 1]; LinePD[X] = Value; }
上述代码中RowData是指对某一行像素进行扩展后的像素值,其宽度为Width + Radius + Radius, Radius是值滤波的半径, 而代码中Size = 2 * Radius + 1,LinePD即所谓的中间结果,考虑数据类型能表达的范围,必须使用 int类型。
对于每行第一个点很简单,直接用for计算出行方向的指定半径内的累加值。而之后所有点,用前一个累计值加上新加入的点,然后去除掉移出的哪一个点,得到新的累加值。这个方法在很多文献中都有提及,并没有什么新鲜之处,这里不多说。
按照正常的思维,在列方向的处理应该和行方向完全相同,只是处理的方向的不同、处理的数据源的不同以及最后需要对计算结果多一个归一化的过程而已。事实也是如此,有很多算法都是这样做的,但是由于CPU构架的一些原因(主要是cache miss的增加),同样的算法沿列方向处理总是会比沿行方向慢一个档次,解决方式有很多,例如先对中间结果进行转置,然后再按照行方向的规则进行处理,处理完后在将数据转置回去。转置过程是有非常高效的处理方式的,借助于SSE以及Cache优化,能实现比原始两重for循环快4倍以上的效果。还有一种方式就正如opencv中列处理过程所示,这正是下面将要重点描述的。
在opencv的代码中,并没有直接沿列方向处理,而是继续沿着行的方向一行一行处理,我先贴出我自己翻译的有关纯C的代码进行解说:
for (Y = 0; Y < Size - 1; Y++) // 注意没有最后一项哦 { int *LinePS = (int *)(Sum->Data + ColPos[Y] * Sum->WidthStep); for(X = 0; X < Width * Channel; X++) ColSum[X] += LinePS[X]; } for (Y = 0; Y < Height; Y++) { unsigned char* LinePD = Dest->Data + Y * Dest->WidthStep; int *AddPos = (int*)(Sum->Data + ColPos[Y + Size - 1] * Sum->WidthStep); int *SubPos = (int*)(Sum->Data + ColPos[Y] * Sum->WidthStep); for(X = 0; X < Width * Channel; X++) { Value = ColSum[X] + AddPos[X]; LinePD[X] = Value * Scale; // / Amount 这样写在PC上会更慢 ColSum[X] = Value - SubPos[X]; } }
接着我们看opencv的列方向上的代码,这里只贴出他针对uchar类型的图像数据的处理。
template<> struct ColumnSum<int, uchar> : public BaseColumnFilter { ColumnSum( int _ksize, int _anchor, double _scale ) : BaseColumnFilter() { ksize = _ksize; anchor = _anchor; scale = _scale; sumCount = 0; } virtual void reset() { sumCount = 0; } virtual void operator()(const uchar** src, uchar* dst, int dststep, int count, int width) { int i; int* SUM; bool haveScale = scale != 1; double _scale = scale; #if CV_SSE2 bool haveSSE2 = checkHardwareSupport(CV_CPU_SSE2); #endif if( width != (int)sum.size() ) { sum.resize(width); sumCount = 0; } SUM = &sum[0]; if( sumCount == 0 ) { memset((void*)SUM, 0, width*sizeof(int)); for( ; sumCount < ksize - 1; sumCount++, src++ ) { const int* Sp = (const int*)src[0]; i = 0; #if CV_SSE2 if(haveSSE2) { for( ; i <= width-4; i+=4 ) { __m128i _sum = _mm_loadu_si128((const __m128i*)(SUM+i)); __m128i _sp = _mm_loadu_si128((const __m128i*)(Sp+i)); _mm_storeu_si128((__m128i*)(SUM+i),_mm_add_epi32(_sum, _sp)); } } #elif CV_NEON for( ; i <= width - 4; i+=4 ) vst1q_s32(SUM + i, vaddq_s32(vld1q_s32(SUM + i), vld1q_s32(Sp + i))); #endif for( ; i < width; i++ ) SUM[i] += Sp[i]; } } else { CV_Assert( sumCount == ksize-1 ); src += ksize-1; } for( ; count--; src++ ) { const int* Sp = (const int*)src[0]; const int* Sm = (const int*)src[1-ksize]; uchar* D = (uchar*)dst; if( haveScale ) { i = 0; #if CV_SSE2 if(haveSSE2) { const __m128 scale4 = _mm_set1_ps((float)_scale); for( ; i <= width-8; i+=8 ) { __m128i _sm = _mm_loadu_si128((const __m128i*)(Sm+i)); __m128i _sm1 = _mm_loadu_si128((const __m128i*)(Sm+i+4)); __m128i _s0 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i)), _mm_loadu_si128((const __m128i*)(Sp+i))); __m128i _s01 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i+4)), _mm_loadu_si128((const __m128i*)(Sp+i+4))); __m128i _s0T = _mm_cvtps_epi32(_mm_mul_ps(scale4, _mm_cvtepi32_ps(_s0))); __m128i _s0T1 = _mm_cvtps_epi32(_mm_mul_ps(scale4, _mm_cvtepi32_ps(_s01))); _s0T = _mm_packs_epi32(_s0T, _s0T1); _mm_storel_epi64((__m128i*)(D+i), _mm_packus_epi16(_s0T, _s0T)); _mm_storeu_si128((__m128i*)(SUM+i), _mm_sub_epi32(_s0,_sm)); _mm_storeu_si128((__m128i*)(SUM+i+4),_mm_sub_epi32(_s01,_sm1)); } } #elif CV_NEON float32x4_t v_scale = vdupq_n_f32((float)_scale); for( ; i <= width-8; i+=8 ) { int32x4_t v_s0 = vaddq_s32(vld1q_s32(SUM + i), vld1q_s32(Sp + i)); int32x4_t v_s01 = vaddq_s32(vld1q_s32(SUM + i + 4), vld1q_s32(Sp + i + 4)); uint32x4_t v_s0d = cv_vrndq_u32_f32(vmulq_f32(vcvtq_f32_s32(v_s0), v_scale)); uint32x4_t v_s01d = cv_vrndq_u32_f32(vmulq_f32(vcvtq_f32_s32(v_s01), v_scale)); uint16x8_t v_dst = vcombine_u16(vqmovn_u32(v_s0d), vqmovn_u32(v_s01d)); vst1_u8(D + i, vqmovn_u16(v_dst)); vst1q_s32(SUM + i, vsubq_s32(v_s0, vld1q_s32(Sm + i))); vst1q_s32(SUM + i + 4, vsubq_s32(v_s01, vld1q_s32(Sm + i + 4))); } #endif for( ; i < width; i++ ) { int s0 = SUM[i] + Sp[i]; D[i] = saturate_cast<uchar>(s0*_scale); SUM[i] = s0 - Sm[i]; } } else { i = 0; #if CV_SSE2 if(haveSSE2) { for( ; i <= width-8; i+=8 ) { __m128i _sm = _mm_loadu_si128((const __m128i*)(Sm+i)); __m128i _sm1 = _mm_loadu_si128((const __m128i*)(Sm+i+4)); __m128i _s0 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i)), _mm_loadu_si128((const __m128i*)(Sp+i))); __m128i _s01 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i+4)), _mm_loadu_si128((const __m128i*)(Sp+i+4))); __m128i _s0T = _mm_packs_epi32(_s0, _s01); _mm_storel_epi64((__m128i*)(D+i), _mm_packus_epi16(_s0T, _s0T)); _mm_storeu_si128((__m128i*)(SUM+i), _mm_sub_epi32(_s0,_sm)); _mm_storeu_si128((__m128i*)(SUM+i+4),_mm_sub_epi32(_s01,_sm1)); } } #elif CV_NEON for( ; i <= width-8; i+=8 ) { int32x4_t v_s0 = vaddq_s32(vld1q_s32(SUM + i), vld1q_s32(Sp + i)); int32x4_t v_s01 = vaddq_s32(vld1q_s32(SUM + i + 4), vld1q_s32(Sp + i + 4)); uint16x8_t v_dst = vcombine_u16(vqmovun_s32(v_s0), vqmovun_s32(v_s01)); vst1_u8(D + i, vqmovn_u16(v_dst)); vst1q_s32(SUM + i, vsubq_s32(v_s0, vld1q_s32(Sm + i))); vst1q_s32(SUM + i + 4, vsubq_s32(v_s01, vld1q_s32(Sm + i + 4))); } #endif for( ; i < width; i++ ) { int s0 = SUM[i] + Sp[i]; D[i] = saturate_cast<uchar>(s0); SUM[i] = s0 - Sm[i]; } } dst += dststep; } } double scale; int sumCount; std::vector<int> sum; };
代码比较多,我稍微精简下(注意,精简后的是不可运行的,只是更清晰的表达了思路)。
virtual void operator()(const uchar** src, uchar* dst, int dststep, int count, int width) { int i; int* SUM; bool haveScale = scale != 1; double _scale = scale; if( width != (int)sum.size() ) { sum.resize(width); sumCount = 0; } SUM = &sum[0]; if( sumCount == 0 ) { memset((void*)SUM, 0, width*sizeof(int)); for( ; sumCount < ksize - 1; sumCount++, src++ ) { const int* Sp = (const int*)src[0]; i = 0; for( ; i <= width-4; i+=4 ) { __m128i _sum = _mm_loadu_si128((const __m128i*)(SUM+i)); __m128i _sp = _mm_loadu_si128((const __m128i*)(Sp+i)); _mm_storeu_si128((__m128i*)(SUM+i),_mm_add_epi32(_sum, _sp)); } for( ; i < width; i++ ) SUM[i] += Sp[i]; } } else { src += ksize-1; } for( ; count--; src++ ) { const int* Sp = (const int*)src[0]; const int* Sm = (const int*)src[1-ksize]; uchar* D = (uchar*)dst; i = 0; const __m128 scale4 = _mm_set1_ps((float)_scale); for( ; i <= width-8; i+=8 ) { __m128i _sm = _mm_loadu_si128((const __m128i*)(Sm+i)); __m128i _sm1 = _mm_loadu_si128((const __m128i*)(Sm+i+4)); __m128i _s0 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i)), _mm_loadu_si128((const __m128i*)(Sp+i))); __m128i _s01 = _mm_add_epi32(_mm_loadu_si128((const __m128i*)(SUM+i+4)), _mm_loadu_si128((const __m128i*)(Sp+i+4))); __m128i _s0T = _mm_cvtps_epi32(_mm_mul_ps(scale4, _mm_cvtepi32_ps(_s0))); __m128i _s0T1 = _mm_cvtps_epi32(_mm_mul_ps(scale4, _mm_cvtepi32_ps(_s01))); _s0T = _mm_packs_epi32(_s0T, _s0T1); _mm_storel_epi64((__m128i*)(D+i), _mm_packus_epi16(_s0T, _s0T)); _mm_storeu_si128((__m128i*)(SUM+i), _mm_sub_epi32(_s0,_sm)); _mm_storeu_si128((__m128i*)(SUM+i+4),_mm_sub_epi32(_s01,_sm1)); } for( ; i < width; i++ ) { int s0 = SUM[i] + Sp[i]; D[i] = saturate_cast<uchar>(s0*_scale); SUM[i] = s0 - Sm[i]; } dst += dststep; } }
opencv的代码在这里就没有继续优化了,本人认为虽然这里的操作是前后依赖的,全局无法并行化,但是观察这一行代码:
Value += RowData[X + Size - 1] - RowData[X - 1];
其中RowData[X + Size - 1] - RowData[X - 1]; 并不是前后依赖的,是可以并行的,是可以借助于因此如果提前计算出所有的差值,