现在的位置: 首页 > 综合 > 正文

英特尔多核平台编码优化大赛的优化过程

2013年10月10日 ⁄ 综合 ⁄ 共 9832字 ⁄ 字号 评论关闭
 英特尔多核平台编码优化大赛的优化过程
                        
HouSisong@263.net  2007.01.20

    英特尔最近举办了一次多核平台编码优化大赛,我也参加了这次比赛;大赛代码提交阶段已经结束,
所以也可以公开自己的优化过程;
    去年末双核处理器出货量开始超过单核处理器出货量,英特尔也发布了4核CPU;AMD今年也将发布4核,
并计划今年下半年发布8核,多核已经大势所趋;而以前的CPU性能增长,软件界都能很自然的获得相应的
性能增长,而对于这次多核方式带来的CPU性能爆炸性增长,软件界的免费午餐没有了,程序开发人员必须
手工的完成软件的并行化;而很多程序员、计算机教育界等对并行编程的准备并不充足(包括我);英特尔
在这种背景下举办多核平台编码优化大赛是很有现实意义的;
    本文章主要介绍了自己参加竞赛过程中,对代码的优化过程;
   
    我测试和优化过程中用的 CPU:AMD64x2 3600+ (双核CPU)  (开发初期还使用过AMD3000+)
                      操作系统:Windows XP 32bit
                        编译器:Visual Studio 2005
                       

大赛公布的原始代码:
(组织者后来要求计算和输出精度到小数点后7位,这里的输出代码做了相应的调整。)

//* compute the potential energy of a collection of */
//* particles interacting via pairwise potential    */

 
#include 
<stdio.h>

#include 
<stdlib.h>
#include 
<math.h>
#include 
<windows.h>
#include 
<time.h>
 
#define NPARTS 1000
#define NITER 201
#define DIMS 3
 
int rand( void );
int computePot(void
);
void initPositions(void
);
void updatePositions(void
);
 
double
 r[DIMS][NPARTS];
double
 pot;
double
 distx, disty, distz, dist;

int main() {
   
int
 i;
   clock_t start, stop;

   initPositions();
   updatePositions();
 
   start=
clock();
   
for( i=0; i<NITER; i++
 ) {
      pot 
= 0.0
;
      computePot();
      
if (i%10 == 0) printf("%5d: Potential: %10.7f  "
, i, pot);
      updatePositions();
   }
   stop
=
clock();
   printf (
"Seconds = %10.9f  ",(double)(stop-start)/
 CLOCKS_PER_SEC);
}
 
 
void
 initPositions() {
   
int
 i, j;
 
   
for( i=0; i<DIMS; i++
 )
      
for( j=0; j<NPARTS; j++
 ) 
         r[i][j] 
= 0.5 + ( (double) rand() / (double
) RAND_MAX );
}
 
 
void
 updatePositions() {
   
int
 i, j;
 
   
for( i=0; i<DIMS; i++
 )
      
for( j=0; j<NPARTS; j++
 )
         r[i][j] 
-= 0.5 + ( (double) rand() / (double
) RAND_MAX );
}
 
 
int
 computePot() {
   
int
 i, j;

   for( i=0; i<NPARTS; i++ ) {
      
for( j=0; j<i-1; j++
 ) {
        distx 
= pow( (r[0][j] - r[0][i]), 2
 );
        disty 
= pow( (r[1][j] - r[1][i]), 2
 );
        distz 
= pow( (r[2][j] - r[2][i]), 2
 );
        dist 
= sqrt( distx + disty +
 distz );
        pot 
+= 1.0 /
 dist;
      }
   }
   
return 0
;
}

代码执行时间          3.97秒

整个代码的代码量还是很小的,而且很容易发现computePot是一个O(N*N)的算法,运行时间上它的消耗最大,
占了总运行时间的95%以上(建议使用工具来定位程序的热点);
对于computePot函数的优化,最容易看到的优化点就是pow函数的调用,pow(x,y)函数是一个的很耗时的操
作(很多库,会针对y为整数做优化),把pow(x,y)改成x*x的方式
修改后的computePot代码如下(其它代码不变):

