现在的位置: 首页 > 综合 > 正文

gpu排序

2018年03月18日 ⁄ 综合 ⁄ 共 12699字 ⁄ 字号 评论关闭

单机版的双调排序可以参考 http://blog.csdn.net/sunmenggmail/article/details/42869235

还是这张图片

基于cuda的双调排序的思路是:

为每一个元素提供一个线程,如果大于1024个元素,还是提供1024个线程,这是因为__syncthreads只能作为block内的线程同步,而一个block最多有1024个线程,如果元素个数大于1024则每个线程可能就要负责一个以上的元素的比较

就上图而言,一个矩形代表一次多线程的比较,那么此图仅需要6次比较,就可以有右边的输出。


#include <vector>
#include <algorithm>
#include <iostream>
#include <time.h>
#include <sys/time.h>
#include <string.h>
#include <math.h>
#include <stdlib.h>
#include <stdio.h>

using namespace std;

#define CHECK_EQ1(a,b) do { \
	if ((a) != (b)) { \ 
		cout <<__FILE__<<" : "<< __LINE__<<" : check failed because "<<a<<"!="<<b<<endl;\
		cout << cudaGetErrorString(a) <<endl;\
		exit(1);\
	}\
} while(0)

#define CUDA_CHECK(condition)\
do {\
	cudaError_t error = condition;\
	CHECK_EQ1(error, cudaSuccess);\
} while(0)

static __device__ __forceinline__ unsigned int __btflo(unsigned int word)
{
    unsigned int ret;
    asm volatile("bfind.u32 %0, %1;" : "=r"(ret) : "r"(word));
	//return the index of highest non-zero bit in a word; for example, 00000110, return 2
    return ret;
}

//for > 1024
__global__ void bigBinoticSort(unsigned int *arr, int len, unsigned int *buf) {
	unsigned len2 = 1 << (__btflo(len-1u) + 1);//
	
	unsigned int MAX = 0xffffffffu;
	unsigned id = threadIdx.x;
	if (id >= len2) return;
	unsigned iter = blockDim.x;
	for (unsigned i = id; i < len2; i += iter) {
		if (i >= len) {
			buf[i-len] = MAX;
		}
	}
	
	__syncthreads();
	
	int count = 0;
	for (unsigned k = 2; k <= len2; k*=2) {
		for (unsigned j = k >> 1; j > 0; j >>= 1) {
			for (unsigned i = id; i < len2; i += iter) {
				unsigned swapIdx = i ^ j;
				
				if (swapIdx > i) {
					unsigned myelem, other;
					if (i < len) myelem = arr[i];
					else myelem = buf[i-len];
					
					if (swapIdx < len) other = arr[swapIdx];
					else other = buf[swapIdx-len];
					
					bool swap = false;
					
					if ((i & k)==0 && myelem > other) swap = true;
					if ((i & k) == k && myelem < other) swap = true;
					
					if (swap) {
						if (swapIdx < len) arr[swapIdx] = myelem; 
						else buf[swapIdx-len] = myelem;
						
						if (i < len) arr[i] = other;
						else buf[i-len] = other;
					}
				}
			}
			__syncthreads();
			
			
		}
	}
}

//for <= 1024
__global__ void binoticSort(unsigned int *arr, int len) {
	__shared__  unsigned int buf[1024];
	buf[threadIdx.x] = (threadIdx.x < len ? arr[threadIdx.x] : 0xffffffffu);
	__syncthreads();
	
	for (unsigned k = 2; k <= blockDim.x; k*=2) {//buid k elements ascend or descend
		for (unsigned j = k >> 1; j > 0; j >>= 1) {//merge longer binotic into shorter binotic
			unsigned swapIdx = threadIdx.x ^ j;
			unsigned myelem = buf[threadIdx.x];
			unsigned other = buf[swapIdx];
			
			__syncthreads();
			
			unsigned ascend = k * (swapIdx < threadIdx.x);
			unsigned descend = k * (swapIdx > threadIdx.x);
			//if I is front, swap is back; ascend = 0, descend = k
			//if I is back, swap is front; ascend = k, descend = 0;
			bool swap = false;
			if ((threadIdx.x & k) == ascend) {
				if (myelem > other) swap = true;
			}
			if ((threadIdx.x & k) == descend) {
				if (myelem < other) swap = true;
			}
			
			if (swap) buf[swapIdx] = myelem;
			__syncthreads();
		}
	}
	
	if (threadIdx.x < len) arr[threadIdx.x] = buf[threadIdx.x];
}

