HouSisong@263.net 2007.01.20
英特尔最近举办了一次多核平台编码优化大赛,我也参加了这次比赛;大赛代码提交阶段已经结束,
所以也可以公开自己的优化过程;
去年末双核处理器出货量开始超过单核处理器出货量,英特尔也发布了4核CPU;AMD今年也将发布4核,
并计划今年下半年发布8核,多核已经大势所趋;而以前的CPU性能增长,软件界都能很自然的获得相应的
性能增长,而对于这次多核方式带来的CPU性能爆炸性增长,软件界的免费午餐没有了,程序开发人员必须
手工的完成软件的并行化;而很多程序员、计算机教育界等对并行编程的准备并不充足(包括我);英特尔
在这种背景下举办多核平台编码优化大赛是很有现实意义的;
本文章主要介绍了自己参加竞赛过程中,对代码的优化过程;
我测试和优化过程中用的 CPU:AMD64x2 3600+ (双核CPU) (开发初期还使用过AMD3000+)
操作系统:Windows XP 32bit
编译器:Visual Studio 2005
大赛公布的原始代码:
(组织者后来要求计算和输出精度到小数点后7位,这里的输出代码做了相应的调整。)
//* particles interacting via pairwise potential */
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <windows.h>
#include <time.h>
#define NPARTS 1000
#define NITER 201
#define DIMS 3
int rand( void );
int computePot(void);
void initPositions(void);
void updatePositions(void);
double r[DIMS][NPARTS];
double pot;
double distx, disty, distz, dist; int main() {
int i;
clock_t start, stop;
initPositions();
updatePositions();
start
for( i=0; i<NITER; i++ ) {
pot = 0.0;
computePot();
if (i%10 == 0) printf("%5d: Potential: %10.7f ", i, pot);
updatePositions();
}
stop=clock();
printf ("Seconds = %10.9f ",(double)(stop-start)/ CLOCKS_PER_SEC);
}
void initPositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[i][j] = 0.5 + ( (double) rand() / (double) RAND_MAX );
}
void updatePositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[i][j] -= 0.5 + ( (double) rand() / (double) RAND_MAX );
}
int computePot() {
int i, j;
for( i=0; i<NPARTS; i++ ) {
for( j=0; j<i-1; j++ ) {
distx = pow( (r[0][j] - r[0][i]), 2 );
disty = pow( (r[1][j] - r[1][i]), 2 );
distz = pow( (r[2][j] - r[2][i]), 2 );
dist = sqrt( distx + disty + distz );
pot += 1.0 / dist;
}
}
return 0;
}
代码执行时间 3.97秒
整个代码的代码量还是很小的,而且很容易发现computePot是一个O(N*N)的算法,运行时间上它的消耗最大,
占了总运行时间的95%以上(建议使用工具来定位程序的热点);
对于computePot函数的优化,最容易看到的优化点就是pow函数的调用,pow(x,y)函数是一个的很耗时的操
作(很多库,会针对y为整数做优化),把pow(x,y)改成x*x的方式;
修改后的computePot代码如下(其它代码不变):
int computePot() {
int i, j;
for( i=0; i<NPARTS; i++ ) {
for( j=0; j<i-1; j++ ) {
distx = pow2( (r[0][j] - r[0][i]) );
disty = pow2( (r[1][j] - r[1][i]) );
distz = pow2( (r[2][j] - r[2][i]) );
dist = sqrt( distx + disty + distz );
pot += 1.0 / dist;
}
}
return 0;
}
代码执行时间 2.50秒 该修改加速比:158.8% 相对于原始代码加速比:158.8%
由于computePot中对数组r的访问顺序和r定义的顺序方式不一致,尝试修改r的数据组织结构;
将double r[DIMS][NPARTS] 的定义 改为 double r[NPARTS][DIMS];
(后面证明这个修改使自己走了很大的弯路;原来的向量格式在使用SSE2优化的时候,代码上会稍微简单一点)
也相应的修改了其他访问r的代码:
#include <stdlib.h>
#include <math.h>
#include <windows.h>
#include <time.h>
#define NPARTS 1000
#define NITER 201
#define DIMS 3
int rand( void );
int computePot(void);
void initPositions(void);
void updatePositions(void);
double r[NPARTS][DIMS];
double pot;
double distx, disty, distz, dist; int main() {
int i;
clock_t start, stop;
initPositions();
updatePositions();
start
for( i=0; i<NITER; i++ ) {
pot = 0.0;
computePot();
if (i%10 == 0) printf("%5d: Potential: %10.7f ", i, pot);
updatePositions();
}
stop=clock();
printf ("Seconds = %10.9f ",(double)(stop-start)/ CLOCKS_PER_SEC);
getchar();
}
void initPositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[j][i] = 0.5 + ( (double) rand() / (double) RAND_MAX );
}
void updatePositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[j][i] -= 0.5 + ( (double) rand() / (double) RAND_MAX );
}
inline double pow2(const double x) { return x*x; }
int computePot() {
int i, j;
for( i=0; i<NPARTS; i++ ) {
for( j=0; j<i-1; j++ ) {
distx = pow2( (r[j][0] - r[i][0]) );
disty = pow2( (r[j][1] - r[i][1]) );
distz = pow2( (r[j][2] - r[i][2]) );
dist = sqrt( distx + disty + distz );
pot += 1.0 / dist;
}
}
return 0;
}
代码执行时间 2.39秒 该修改加速比:104.6% 相对于原始代码加速比:166.1%
r可以看作一个3维空间点的数组,computePot函数就是求这些点之间距离的某种势能:)
为了将每个点的内存访问对齐到16byte边界,
将double r[NPARTS][DIMS] 的定义 改为 __declspec(align(16))double r[NPARTS][DIMS+1];
(其它代码不变)(速度略有下降,为使用SSE2优化作准备)
代码执行时间 2.42秒 该修改加速比: 98.8% 相对于原始代码加速比:164.0%
使用SSE2来优化computePot函数;SSE2是一种单指令流多数据流指令集,里面包含有能同时处理两个64bit浮点数的指令;
代码需要 #include <emmintrin.h> 文件,从而利用编译器来优化SSE2指令,就不用手工编写汇编代码了;
修改后的computePot代码如下(其它代码不变):
int i, j;
__m128d lcpot;
lcpot
=_mm_setzero_pd();for( i=0; i<NPARTS; i++ ) {
__m128d posi_l=*(__m128d*)(&r[i][0]);
__m128d posi_h=*(__m128d*)(&r[i][2]);
j=0;
for(;j+1<i;++j)
{
__m128d dm0_l=_mm_sub_pd(*(__m128d*)(&r[j][0]),posi_l);
__m128d dm0_h=_mm_sub_pd(*(__m128d*)(&r[j][2]),posi_h);
dm0_l=_mm_mul_pd(dm0_l,dm0_l);
dm0_h=_mm_mul_pd(dm0_h,dm0_h);
__m128d dm0
=_mm_add_pd(dm0_l,dm0_h);__m128d dm1;
dm1=_mm_unpackhi_pd(dm0,dm0);
dm0=_mm_add_sd(dm0,dm1);
dm1
=_mm_sqrt_sd(dm0,dm0);dm0=_mm_div_sd(dm1,dm0);
lcpot=_mm_add_sd(lcpot,dm0);
}
}
__m128d dm1;
dm1
lcpot=_mm_add_sd(lcpot,dm1);
pot+=*(double*)(&lcpot);
return 0;
}
代码执行时间 2.31秒 该修改加速比:104.8% 相对于原始代码加速比:171.9%
(时间提高不多,主要时间耗在了_mm_sqrt_sd、_mm_div_sd指令上,所以SSE2的优化效果没有体现出来)
SSE3中有一条_mm_hadd_pd指令(水平加),可以用来改善一点点代码,
程序需要#include <intrin.h>文件:
int i, j;
__m128d lcpot;
lcpot
=_mm_setzero_pd();for( i=0; i<NPARTS; i++ ) {
__m128d posi_l=*(__m128d*)(&r[i][0]);
__m128d posi_h=*(__m128d*)(&r[i][2]);
j=0;
for(;j+1<i;++j)
{
__m128d dm0_l=_mm_sub_pd(*(__m128d*)(&r[j][0]),posi_l);
__m128d dm0_h=_mm_sub_pd(*(__m128d*)(&r[j][2]),posi_h);
dm0_l=_mm_mul_pd(dm0_l,dm0_l);
dm0_h=_mm_mul_pd(dm0_h,dm0_h);
__m128d dm0
=_mm_add_pd(dm0_l,dm0_h);__m128d dm1;
//dm1=_mm_unpackhi_pd(dm0,dm0);
//dm0=_mm_add_sd(dm0,dm1);
dm0=_mm_hadd_pd(dm0,dm0); //SSE3
dm1=_mm_sqrt_sd(dm0,dm0);
dm0=_mm_div_sd(dm1,dm0);
lcpot=_mm_add_sd(lcpot,dm0);
}
}
__m128d dm1;
dm1
lcpot=_mm_add_sd(lcpot,dm1);
pot+=*(double*)(&lcpot);
return 0;
}
代码执行时间 2.22秒 该修改加速比:104.1% 相对于原始代码加速比:178.8%
线程化computePot函数来达到并行使用两个CPU的目的;
也就是将computePot的内部运算改写为支持并行运算的形式,并合理划分成多个任务交给线程执行;
程序实现了一个比较通用的工作线程池,来利用多核CPU完成
代码需要 #include <vector> #include <process.h>
并在编译器中设定项目的属性:运行时库 为 多线程(/MT)
直接给出全部实现代码:代码执行时间 1.16秒 该修改加速比:191.4% 相对于原始代码加速比:342.2%
并行代码使程序性能得到了巨大的提高!!
//* particles interacting via pairwise potential */
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <windows.h>
#include <time.h>
#include <emmintrin.h> //使用SSE2
#include <intrin.h> //使用SSE3
#include <vector>
#include <process.h> //使用线程
#define NPARTS 1000
#define NITER 201
#define DIMS 3 const int DefthreadCount=2; //设为1则为单线程执行,如果有4个核可以设为4:)
int rand( void );
int computePot(void);
void initPositions(void);
void updatePositions(void);
__declspec(align(16))double r[NPARTS][DIMS+1]; double pot;
double distx, disty, distz, dist; int main() {
int i;
clock_t start, stop;
initPositions();
updatePositions();
start
for( i=0; i<NITER; i++ ) {
pot = 0.0;
computePot();
if (i%10 == 0) printf("%5d: Potential: %10.7f ", i, pot);
updatePositions();
}
stop=clock();
printf ("Seconds = %10.9f ",(double)(stop-start)/ CLOCKS_PER_SEC);
getchar();
}
void initPositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[j][i] = 0.5 + ( (double) rand() / (double) RAND_MAX );
}
void updatePositions() {
int i, j;
for( i=0; i<DIMS; i++ )
for( j=0; j<NPARTS; j++ )
r[j][i] -= 0.5 + ( (double) rand() / (double) RAND_MAX );
}
struct TWorkData
{
long iBegin;
long iEnd;
double fResult;
}; void computePotPart(TWorkData* work_data) {
int i, j;
__m128d lcpot;
lcpot
=_mm_setzero_pd();for( i=work_data->iBegin; i<work_data->iEnd; ++i ) {
__m128d posi_l=*(__m128d*)(&r[i][0]);
__m128d posi_h=*(__m128d*)(&r[i][2]);
j=0;
for(;j+1<i;++j)
{
__m128d dm0_l=_mm_sub_pd(*(__m128d*)(&r[j][0]),posi_l);
__m128d dm0_h=_mm_sub_pd(*(__m128d*)(&r[j][2]),posi_h);
dm0_l=_mm_mul_pd(dm0_l,dm0_l);
dm0_h=_mm_mul_pd(dm0_h,dm0_h);
__m128d dm0
=_mm_add_pd(dm0_l,dm0_h);__m128d dm1;
//dm1=_mm_unpackhi_pd(dm0,dm0);
//dm0=_mm_add_sd(dm0,dm1);
dm0=_mm_hadd_pd(dm0,dm0); //SSE3
dm1=_mm_sqrt_sd(dm0,dm0);
dm0=_mm_div_sd(dm1,dm0);
lcpot=_mm_add_sd(lcpot,dm0);
}
}
__m128d dm1;
dm1
lcpot=_mm_add_sd(lcpot,dm1);
work_data->fResult=*(double*)(&lcpot);
} /////////////////////////////////////////////////////////////
//工作线程池 TWorkThreadPool
//用于把一个任务拆分成多个线程任务
//要求每个小任务任务量相近
//todo:改成任务领取模式
class TWorkThreadPool;
typedef
void (*TThreadCallBack)(void * pData);enum TThreadState{ thrStartup=0, thrReady, thrBusy, thrTerminate, thrDeath }; class TWorkThread
{
public:
volatile HANDLE thread_handle;
volatile enum TThreadState state;
volatile TThreadCallBack func;
volatile void * pdata; //work data
volatile HANDLE waitfor_event;
TWorkThreadPool* pool;
volatile DWORD thread_ThreadAffinityMask;
TWorkThread() { memset(
this,0,sizeof(TWorkThread)); }}; void do_work_end(TWorkThread* thread_data); void __cdecl thread_dowork(TWorkThread*