现在的位置: 首页 > 综合 > 正文

hadoop下kmeans算法实现四

2018年04月13日 ⁄ 综合 ⁄ 共 5907字 ⁄ 字号 评论关闭

KMapper.java

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class KMapper extends Mapper<LongWritable, Text, Text, Text> {
    
    private String[] center;
    //读取3.txt中更新的中心点坐标,并将坐标存入center数组中
    protected void setup(Context context) throws IOException,InterruptedException  //read centerlist, and save to center[]
    {
        String centerlist = "hdfs://localhost:9000/home/administrator/hadoop/kmeans/input2/3.txt"; //center文件
        Configuration conf1 = new Configuration();
        conf1.set("hadoop.job.ugi", "hadoop-user,hadoop-user");
       FileSystem fs = FileSystem.get(URI.create(centerlist),conf1);
       FSDataInputStream in = null;
       ByteArrayOutputStream out = new ByteArrayOutputStream();
       try{
             
           in = fs.open( new Path(centerlist) );
           IOUtils.copyBytes(in,out,100,false);  
           center = out.toString().split(" ");
           }finally{
                IOUtils.closeStream(in);
            }
    }
    //从hadoop接收的数据在2.txt中保存
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
    {
        StringTokenizer itr = new StringTokenizer(value.toString());
        //从2.txt读入数据,以空格为分割符,一个一个处理
        while(itr.hasMoreTokens())//用于判断所要分析的字符串中,是否还有语言符号,如果有则返回true,反之返回false
        {
            
            //计算第一个坐标跟第一个中心的距离min
            String outValue = new String(itr.nextToken());//逐个获取以空格为分割符的字符串(2,3) (10,30) (34,40) (1,1)
            String[] list = outValue.replace("(", "").replace(")", "").split(",");
            String[] c = center[0].replace("(", "").replace(")", "").split(",");
            float min = 0;
            int pos = 0;
            for(int i=0;i<list.length;i++)
            {
                System.out.println(i+"list:"+list[i]);
                System.out.println(i+"c:"+c[i]);
                min += (float) Math.pow((Float.parseFloat(list[i]) - Float.parseFloat(c[i])),2);//求欧式距离,为加根号
            }
            
            
            for(int i=0;i<center.length;i++)
            {
                String[] centerStrings = center[i].replace("(", "").replace(")", "").split(",");
                float distance = 0;
                for(int j=0;j<list.length;j++)
                    distance += (float) Math.pow((Float.parseFloat(list[j]) - Float.parseFloat(centerStrings[j])),2);
                if(min>distance)
                {
                    min=distance;
                    pos=i;
                }
            }
            context.write(new Text(center[pos]), new Text(outValue));//输出:中心点,对应的坐标
            System.out.println("中心点"+center[pos]+"对应坐标"+outValue);
            System.out.println("Mapper输出:"+center[pos]+" "+outValue);
        }
    }

}

KReduce.java

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class KReducer extends Reducer<Text, Text, Text, Text> {
    //<中心点类别,中心点对应的坐标集合>,每个中心点类别的坐标集合求新的中心点
    
    public void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException
    {
        String outVal = "";
        int count=0;
        String center="";
        System.out.println("Reduce过程第一次");
        System.out.println(key.toString()+"Reduce");
        int length = key.toString().replace("(", "").replace(")", "").replace(":", "").split(",").length;
        float[] ave = new float[Float.SIZE*length];
        for(int i=0;i<length;i++)
            ave[i]=0; 
        for(Text val:value)
        {
            System.out.println("val:"+val.toString());
            System.out.println("values:"+value.toString());
            outVal += val.toString()+" ";
            String[] tmp = val.toString().replace("(", "").replace(")", "").split(",");
            System.out.println("temlength:"+tmp.length);
            for(int i=0;i<tmp.length;i++)
                ave[i] += Float.parseFloat(tmp[i]);
            count ++;
        }
        System.out.println("count:"+count);
        System.out.println("outVal:"+outVal+"/outVal");
        for (int i=0;i<2;i++)
        {
            System.out.println("ave"+i+"i"+ave[i]);
        }
        //ave[0]存储X坐标之和,ave[1]存储Y坐标之和
        for(int i=0;i<length;i++)
        {
            ave[i]=ave[i]/count;
            if(i==0)
                center += "("+ave[i]+",";
            else {
                if(i==length-1)
                    center += ave[i]+")";
                else {
                    center += ave[i]+",";
                }
            }
        }
        System.out.println("写入part:"+key+" "+outVal+" "+center);
        context.write(key, new Text(outVal+center));
    }

}


NewCenter.java

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class NewCenter {
	
	int k = 2;
	float shold=Integer.MIN_VALUE;
	String[] line;
	String newcenter = new String("");
	
	public float run(String[] args) throws IOException,InterruptedException
	{
		Configuration conf = new Configuration();
		conf.set("hadoop.job.ugi", "hadoop,hadoop"); 
		FileSystem fs = FileSystem.get(URI.create(args[2]+"/part-r-00000"),conf);
		FSDataInputStream in = null;
		ByteArrayOutputStream out = new ByteArrayOutputStream();
		try{ 
			in = fs.open( new Path(args[2]+"/part-r-00000")); 
			IOUtils.copyBytes(in,out,50,false);
			line = out.toString().split("\n");
			} finally { 
				IOUtils.closeStream(in);
			}
	
		//System.out.println("上一次的MapReduce结果:"+out.toString());
		System.out.println("上一次MapReduce结果:第一行:"+line[0]);
		System.out.println("第二行:"+line[1]);
		System.out.println("。");
		for(int i=0;i<k;i++)
		{
			String[] l = line[i].replace("\t", " ").split(" ");//如果这行有tab的空格,可以替代为空格
			//(key,values)key和values同时输出是,中间保留一个Tab的距离,即'\t'
			String[] startCenter = l[0].replace("(", "").replace(")", "").split(",");
			//上上次的中心点startCenter[0]=(10,30);startCenter[1]=(2,3);
			String[] finalCenter = l[l.length-1].replace("(", "").replace(")", "").split(",");
			//上一次的中心点finalCenter[0]=(22,35);finalCenter[1]=(1.5,2.0);
			float tmp = 0;
			for(int j=0;j<startCenter.length;j++)
				tmp += Math.pow(Float.parseFloat(startCenter[j])-Float.parseFloat(finalCenter[j]), 2);
			//两个中心点间的欧式距离的平方
			newcenter = newcenter + l[l.length - 1].replace("\t", "") + " ";
			if(shold <= tmp)
				shold = tmp;
			System.out.println(i+"坐标距离:"+tmp);
		}
		System.out.println("新中心点:"+newcenter);
		OutputStream out2 = fs.create(new Path(args[1]+"/center/3.txt") ); 
		IOUtils.copyBytes(new ByteArrayInputStream(newcenter.getBytes()), out2, 4096,true);
		//System.out.println(newcenter);
		return shold;
		//return 0;
	}

}

抱歉!评论已关闭.