template<class T>
inline void printVec(T *vec, int len) {
	for (int i = 0; i < len; ++i) cout <<vec[i] << "\t";
	cout << endl;
}

template<class T>
inline void printVecg(T *gvec, int len) {
	T *vec = (T*)malloc(sizeof(T)*len);
	CUDA_CHECK(cudaMemcpy(vec,gvec,sizeof(T)*len,cudaMemcpyDeviceToHost));
	printVec(vec,len);
	free(vec);
}

void lineSize(int N, int &nblocks, int &nthreads) {
	if (N <= 1024) {
		nthreads = (N + 32 - 1)/32*32;//
	}
	else {
		nblocks = (N + 1024 -1)/1024;
	}
}

bool validate(unsigned *gvec, int len) {
	unsigned *vec = (unsigned*)malloc(sizeof(unsigned)*len);
	CUDA_CHECK(cudaMemcpy(vec,gvec,sizeof(unsigned)*len,cudaMemcpyDeviceToHost));
	for(int i = 1; i < len; ++i) {
		if (vec[i] <= vec[i-1]) return false;
	}
	return true;
}

inline int roundUpPower2(int v) {
	v--;
	v |= v >> 1;
	v |= v >> 2;
	v |= v >> 4;
	v |= v >> 8;
	v |= v >> 16;

	v++;
	return v;
}



int main(int argc, char *argv[]) {
	if (argc != 2) {
		cout << "len \n";
		return;
	}
	int len = atoi(argv[1]);
	unsigned int *arr = (unsigned int*)malloc(sizeof(unsigned int)*len);
	for (int i = 0; i < len; ++i) arr[i] = i;
	srand((unsigned int)time(NULL));
	for (int i = len; i >= 2; --i) {
		int j = rand() % i;
		swap(arr[i-1], arr[j]);
	}
	
	
	
	unsigned* debug;
	CUDA_CHECK(cudaMalloc((void**)&debug, sizeof(unsigned)*1000));
	
	unsigned int* darr, *buf;
	CUDA_CHECK(cudaMalloc((void**)&darr, sizeof(unsigned int)*len));
	CUDA_CHECK(cudaMalloc((void**)&buf, sizeof(unsigned int)*len));
	CUDA_CHECK(cudaMemcpy(darr, arr, sizeof(unsigned int)*len, cudaMemcpyHostToDevice));
	
	
	bigBinoticSort<<<1,1024>>>(darr,len, buf);
	CUDA_CHECK(cudaPeekAtLastError());
	CUDA_CHECK(cudaDeviceSynchronize());
	
	if (validate(darr, len))
		cout << "yes\n";
	else
		cout << "no\n";
	
	
	return 1;
}

算法有两个双调排序实现,一个用于小于1024个元素,用到了共享内存加快访问速度,但是如果真要排序1024以下的元素,建议还是用cpu版本的快排吧,gpu的在速度上并没有明显的优势,甚至还比cpu慢

如果大于1024元素,就采用另一种方法。这种方法的缺点也是很明显的,就是不管再多的元素,只能用一个block进行计算,而一个block最多只能用1024个线程,估计在一万个元素以内的话,这个方法是gpu上最快的。

经过本人测试,包括thrust的sort(基数排序), 只有元素数量超过5000个,gpu上的排序算法才有明显的优势。10万左右的元素,gpu上的排序算法比cpu有一百倍的提速。

下面会介绍在gpu上进行快速排序。gpu快速排序可以处理非常大的数据,但是会有递归深度的限制,当超过递归深度时,就可以调用上面所讲的双调排序进行处理。测试表明,速度比thrust还是快一点

gpu上的快排主要参考样例 NVIDIA_CUDA-6.5_Samples/6_Advanced/cdpAdvancedQuicksort

快排只处理大于1024个元素的数组,然后将其分隔为左右两个子数组,如果子数组长度大于1024则继续动态递归调用快排,如果小于1024则动态调用双调排序。如果快排的递归深度已经超过最大递归深度(cuda最大嵌套深度64,但是还受限于每一级所使用的内存大小),则直接调用双调排序。