inline pow2(const double x) { return x*x; } 
int
 computePot() {
   
int
 i, j;

   for( i=0; i<NPARTS; i++ ) {
      
for( j=0; j<i-1; j++
 ) {
        distx 
= pow2( (r[0][j] - r[0
][i]) );
        disty 
= pow2( (r[1][j] - r[1
][i]) );
        distz 
= pow2( (r[2][j] - r[2
][i]) );
        dist 
= sqrt( distx + disty +
 distz );
        pot 
+= 1.0 /
 dist;
      }
   }
   
return 0
;
}

代码执行时间          2.50秒    该修改加速比:158.8%    相对于原始代码加速比:158.8%

由于computePot中对数组r的访问顺序和r定义的顺序方式不一致,尝试修改r的数据组织结构;
将double r[DIMS][NPARTS] 的定义 改为 double r[NPARTS][DIMS];
(后面证明这个修改使自己走了很大的弯路;原来的向量格式在使用SSE2优化的时候,代码上会稍微简单一点)
也相应的修改了其他访问r的代码: 

#include <stdio.h>
#include 
<stdlib.h>
#include 
<math.h>
#include 
<windows.h>
#include 
<time.h>
 
#define NPARTS 1000
#define NITER 201
#define DIMS 3
 
int rand( void );
int computePot(void
);
void initPositions(void
);
void updatePositions(void
);
 
double
 r[NPARTS][DIMS];
double
 pot;
double
 distx, disty, distz, dist;

int main() {
   
int
 i;
   clock_t start, stop;

   initPositions();
   updatePositions();
 
   start=
clock();
   
for( i=0; i<NITER; i++
 ) {
      pot 
= 0.0
;
      computePot();
      
if (i%10 == 0) printf("%5d: Potential: %10.7f  "
, i, pot);
      updatePositions();
   }
   stop
=
clock();
   printf (
"Seconds = %10.9f  ",(double)(stop-start)/
 CLOCKS_PER_SEC);
   getchar();
}
 
 
void
 initPositions() {
   
int
 i, j;
 
   
for( i=0; i<DIMS; i++
 )
      
for( j=0; j<NPARTS; j++
 ) 
         r[j][i] 
= 0.5 + ( (double) rand() / (double
) RAND_MAX );
}
 
 
void
 updatePositions() {
   
int
 i, j;
 
   
for( i=0; i<DIMS; i++
 )
      
for( j=0; j<NPARTS; j++
 )
         r[j][i] 
-= 0.5 + ( (double) rand() / (double
) RAND_MAX );
}
 
