现在的位置: 首页 > 综合 > 正文

hadoop介绍(关于hadoop技术知识的学习笔记)

2014年09月05日 ⁄ 综合 ⁄ 共 27151字 ⁄ 字号 评论关闭

1.1            云平台hadoop基础信息

Hadoop[1]是一个能够对大量数据进行分布式处理的软件框架。主要是由HDFS和MapReduce组成。

1.1.1       hadoop概念

Hadoop是N个开源项目的总称[4]。主要是由HDFS和MapReduce组成。

HDFS是Google File System(GFS)的开源实现。

MapReduce是Google MapReduce的开源实现。这个分布式框架很有创造性,而且有极大的扩展性,使得Google在系统吞吐量上有很大的竞争力。因此Apache基金会用Java实现了一个开源版本,支持Fedora、Ubuntu等Linux平台。雅虎和硅谷风险投资公司Benchmark Capital 联合成立一家名为Hortonworks的新公司,接管被广泛应用的数据分析软件Hadoop的开发工作。

Hadoop实现了HDFS文件系统和MapRecue。用户只要继承MapReduceBase,提供分别实现Map和Reduce的两个类,并注册Job即可自动分布式运行。

 

hadoop其他开源子项目的名称:

HBase: 类似Google BigTable的分布式NoSQL列数据库。(HBaseAvro已经于20105月成为顶级Apache项目)

Hive:数据仓库工具,由Facebook贡献。

Zookeeper:分布式锁设施,提供类似Google Chubby的功能,由Facebook贡献。

Avro:新的数据序列化格式与传输工具,将逐步取代Hadoop原有的IPC机制。

Pig:大数据分析平台,为用户提供多种接口。

Ambari[5]Hadoop管理工具,可以快捷的监控、部署、管理集群。

Sqoop:于在HADOOP与传统的数据库间进行数据的传递。

1.1.2       hadoop优点

Hadoop[1]是一个能够对大量数据进行分布式处理的软件框架。但是 Hadoop是以一种可靠、高效、可伸缩的方式进行处理的。Hadoop是可靠的,因为它假设计算元素和存储会失败,因此它维护多个工作数据副本,确保能够针对失败的节点重新分布处理。Hadoop是高效的,因为它以并行的方式工作,通过并行处理加快处理速度。Hadoop还是可伸缩的,能够处理
PB
级数据。此外,Hadoop
依赖于社区服务器,因此它的成本比较低,任何人都可以使用。

Hadoop是一个能够让用户轻松架构和使用的分布式计算平台。用户可以轻松地在Hadoop上开发和运行处理海量数据的应用程序。它主要有以下几个优点:

⒈高可靠性。Hadoop按位存储和处理数据的能力值得人们信赖。

⒉高扩展性。Hadoop是在可用的计算机集簇间分配数据并完成计算任务的,这些集簇可以方便地扩展到数以千计的节点中。

⒊高效性。Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。

⒋高容错性。Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。

Hadoop带有用 Java语言编写的框架,因此运行在 Linux生产平台上是非常理想的。Hadoop上的应用程序也可以使用其他语言编写,比如
C++

1.1.3       hadoop集群

Google的数据中心使用廉价的Linux PC机组成集群,在上面运行各种应用。即使是分布式开发的新手也可以迅速使用Google的基础设施。核心组件是3个:

GFSGoogleFile System)。一个分布式文件系统,隐藏下层负载均衡,冗余复制等细节,对上层程序提供一个统一的文件系统API接口。Google根据自己的需求对它进行了特别优化,包括:超大文件的访问,读操作比例远超过写操作,PC机极易发生故障造成节点失效等。GFS把文件分成64MB的块,分布在集群的机器上,使用Linux的文件系统存放。同时每块文件至少有3份以上的冗余。中心是一个Master节点,根据文件索引,找寻文件块。详见Google的工程师发布的GFS论文。

MapReduceGoogle发现大多数分布式运算可以抽象为MapReduce操作。Map是把输入Input分解成中间的Key/Value对,ReduceKey/Value合成最终输出Output。这两个函数由程序员提供给系统,下层设施把MapReduce操作分布在集群上运行,并把结果存储在GFS上。

BigTable。一个大型的分布式数据库,这个数据库不是关系式的数据库。像它的名字一样,就是一个巨大的表格,用来存储结构化的数据。

以上三个设施Google均有论文发表。

1.1.4       hadoop应用 (MapReduce)

Hadoop 的最常见用法之一是 Web搜索。虽然它不是惟一的软件框架应用程序,但作为一个并行数据处理引擎,它的表现非常突出。Hadoop最有趣的方面之一是
Map andReduce
流程,它受到Google开发的启发。这个流程称为创建索引,它将Web爬行器检索到的文本 Web页面作为输入,并且将这些页面上的单词的频率报告作为结果。然后可以在整个
Web
搜索过程中使用这个结果从已定义的搜索参数中识别内容。

MapReduce

使用这个抽象模型,我们只要表述我们想要执行的简单运算即可,而不必关心并行计算、容错、数据分布、负载均衡等复杂的细节,这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自Lisp和许多其他函数式语言的MapReduce的原语。我们意识到我们大多数的运算都包含这样的操作:在输入数据的“逻辑”记录上应用Map操作得出一个中间key/value
pair
集合,然后在所有具有相同key值的value值上应用Reduce操作,从而达到合并中间的数据,得到一个想要的结果的目的。使用MapReduce模型,再结合用户实现的MapReduce函数,我们就可以非常容易的实现大规模并行化计算;通过MapReduce模型自带的“再次执行”(re-execution)功能,也提供了初级的容灾实现方案。

MapReduce

最简单的 MapReduce应用程序至少包含 3个部分:一个 Map
函数、一个 Reduce
函数和一个 main函数。main
函数将作业控制和文件输入/输出结合起来。在这点上,Hadoop提供了大量的接口和抽象类,从而为 Hadoop应用程序开发人员提供许多工具,可用于调试和性能度量等。

MapReduce 本身就是用于并行处理大数据集的软件框架。MapReduce的根源是函数性编程中的 mapreduce
函数。它由两个可能包含有许多实例(许多 Map Reduce)的操作组成。Map函数接受一组数据并将其转换为一个键/值对列表,输入域中的每个元素对应一个键/值对。Reduce函数接受
Map
函数生成的列表,然后根据它们的键(为每个键生成一个键/值对)缩小键/值对列表。

