现在的位置: 首页 > 综合 > 正文

OpenMP: 多线程文件操作

2013年10月26日 ⁄ 综合 ⁄ 共 13286字 ⁄ 字号 评论关闭

项目背景:

      为了提升项目的运行效率,考虑多线程技术。最近OpenMP技术很热,咱也凑凑热闹,也为了充分发挥电脑硬件的能力。

硬件:

      酷睿2双核 2.2GHz

      3G 内存

软件:

      Visual Studio 2010 旗舰版

      Windows 7 旗舰版 32bit

难点:

      由于多个线程操作同一个文件,很有可能存在线程冲突。

OpenMP:

      1. 必须的头文件 <omp.h>

      2. #pragma omp 预处理指示符指定要采用OpenMP。 例如通过 #pragma om parallel for 来指定下方的for循环采用多线程执行,此时编译器会根据CPU的个数来创建线程数。对于双核系统,编译器会默认创建两个线程执行并行区域的代码。

      示例代码:

 

#include <iostream>
#include <stdio.h>
#include <omp.h> // OpenMP编译需要包含的头文件

int main()
{
   #pragma omp parallel for
   for (int i = 0; i < 100; ++i)
   {
      std::cout << i << std::endl;
   }

   return 0;
}

      3. OpenMP 常用库函数

      函数原型                                         功能

      int omp_get_num_procs(void)      返回当前可用的处理器个数

      int omp_get_num_threads(void)  返回当前并行区域中活动线程的个数,如果在并行区域外部调用,返回1

      int omp_get_thread_num(void)    返回当前的线程号(omp_get_thread_ID更好一些)

      int omp_set_num_threads(void)   设置进入并行区域时,将要创建的线程个数

      3.1 并行区域

#pragma omp parallel  //大括号内为并行区域
{
    //put parallel code here.
}

      3.2 库函数示例

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
   std::cout << "Processors Number: " << omp_get_num_procs() << std::endl;  
  
   std::cout << "Parallel area 1" << std::endl;  
   #pragma omp parallel   
   {  
      std::cout << "Threads number: " << omp_get_num_threads() << std::endl;  
      std::cout << "; this thread ID is " << omp_get_thread_num() << std::endl;  
   }  
     
   std::cout << "Parallel area 2" << std::endl;  
   #pragma omp parallel   
   {  
      std::cout << "Number of threads: " << omp_get_num_threads() << std::endl;  
      std::cout << "; this thread ID is " << omp_get_thread_num() << std::endl;  
   }  
  
   return 0;  
}  

      3.3 for循环并行化的基本用法

      3.3.1 数据不相关性

      利用openmp实现for循环的并行化,需满足数据的不相关性。

      在循环并行化时,多个线程同时执行循环,迭代的顺序是不确定的。如果数据是非相关的,那么可以采用基本的 #pragma omp parallel for 预处理指示符。

      如果语句S2与语句S1相关,那么必然存在以下两种情况之一:

      1. 语句S1在一次迭代中访问存储单元L,而S2在随后的一次迭代中访问同一存储单元,称之为循环迭代相关(loop carried dependence);

      2. S1和S2在同一循环迭代中访问同一存储单元L,但S1的执行在S2之前,称之为非循环迭代相关(loop-independent dependence)。

    

       3.3.2 for循环并行化的几种声明形式

    

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
   //声明形式一   
   #pragma omp parallel   
   {  
      #pragma omp for   
      for (int i = 0; i < 10; ++i)  
      {  
         std::cout << i << std::endl;  
      }  
   }  
  
   //声明形式二   
   #pragma omp parallel for   
   for (int i = 0; i < 10; ++i)  
   {  
      std::cout << i << std:: endl;  
   }  
  
   return 0;  
}  

       上面代码的两种声明形式是一样的,可见第二种形式更为简洁。不过,第一种形式有一个好处:可以在并行区域内、for循环以外插入其他并行代码。