这段程序的最精彩的地方在于分隔函数

将数组按照warp大小进行分隔,每个warp处理32个元素,通过全局的atomicAdd函数,分别获得warp内的小于和大于pivot数在数组的偏移地址,注意在同一个warp内,这个偏移地址是一样的,然后每个线程将自己的元素放到偏移地址,这样就完成了分割

需要注意的是,这个快排不是in-place的,又涉及到递归调用,所以还得处理原数组和缓冲区的调换

由于cuda没有显式的锁,此方法采用了一种特殊的循环队列,本人认为在极端情况下,可能会出现问题

(这里的代码有错,没有处理原数组和缓冲区的调换,只是帮助理解。正确的代码请参考Samples里的)

#define QSORT_BLOCKSIZE_SHIFT   9
#define QSORT_BLOCKSIZE         (1 << QSORT_BLOCKSIZE_SHIFT)
#define BITONICSORT_LEN         1024            // Must be power of 2!
#define QSORT_MAXDEPTH          16              // Will force final bitonic stage at depth QSORT_MAXDEPTH+1
#define QSORT_STACK_ELEMS   1*1024*1024 // One million stack elements is a HUGE number.

typedef struct __align__(128) qsortAtomicData_t
{
    volatile unsigned int lt_offset;    // Current output offset for <pivot
    volatile unsigned int gt_offset;    // Current output offset for >pivot
    volatile unsigned int sorted_count; // Total count sorted, for deciding when to launch next wave
    volatile unsigned int index;        // Ringbuf tracking index. Can be ignored if not using ringbuf.
} qsortAtomicData;

////////////////////////////////////////////////////////////////////////////////
// A ring-buffer for rapid stack allocation
////////////////////////////////////////////////////////////////////////////////
typedef struct qsortRingbuf_t
{
    volatile unsigned int head; //1        // Head pointer - we allocate from here
    volatile unsigned int tail; //0        // Tail pointer - indicates last still-in-use element
    volatile unsigned int count;//0        // Total count allocated
    volatile unsigned int max;  //0        // Max index allocated
    unsigned int stacksize;  //           // Wrap-around size of buffer (must be power of 2)
    volatile void *stackbase;           // Pointer to the stack we're allocating from
} qsortRingbuf;


/*
for cuda has no lock, so we have to do like this:
if alloc , ++head
if free , ++tail
so [tail, head) contains alloced chunks; head point to the next free chunk
count record the number of chunks had free
we have n chunks, but the index of a chunk is increase when re-alloc
max record the maximum index of the free chunks
only if the chunks before max are all free, aka, max == count, we can alter tail value 
*/
template<class T>
static __device__ void ringbufFree(qsortRingbuf *ringbuf, T *data) {
	unsigned index = data->index;
	unsigned count = atomicAdd((unsigned*)&(ringbuf->count), 1) + 1;
	unsigned max = atomicMax((unsigned*)&(ringbuf->max), index + 1);
	
	if (max < (index + 1)) max = index + 1;
	if (max == count) {
		atomicMax((unsigned*)&(ringbuf->tail), count);
	}
}

template<class T>
static __device__ T* ringbufAlloc(qsortRingbuf *ringbuf) {
	unsigned int loop = 10000;
	while (((ringbuf->head - ringbuf->tail) >= ringbuf->stacksize) && (loop-- > 0));
	if (loop == 0) return NULL;
	
	unsigned index = atomicAdd((unsigned*)&ringbuf->head, 1);
	T *ret = (T*)(ringbuf->stackbase) + (index & (ringbuf->stacksize - 1));
	ret->index = index;
	
	return ret;
}