这里提供一个示例,帮助您理解它。假设输入域是one small step for man,one giant leap for mankind。在这个域上运行 Map函数将得出以下的键/值对列表:

one,1(small,1 (step,1
(for,1
(man,1

MapReduce 流程的概念流

 

  MapReduce 流程的概念流

(one,1 (giant,1 (leap,1(for,1
(mankind,1

如果对这个键/值对列表应用Reduce函数,将得到以下一组键/值对:

one,2(small,1 (step,1
(for,2
(man,1)(giant,1(leap,1
(mankind,1

结果是对输入域中的单词进行计数,这无疑对处理索引十分有用。但是,假

显示处理和存储的物理分布的 Hadoop集群

 

  显示处理和存储的物理分布的 Hadoop集群

设有两个输入域,第一个是 one smallstep for man,第二个是 one giant leap for mankind。您可以在每个域上执行
Map
函数和 Reduce函数,然后将这两个键/值对列表应用到另一个 Reduce函数,这时得到与前面一样的结果。换句话说,可以在输入域并行使用相同的操作,得到的结果是一样的,但速度更快。这便是
MapReduce
的威力;它的并行功能可在任意数量的系统上使用。图2以区段和迭代的形式演示这种思想。

回到 Hadoop 上,它是如何实现这个功能的?一个代表客户机在单个主系统上启动的 MapReduce应用程序称为 JobTracker。类似于NameNode,它是
Hadoop
集群中惟一负责控制 MapReduce应用程序的系统。在应用程序提交之后,将提供包含在 HDFS中的输入和输出目录。JobTracker使用文件块信息(物理量和位置)确定如何创建其他TaskTracker从属任务。MapReduce应用程序被复制到每个出现输入文件块的节点。将为特定节点上的每个文件块创建一个惟一的从属任务。每个
TaskTracker
将状态和完成信息报告给 JobTracker。图 3显示一个示例集群中的工作分布。

Hadoop 的这个特点非常重要,因为它并没有将存储移动到某个位置以供处理,而是将处理移动到存储。这通过根据集群中的节点数调节处理,因此支持高效的数据处理。

1.2            云平台hadoop子项目MapReduce

1.2.1       MapReduce基础概念

MapReduce是一种可用于数据处理的编程模型,支持多种语言,包括JavaRubyPythonC++,最重要的是MapReduce程序的本质是并行运行,可以将大规模的数据分析任务交给任何一个拥有足够多机器的运营商。

1.2.2       MapReduce简单样例

最简单的 MapReduce应用程序至少包含 3个部分:一个
Map
函数(个人理解为取出key/value的映射函数)、一个 Reduce函数(value进行处理比较,得到目标的key/vlaue)和一个
main
函数。main
函数将作业控制和文件输入/输出结合起来。在这点上,Hadoop提供了大量的接口和抽象类,从而为 Hadoop应用程序开发人员提供许多工具,可用于调试和性能度量等。

一个MapReduce关于气象数据收集处理的小实例:

http://www.china-cloud.com/yunzixun/yunjisuanxinwen/20111101_7321.htm

如上实例也是一个纵向扩展的实例。

 

1.2.3       MapReduce横向扩展

为了实现横向扩展(scaling out),我们需要把数据存储在分布式文件系统中,一般为HDFS,由此允许Hadoop将MapReduce 计算移到存储有部分数据的各台机器上。下面我们看看具体过程。

数据流

  首先定义一些术语。MapReduce作业(job) 是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务(task)来执行,其中包括两类任务:map任务和reduce任务。

 

  有两类节点控制着作业执行过程:一个jobtracker及一系列tasktracker。jobtracker通过调度tasktracker上运行的任务,来协调所有运行在系统上的作业。tasktracker在运行任务的同时将运行进度报告发送给jobtracker,jobtracker由此记录每项作业任务的整体进度情况。如果其中一个任务失败,jobtracker可以在另外一个tasktracker节点上重新调度该任务。

 

  Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(input split)或简称分片。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map 函数从而处理分片中的每条记录。

 

  拥有许多分片,意味着处理每个分片所需要的时间少于处理整个输入数据所花的时间。因此,如果我们并行处理每个分片,且每个分片数据比较小,那么整个处理过程将获得更好的负载平衡,因为一台较快的计算机能够处理的数据分片比一台较慢的计算机更多,且成一定的比例。即使使用相同的机器,处理失败的作业或其他同时运行的作业也能够实现负载平衡,并且如果分片被切分得更细,负载平衡的质量会更好。

 

1.2.4       MapReduce气象实例(C++)

我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用Pipes来运行它。例 2-12 显示了用C++语言编写的map函数和reduce 函数的源代码。

 

  例2-12. 用C++语言编写的MaxTemperature程序

 

  #include

 

  #include

 

  #include

 

  #include

 

  #include "hadoop/Pipes.hh"

 

  #include "hadoop/TemplateFactory.hh"

 

  #include "hadoop/StringUtils.hh"

 

  class MaxTemperatureMapper : public HadoopPipes::Mapper {

 

  public:

 

  MaxTemperatureMapper(HadoopPipes::TaskContext& context) {

 

  }

 

  voidmap(HadoopPipes::MapContext& context) {

 

  std::string line = context.getInputValue();

 

  std::string year = line.substr(15, 4);

 

  std::string airTemperature = line.substr(87, 5);

 

  std::string q = line.substr(92, 1);

 

  if (airTemperature != "+9999" &&

 

  (q == "0" || q == "1" || q == "4" || q== "5" || q == "9")) {

 

  context.emit(year, airTemperature);

 

  }

 

  }

 

  };

 

  class MapTemperatureReducer : public HadoopPipes::Reducer {

 

  public:

 

  MapTemperatureReducer(HadoopPipes::TaskContext& context) {

 

  }

 

  voidreduce(HadoopPipes::ReduceContext& context) {

 

  int maxValue = INT_MIN;

 

  while (context.nextValue()) {

 

  maxValue = std::max(maxValue,HadoopUtils::toInt(context.getInputValue()));

 

  }

 

  context.emit(context.getInputKey(),HadoopUtils::toString(maxValue));

 

  }

 

  };

 

  intmain(int argc, char *argv[]) {

 

  returnHadoopPipes::runTask(HadoopPipes::TemplateFactory());

 

  }

 

  应用程序对Hadoop C++库链接提供了一个与tasktracker 子进程进行通信的简单封装。通过扩展HadoopPipes命名空间中定义的mapper和reducer两个类,我们定义了map()和reduce()方法,同时我们提供各种情况下map()和reduce()方法的实现。这些方法采用了上下文对象(MapContext类型或ReduceContext类型),进而提供了读取输入数据和写入输出数据,以及通过JobConf类来访问作业配置信息的功能。本例中的处理过程类似于Java的处理方式。

 

  与Java接口不同,C++接口中的键和值按字节缓冲,用标准模板库(StandardTemplate Library,STL)中的字符串表示。这样做简化了接口,但把更重的负担留给了应用程序开发人员,因为开发人员必须来回封送(marshall)字符串与特定应用领域内使用的具体类型。这一点在MapTemperatureReducer中有所体现,我们必须把输入值转换为整型值(通过HadoopUtils中定义的方法),然后将找到的最大值转化为字符串后再输出。在某些情况下,我们可以省略这类转化,如MaxTemperatureMapper
中的airTemperature值无需转换为整型,因为map()方法并不将它当作数值类型来处理。

 

  这个应用程序的入口点是main()方法。它调用HadoopPipes::runTask,该函数连接到Java父进程,并在mapper和reducer之间来回封送数据。runTask()方法被传入一个Factory参数,由此新建mapper或reducer实例。新建mapper还是创建reducer,Java父进程可通过套接字连接进行控制。我们可以用重载模板factory来设置combiner、partitioner、record reader或record writer。

 

  编译运行

 

  现在我们可以用Makerfile编译连接例2-13中的程序。

 

  例2-13. C++版本MapReduce程序的Makefile

 

  CC = g++

 

  CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

 

  max_temperature: max_temperature.cpp

 

  $ (CC) $(CPPFLAGS) $< -Wall-L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib

 

  -lhadooppipes \ -lhadooputils -lpthread -g -O2 -o $@

 

1.2.5       MapReduce工作流

在气象实例中主要是一个单独简单的作业,即比较最大值。

当工作任务分解时需要分解成多个时,需要形成MapReduce工作流。

例子:

假设我们想找到每个气象台每年每天的最高气温记录的均值。例如,要计算029070-99999气象台的1月1日的每日最高气温的均值,我们将从这个气象台的1901年1月1日,1902年1月1日,直到2000年的1月1日的气温中找出每日最高气温的均值。

我们如何使用MapReduce来计算它呢?计算自然分解为下面两个阶段。

(1) 计算每对station-date的每日最高气温。

本例中的MapReduce程序是最高气温程序的一个变种,不同之处在于本例中的键是一个综合的station-date对,而不只是年份。

(2) 计算每个station-day-month键的每日最高气温的均值。

mapper从上一个作业得到输出记录(station-date,最高气温值),丢掉年份部分,将其值投影到记录(station-day-month,最高气温值)。然后reducer为每个station-day-month键计算最高气温值的均值。

第一阶段的输出看上去就是我们想要的气象台的信息。(示例的mean_max_daily_temp.sh脚本提供了Hadoop Streaming中的一个实现)

1.3            云平台hadoop子项目HDFS

当数据集的大小超过一台独立计算机的存储能力时,就有必要对它进行分区(partition)并存储在若干台单独的计算机上。管理网络中的跨多台计算机存储的文件系统称为分布式的文件系统(distributed filesystem)。hadoop子项目HDFS为hadoop distributed filesystem。

HDFS的设计:

1、超大文件。TB、PB的单个文件存储在集群上。

2、流式数据访问。HDFS的构建思想:一次写入,多次读取是最高效的访问模式。数据集通常是数据源生成或者从数据源复制而来,接着长时间的进行读取做各种分析。

3、硬件环境。便宜的普通硬件,虽然节点故障率高,但是其中设计有容灾容错,用户感知不到。

4、数据访问延迟较大。HDFS是为高数据吞度量而设计优化的,这可能会以高时间延迟为代价。

5、多用户写入,任意修改文件。HDFS中文件可能只有一个writer,而且写操作总是将数据添加在文件的末尾。它不支持多个写入者的操作,也不支持在文件中任意位置进行修改。

 

HDFS的块的概念默认是64MB,目的是为了最小化寻址开销。

HDFS的备份和容错能力,将少数块赋值到几个独立的机器上(默认是3个)。

HDFS有两类节点,并以管理者和工作者模式运行,即一个namenode(管理者)和多个datanode(工作者)。namenode管理文件系统的命名空间,它维护着整个文件系统树及整棵树内所有的文件和目录。

名字节点和数据节点

HDFS是一个的主从结构,一个HDFS集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。

namenode数据至关重要,他的数据丢失意味着分布式文件系统的丢失,所以需要做好及时的备份和镜像备份。

Hadoop系统中的文件系统fs 命令有着和ls 等操作系统类似的命令。

文件系统的权限三种:只读、写入、可执行。每个文件都有所属用户(owner)、组别(group)以及模式(mode)。这个模式是由所属用户的权限、组内成员的权限以及其他用户的权限组成的。

 

hadoop文件接口操作提供JAVA、以及Clibhdfs等。

FUSE:用户空间文件系统,允许把按照用户空间实现的文件系统整合成一个UNIX文件系统。通过使用hadoop的FUSE的功能,任意一个hadoop文件系可以和unix系统进行交互挂载。

HDFS上文件交互接口HTTP和FTP接口定义。

HDFS提供数据操作、写入、文件目录读取,以及数据同步备份、及时写入等操作。

HDFS存储小文件会非常低效,每个块的元数据都会占用namenode管理的内存空间。

Hadoop读取HDFS文件系统中的压缩文件时,需要进行文件压缩的格式需要支持分片。因为mapReduce输入是分片的,hdfs数据读取也是分片的。

压缩与输入分片。

序列化,将结构化对象转化为字节流,以便在网络上传输和写到磁盘上进行永久存储。同时还有反序列化。hadoop有属于自己的序列化的格式writable。

Avro 是一个独立于编程语言的数据序列化系统。该项目,需要解决writable的可移植性的不足,允许C和C++、python、ruby和hadoop进行交互。google的protocal Buffers 相比,avro用语言无关的模式进行定义。有Avro自己的编程特性和特点,有自己定义的avro IDL描述语言。提供丰富的序列化和反序列化的接口。

KerberOS和hadoop管理提供用户安全验证交互的管理服务。

dfsadmin用来可以进行管理hadoop的相关集群、fsck等检查整个文件系统的健康状况。

 

1.4            云平台hadoop子项目PIG

 

介绍pig,一个不得不说的hadoop的扩展。

1.2 什么是pig

 

Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口。

 

1.3 pig的特点

 

1、专注于于大量数据集分析(ad-hoc analysis , ad-hoc 代表:a solution that has been customdesigned for a specific problem );

      2、运行在集群的计算架构上,Yahoo Pig 提供了多层抽象,简化并行计算让普通用户使用;这些抽象完成自动把用户请求queries翻译成有效的并行评估计划,然后在物理集群上执行这些计划;

     3、提供类似 SQL 的操作语法;

     4、开放源代码;

1.4 pig的主要用户

 

1、yahoo

 

2、twitter

1.5 关于pig和hive

 

对于开发人员,直接使用Java APIs可能是乏味或容易出错的,同时也限制了Java程序员在Hadoop上编程的运用灵活性。于是Hadoop提供了两个解决方案,使得Hadoop编程变得更加容易。

 

?Pig是一种编程语言,它简化了Hadoop常见的工作任务。Pig可加载数据、表达转换数据以及存储最终结果。Pig内置的操作使得半结构化数据变得有意义(如日志文件)。同时Pig可扩展使用Java中添加的自定义数据类型并支持数据转换。

 

?Hive在Hadoop中扮演数据仓库的角色。Hive添加数据的结构在HDFS(hive superimposes structure ondata in HDFS),并允许使用类似于SQL语法进行数据查询。与Pig一样,Hive的核心功能是可扩展的。

 

Pig和Hive总是令人困惑的。Hive更适合于数据仓库的任务,Hive主要用于静态的结构以及需要经常分析的工作。Hive与SQL相似促使 其成为Hadoop与其他BI工具结合的理想交集。Pig赋予开发人员在大数据集领域更多的灵活性,并允许开发简洁的脚本用于转换数据流以便嵌入到较大的 应用程序。Pig相比Hive相对轻量,它主要的优势是相比于直接使用Hadoop Java APIs可大幅削减代码量。正因为如此,Pig仍然是吸引大量的软件开发人员。

 

 

 

第2章 安装pig

2.1 下载pig

 

下载pig的最新版本:

 

http://www.apache.org/dyn/closer.cgi/pig

 

我下载的是pig-0.10.0.tar.gz 

2.2 安装pig

 

解压缩

 

tar zxvf pig-0.10.0.tar.gz

 

进入目录

 

cd pig-0.10.0

 

注意,pig是hadoop的工具,所以不需要修改原hadoop的配置。

 

将pig加入到环境变量中:

 

输入

 

cd ~

 

进入到用户主目录

 

vi .bashrc

 

最下边加入环境变量的配置

 

保存然后执行

 

. .bashrc

 

输入 pig -help进行测试,如果设置成功,则出现如下界面

 

如果想获取pig的源码,可以使用svn下载

 

http://svn.apache.org/repos/asf/pig/trunk

2.3 配置hadoop

 

进入目录$PIG_HOME/conf

 

修改配置文件,在pig.properties中加入

 

fs.default.name=hdfs://localhost:9000

 

mapred.job.tracker=localhost:9001

 

指向本地伪分布式的hdfs和mapreduce

 

在本地运行pig

 

pig -x local

 

得到如下界面

和hadoop一起运行

 

直接输入pig或者pig -x mapreduce

 

有可能出现下面的错误

 

Cannot find hadoop configurations in classpath(neither hadoop-site.xml nor core-site.xml was found in the classpath).

 

需要配置~/.bashrc或者/etc/profile,建议配置.bashrc文件,加入

 

export HADOOP_HOME=/home/hadoop/hadoop-1.0.3

 

export PIG_CLASSPATH=$HADOOP_HOME/conf

 

配置完成后可正常进入

 

 

1.5            云平台hadoop子项目HIVE

   hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。

对于mapreduce的深度封装的接口。

Hadoop是一个存储计算框架,主要由两部分组成:

 

1,存储(Hadoop分布式文件系统-HDFS)

 

2,计算(MapReduce计算框架)

 

 

1,Hadoop分布式文件系统

 

这是一种文件系统实现,类似于NTFS,ext3,ext4等等,不过它是建立在更高的层次之上的。在HDFS上存储的文件被分成块(每块默认未64M,比一般的文件系统块大小大的多,可调)分布在多台机器上,其中的每块又会有多块的冗余备份(默认为3),以增强文件系统的容错能力。这种存储模式与后面将要说明的MapReduce计算模型相得益彰。HDFS在具体实现中主要有以下几个部分:

 

一、名称节点(NameNode):它的职责在于存储整个文件系统的元数据,这是个非常重要的角色。元数据在集群启动时会加载到内存中,元数据的改变也会写到磁盘上的一个文件系统映像文件中(同时还会维护一个对元数据的编辑日志)。目前名称节点还是一个单点。因为HDFS存储文件的时候是将文件划分成逻辑上的块来存储的,模个文件对应那些块都存储在名称节点上,所以如果它有损坏整个集群的数据将不可用。当然我们可以采取一些措施来备份名称节点的元数据(文件系统映像文件),比如可以将名称节点目录同时设置到本地目录和一个NFS目录,这样任何元数据的改变将写入到两个位置做冗余备份,向两个目录冗余写的过程是原子的。这样,在使用中的名称节点宕机之后,我们可以使用NFS上的备份文件来恢复文件系统。

 

二、第二名称节点(SecondaryNameNode):这个角色的作用就是定期通过编辑日志合并命名空间映像,防止编辑日志过大。不过第二名称节点的状态是滞后于主名称节点的,所以如果主名称节点挂掉,也必定会有一些文件损失。

 

三、数据节点(DataNode):这是HDFS中具体存储数据的地方,一般有多台机器。除了提供存储服务,它们还定时向名称节点发送它们存储的块的列表,所以名称节点没有必要永久保存每个文件的每个块所在的数据节点,这些信息会在系统启动后由数据节点重建。

 

2,MapReduce计算框架

 

这是一种分布式计算模型,其核心就是将任务分解成小任务由不同的计算者同时参与计算,并将各个计算者的计算结果合并得出最终的结果。模型本身非常简单,一般只需要实现两个接口即可;问题的关键在于怎样将实际问题转化为MapReduce任务。Hadoop的MapReduce部分主要由以下几部分组成:

 

一、作业跟踪节点(JobTracker):负责任务的调度(可以设置不同的调度策略)、状态跟踪。它的角色有点类似于HDFS中的名称节点,JobTracker也是一个单点,在未来的版本中可能会有所改进。

 

二、任务跟踪节点(TaskTracker):负责具体的任务的执行。它通过“心跳”的方式告知JobTracker其状态,并由JobTracker根据其报告的状态为其分配任务,TaskTracker会启动一个新的JVM来运行一个任务,当然JVM实例也可以被重用。

 

以上就是对于Hadoop最重要的两个部分的简介,Hadoop存在的理由就是它适应于大数据的存储计算。一个Hadoop集群可以看成是一个存储、计算“数据”的“库”。

 

Hive是一个构建于Hadoop集群之上的“数据仓库”应用

 

Hive是Facebook开发的构建于Hadoop集群之上的数据仓库应用,它提供了类似于SQL语法的HQL语句作为数据访问接口,这使得普通分析人员的应用Hadoop的学习曲线变缓。至于Facebook为什么使用Hadoop和Hive组建其数据仓库,其内部人员分享了他们的一些经历,大致的过程是如下的:

 

1,Facebook的数据仓库一开始是构建于MySQL之上的,但是随着数据量的增加某些查询需要几个小时甚至几天的时间才能完成。

 

2,当数据量接近1T的时候,mysqld后台进程宕掉,这时他们决定将他们数据仓库转移到Oracle。当然这次转移的过程也是付出了很大的代价的,比如支持的SQL方言不同,修改以前的运行脚本等等。

 

3,Oracle应付几T的数据还是没有问题的,但是在开始收集用户点击流的数据(每天大约400G)之后,Oracle也开始撑不住了,由此又要考虑新的数据仓库方案。

 

4,内部开发人员花了几周的时间建立了一个并行日志处理系统Cheetah,这样的话勉强可以在24小时之内处理完一天的点击流数据。

 

5,Cheetah也存在许多缺点。后来发现了Hadoop项目,并开始试着将日志数据同时载入Cheetah和Hadoop做对比,Hadoop在处理大规模数据时更具优势,后来将所有的工作流都从Cheetah转移到了Hadoop,并基于Hadoop做了很多有价值的分析。

 

6,后来为了使组织中的多数人能够使用Hadoop,开发了Hive,Hive提供了类似于SQL的查询接口,非常方便。与此同时还开发了一些其它工具。

 

7,现在集群存储2.5PB的数据,并且以每天15TB的数据在增长,每天提交3000个以上的作业,大约处理55TB的数据...

 

现在很多大的互联网公司出于成本考虑都在研究、使用Hadoop;数据的价值正得到越来越多的人的重视,而这种重视,又体现出Hadoop存在的巨大价值。

第一部分:Hive简介

什么是Hive

?Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供类SQL查询功能。

?本质是将SQL转换为MapReduce程序

  

第二部分:为什么使用Hive

面临的问题

  人员学习成本太高

  项目周期要求太短

  我只是需要一个简单的环境

 MapReduce  如何搞定

  复杂查询好难

  Join如何实现

为什么要使用Hive

?操作接口采用类SQL语法,提供快速开发的能力

?避免了去写MapReduce,减少开发人员的学习成本

?扩展功能很方便

 

Hive的特点

?可扩展

Hive可以自由的扩展集群的规模,一般情况下不需要重启服务

?延展性

Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数

?容错

良好的容错性,节点出现问题SQL仍可完成执行

 

第三部分:Hive与Hadoop的关系

  

第四部分:Hive与传统数据库对比

     Hive     RDBMS

查询语言   HQL     SQL

数据存储   HDFS   Raw Device or Local FS

执行    MapReduce        Excutor

执行延迟   高 低

处理数据规模 大 小

索引    0.8版本后加入位图索引      有复杂的索引

 

第五部分:Hive的历史

?由FaceBook 实现并开源

?2011年3月,0.7.0版本 发布,此版本为重大升级版本,增加了简单索引,HAING等众多高级特性

?2011年06月,0.7.1 版本发布,修复了一些BUG,如在Windows上使用JDBC的的问题

? 2011年12月,0.8.0版本发布,此版本为重大升级版本,增加了insert into 、HA等众多高级特性

?2012年2月5日,0.8.1版本发布,修复了一些BUG,如 使 Hive 可以同时运行在 Hadoop0.20.x 与 0.23.0

?2012年4月30日,0.9.0版本发布,重大改进版本,增加了对Hadoop 1.0.0的支持、实现BETWEEN等特性

  

第六部分:Hive的未来发展 

?增加更多类似传统数据库的功能,如存储过程

?提高转换成的MapReduce性能

?拥有真正的数据仓库的能力

?UI部分加强

 

1.6            云平台hadoop子项目HBASE

HBASE是一个在HDFS上开发的面向列的分布式数据库,可以随时读写超大规模的数据集。

1.6.1       HBase简介

HBase – HadoopDatabase,是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。

HBase是GoogleBigtable的开源实现,类似Google Bigtable利用GFS作为其文件存储系统,HBase利用HadoopHDFS作为其文件存储系统;Google运行MapReduce来处理Bigtable中的海量数据,HBase同样利用Hadoop MapReduce来处理HBase中的海量数据;Google Bigtable利用 Chubby作为协同服务,HBase利用Zookeeper作为对应。

上图描述了HadoopEcoSystem中的各层系统,其中HBase位于结构化存储层,Hadoop HDFS为HBase提供了高可靠性的底层存储支持,Hadoop MapReduce为HBase提供了高性能的计算能力,Zookeeper为HBase提供了稳定服务和failover机制。

此外,Pig和Hive还为HBase提供了高层语言支持,使得在HBase上进行数据统计处理变的非常简单。 Sqoop则为HBase提供了方便的RDBMS数据导入功能,使得传统数据库数据向HBase中迁移变的非常方便。

1.6.2       HBase访问接口

 

HBase访问接口

1.      Native Java API,最常规和高效的访问方式,适合Hadoop MapReduce Job并行批处理HBase表数据

2.      HBase Shell,HBase的命令行工具,最简单的接口,适合HBase管理使用

3.      Thrift Gateway,利用Thrift序列化技术,支持C++,PHP,Python等多种语言,适合其他异构系统在线访问HBase表数据

4.      REST Gateway,支持REST 风格的Http API访问HBase,解除了语言限制

5.      Pig,可以使用Pig Latin流式编程语言来操作HBase中的数据,和Hive类似,本质最终也是编译成MapReduce Job来处理HBase表数据,适合做数据统计

6.      Hive,当前Hive的Release版本尚没有加入对HBase的支持,但在下一个版本Hive 0.7.0中将会支持HBase,可以使用类似SQL语言来访问HBase

1.6.3       HBase数据模型

HBase数据模型

Table & ColumnFamily

Row Key

Timestamp

Column Family

URI

Parser

r1

t3

url=http://www.taobao.com

title=天天特价

t2

host=taobao.com

 

t1

 

 

r2

t5

url=http://www.alibaba.com

content=每天…

t4

host=alibaba.com

 

Ø  Row Key: 行键,Table的主键,Table中的记录按照Row Key排序

Ø  Timestamp: 时间戳,每次数据操作对应的时间戳,可以看作是数据的version number

Ø  Column Family:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。

Table & Region

当Table随着记录数不断增加而变大后,会逐渐分裂成多份splits,成为regions,一个region由[startkey,endkey)表示,不同的region会被Master分配给相应的RegionServer进行管理:

-ROOT- &&.META. Table

HBase中有两张特殊的Table,-ROOT-和.META.

Ø  .META.:记录了用户表的Region信息,.META.可以有多个regoin

Ø  -ROOT-:记录了.META.表的Region信息,-ROOT-只有一个region

Ø  Zookeeper中记录了-ROOT-表的location

Client访问用户数据之前需要首先访问zookeeper,然后访问-ROOT-表,接着访问.META.表,最后才能找到用户数据的位置去访问,中间需要多次网络操作,不过client端会做cache缓存。

1.6.4       MapReduce on HBase

MapReduce on HBase

在HBase系统上运行批处理运算,最方便和实用的模型依然是MapReduce,如下图:

HBase Table和Region的关系,比较类似HDFS File和Block的关系,HBase提供了配套的TableInputFormat和TableOutputFormat API,可以方便的将HBase Table作为Hadoop MapReduce的Source和Sink,对于MapReduceJob应用开发人员来说,基本不需要关注HBase系统自身的细节。

1.6.5       HBase系统架构

HBase系统架构

Client

HBase Client使用HBase的RPC机制与HMaster和HRegionServer进行通信,对于管理类操作,Client与HMaster进行RPC;对于数据读写类操作,Client与HRegionServer进行RPC

1.6.6       Zookeeper

Zookeeper

Zookeeper Quorum中除了存储了-ROOT-表的地址和HMaster的地址,HRegionServer也会把自己以Ephemeral方式注册到 Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的健康状态。此外,Zookeeper也避免了HMaster的 单点问题,见下文描述

1.6.7       HMaster

HMaster

HMaster没有单点问题,HBase中可以启动多个HMaster,通过Zookeeper的MasterElection机制保证总有一个Master运行,HMaster在功能上主要负责Table和Region的管理工作:

1.      管理用户对Table的增、删、改、查操作

2.      管理HRegionServer的负载均衡,调整Region分布

3.      在RegionSplit后,负责新Region的分配

4.      在HRegionServer停机后,负责失效HRegionServer 上的Regions迁移

1.6.8       HRegionServer

HRegionServer

HRegionServer主要负责响应用户I/O请求,向HDFS文件系统中读写数据,是HBase中最核心的模块。

HRegionServer内部管理了一系列HRegion对象,每个HRegion对应了Table中的一个Region,HRegion中由多 个HStore组成。每个HStore对应了Table中的一个ColumnFamily的存储,可以看出每个Column Family其实就是一个集中的存储单元,因此最好将具备共同IO特性的column放在一个ColumnFamily中,这样最高效。

HStore存储是HBase存储的核心了,其中由两部分组成,一部分是MemStore,一部分是StoreFiles。MemStore是 Sorted Memory Buffer,用户写入的数据首先会放入MemStore,当MemStore满了以后会Flush成一个StoreFile(底层实现是HFile), 当StoreFile文件数量增长到一定阈值,会触发Compact合并操作,将多个StoreFiles合并成一个StoreFile,合并过程中会进 行版本合并和数据删除,因此可以看出HBase其实只有增加数据,所有的更新和删除操作都是在后续的compact过程中进行的,这使得用户的写操作只要
进入内存中就可以立即返回,保证了HBaseI/O的高性能。当StoreFiles Compact后,会逐步形成越来越大的StoreFile,当单个StoreFile大小超过一定阈值后,会触发Split操作,同时把当前 Region Split成2个Region,父Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer 上,使得原先1个Region的压力得以分流到2个Region上。下图描述了Compaction和Split的过程:

在理解了上述HStore的基本原理后,还必须了解一下HLog的功能,因为上述的HStore在系统正常工作的前提下是没有问题的,但是在分布式系统环境中,无法避免系统出错或者宕机,因此一旦HRegionServer意外退出,MemStore中的内存数据将会丢失,这就需要引入HLog了。 每个HRegionServer中都有一个HLog对象,HLog是一个实现Write Ahead Log的类,在每次用户操作写入MemStore的同时,也会写一份数据到HLog文件中(HLog文件格式见后续),HLog文件定期会滚动出新的,并
删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知 到,HMaster首先会处理遗留的 HLog文件,将其中不同Region的Log数据进行拆分,分别放到相应region的目录下,然后再将失效的region重新分配,领取 到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。

1.6.9       HBase存储格式

HBase存储格式

HBase中的所有数据文件都存储在HadoopHDFS文件系统上,主要包括上述提出的两种文件类型:

1.      HFile, HBase中KeyValue数据的存储格式,HFile是Hadoop的二进制格式文件,实际上StoreFile就是对HFile做了轻量级包装,即StoreFile底层就是HFile

2.      HLog File,HBase中WAL(WriteAhead Log) 的存储格式,物理上是Hadoop的SequenceFile

HFile

下图是HFile的存储格式:

首先HFile文件是不定长的,长度固定的只有其中的两块:Trailer和FileInfo。正如图中所示的,Trailer中有指针指向其他数 据块的起始点。File Info中记录了文件的一些Meta信息,例如:AVG_KEY_LEN, AVG_VALUE_LEN,LAST_KEY, COMPARATOR, MAX_SEQ_ID_KEY等。Data Index和Meta Index块记录了每个Data块和Meta块的起始点。

Data Block是HBase I/O的基本单元,为了提高效率,HRegionServer中有基于LRU的Block Cache机制。每个Data块的大小可以在创建一个Table的时候通过参数指定,大号的Block有利于顺序Scan,小号Block利于随机查询。 每个Data块除了开头的Magic以外就是一个个KeyValue对拼接而成, Magic内容就是一些随机数字,目的是防止数据损坏。后面会详细介绍每个KeyValue对的内部构造。

HFile里面的每个KeyValue对就是一个简单的byte数组。但是这个byte数组里面包含了很多项,并且有固定的结构。我们来看看里面的具体结构:

开始是两个固定长度的数值,分别表示Key的长度和Value的长度。紧接着是Key,开始是固定长度的数值,表示RowKey的长度,紧接着是 RowKey,然后是固定长度的数值,表示Family的长度,然后是Family,接着是Qualifier,然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)。Value部分没有这么复杂的结构,就是纯粹的二进制数据了。

HLogFile

上图中示意了HLog文件的结构,其实HLog文件就是一个普通的Hadoop Sequence File,Sequence File 的Key是HLogKey对象,HLogKey中记录了写入数据的归属信息,除了table和region名字外,同时还包括 sequence number和timestamp,timestamp是“写入时间”,sequence number的起始值为0,或者是最近一次存入文件系统中sequence number。

HLog Sequece File的Value是HBase的KeyValue对象,即对应HFile中的KeyValue,可参见上文描述。

结束

本文对HBase技术在功能和设计上进行了大致的介绍,由于篇幅有限,本文没有过多深入地描述HBase的一些细节技术。目前一淘的存储系统就是基于HBase技术搭建的,后续将介绍“一淘分布式存储系统”,通过实际案例来更多的介绍HBase应用。

 

 

1.7            云平台hadoop子项目ZOOKEEPER

zookeeper简介

 

zookeeper是一个开源分布式的服务,它提供了分布式协作,分布式同步,配置管理等功能,是做为hadoop的分布式协调服务的. 其实现的功能与google的chubby基本一致.zookeeper的官方网站已经写了一篇非常经典的概述性文章,请大家参阅:ZooKeeper: A DistributedCoordination Service for Distributed Applications

在此我仅花少量笔墨介绍下本文相关的内容。

在zookeeper的集群中,各个节点共有下面3种角色和4种状态:

 

    角色:leader,follower,observer

   状态:leading,following,observing,looking

 

除了observer和observing之外,其它的角色和状态与下面将要介绍的Paxos算法中的角色与状态一一对应,我们将在下文中具体描述.

observer是zookeeper-3.3版本新添加的一个角色,在这里有相关的介绍. 他们的引入是为了解决zookeeper集群扩大后,由于网络可靠性下降可能导致的拜占庭将军问题. observer的行为在大多数情况下与follower完全一致, 但是他们不参加选举和投票, 而仅仅接受(observing)选举和投票的结果.

 

zookeeper实现了一个层次名字空间(hierarchal name space)的数据模型, 它特别象一个文件系统, 每个文件被称为znode, 一个znode除了自己包含一些数据外,还能拥有孩子节点.

存在下述的3种类型znode:

 

    Persistent Nodes: 永久有效地节点,除非client显式的删除,否则一直存在

    Ephemeral Nodes: 临时节点,仅在创建该节点client保持连接期间有效,一旦连接丢失,zookeeper会自动删除该节点

    Sequence Nodes: 顺序节点,client申请创建该节点时,zk会自动在节点路径末尾添加递增序号,这种类型是实现分布式锁,分布式queue等特殊功能的关键

 

Zookeeper Watch 定义如下:

 

    A watch event is one-time trigger, sent tothe client that set the watch, which occurs when the data for which the watchwas set changes.

 

在我看来,watch可以理解为一个分布式的回调,当client关心的znodes发生变化时,zookeeper将会把消息传回到client,并导致client的消息处理函数得到调用.zk的任何一个读操作都能够设置watch,例如:getData(), getChildren(), andexists()

可以watch的event包括如下的二种:

 

   KeeperState:Disconnected,SyncConnected,Expired

   EventType:None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged

 

这些状态是很容易理解的. watch的实现只言片语没法说清楚,后面我可能会专门写一篇文章讲述这个实现.

 

Paxos算法

 

说到zookeeper,我们不得不提起Paxos算法和LesileLamport.

Paxos算法是zookeeper的灵魂,这个算法是Leslie Lamport在1990年提出的一种基于消息传递的一致性算法.Paxos 算法解决的问题是一个分布式系统如何就某个值(决议)达成一致。一个典型的场景就是:”在zookeepercluster中谁是leader?”。

该算法由Leslie于1990年在文章The Part-Time Parliament中首次提出,但是这篇文章相当的晦涩难懂(也有一些轶事,可以看文章链接中Leslie自己写的内容),于是,Lesilie在2001年写下了Paxos Made Simple.他对此解释道:

 

    At the PODC 2001 conference, I got tired ofeveryone saying how difficult it was to understand the Paxos algorithm,published in [122]. Although people got so hung up in the pseudo-Greek namesthat they found the paper hard to understand, the algorithm
itself is verysimple. So, I cornered a couple of people at the conference and explained the algorithmto them orally, with no paper. When I got home, I wrote down the explanation asa short note, which I later revised based on comments from Fred Schneider andButler
Lampson. The current version is 13 pages long, and contains no formulamore complicated than n1 > n2.

 

Paxos MadeSimple的abstract只有一句话:

 

    The Paxos algorithm, when presented inplain English, is very simple.

 

可见这位Lamport老兄是多么的有意思. 顺便说一句,这位老哥就是LaTex中的”La”.

在上文中是这样描述Paxos算法执行过程的:

 

    Phase 1.

    (a) A proposer selects a proposal number nand sends a prepare request with number n to a majority of acceptors.

    (b) If an acceptor receives a preparerequest with number n greater than that of any prepare request to which it hasalready responded, then it responds to the request with a promise not to acceptany more proposals numbered less than n and with the highest-numbered
proposal(if any) that it has accepted.

    Phase 2.

    (a) If the proposer receives a response toits prepare requests (numbered n) from a majority of acceptors, then it sendsan accept request to each of those acceptors for a proposal numbered n with avalue v, where v is the value of the highest-numbered
proposal among theresponses, or is any value if the responses reported no proposals.

    (b) If an acceptor receives an acceptrequest for a proposal numbered n, it accepts the proposal unless it hasalready responded to a prepare request having a number greater than n.

 

这几乎就是Paxos的全部了.具体的执行过程举例可以在Zookeeper全解析——Paxos作为灵魂中找到,在此不再赘述.

Zookeeper完全实现了Paxos算法,zk cluster中每个节点都保持了一份完整的数据模型,当任何一个client通过某集群节点向集群发起读写请求时,该节点会向Leader节点发出投票请求,如果投票通过(超过一半节点同意)则该请求被执行,否则该请求被驳回. 通过paxos算法,zookeeper的保持了数据模型的一致性,同时保持了任何操作的原子性.

 

分布式选举

 

介绍完了Paxos算法, 分布式选举几乎是顺理成章的, 因为分布式选举不过是Paxos算法的一次或者若干次执行, 所不同的只是proposal内容为:”谁是Leader”.下面这两个图解释了zookeeper集群在正常工作和选举时各个节点状态的异同:

zookeeper状态示意图

 

zookeeper状态示意图

 

zookeeper采用org.apache.zookeeper.server.quorum.FastLeaderElection作为其缺省选举算法,关于这个算法的具体执行流程可以参考淘宝核心系统段飞同学的文章“paxos实现”.或者也可以直接阅读源代码. zookeeper源代码量不大,结构清晰,注释充分,阅读体验超好~ 我就不在这里越俎代庖了.

 

zookeeper应用

 

拥有了zookeeper如此强大的分布式协作系统后,我们可以很容易的实现大量的分布式应用,包括了分布式锁,分布式队列,分布式Barrier,双阶段提交等等. 这些应用可以帮我们改进很多复杂系统的协作方式,将这些系统的实现变得更加优雅而高效.

鉴于篇幅,本文仅介绍分布式锁的实现.

利用了前文提到的sequence nodes可以非常容易的实现分布式锁. 实现分布式锁的基本步骤如下(这些步骤需要在所有需要锁的客户端执行):

 

    client调用create()创建名为”_locknode_/lock-”的节点,注意需要设置sequence和ephemeral属性

    client调用getChildren(“_locknode_”),注意不能设置watch,这样才能避免羊群效应

    如果步骤1中创建的节点序号最低,则该client获得锁,开始执行其它程序

    client对lock-xxx中序号仅次于自己创建节点的那个节点调用exists(),并设置watch

    如果exist()返回false(节点不存在)则回到步骤2,否则等待步骤4中的watch被触发并返回步骤2

 

分布式锁在zookeeper的源代码中已经有实现,可以参考org.apache.zookeeper.recipes.lock

 

下面是一个使用分布式锁的样例,这段程序摘自一个hadoopreduce的configure函数, 使用分布式锁的目的是确保一台机器上的所有reduce进程中,只有一个reduce进程会执行某些初始化代码. 同时其它reduce在总和初始化完成之前不会继续执行.

     

class zkWatcherimplements Watcher {

     //watch回调函数

    public void process(WatchedEvent event) {

         if (event.getType() ==EventType.NodeCreated) {

            if (event.getPath() =="balbalbal.init_done"

            //如果回调信息是节点创建,且创建的节点是init成功节点,则触发latch

                  gcihInitLatch.countDown();

        } else if (event.getState() ==KeeperState.SyncConnected) {

            //server连接成功,触发连接成功latch

            zkConnectedLatch.countDown();

         }

    }

}

public voidconfigure(String conf) {

    try {

 

        //zookeeper服务器列表,节点间用,分隔

        String keepers = "zk_server0:port,zk_server1:port,zk_server2:port";

        String Init_Done ="/full-dump-gcih/"

                +InetAddress.getLocalHost().getHostName() + ".init_done";

        String HostName =InetAddress.getLocalHost().getHostName();

 

        // 初始化一个Watch

        zkWatcher zkw = new zkWatcher();

        //异步创建连接, 并设置zkw为watch回调

        ZooKeeper zk = new ZooKeeper(keepers,5000, zkw);

        //等待zookeeper创建连接成功

        zkConnectedLatch.await();

        //创建分布式锁

        WriteLock gcih_lock = new WriteLock(zk,"/full-dump-gcih/" + HostName, null);

        //检测初始化成功标识是否存在,并设置watch

        if (null == zk.exists(Init_Done, true)){

            // if the init_done node not existswe try to init

            if (gcih_lock.lock()) {

                //获取锁成功,初始化数据

                initializeData(conf);

                //创建初始化成功标识,注意这个标志是永久节点

                zk.create(Init_Done, null,Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

                //工作完成,释放锁

                gcih_lock.unlock();

            } else {

                //未获取锁,说明已经有reduce在做初始化了,释放锁

                gcih_lock.unlock();

                if (!gcihInitLatch.await(30,TimeUnit.MINUTES))

                    throw new IOException(

                            "Init UDP timeout, critical error");

                else {

                    //latch成功返回,说明the one 初始化成功了

                    initializeData(null);

                }

            }

        } else {// if init_done exists wesimply load data from gcih

            initializeData(null);

        }

     }catch (Exception e) {

        .....

    }

  }

 

多个reduce分别获取锁后,加锁节点的子节点信息如下所示

1

2

     

[zk:localhost:2181(CONNECTED) 31] ls /full-dump-gcih/xxxxx.cm2

[x-84692699318388014-0000000001,x-84692699318387993-0000000000]

 

这些节点全部是Sequence+Ephemeral 属性的节点, 其中

1

2

     

x-84692699318388014-000000000

name-zk_session_id-sequence_number

 

这个节点名称是org.apache.zookeeper.recipes.lock中使用的名称,可以根据需要自己重新实现相关代码,进而设计一个专用的锁.

 

 

1.8            云平台hadoop子项目SQOOP

 

Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

详细的介绍和使用手册。

http://archive.cloudera.com/cdh/3/sqoop-1.2.0-CDH3B4/SqoopUserGuide.html

抱歉!评论已关闭.