//声明形式一   
#pragma omp parallel   
{  
   std::cout << "OK." << std::endl;  
   #pragma omp for   
   for(int i = 0; i < 10; ++i)  
   {  
      std::cout << i << std::endl;  
   }  
}  
  
  
//声明形式二   
#pragma omp parallel for   
//std::cout << "OK." << std::endl;      // error!   
for(int i = 0; i < 10; ++i)  
{  
   std::cout << i << std::endl;  
}   

       3.3.3 for 循环并行化的约束条件

       尽管OpenMP可以很方便地对for循环进行并行化,但并不是所有的for循环都可以并行化。下面几种情形的for循环便不可以:

       1. for循环的循环变量必须是有符号型。例如,for(unsigned int i = 0; i < 10; ++i){...}编译不通过。

       2. for循环的比较操作符必须是<, <=, >, >=。例如,for(int i = 0; i != 10; i++)编译不通过。

       3. for循环的增量必须是整数的加减,而且必须是一个循环不变量。例如,for(int i = 0; i < 10; i = i+1)编译不通过,感觉只能++i, i++, --i, i--。

       4. for循环的比较操作符如果是<, <=,那么循环变量只能增加。例如,for(int i = 0; i != 10; --i)编译不通过。

       5. 循环必须是单入口,单出口。循环内部不允许能够达到循环以外的跳出语句,exit除外。异常的处理也不必须在循环体内部处理。例如,如循环体内的break或者goto语句,会导致编译不通过。

 

       3.3.4 基本for循环并行化示例

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
   int a[10] = {1};  
   int b[10] = {2};  
   int c[10] = {3};  
  
#pragma omp parallel   
{  
   #pragma omp for   
   for(int i = 0; i < 10; ++i)  
   {  
      //c[i]只与a[i]和b[i]相关   
      c[i] = a[i] + b[i];  
   }  
}  
  
   return 0;  
}  

 

       3.3.5 嵌套for循环

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
   #pragma omp parallel   
   {  
      #pragma omp for   
      for(int i = 0; i < 10; ++i)  
      {  
         for(int j = 0; j < 10; ++j)  
         {  
            c[i][j] = a[i][j] + b[i][j];  
         }  
      }  
   }  
  
   return 0;  
}   

       编译器会让第一个CPU完成

for(int i = 0; i < 5; ++i)  
{  
    for(int j = 0; j < 5; ++j)  
    {  
       c[i][j] = a[i][j] + b[i][j];  
    }  
}   

       让第二个CPU完成

for(int i = 5; i < 10; ++i)  
{  
   for(int j = 5; j < 10; ++j)  
   {  
      c[i][j] = a[i][j] + b[i][j];  
   }  
}  

 

 

数据的共享和私有化
1. 引言
    在并行区域内,若多个线程共同访问同一个存储单元,并且至少会有一个线程更新数据单元中的内容时,会发生数据竞争。本节的数据共享和私有化对数据竞争做一个初步探讨,后续会涉及同步、互斥的内容。
 
2. 并行区域内的变量的共享和私有
    除了以下三种情况外,并行区域中的所有变量都是共享的:
    > 并行区域中定义的变量
    > 多个线程用来完成循环的循环变量
    > private、firstprivate、lastprivate、reduction修饰的变量
    例如,

#include <iostream>  
#include <omp.h>  
  
int main()  
{  
   int share_a = 0; // 共享变量  
    int share_to_private_b = 1;  
  
   #pragma omp parallel  
   {  
      int private_c = 2;  
   //通过private修饰后在并行区域内变为私有变量  
   #pragma omp for private(share_to_private_b)  
      for(int i = 0; i < 10; ++i)  
      {//该循环变量是私有的,若为两个线程,则一个执行0<=i<5,另一个执行5<=i<10  
         std::cout << i << std::endl;  
      }  
   }  
  
   return 0;  
}  

3. 共享与私有变量声明的方法
    private(val1, val2, ...)          并行区域中变量val是私有的,即每个线程拥有该变量的一个copy
    firstprivate(val1, val2, ...)    与private不同,每个线程在开始的时候都会对该变量进行一次初始化
    lastprivate(val1, val2, ...)    与private不同,并发执行的最后一次循环的私有变量将会copy到val
    shared(val1, val2, ...)          声明val是共享的
