现在的位置: 首页 > 综合 > 正文

MapReduce直接连接Mysql获取数据

2014年07月27日 ⁄ 综合 ⁄ 共 7114字 ⁄ 字号 评论关闭

Mysql中数据:

 

Sql代码  收藏代码
  1. mysql> select * from lxw_tbls;  
  2. +---------------------+----------------+  
  3. | TBL_NAME            | TBL_TYPE       |  
  4. +---------------------+----------------+  
  5. | lxw_test_table      | EXTERNAL_TABLE |  
  6. | lxw_t               | MANAGED_TABLE  |  
  7. | lxw_t1              | MANAGED_TABLE  |  
  8. | tt                  | MANAGED_TABLE  |  
  9. | tab_partition       | MANAGED_TABLE  |  
  10. | lxw_hbase_table_1   | MANAGED_TABLE  |  
  11. | lxw_hbase_user_info | MANAGED_TABLE  |  
  12. | t                   | EXTERNAL_TABLE |  
  13. | lxw_jobid           | MANAGED_TABLE  |  
  14. +---------------------+----------------+  
  15. rows in set (0.01 sec)  
  16.   
  17. mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME;  
  18. +---------------------+----------------+  
  19. | TBL_NAME            | TBL_TYPE       |  
  20. +---------------------+----------------+  
  21. | lxw_hbase_table_1   | MANAGED_TABLE  |  
  22. | lxw_hbase_user_info | MANAGED_TABLE  |  
  23. | lxw_jobid           | MANAGED_TABLE  |  
  24. | lxw_t               | MANAGED_TABLE  |  
  25. | lxw_t1              | MANAGED_TABLE  |  
  26. | lxw_test_table      | EXTERNAL_TABLE |  
  27. +---------------------+----------------+  
  28. rows in set (0.00 sec)  

 

MapReduce程序代码,ConnMysql.java:

 