__global__ void qsort_warp(unsigned *indata,
                           unsigned *outdata,
                           unsigned int offset,//0
                           unsigned int len,//
                           qsortAtomicData *atomicData,//stack
                           qsortRingbuf *atomicDataStack,//ringbuf
                           unsigned int source_is_indata,//true
                           unsigned int depth) 
{
	//printf("depth = %d", depth);
	    // Find my data offset, based on warp ID
    unsigned int thread_id = threadIdx.x + (blockIdx.x << QSORT_BLOCKSIZE_SHIFT);
    //unsigned int warp_id = threadIdx.x >> 5;   // Used for debug only
    unsigned int lane_id = threadIdx.x & (warpSize-1);// %32

    // Exit if I'm outside the range of sort to be done
    if (thread_id >= len)
        return;

    //
    // First part of the algorithm. Each warp counts the number of elements that are
    // greater/less than the pivot.
    //
    // When a warp knows its count, it updates an atomic counter.
    //

    // Read in the data and the pivot. Arbitrary pivot selection for now.
    unsigned pivot = indata[offset + len/2];
    unsigned data  = indata[offset + thread_id];

    // Count how many are <= and how many are > pivot.
    // If all are <= pivot then we adjust the comparison
    // because otherwise the sort will move nothing and
    // we'll iterate forever.
    unsigned int greater = (data > pivot);
    unsigned int gt_mask = __ballot(greater);//Evaluate predicate for all active threads of the warp and return an integer whose Nth bit is set if and only if predicate evaluates to non-zero for the Nth thread of the warp and the Nth thread is active.
	
	if (gt_mask == 0) {
		greater = (data >= pivot);
		gt_mask = __ballot(greater);
	}
	
	unsigned lt_mask = __ballot(!greater);
	unsigned gt_count = __popc(gt_mask);//count number of 1 in a warp;
	unsigned lt_count = __popc(lt_mask);
	
	//only thread 0 in warp calc
	//find 2 new positions for this warp
	unsigned lt_oft, gt_oft;
	if (lane_id == 0) {
		if (lt_count > 0)
			lt_oft = atomicAdd((unsigned*)&atomicData->lt_offset, lt_count);//atomicAdd return old value, not the newer//all the warps will syn call this
		if (gt_count > 0)
			gt_oft = len - (atomicAdd((unsigned*) &atomicData->gt_offset, gt_count) + gt_count);
		
		//printf("depth = %d\n", depth);
		//printf("pivot = %u\n", pivot);
		//printf("lt_count %u lt_oft %u gt_count %u gt_oft %u  atomicDataGtOffset %u\n", lt_count,lt_oft, gt_count,gt_oft, atomicData->gt_offset);
	}
	
	lt_oft = __shfl((int)lt_oft, 0);
	gt_oft = __shfl((int)gt_oft, 0);//Everyone pulls the offsets from lane 0
	
	__syncthreads();
	 // Now compute my own personal offset within this. I need to know how many
    // threads with a lane ID less than mine are going to write to the same buffer
    // as me. We can use popc to implement a single-operation warp scan in this case.
    unsigned lane_mask_lt;
    asm("mov.u32 %0, %%lanemask_lt;" : "=r"(lane_mask_lt));//bits set in positions less than the thread's lane number the warp
	
	unsigned my_mask = greater ? gt_mask : lt_mask;
	unsigned my_oft = __popc(my_mask & lane_mask_lt);//
	
	//move data
	my_oft += greater ? gt_oft : lt_oft;
	outdata[offset + my_oft] = data;
	__syncthreads();
	
	//if (lane_id == 0) printf("pivot = %d", pivot);
	if (lane_id == 0) {
		/*if (blockIdx.x == 0) {
		printf("depth = %d\n", depth);
		for (int i = 0; i < len; ++i)
			printf("%u ", outdata[offset+i]);
		printf("\n");
		}*/
		
		
		unsigned mycount = lt_count + gt_count;
		//we are the last warp 
		if (atomicAdd((unsigned*)&atomicData->sorted_count, mycount) + mycount == len) {
			unsigned lt_len = atomicData->lt_offset;
			unsigned gt_len = atomicData->gt_offset;
				
			cudaStream_t lstream, rstream;
            cudaStreamCreateWithFlags(&lstream, cudaStreamNonBlocking);
            cudaStreamCreateWithFlags(&rstream, cudaStreamNonBlocking);
			
			ringbufFree<qsortAtomicData>(atomicDataStack, atomicData);
			
			if (lt_len == 0) return;
			//////////////////////////// left
			if (lt_len > BITONICSORT_LEN) {
				if (depth >= QSORT_MAXDEPTH) {
					bigBinoticSort<<<1, BITONICSORT_LEN,0, rstream>>>(outdata + offset, lt_len, indata + offset);
				}
				else {
					
					
					if ((atomicData = ringbufAlloc<qsortAtomicData>(atomicDataStack)) == NULL)
                        printf("Stack-allocation error. Failing left child launch.\n");
					else {
						atomicData->lt_offset = atomicData->gt_offset = atomicData->sorted_count = 0;
                        unsigned int numblocks = (unsigned int)(lt_len+(QSORT_BLOCKSIZE-1)) >> QSORT_BLOCKSIZE_SHIFT;
                        qsort_warp<<< numblocks, QSORT_BLOCKSIZE, 0, lstream >>>(outdata, indata, offset, lt_len, atomicData, atomicDataStack, true, depth+1);
					}
					
					
				}
			}
			else if (lt_len > 1) {
				unsigned int bitonic_len = 1 << (__btflo(lt_len-1U)+1);
                binoticSort<<< 1, bitonic_len, 0, lstream >>>(outdata + offset,lt_len);
			}
			////////////////////////// right
			if (gt_len > BITONICSORT_LEN) {
				if (depth >= QSORT_MAXDEPTH)
					bigBinoticSort<<<1, BITONICSORT_LEN,0, rstream>>>(outdata + offset + lt_len, gt_len, indata + offset + lt_len);
				else {
					
					if ((atomicData = ringbufAlloc<qsortAtomicData>(atomicDataStack)) == NULL)
                        printf("Stack allocation error! Failing right-side launch.\n");
					else {
						atomicData->lt_offset = atomicData->gt_offset = atomicData->sorted_count = 0;
                        unsigned int numblocks = (unsigned int)(gt_len+(QSORT_BLOCKSIZE-1)) >> QSORT_BLOCKSIZE_SHIFT;
                        qsort_warp<<< numblocks, QSORT_BLOCKSIZE, 0, rstream >>>(outdata, indata, offset+lt_len, gt_len, atomicData, atomicDataStack, true, depth+1);
					}
					
				}
			}
			else if (gt_len > 1) {
				unsigned int bitonic_len = 1 << (__btflo(gt_len-1U)+1);
                binoticSort<<< 1, bitonic_len, 0, rstream >>>(outdata + offset + lt_len,gt_len);
			}
			
		}
	}
}