4. private示例
    如果使用private,无论该变量在并行区域外是否初始化,在进入并行区域后,该变量均不会初始化。
    在VS2010下,会因为private所导致的私有变量未初始化而出现错误。例如:

#include <iostream>  
#include <omp.h>  
  
int main()  
{  
   //通过private修饰该变量之后在并行区域内变为私有变量,进入并行  
    //区域后每个线程拥有该变量的拷贝,并且都不会初始化  
   int shared_to_private = 1;  
  
#pragma omp parallel for private(shared_to_private)  
   for(int i = 0; i < 10; ++i)  
   {  
      std::cout << shared_to_private << std::endl;  
   }  
  
   return 0;  
}  

F5调试由于变量shared_to_rivate未初始化而崩掉。
5. firstprivate示例

#include <iostream>  
#include <omp.h>  
  
int main()  
{  
   //通过firstprivate修饰该变量之后在并行区域内变为私有变量,  
    //进入并行区域后每个线程拥有该变量的拷贝,并且会初始化  
   int share_to_first_private = 1;  
  
#pragma omp parallel for firstprivate(share_to_first_private)  
   for(int i = 0; i < 10; ++i)  
   {  
      std::cout << ++share_to_first_private << std::endl;  
   }  
  
   return 0;  
}  

    运行程序,可以看到每个线程对应的私有变量share_to_first_private都初始化为1,并且每次循环各自增加1.
 
6. lastprivate示例

#include <iostream>  
#include <omp.h>  
  
int main()  
{  
   //通过lastprivate修饰后在并行区域内变为私有变量,进入并行区域  
    //后变为私有变量,进入并行区域后每个线程拥有该变量的拷贝,并且会初始化  
    int share_to_last_private = 1;  
  
   std::cout << "Before: " << share_to_last_private << std::endl;  
#pragma omp parallel for lastprivate(share_to_last_private)firstprivate(share_to_last_private)  
   for(int i = 0; i < 11; ++i)  
   {  
      std::cout << ++share_to_last_private << std::endl;  
   }  
  
   std::cout << "After: " << share_to_last_private << std::endl;  
   return 0;  
}  

同样,仍然需要通过firstprivate来初始化并行区域中的变量,否则运行会出错。
在运行前后,share_to_last_private变量的值变了,其值最后变成最后一次循环的值,即多个线程最后一次修改的share_to_last_private(是share_to_last_private的copy)值会赋给share_to_last_private.
 
7. shared示例

#include <iostream>  
#include <omp.h>  
  
int main()  
{  
   int sum = 0;  
   std::cout << "Before: " << sum << std::endl;  
#pragma omp parallel for shared(sum)  
   for(int i = 0; i < 10; ++i)  
   {  
      sum += i;  
      std::cout << sum << std::endl;  
   }  
   std::cout << "After: " << sum << std::endl;  
   return 0;  
}  

上面的代码中,sum本身就是共享的,这里的shared的声明作为演示用。上面的代码因为sum是共享的,多个线程对sum的操作会引起数据竞争,后续在做介绍。
 
8. reduction的用法

#include <iostream>  
#include <omp.h>  
  
int main()  
{  
   int sum = 0;  
   std::cout << "Before: " << sum << std::endl;  
  
#pragma omp parallel for reduction(+:sum)  
   for(int i = 0; i < 10; ++i)  
   {  
      sum = sum + i;  
      std::cout << sum << std::endl;  
   }  
  
   std::cout << "After: " << sum << std::endl;  
   return 0;  
}  

其中sum是共享的,采用reduction之后,每个线程根据reduction(+:sum)的声明算出自己的sum,然后再将每个线程的sum加起来。
运行程序,发现第一个线程sum的值依次为0、1、3、6、10;第二个线程sum的值依次为5、11、18、26、35;最后10+35=45。
若果将其中reduction声明去掉,则会输出:
 
