因需要改写将服务器读取文件同步IO为异步IO,因此在自己机子上编写了这个根据文件列表随机不间断读取文件块得操作
环境:CentOS4.8 512M内存,2个1.8 cpu
编译器:g++4.5
库:librt
文件列表格式:文件绝对路径+文件名 空格 文件总长度
提示:最好选取文件大点
/root/work/data/[www.dy2018.com]jixieshi.rmvb 371324360
/root/work/data/oracle9i_system01.dbf 398467072
编译: g++4.5 chlaws_aio.c -lrt
程序如下:
程序还可以大幅度优化,这里只是简单实现下,有问题可以留言,转载请注明文章出处.
//FILE *fp=fopen(file,"rb+");
int ix=0;
int nread=0,nret=0;
//char rbuf[100][1024*256];
//fd_map:filename-fd
map<string,int> fd_map;
fd_map.clear();
struct AIO_INFO aio_info;
list<struct AIO_INFO> aio_list;
list<int>err_fd;
srand(time(NULL));
int last_time = 0;
int now;
int count = 0;
while(1)
{
now = time(NULL);
if(now - last_time > 120)
{
cout << "aio_list size:" << aio_list.size() << endl;
count ++;
if (count >= 5)
break;
last_time = now;
}
fd_map.clear();
aio_list.clear();
ix = 0;
aio_list.clear();
list<int>::iterator tmp_fd = err_fd.begin();
cout << "err_fd size:" << err_fd.size() << endl;
while( tmp_fd != err_fd.end())
{
close(*tmp_fd);
err_fd.erase(tmp_fd++);
}
for( ; ix<100; ix++){
map<string,int>::iterator file_iter = file_list.begin();
int req = rand()%file_list.size();
//add fd to fd_map
int fd=-1;
//int need_insert_fd_map=0;
while(req--)file_iter++;
string reqfile;
if(file_iter != file_list.end())
reqfile=file_iter->first;
else
continue;
if(reqfile.size() == 0) break;
map<string,int>::iterator fd_iter = fd_map.find(reqfile);
if(fd_iter != fd_map.end())
{
fd = fd_iter->second;
}
else
{
fd = open(reqfile.c_str(),O_RDONLY);
if(fd == -1) {cout << "open error:" << strerror(errno) << endl; exit(-1);}
else fd_map.insert(make_pair(reqfile,fd));
//need_insert_fd_map = 1;
}
int filesize = file_iter->second;
int blk = rand()%(filesize/block);
//cout << "req file:" << reqfile << "/tblock:" << blk << endl;
//struct aiocb *my_aiocb =(struct aiocb*)malloc(sizeof(struct aiocb));
//bzero(&aio_info.my_aiocb,sizeof(struct aiocb));
//aio_info.my_aiocb = (struct aiocb*)malloc(sizeof(struct aiocb));
bzero(&aiocb_list[ix],sizeof(struct aiocb));
aio_info.my_aiocb = &aiocb_list[ix];
aio_info.my_aiocb->aio_fildes = fd;
aio_info.my_aiocb->aio_buf = (char*)malloc(block+1);
aio_info.my_aiocb->aio_nbytes = block;
aio_info.my_aiocb->aio_offset = blk*block;
//memcpy((char*)&(aio_info.my_aiocb),(char*)&my_aiocb,sizeof (my_aiocb));
//aio_info.my_aiocb = &my_aiocb;
aio_info.reqblock=blk;
aio_info.filename=reqfile;
aio_info.filesize=filesize;
//aio_list.insert(aio_list.begin(),aio_info);
//printf("aio_read , aiocb address:%x/n",&aio_info.my_aiocb);
int ret = aio_read(&aiocb_list[ix]);
if(ret == -1)
{
cout << "aio_read error " << "reqfile:" << reqfile << " block:" << blk << endl;
//free((void*)aio_info.my_aiocb);
//bzero(&aio_info,sizeof(struct AIO_INFO));
/*
map<string,int>::iterator del_fd = fd_map.find(reqfile);
if(del_fd != fd_map.end())
{
close(del_fd->second);
fd_map.erase(del_fd);
}
*/
err_fd.insert(err_fd.begin(),fd);
continue;
}
cout << "req file:" << reqfile << "/tblock:" << blk << endl;
aio_list.insert(aio_list.begin(),aio_info);
//if(need_insert_fd_map == 1) fd_map.insert(make_pair(reqfile,fd));
/*
while(aio_error(&my_aiocb) == EINPROGRESS);
ret = aio_return(&my_aiocb);
if(ret > 0)
cout << "aio_return ok, ret=" << ret <<"/n";
else
cout << "aio_return error/n";
*/
}
if(aio_list.size() == 0)
{
continue;
}
int ret = -1;
int wait_last = 0;
int wait_now = 0;
count = 0;
while(1)
{
wait_now = time(NULL);
if(wait_now - wait_last > 3)
{
cout << "[in while] wait 30s,aio_list size:" << aio_list.size()
<< "fd_map size:" << fd_map.size() << endl;
if( count ++ > 3)
{
cout << "timeout exit while loop /n";
list<struct AIO_INFO>::iterator del_it = aio_list.begin();
while(del_it!=aio_list.end())
{
free((void*)del_it->my_aiocb->aio_buf);
//free((void*)del_it->my_aiocb);
aio_list.erase(del_it++);
}
break;
}
if(count == 1) usleep(1000);
wait_last = wait_now;
}
if(aio_list.size() == 0)
{
cout << "[in while] the aio_list size : 0,exit while loop/n";
break;
}
list<struct AIO_INFO>::iterator aio_iter = aio_list.begin();
for( ;aio_iter!=aio_list.end();)
{
ret = -1;
// cout << "in aio_list,fd:" << aio_iter->my_aiocb.aio_fildes
// << " filename:" << aio_iter->filename << " block:" << aio_iter->reqblock << endl;
//printf("aio_read , aiocb address:%x/n",&aio_iter->my_aiocb);
if( (ret=aio_error((aio_iter->my_aiocb))) == -1)
{
free((void*)aio_iter->my_aiocb->aio_buf);
//free((void*)aio_iter->my_aiocb);
aio_list.erase(aio_iter++);
cout << "aio_error" << endl;
continue;
}
else if( ret == EINPROGRESS )
{
aio_iter++;
//cout << "async io this req not ready" << endl;
continue;
}
else if(ret == 0)
{
//cout << "request :"<<aio_iter->filename << "/tblock:" << aio_iter->reqblock << "/taio_error ok/n";
ret = aio_return((aio_iter->my_aiocb));
if(ret == block)
{
cout << "aio_return ok, this req:" << aio_iter->filename
<< "/treq block:" << aio_iter->reqblock << endl;
free((void*)aio_iter->my_aiocb->aio_buf);
//free((void*)aio_iter->my_aiocb);
aio_list.erase(aio_iter++);
}
else
{
cout << "aio_return error,ret :" << ret << endl;
free((void*)aio_iter->my_aiocb->aio_buf);
//free((void*)aio_iter->my_aiocb);
aio_list.erase(aio_iter++);
}
}
}
}//end while wait aio return
//free buf
list<struct AIO_INFO>::iterator aio_iter = aio_list.begin();
for( ; aio_iter!=aio_list.end() ; )
{
free((void*)aio_iter->my_aiocb->aio_buf);
//free((void*)aio_iter->my_aiocb);
aio_list.erase(aio_iter++);
}
if(aio_list.size() != 0)
{
cout << "error:aio_list size != 0" << endl;
}
//close fd
map<string,int>::iterator fd_iter = fd_map.begin();
for( ; fd_iter!=fd_map.end() ; )
{
cout << "close fd:" << fd_iter->second << endl;
close(fd_iter->second);
fd_map.erase(fd_iter++);
}
usleep(2*1000);
}
return 0;
}