现在的位置: 首页 > 综合 > 正文

MapReduce Design Patterns(chapter 2 (part 2))(三)

2017年11月15日 ⁄ 综合 ⁄ 共 7020字 ⁄ 字号 评论关闭

Median and standard deviation

中值和标准差的计算比前面的例子复杂一点。因为这种运算是非关联的,它们不是那么容易的能从combiner中获益。中值是将数据集一分为两等份的数值类型,一份比中值大,一部分比中值小。这需要数据集按顺序完成清洗。数据必须是排序的,但存在一定障碍,因为MapReduce不会根据values排序。

 

方差告诉我们数据跟平均值之间的差异程度。这就要求我们之前要先找到平均值。执行这种操作最容易的方法是复制值得列表到临时列表,以便找到中值,或者再一次迭代集合所有数据得到标准差。对大的数据量,这种实现可能导致java堆空间的问题,引文每个输入组的每个值都放进内存处理。下一个例子就是针对这种问题的。

 

问题:给出用户评论,计算一天中每个小时评论长度的中值和标准差。

 

Mapper codeMapper会处理每条输入记录计算一天内每个小时评论长度的中值(貌似事实不是这样)。输出键是小时,输出值是评论长度。

 

public static class
MedianStdDevMapper extends

Mapper<Object,
Text,
IntWritable,
IntWritable> {

private
IntWritable outHour =
new IntWritable();

private
IntWritable outCommentLength =
new IntWritable();

private final static
SimpleDateFormat frmt =
new SimpleDateFormat(

"yyyy-MM-dd'T'HH:mm:ss.SSS");

public
void
map(Object key,
Text value,
Context context)

throws
IOException,
InterruptedException {

Map<String,
String>
parsed = transformXmlToMap(value.toString());

// Grab the "CreationDate" field,

// since it is what we are grouping by

String strDate
= parsed.get("CreationDate");

// Grab the comment to find the length

String text
= parsed.get("Text");

// get the hour this comment was posted in

Date creationDate
= frmt.parse(strDate);

outHour.set(creationDate.getHours());

// set the comment length

outCommentLength.set(text.length());

// write out the user ID with min max dates and count

context.write(outHour,
outCommentLength);

}

}

 

Reducer codeReducer会迭代给定值得集合,并把每个值加到内存列表里。同时也会计算一个动态的sumcount。迭代之后,评论长度被排序,以便找出中值。如果数量是偶数,中值是中间两个数的平均值。下面,根据动态的sumcount计算出平均值,然后迭代排序的列表计算出标准差。每个数跟平均值的差的平方累加求和保存在一个动态sum中,这个sum的平方根就是标准差。最后输出key,中值和标准差。

 

public static class
MedianStdDevReducer extends

Reducer<IntWritable,
IntWritable,

IntWritable,
MedianStdDevTuple> {

private
MedianStdDevTuple result =
new MedianStdDevTuple();

private
ArrayList<Float>
commentLengths =
new ArrayList<Float>();

public
void
reduce(IntWritable key,
Iterable<IntWritable>
values,

Context context)
throws IOException,
InterruptedException {

float
sum =
0;

float
count =
0;

commentLengths.clear();

result.setStdDev(0);

// Iterate through all input values for this key

for
(IntWritable val
: values) {

commentLengths.add((float)
val.get());

sum
+= val.get();

++count;

}

// sort commentLengths to calculate median

Collections.sort(commentLengths);

// if commentLengths is an even value, average middle two elements

if
(count
% 2
== 0) {

result.setMedian((commentLengths.get((int)
count /
2 - 1) +

commentLengths.get((int)
count /
2)) / 2.0f);

}
else {

// else, set median to middle value

result.setMedian(commentLengths.get((int)
count /
2));

}

// calculate standard deviation

float
mean =
sum /
count;

float
sumOfSquares =
0.0f;

for
(Float f
: commentLengths) {

sumOfSquares
+= (f
- mean) * (f
- mean);

}

result.setStdDev((float)
Math.sqrt(sumOfSquares
/ (count
- 1)));

context.write(key,
result);

}

}

 

Combiner optimization。这种情况下不能用combinerreducer需要所有的值去计算中值和标准差。因为combiner仅仅在一个map本地处理中间键值对。计算完整的中值,和标准值是不可能的。下面的例子是一种复杂一点的使用自定义的combiner的实现。

 

Memory-conscious median and standard deviation

下面的例子跟前一个不同,并减少了内存的使用。把值放进列表会导致很多重复的元素。一种去重的方法是标记元素的个数。例如,对于列表< 1, 1, 1, 1, 2, 2, 3,4, 5, 5, 5 >,可以用一个sorted map保存:(14,
2
2, 31, 41, 53)。核心的原理是一样的:reduce阶段会迭代所有值并放入内存数据结构中。数据结构和搜索的方式是改变的地方。Map很大程度上减少了内存的使用。前一个例子使用list,复杂度为On),n是评论条数,本例使用map,使用键值对,为Omaxm)),m是评论长度的最大值。作为额外的补充,combiner的使用能帮助聚合评论长度的数目,并通过writable对象输出reducer端将要使用的这个map

 

问题:同前一个。

 

Mapper codeMapper处理输入记录,输出键是小时,值是sortedmapwritable对象,包含一个元素:评论长度和计数1.这个mapreducercombiner里多处用到。

 

public static class
MedianStdDevMapper extends

