转载

MapReduce读取Mysql统计分析再将结果写入mysql中供动态可视化

MapReduce读取Mysql统计分析再将结果写入mysql中供动态可视化

1.前言

最近做一个小项目,其中一个功能是,使用hadoop的MapReduce程序来读取Mysql数据库的某表数据,然后MR进行类别统计,然后再将统计结果写入mysql的另一张表中,最后使用jsp页面调用Echarts读取数据库来动态可视化结果。

先上一张效果图:
最终效果图
本篇先介绍MapReduce读取Mysql统计分析再将结果写入mysql中。
另一篇则是web项目中使用jsp调用Echarts读取数据库来动态可视化结果。

2.MapReduce读取Mysql

搜编百度,都没有一篇完整的mapreduce读取mysql数据,统计分析,最后写入mysql的文章。(so,是不是要点个关注?)

本机环境:

  • centos6.9 64x
  • mysql5.5
  • hadoop2.7
  • eclipse4.5
  • echarts

2.1、建立数据库表:

在mysql中新建了一个数据库test,然后在test中新建了两张表,分别叫mptest,xieru表。

建表语句:

-- MySQL dump 10.13 Distrib 5.5.55, for Linux (x86_64)
--
-- Host: localhost Database: test
-- -----------------------zoutao-------------------------------
-- Server version 5.5.55-log