计算步骤如下:
第一个线程sum=0,第二个线程sum=5
第一个线程sum=2+12=14;第二个线程sum=7+14=21
第一个线程sum=3+21=24;第二个线程sum=8+24=32
第一个线程sum=4+32=36;第二个线程sum=9+36=45
尽管结果是对的,但是两个线程对共享的sum的操作时不确定的,会引发数据竞争,例如计算步骤可能如下:
第一个线程sum=0,第二个线程sum=5
第一个线程sum=1+5=6;第二个线程sum=6+6=12
第一个线程sum=2+12=14;第二个线程sum=7+14=21
第一个线程sum=3+21=24;第二个线程sum=8+21=29 //在第一个线程没有将sum更改为24时,第二个线程读取了sum的值
第一个线程sum=4+29=33;第二个线程sum=9+33=42 //导致结果错误。
 
9. reduction声明可以看作:
     1. 保证了对sum的原则操作
     2. 多个线程的执行结果通过reduction中声明的操作符进行计算,以加法操作符为例:
假设sum的初始化为10,reduction(+:sum)声明的并行区域中每个线程的sum初始化为0(规定),并行处理结束之后,会将sum的初始化值10以及每个线程所计算的sum值相加。
 
10. reduction的声明形式
      其具体如下:
      reduction(operator: val1, val2, ...)
其中operator以及约定变量的初始值如下:
      运算符                    数据类型                        默认初始值
      +                          整数、浮点                      0
      -                           整数、浮点                      0
      *                          整数、浮点                      1
      &                          整数                               所有位均为1
      |                           整数                               0
      ^                          整数                               0
      &&                        整数                               1
      ||                          整数                               0
 

1. 引言

    在OpenMP中,线程同步机制包括互斥锁同步机制和事件同步机制。

2. 互斥锁同步

    互斥锁同步的概念类似于Windows中的临界区(CriticalSection)以及Windows和Linux中的Mutex以及VxWorks中的SemTake和SemGive(初始化时信号量为满),即对某一块代码操作进行保护,以保证同时只能有一个线程执行该段代码。

 

3. atomic(原子)同步语法

    #pragma omp atomic

    x < + or * or - or * or / or & or | or << or >> >=expt

    (例如,x<<=1; or x*=2;)

    或

    #prgma omp atomic

    x++ or x-- or --x or ++x

    可以看到atomic的操作仅适用于两种情况:

    1. 自加减操作;

    2. x<上述列出的操作符>=expr;

4. 示例

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
   int sum = 0;  
   std::cout << "Before: " << sum << std::endl;  
  
#pragma omp parallel for   
   for(int i = 0; i < 2000; ++i)  
   {  
   #pragma omp atomic   
      sum++;  
   }  
  
   std::cout << "After: " << sum << std::endl;  
   return 0;  
}   

输出2000,如果将#pragma omp atomic声明去掉,则结果不确定。

 

5. critical同步机制

    本节介绍互斥锁机制的使用方法,类似于windows下的CriticalSection。

5.1 临界区声明方法

     #pragma omp critical [(name)] //[]表示名字可选

     {

           //并行程序块,同时只能有一个线程能访问该并行程序块

     }

     例如,

    #pragma omp critial (tst)

    a = b + c;

5.2 critical与atomic的区别

     临界区critical可以对某个并行程度块进行保护,atomic所能保护的仅为一句代码。

5.3 critical示例

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
   int sum = 0;  
   std::cout << "Before: " << sum << std::endl;  
  
#pragma omp parallel for   
   for(int i = 0; i < 10; ++i)  
   {  
   #pragma omp critial (a)   
      {  
       sum = sum + i;  
       sum = sum + i*2;  
      }  
   }  
   std::cout << "After: " << sum << std::endl;  
   return 0;  
}  

线程同步之互斥锁函数

前文介绍了互斥锁同步的两种方法:atomic和critical,本章介绍OpenMP提供的互斥锁函数。互斥锁函数类似于Windows、Linux下的mutex。

 

1. 互斥锁函数

  函数声明                                                                   功能

  void omp_init_lock(omp_lock*)                               初始化互斥器

  void omp_destroy_lock(omp_lock*)                        销毁互斥器

  void omp_set_lock(omp_lock*)                               获得互斥器

  void omp_unset_lock(omp_lock*)                           释放互斥器

  void omp_test_lock(omp_lock*)                              试图获得互斥器,如果获得成功则返回true,否则返回false

 