void runqsort(unsigned *gpudata, unsigned *scratchdata, unsigned int count, cudaStream_t stream) {
	unsigned int stacksize = QSORT_STACK_ELEMS;//1*1024*1024

    // This is the stack, for atomic tracking of each sort's status
    qsortAtomicData *gpustack;
    CUDA_CHECK(cudaMalloc((void **)&gpustack, stacksize * sizeof(qsortAtomicData)));
    CUDA_CHECK(cudaMemset(gpustack, 0, sizeof(qsortAtomicData)));     // Only need set first entry to 0

    // Create the memory ringbuffer used for handling the stack.
    // Initialise everything to where it needs to be.
    qsortRingbuf buf;
    qsortRingbuf *ringbuf;
    CUDA_CHECK(cudaMalloc((void **)&ringbuf, sizeof(qsortRingbuf)));
    buf.head = 1;           // We start with one allocation
    buf.tail = 0;
    buf.count = 0;
    buf.max = 0;
    buf.stacksize = stacksize;
    buf.stackbase = gpustack;
    CUDA_CHECK(cudaMemcpy(ringbuf, &buf, sizeof(buf), cudaMemcpyHostToDevice));
	
	if (count > BITONICSORT_LEN)//1024
    {//QSORT_BLOCKSIZE = 2^9 = 512
		
        unsigned int numblocks = (unsigned int)(count+(QSORT_BLOCKSIZE-1)) >> QSORT_BLOCKSIZE_SHIFT;
        qsort_warp<<< numblocks, QSORT_BLOCKSIZE, 0, stream >>>(gpudata, scratchdata, 0U, count, gpustack, ringbuf, true, 0);
		
   }
    else
    {
        binoticSort<<< 1, BITONICSORT_LEN >>>(gpudata, count);
		CUDA_CHECK(cudaMemcpy(scratchdata, gpudata, sizeof(unsigned)*count, cudaMemcpyDeviceToDevice));
    }
	
	cudaDeviceSynchronize();
}

抱歉!评论已关闭.