DROP TABLE IF EXISTS `mptest`;
CREATE TABLE `mptest` (
  `id` varchar(30) NOT NULL DEFAULT '',
  `name` varchar(255) DEFAULT NULL,
  `txt` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

LOCK TABLES `mptest` WRITE;
INSERT INTO `mptest` VALUES ('1','zhangsan','javascript'),('2','lisi','C'),('3','wangwu','C++'),('4','chenliu','java'),('5','zoutao5','java'),('6','zoutao','python');
UNLOCK TABLES;

DROP TABLE IF EXISTS `xieru`;
CREATE TABLE `xieru` (
  `word` varchar(255) DEFAULT NULL COMMENT 'leibie',
  `count` int(11) DEFAULT NULL COMMENT 'shul'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='xie';

如图:
在这里插入图片描述
xieru表:在这里插入图片描述
最后我们的数据库里面:
在这里插入图片描述
为了防止有数据,我们先清空一下的xieru表:

truncate table xieru;	
select * from xieru;

如图:在这里插入图片描述

2.2、创建mapreduce程序

打开位于liunx系统上的eclipse,创建一个mapreduce项目:
在这里插入图片描述
随便取名字吧,能看懂这个文章的也不可能不会这些操作,我就不写了。
项目创建完成以后:大概是这样的
在这里插入图片描述

2.3、导入mysql的驱动jar包

安装了mysql的,里面有个驱动包。没有的你就百度自行下载一个就是了。
mysql-connector-java-x.x.xxx.jar,把这个jar包导入到刚才新建的mp程序里面去。

在这里插入图片描述
开始写代码,太墨迹了。

2.4、创建实体类

新建包,新建MyDBWritable.java。

MyDBWritable :

package com.xfbshop;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/** * * @author zoutao * 实体类-来自于mptest表和xieru表 * 不同表的字段我也写在了一起。没关系 */
public class MyDBWritable implements DBWritable, Writable {
	
	private String id;
	private String name;
	private String txt;
	
	//xieru
	private String word = ""; 
	private int count = 0;


	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getTxt() {
		return txt;
	}

	public void setTxt(String txt) {
		this.txt = txt;
	}
	
	//xiugai
	public String getWord() {
		return word;
	}

	public void setWord(String word) {
		this.word = word;
	}

	public int getCount() {
		return count;
	}

	public void setCount(int count) {
		this.count = count;
	}
	
	
//防止空
	@Override
	public boolean equals(Object o) {
		if (this == o)
			return true;
		if (o == null || getClass() != o.getClass())
			return false;

		MyDBWritable that = (MyDBWritable) o;
		if (id != null ? !id.equals(that.id) : that.id != null)
			return false;
		if (name != null ? !name.equals(that.name) : that.name != null)
			return false;
		return txt != null ? txt.equals(that.txt) : that.txt == null;
	}

// hashCode方法的主要作用是为了配合基于散列的集合一起正常运行
	@Override
	public int hashCode() {
		int result = id != null ? id.hashCode() : 0;
		result = 31 * result + (name != null ? name.hashCode() : 0);
		result = 31 * result + (txt != null ? txt.hashCode() : 0);
		return result;
	}

	// 串行化-xiugai
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(id);
		out.writeUTF(name);
		out.writeUTF(txt);
		
		out.writeUTF(word);
		out.writeInt(count);
	}

	// 反串行化-xiugai
	@Override
	public void readFields(DataInput in) throws IOException {
		id = in.readUTF();
		name = in.readUTF();
		txt = in.readUTF();
		
		word = in.readUTF();
		count = in.readInt();
	}

	// 写入DB--xiugai
	@Override
	public void write(PreparedStatement ps) throws SQLException {
		/*ps.setString(1, id); ps.setString(2, name); ps.setString(3, txt);*/
		
		ps.setString(1,word);
		ps.setInt(2,count);
	}

	// 从DB读取
	@Override
	public void readFields(ResultSet rs) throws SQLException {
		id = rs.getString(1);
		name = rs.getString(2);
		txt = rs.getString(3);
	}
}

2.5、Mapper 类

接下里是mapper读取数据库内容,获取需要统计的字段,转换输出格式为text—intwritable。

这几个地方的类型要一致:
在这里插入图片描述
JDBCMapper.java:

package com.xfbshop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/** * @author zoutao * @date:2019/2/14 */
public class JDBCMapper extends Mapper<LongWritable, MyDBWritable, Text, IntWritable>{
	@Override
	protected void map(LongWritable key, MyDBWritable value, Context context) throws IOException, InterruptedException {
		System.out.println("key--->"+key);
		//拿到一行文本内容,记得要是String类型 
		String line = value.getTxt();
		System.out.println(value.getId() + "-->" + value.getName()+"--->"+value.getTxt());
		//将这行文本切分成单词,以空格切分
		String[] arr = line.split(" ");
		for(String s : arr){
			//<单词,1>
			context.write(new Text(s),new IntWritable(1));
		}
	}
}

2.6、reducer类

reducer进行聚合统计单词的个数,
这几个地方的类型要一致:
在这里插入图片描述
JDBCReducer.java:


package com.xfbshop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/** * @author zoutao * @date:2019/2/14 */
//xiugai
public class JDBCReducer extends Reducer<Text, IntWritable, MyDBWritable, NullWritable>  {
	protected void reduce(Text key,Iterable<IntWritable> values, Context context) 
    		throws IOException, InterruptedException {
    	System.out.print("reduce被执行了》》》》》》》》》》》》》》》》》》》》》》》》》");
		//定义一个计数器
		int count = 0;
		//遍历这一组kv的所有v,累加到count中
		for (IntWritable value : values) {
			count += value.get();
		}
		System.out.print("总数:"+count);
		//context.write(key, new IntWritable(count));
		// xiugai
		MyDBWritable keyOut = new MyDBWritable();
		keyOut.setWord(key.toString());
		keyOut.setCount(count);
		System.out.print("测试:"+NullWritable.get());
		context.write(keyOut, NullWritable.get());
	}
}

2.7、Run主类

这里输出格式一定要注意对应。还要设置好保存的数量。
MapReduce默认提供了DBInputFormat和DBOutputFormat,分别用于数据库的读取和数据库的写入。

读取mptest表:name、txt字段,统计txt中的数据类型和总数。
写入xieru表:word、conut字段。存放统计的结果。

JDBCApp.java源码:

package com.xfbshop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;


import java.io.IOException;

/** * @author zoutao * @date:2019/2/14 */
public class JDBCApp {
	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {
		// 单例作业
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "file:///");
		Job job = Job.getInstance(conf);
		System.setProperty("hadoop.home.dir", "/app/hadoop-2.7.1");

		// 设置job的各种属性
		job.setJobName("MySQLApp"); // 设置job名称
		job.setJarByClass(JDBCApp.class); // 设置搜索类
		job.setInputFormatClass(DBInputFormat.class); //read
		
		String driverClass = "com.mysql.jdbc.Driver";
		String url = "jdbc:mysql://192.168.0.81:3306/test";
		String userName = "mysql";
		String passWord = "";
		// 设置数据库配置
		DBConfiguration.configureDB(job.getConfiguration(), driverClass, url,
				userName, passWord);
		
		// 设置数据输入内容-sql查询数据作为输入数据
		DBInputFormat.setInput(job, MyDBWritable.class,
				"select id,name,txt from mptest",
				"select count(*) from mptest");

		// 设置输出路径-输出一个结果文件到本地
		Path path = new Path("/home/zoutao");
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(path)) {
			fs.delete(path, true);
		}
		
		//FileOutputFormat.setOutputPath(job, path);
		//把结果写入到数据表中对应字段
		DBOutputFormat.setOutput(job,"xieru","word","count");
		
		job.setMapperClass(JDBCMapper.class); // 设置mapper类
		job.setReducerClass(JDBCReducer.class); // 设置reduecer类
		job.setMapOutputKeyClass(Text.class); // 设置之map输出key
		job.setMapOutputValueClass(IntWritable.class); // 设置map输出value
		job.setOutputKeyClass(Text.class); // 设置reduce 输出key
		job.setOutputValueClass(IntWritable.class); // 设置reduce输出value
		// zhixing
		job.setNumReduceTasks(1);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
		//job.waitForCompletion(true);
	}

}

完成代码的编写以后。
运行hadoop。

2.8、启动hadoop集群:

启动你的hadoop集群,start-all.sh,然后jps查看一下是否已经启动:在这里插入图片描述

2.9、运行mapreduce程序:

我是搭建好环境了的,于是在eclipse里面直接运行。右键run as:

读取过程:在这里插入图片描述
计算过程:在这里插入图片描述
写入完成:
在这里插入图片描述
运行完成。

3.0、查看结果:

首先看一下我们本地产生的文件:(可以不看)
在这里插入图片描述
再去mysql里面查看一下是否完成?
将统计分析出来的word和count,(类型和总数),又写入到新的数据表xieru中:
在这里插入图片描述上述代码整个贴出了,实现的是从MapReduce读取Mysql统计分析再将结果写入mysql,顺便还写了保存到本地生成txt文本。

下一篇是 :Jsp+Servlet+Echarts实现动态数据可视化


整个项目后期会提供下载地址,如果你还是不会,那么可以留下邮箱,发给你。
正文到此结束
本文目录