inline 
double pow2(const double x) { return x*
x; } 
int
 computePot() {
   
int
 i, j;

   for( i=0; i<NPARTS; i++ ) {
      
for( j=0; j<i-1; j++
 ) {
        distx 
= pow2( (r[j][0- r[i][0
]) );
        disty 
= pow2( (r[j][1- r[i][1
]) );
        distz 
= pow2( (r[j][2- r[i][2
]) );
        dist 
= sqrt( distx + disty +
 distz );
        pot 
+= 1.0 /
 dist;
      }
   }
   
return 0
;
}

代码执行时间          2.39秒    该修改加速比:104.6%    相对于原始代码加速比:166.1%

r可以看作一个3维空间点的数组,computePot函数就是求这些点之间距离的某种势能:)
为了将每个点的内存访问对齐到16byte边界,
将double r[NPARTS][DIMS] 的定义 改为 __declspec(align(16))double r[NPARTS][DIMS+1];
(其它代码不变)(速度略有下降,为使用SSE2优化作准备)

代码执行时间          2.42秒    该修改加速比: 98.8%    相对于原始代码加速比:164.0%

使用SSE2来优化computePot函数;SSE2是一种单指令流多数据流指令集,里面包含有能同时处理两个64bit浮点数的指令;
代码需要 #include <emmintrin.h> 文件,从而利用编译器来优化SSE2指令,就不用手工编写汇编代码了;
修改后的computePot代码如下(其它代码不变):

int computePot() {
   
int
 i, j;
   __m128d lcpot;

  lcpot=_mm_setzero_pd();
  
for( i=0; i<NPARTS; i++
 ) {
      __m128d posi_l
=*(__m128d*)(&r[i][0
]);
      __m128d posi_h
=*(__m128d*)(&r[i][2
]);
      j
=0
;
      
for(;j+1<i;++
j)
      {
          __m128d dm0_l
=_mm_sub_pd(*(__m128d*)(&r[j][0
]),posi_l);
          __m128d dm0_h
=_mm_sub_pd(*(__m128d*)(&r[j][2
]),posi_h);
          dm0_l
=
_mm_mul_pd(dm0_l,dm0_l);
          dm0_h
=
_mm_mul_pd(dm0_h,dm0_h);

          __m128d dm0=_mm_add_pd(dm0_l,dm0_h);
          __m128d dm1;
          dm1
=
_mm_unpackhi_pd(dm0,dm0);
          dm0
=
_mm_add_sd(dm0,dm1);

          dm1=_mm_sqrt_sd(dm0,dm0); 
          dm0
=
_mm_div_sd(dm1,dm0); 
         
          lcpot
=
_mm_add_sd(lcpot,dm0);
      }
   }

   __m128d dm1;
   dm1=
_mm_unpackhi_pd(lcpot,lcpot);
   lcpot
=
_mm_add_sd(lcpot,dm1);
   pot
+=*(double*)(&
lcpot);
   
return 0
;
}

代码执行时间          2.31秒    该修改加速比:104.8%    相对于原始代码加速比:171.9%
(时间提高不多,主要时间耗在了_mm_sqrt_sd、_mm_div_sd指令上,所以SSE2的优化效果没有体现出来)

SSE3中有一条_mm_hadd_pd指令(水平加),可以用来改善一点点代码,
程序需要#include <intrin.h>文件:

int computePot() {
   
int
 i, j;
   __m128d lcpot;

  lcpot=_mm_setzero_pd();
  
for( i=0; i<NPARTS; i++
 ) {
      __m128d posi_l
=*(__m128d*)(&r[i][0
]);
      __m128d posi_h
=*(__m128d*)(&r[i][2
]);
      j
=0
;
      
for(;j+1<i;++
j)
      {
          __m128d dm0_l
=_mm_sub_pd(*(__m128d*)(&r[j][0
]),posi_l);
          __m128d dm0_h
=_mm_sub_pd(*(__m128d*)(&r[j][2
]),posi_h);
          dm0_l
=
_mm_mul_pd(dm0_l,dm0_l);
          dm0_h
=
_mm_mul_pd(dm0_h,dm0_h);

          __m128d dm0=_mm_add_pd(dm0_l,dm0_h);
          __m128d dm1;
          
//
dm1=_mm_unpackhi_pd(dm0,dm0);
          
//dm0=_mm_add_sd(dm0,dm1);

          dm0=_mm_hadd_pd(dm0,dm0); //SSE3

          dm1
=_mm_sqrt_sd(dm0,dm0); 
          dm0
=
_mm_div_sd(dm1,dm0); 
         
          lcpot
=
_mm_add_sd(lcpot,dm0);
      }
   }

   __m128d dm1;
   dm1=
_mm_unpackhi_pd(lcpot,lcpot);
   lcpot
=
_mm_add_sd(lcpot,dm1);
   pot
+=*(double*)(&
lcpot);
   
return 0
;
}

代码执行时间          2.22秒   该修改加速比:104.1%    相对于原始代码加速比:178.8%

线程化computePot函数来达到并行使用两个CPU的目的
也就是将computePot的内部运算改写为支持并行运算的形式,并合理划分成多个任务交给线程执行;
程序实现了一个比较通用的工作线程池,来利用多核CPU完成
代码需要 #include   <vector> #include   <process.h>
并在编译器中设定项目的属性:运行时库 为 多线程(/MT)

直接给出全部实现代码:代码执行时间          1.16秒    该修改加速比:191.4%    相对于原始代码加速比:342.2%
并行代码使程序性能得到了巨大的提高!!

//* compute the potential energy of a collection of */
//* particles interacting via pairwise potential    */
 
#include 
<stdio.h>
#include 
<stdlib.h>
#include 
<math.h>
#include 
<windows.h>
#include 
<time.h>
#include   
<emmintrin.h> //使用SSE2
#include   <intrin.h>    //使用SSE3
#include   <vector>
#include   
<process.h> //使用线程

#define NPARTS 1000
#define NITER 201
#define DIMS 3

const int DefthreadCount=2//设为1则为单线程执行,如果有4个核可以设为4:)

int rand( void );
int computePot(void);
void initPositions(void);
void updatePositions(void);
 
__declspec(align(
16))double r[NPARTS][DIMS+1];

double pot;
double distx, disty, distz, dist;

int main() {
   
int i;
   clock_t start, stop;

   initPositions();
   updatePositions();
 
   start=clock();
   
for( i=0; i<NITER; i++ ) {
      pot 
= 0.0;
      computePot();
      
if (i%10 == 0) printf("%5d: Potential: %10.7f  ", i, pot);
      updatePositions();
   }
   stop
=clock();
   printf (
"Seconds = %10.9f  ",(double)(stop-start)/ CLOCKS_PER_SEC);
   getchar();
}
 

void initPositions() {
   
int i, j;
 
   
for( i=0; i<DIMS; i++ )
      
for( j=0; j<NPARTS; j++ ) 
         r[j][i] 
= 0.5 + ( (double) rand() / (double) RAND_MAX );
}
 
 
void updatePositions() {
   
int i, j;
 
   
for( i=0; i<DIMS; i++ )
      
for( j=0; j<NPARTS; j++ )
         r[j][i] 
-= 0.5 + ( (double) rand() / (double) RAND_MAX );
}
 

struct TWorkData
{
    
long     iBegin;
    
long     iEnd;
    
double    fResult;
};

void computePotPart(TWorkData* work_data) {
   
int i, j;
   __m128d lcpot;

  lcpot=_mm_setzero_pd();
  
for( i=work_data->iBegin; i<work_data->iEnd; ++i ) {
      __m128d posi_l
=*(__m128d*)(&r[i][0]);
      __m128d posi_h
=*(__m128d*)(&r[i][2]);
      j
=0;
      
for(;j+1<i;++j)
      {
          __m128d dm0_l
=_mm_sub_pd(*(__m128d*)(&r[j][0]),posi_l);
          __m128d dm0_h
=_mm_sub_pd(*(__m128d*)(&r[j][2]),posi_h);
          dm0_l
=_mm_mul_pd(dm0_l,dm0_l);
          dm0_h
=_mm_mul_pd(dm0_h,dm0_h);

          __m128d dm0=_mm_add_pd(dm0_l,dm0_h);
          __m128d dm1;
          
//dm1=_mm_unpackhi_pd(dm0,dm0);
          
//dm0=_mm_add_sd(dm0,dm1);
          dm0=_mm_hadd_pd(dm0,dm0); //SSE3

          dm1
=_mm_sqrt_sd(dm0,dm0); 
          dm0
=_mm_div_sd(dm1,dm0); 
         
          lcpot
=_mm_add_sd(lcpot,dm0);
      }
   }

   __m128d dm1;
   dm1=_mm_unpackhi_pd(lcpot,lcpot);
   lcpot
=_mm_add_sd(lcpot,dm1);
   work_data
->fResult=*(double*)(&lcpot);
}

/////////////////////////////////////////////////////////////
//工作线程池 TWorkThreadPool
//用于把一个任务拆分成多个线程任务
//要求每个小任务任务量相近
//todo:改成任务领取模式
class TWorkThreadPool;

typedef void (*TThreadCallBack)(void * pData);
enum TThreadState{ thrStartup=0, thrReady,  thrBusy, thrTerminate, thrDeath };

class TWorkThread
{
public:
    
volatile HANDLE             thread_handle;
    
volatile enum TThreadState  state;
    
volatile TThreadCallBack    func;
    
volatile void *             pdata;  //work data     
    volatile HANDLE             waitfor_event;
    TWorkThreadPool
*            pool;
    
volatile DWORD              thread_ThreadAffinityMask;

    TWorkThread() { memset(this,0,sizeof(TWorkThread));  }
};

void do_work_end(TWorkThread* thread_data);

void __cdecl thread_dowork(TWorkThread*

【上篇】
【下篇】

抱歉!评论已关闭.