Java代码  收藏代码
  1. package com.lxw.study;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6. import java.net.URI;  
  7. import java.sql.PreparedStatement;  
  8. import java.sql.ResultSet;  
  9. import java.sql.SQLException;  
  10. import java.util.Iterator;  
  11.   
  12. import org.apache.hadoop.conf.Configuration;  
  13. import org.apache.hadoop.filecache.DistributedCache;  
  14. import org.apache.hadoop.fs.FileSystem;  
  15. import org.apache.hadoop.fs.Path;  
  16. import org.apache.hadoop.io.LongWritable;  
  17. import org.apache.hadoop.io.Text;  
  18. import org.apache.hadoop.io.Writable;  
  19. import org.apache.hadoop.mapreduce.Job;  
  20. import org.apache.hadoop.mapreduce.Mapper;  
  21. import org.apache.hadoop.mapreduce.Reducer;  
  22. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;  
  23. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;  
  24. import org.apache.hadoop.mapreduce.lib.db.DBWritable;  
  25. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  26.   
  27. public class ConnMysql {  
  28.           
  29.         private static Configuration conf = new Configuration();  
  30.           
  31.         static {  
  32.                 conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));  
  33.                 conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));  
  34.                 conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));  
  35.                 conf.set("mapred.job.tracker""10.133.103.21:50021");  
  36.         }  
  37.           
  38.         public static class TblsRecord implements Writable, DBWritable {  
  39.                 String tbl_name;  
  40.                 String tbl_type;  
  41.   
  42.                 public TblsRecord() {  
  43.   
  44.                 }  
  45.   
  46.                 @Override  
  47.                 public void write(PreparedStatement statement) throws SQLException {  
  48.                         // TODO Auto-generated method stub  
  49.                         statement.setString(1this.tbl_name);  
  50.                         statement.setString(2this.tbl_type);  
  51.                 }  
  52.   
  53.                 @Override  
  54.                 public void readFields(ResultSet resultSet) throws SQLException {  
  55.                         // TODO Auto-generated method stub  
  56.                         this.tbl_name = resultSet.getString(1);  
  57.                         this.tbl_type = resultSet.getString(2);  
  58.                 }  
  59.   
  60.                 @Override  
  61.                 public void write(DataOutput out) throws IOException {  
  62.                         // TODO Auto-generated method stub  
  63.                         Text.writeString(out, this.tbl_name);  
  64.                         Text.writeString(out, this.tbl_type);  
  65.                 }  
  66.   
  67.                 @Override  
  68.                 public void readFields(DataInput in) throws IOException {  
  69.                         // TODO Auto-generated method stub  
  70.                         this.tbl_name = Text.readString(in);  
  71.                         this.tbl_type = Text.readString(in);  
  72.                 }  
  73.   
  74.                 public String toString() {  
  75.                         return new String(this.tbl_name + " " + this.tbl_type);  
  76.                 }  
  77.   
  78.         }  
  79.   
  80.         public static class ConnMysqlMapper extends Mapper<LongWritable,TblsRecord,Text,Text> {  
  81.                 public void map(LongWritable key,TblsRecord values,Context context)   
  82.                                 throws IOException,InterruptedException {  
  83.                         context.write(new Text(values.tbl_name), new Text(values.tbl_type));  
  84.                 }  
  85.         }  
  86.           
  87.         public static class ConnMysqlReducer extends Reducer<Text,Text,Text,Text> {  
  88.                 public void reduce(Text key,Iterable<Text> values,Context context)   
  89.                                 throws IOException,InterruptedException {  
  90.                         for(Iterator<Text> itr = values.iterator();itr.hasNext();) {  
  91.                                 context.write(key, itr.next());  
  92.                         }  
  93.                 }  
  94.         }  
  95.           
  96.         public static void main(String[] args) throws Exception {  
  97.                 Path output = new Path("/user/lxw/output/");  
  98.                   
  99.                 FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);  
  100.                 if (fs.exists(output)) {  
  101.                         fs.delete(output);  
  102.                 }  
  103.                   
  104.                 //mysql的jdbc驱动  
  105.                 DistributedCache.addFileToClassPath(new Path(    
  106.                           "hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf);    
  107.                   
  108.                 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",    
  109.                           "jdbc:mysql://10.133.103.22:3306/hive""hive""hive");    
  110.                   
  111.                 Job job = new Job(conf,"test mysql connection");  
  112.                 job.setJarByClass(ConnMysql.class);  
  113.                   
  114.                 job.setMapperClass(ConnMysqlMapper.class);  
  115.                 job.setReducerClass(ConnMysqlReducer.class);  
  116.                   
  117.                 job.setOutputKeyClass(Text.class);  
  118.                 job.setOutputValueClass(Text.class);  
  119.                   
  120.                 job.setInputFormatClass(DBInputFormat.class);  
  121.                 FileOutputFormat.setOutputPath(job, output);  
  122.                   
  123.                 //列名  
  124.                 String[] fields = { "TBL_NAME""TBL_TYPE" };   
  125.                 //六个参数分别为:  
  126.                 //1.Job;2.Class<? extends DBWritable>  
  127.                 //3.表名;4.where条件  
  128.                 //5.order by语句;6.列名  
  129.                 DBInputFormat.setInput(job, TblsRecord.class,  
  130.                      "lxw_tbls""TBL_NAME like 'lxw%'""TBL_NAME", fields);    
  131.                   
  132.                 System.exit(job.waitForCompletion(true) ? 0 : 1);  
  133.         }  
  134.           
  135. }  

 

运行结果:

 

Java代码  收藏代码
  1. [lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000  
  2. lxw_hbase_table_1       MANAGED_TABLE  
  3. lxw_hbase_user_info     MANAGED_TABLE  
  4. lxw_jobid       MANAGED_TABLE  
  5. lxw_t   MANAGED_TABLE  
  6. lxw_t1  MANAGED_TABLE  
  7. lxw_test_table  EXTERNAL_TABLE  

 

抱歉!评论已关闭.