2. 互斥锁示例

#include <iostream>   
#include <omp.h>   
  
static omp_lock_t lock;  
  
int main()  
{  
   omp_init_lock(&lock); //初始化互斥锁   
  
#pragma omp parallel for   
   for(int i = 0; i < 5; ++i)  
   {  
      omp_set_lock(&lock);   //获得互斥器   
       std::cout << omp_get_thread_num() << "+" << std::endl;  
      std::cout << omp_get_thread_num() << "-" << std::endl;  
      omp_unset_lock(&lock); //释放互斥器   
    }  
  
   omp_destroy_lock(&lock);  //销毁互斥器   
    return 0;  
}  

上边的示例对for循环中的所有内容进行加锁保护,同时只能有一个线程执行for循环中的内容。

线程1或线程2在执行for循环内部代码时不会被打断。如果删除代码中的获得锁释放锁的代码,则相当于没有互斥锁。

互斥锁函数中只有omp_test_lock函数是带有返回值的,该函数可以看作是omp_set_lock的非阻塞版本。

线程同步之事件同步机制

1. 引言

前边已经提到,线程的同步机制包括互斥锁同步和事件同步。互斥锁同步包括atomic、critical、mutex函数,其机制与普通多线程同步的机制类似。而事件同步则通过nowait、sections、single、master等预处理指示符声明来完成。

2. 隐式栅障

      在开始之前,先介绍一下并行区域中的隐式栅障。

      栅障(Barrier)是OpenMP用于线程同步的一种方法。线程遇到栅障时必须等待,直到并行的所有线程都到达同一点。

      注意:

      在任务分配for循环和任务分配section结构中隐含了栅障,在parallel, for, sections, single结构的最后,也会有一个隐式的栅障。

隐式的栅障。

      隐式的栅障会使线程等到所有的线程继续完成当前的循环、结构化块或并行区,再继续执行后续工作。可以使用nowait去掉这个隐式的栅障。

3. nowait事件同步

    nowait用来取消栅障,其用法如下:

    #pragma omp for nowait  //不能使用#pragma omp parallel for nowait

    或

    #pragma omp single nowait

    示例:

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
    #pragma omp parallel   
   {  
      #pragma omp for nowait   
      for(int i = 0; i < 1000; ++i)  
      {  
         std::cout << i << "+" << std::endl;  
      }  
  
      #pragma omp for   
      for(int j = 0; j < 10; ++j)  
      {  
         std::cout << j << "-" << std::endl;  
      }  
   }  
   return 0;  
}  

运行程序,可以看到第一个for循环的两个线程中的一个执行完之后,继续向下执行,因此同时打印了第一个循环的+和第二个循环的-。

如果去掉第一个for循环的nowait生命,则第一个for循环的两个线程都执行完之后,才开始同时执行第二个for循环。也就是说,通过#pragma omp for声明的for循环结束时有一个默认的隐式栅障。

4. 显示同步栅障 #pragma omp barrier

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
    #pragma omp parallel   
    {  
        for(int i = 0; i < 100; ++i)  
        {  
            std::cout << i << "+" << std::endl;  
        }  
          
        #pragma om barrier   
        for(int j = 0; j < 10; ++j)  
        {  
            std::cout << j << "-" << std::endl;  
        }  
    }  
  
    return 0;  
}

运行程序,可以看出两个线程执行了第一个for循环,当两个线程同时执行完第一个for循环之后,在barrier处进行了同步,然后执行后边的for循环。

5. master事件同步

    通过#pragma om master来声明对应的并行程序块只有主线程完成。

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
#pragma omp parallel   
{  
   #pragma omp master   
   {  
      for(int j = 0; j < 10; ++j)  
      {  
         std::cout << j << "-" << std::endl;  
      }  
   }  
  
   std::cout << "This will printed twice." << std::endl;  
}  
   return 0;  
}  