Mapper<lObject,
Text,
IntWritable,
SortedMapWritable> {

private
IntWritable commentLength =
new IntWritable();

private static final
LongWritable ONE =
new LongWritable(1);

private
IntWritable outHour =
new IntWritable();

private final static
SimpleDateFormat frmt =
new SimpleDateFormat(

"yyyy-MM-dd'T'HH:mm:ss.SSS");

public
void
map(Object key,
Text value,
Context context)

throws
IOException,
InterruptedException {

Map<String,
String>
parsed = transformXmlToMap(value.toString());

// Grab the "CreationDate" field,

// since it is what we are grouping by

String strDate
= parsed.get("CreationDate");

// Grab the comment to find the length

String text
= parsed.get("Text");

// Get the hour this comment was posted in

Date creationDate
= frmt.parse(strDate);

outHour.set(creationDate.getHours());

commentLength.set(text.length());

SortedMapWritable outCommentLength
= new
SortedMapWritable();

outCommentLength.put(commentLength,
ONE);

// Write out the user ID with min max dates and count

context.write(outHour,
outCommentLength);

}

}

 

Reducer codeReducer通过迭代上面的map生成一个大的treemapkey是评论长度,value是这个长度的评论的数目。

 

迭代以后,中值被计算出来。中值的索引由评论总数除以2得出。然后迭代treemapentrySet找到key,需满足条件为:previousCommentCount
medianIndex < commentCount
,把treeMap的值加到每一步迭代的评论里。一旦条件满足,如果有偶数条评论且中值索引等于前一条评论的,中值取前一个的长度和当前长度的平均值。否则,中值就是当前评论的长度。

 

接下来,再一次迭代treemap,计算出平方和,确保相关联的评论长度和数目相乘。标准差就根据平方和算出来了。中值和标准差就随着key一块输出。

public static class
MedianStdDevReducer extends

Reducer<IntWritable,
SortedMapWritable,

IntWritable,
MedianStdDevTuple> {

private
MedianStdDevTuple result =
new MedianStdDevTuple();

private
TreeMap<Integer,
Long>
commentLengthCounts =

new
TreeMap<Integer,
Long>();

public
void
reduce(IntWritable key,
Iterable<SortedMapWritable>
values,

Context context)
throws IOException,
InterruptedException {

float
sum =
0;

long
totalComments =
0;

commentLengthCounts.clear();

result.setMedian(0);

result.setStdDev(0);

for
(SortedMapWritable v
: values) {

for
(Entry<WritableComparable,
Writable>
entry : v.entrySet()) {

int
length = ((IntWritable)
entry.getKey()).get();

long
count = ((LongWritable)
entry.getValue()).get();

totalComments
+= count;

sum
+= length
* count;

Long storedCount
= commentLengthCounts.get(length);

if
(storedCount
== null) {

commentLengthCounts.put(length,
count);

}
else {

commentLengthCounts.put(length,
storedCount +
count);

}

}

}

long
medianIndex =
totalComments /
2L;

long
previousComments =
0;

long
comments =
0;

int
prevKey =
0;

for
(Entry<Integer,
Long>
entry : commentLengthCounts.entrySet()) {

comments
= previousComments
+ entry.getValue();

if
(previousComments
medianIndex
&& medianIndex
< comments) {

if
(totalComments
% 2
== 0 &&
previousComments ==
medianIndex) {

result.setMedian((float)
(
entry.getKey() +
prevKey) /
2.0f);

}
else {

result.setMedian(entry.getKey());

}

break;

}

previousComments
= comments;

prevKey
= entry.getKey();

}

// calculate standard deviation

float
mean =
sum /
totalComments;

float
sumOfSquares =
0.0f;

for
(Entry<Integer,
Long>
entry : commentLengthCounts.entrySet()) {

sumOfSquares
+= (entry.getKey() -
mean) * (entry.getKey() -
mean) *

entry.getValue();

}

result.setStdDev((float)
Math.sqrt(sumOfSquares
/ (totalComments
- 1)));

context.write(key,
result);

}

}

 

Combiner optimization。跟前面的例子不同,这里combiner的逻辑跟reducer不同。Reducer计算中值和标准差,而combiner对每个本地map的中间键值对聚合sortedMapWritable条目。代码解析这些条目并在本地map聚合它们,这跟前面部分的reducer代码是相同的。这里用一个hashmap替换treemap,因为不需要排序,且hashmap更快。Reducer使用map计算中值和标准差,而combiner是用sortedMapWritable序列化为reduce阶段做准备。

 

public static class
MedianStdDevCombiner extends

Reducer<IntWritable,
SortedMapWritable,
IntWritable,
SortedMapWritable> {

protected
void
reduce(IntWritable key,

Iterable<SortedMapWritable>
values,
Context context)

throws
IOException,
InterruptedException {

SortedMapWritable outValue
= new
SortedMapWritable();

for
(SortedMapWritable v
: values) {

for
(Entry<WritableComparable,
Writable>
entry : v.entrySet()) {

LongWritable count
= (LongWritable)
outValue.get(entry.getKey());

if
(count
!= null) {

count.set(count.get()

+ ((LongWritable)
entry.getValue()).get());

}
else {

outValue.put(entry.getKey(),
new LongWritable(

((LongWritable)
entry.getValue()).get()));

}

}

}

context.write(key,
outValue);

}

}

 

Data flow diagram。图2-4展示了例子的数据流程图

Figure 2-4. Data flow for the standard deviation example

抱歉!评论已关闭.