运行程序,可以看到,进入parallel声明的并行区域之后,创建了两个线程。主线程执行了for循环,而另一个线程没有执行for循环,而直接进入了for循环之后的打印语句,然后执行for循环的线程随后还会再执行一次后边的打印语句。

6. sections用来指定不同的线程执行不同的部分

    下面通过一个实例来说明其使用方法:

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
  
//声明该并行区域分为若干个section,section之间的运行顺序为并行   
//的关系   
#pragma omp parallel sections   
   for(int i = 0; i < 5; ++i)  
   {  
       std::cout << i << "+" << std::endl;  
   }  
  
#pragma omp section   //第一个section,由某个线程单独完成   
    for(int j = 0; j < 5; ++j)  
   {  
       std::cout << j << "-" << std::endl;  
   }  
  
   return 0;  
}  

可以看到,并行区域中有两个线程,所以两个section同时执行。

线程的调度优化
1. 引言

    通过前边的介绍,知道了并行区域,默认情况下会自动生成与CPU个数相等的线程,然后并行执行并行区域中的代码。对于并行区域中的for循环有特殊的声明方式,这样不同的线程可以分别运行for循环变量的不同部分。通过锁同步(atomic、critical、mutex函数)或事件同步(nowait、single、section、master)来实现并行区域的同步控制。

    那么系统是如何对线程进行调度的呢?具体的调度策略均有底层完成,本节介绍几种for可以在上层对for循环进行控制的调度策略。

2. 调度策略

    调度策略                  功能                                                                                                 适用场合

    static                     循环变量区域分为n等份,每个线程平分n份任务                                       各个cpu的性能差别不大

    dynamic                 循环变量区域分为n等份,某个线程执行完1份之后执行其他需要执行的         cpu之间运行能力差异很大

                                  那一份任务

    guided                   循环变量区域由大到小分为不等的n份,运行方法类似dynamic                   由于任务比dynamic不同,

                                                                                                                                         所以可以减少调度开销

    runtime                  在运行时来适用上述三种调度策略中的一种,默认使用static

    示例:

3.1. static

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
  
//static调度策略,for循环每两次迭代分为一个任务   
#pragma omp parallel for schedule(static, 2)   
    for(int i = 0; i < 10; ++i)  
    {  
    //被分为5个任务,其中循环0~1,4~5, 8~9分配给了第一个线程,   
     //其余的分配给第二个线程   
         std::cout << "thread id: " << omp_get_thread_num() << " value: " << i << std::endl;  
    }  
  
    return 0;  
}  

3.2. dynamic

#include <iostream>   
#include <omp.h>   
  
int main()  
{  
    //dynamic调度策略,for循环每两次迭代分为一个任务   
    #pragma om parallel for schedule(dnamic, 2)   
    for(int i = 0; i < 10; ++i)  
    {  
    //分为5个任务,只要有任务并且线程空闲,那么该线程会执行该任务   
         std::cout << "thread id: " << omp_get_thread_num() << " value: " << i << std::endl;  
    }  
  
    return 0;  
}  

3.3. guided

    guided调度策略与dynamic区别在于,所分的任务块是从大到小排列的。具体分块算法为:每块的任务大小为:【迭代次数/线程个数的二倍】。其中每个任务的最小迭代次数由guided声明设定,默认为1。

    举例说明:

#pragma omp for schedule [guided, 80]   
  
for(int i = 0; i < 800; ++i)  
{  
    // .....   
}  

两个cpu,那么任务分配如下:

第一个任务: [800/(2*2)] = 200

第二个任务:第一个任务分了200,还有600,那么[600/(2*2)] = 150

第三个任务:第二个任务分了150,还有450,那么[450/2*2)] = 113

第四个人任务:第三个任务分了113,还有337,那么[337/(2*2)] = 85

第五个任务:第四个任务分了85,还有252,那么[252/(2*2)] = 63, 小于声明的80,那么这里为80

第六个任务:第五个任务分了80,还有172,根据声明,这里为80(因为会小于80)

第七个任务:第六个任务分了80,还有92,根据声明,这里为80(因为会小于80)

第八个任务:第七个任务分了80,还有12,根据声明,这里为12(因为不够80)

 

抱歉!评论已关闭.