使用MapReduce读取HBase数据存储到MySQL

2023-06-13,,

Mapper读取HBase数据

package MapReduce;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import java.io.IOException; public class CallMapper extends TableMapper<phoneInfoDBWritable,phoneInfoDBWritable>{ //将log的caller,callee,time,dur提取出来,相当于将每一行数据读取出来放入到 phoneInfo 对象中。
private phoneInfo pp = new phoneInfo();
private phoneInfoDBWritable pDB = null;
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //获取rowkey
String rowkey = new String(key.get());
//获取一行数据
Cell[] cells = value.rawCells();
// 获取的数据,通话时长,日期
String caller = "";
String callee = "";
String time = "";
String dur = "";
String flag = "";
String dateCallk = "";
//循环取出
for (Cell cell :cells){
// 取出行名称
String lineName = new String(CellUtil.cloneQualifier(cell)); // 判断打电话的人
if(lineName.equals("caller")){
caller = new String(CellUtil.cloneValue(cell));
}
// 接电话的人
if(lineName.equals("callee")){
callee = new String(CellUtil.cloneValue(cell));
}
// 判断日期
if(lineName.equals("time")){
time = new String(CellUtil.cloneValue(cell));
}
// 判断时长
if(lineName.equals("dur")){
dur = new String(CellUtil.cloneValue(cell));
}
// 判断日期
if(lineName.equals("flag")){
flag = new String(CellUtil.cloneValue(cell));
}
//01_手机号_yyyMMddhhmmss_1
String[] split = rowkey.split("_");
//截取打电话的人的电话号码
String phoneNum = split[1];
//拼接key
dateCallk = phoneNum + "_" + split[2].substring(0, 6);
//输出到文件 }
//测试输出内容
pp.setCaller(caller);
pp.setCallee(callee);
pp.setTime(time);
pp.setDur(dur);
pp.setFlag(flag);
//System.err.println("rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag);
//String string = "rowkey: " + rowkey + "-" +caller+ "-" +callee+ "-" + time + "-" +dur+ "-" +flag;
//将数据写入到mysql中
pDB = new phoneInfoDBWritable(pp);
context.write(pDB,null);
}
}

Driver配置分发任务

package MapReduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; public class MRRunner { public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); //创建configuration
conf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Job job = Job.getInstance(conf, "db store"); //实现与数据库的连接
DBConfiguration.configureDB(job.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/callphone", "root","root");
//将从HBase表中获取的数据封装写入到数据库表的格式
DBOutputFormat.setOutput(job, "phone", "caller", "callee", "time", "dur","flag"); //设置Driver
job.setJarByClass(MRRunner.class);
//设置数据输出学出到mysql的类格式
job.setOutputFormatClass(DBOutputFormat.class); //扫描HBase表
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500); //设置Mapper
job.setMapperClass(CallMapper.class);
TableMapReduceUtil.initTableMapperJob(
"phone:log",
scan,
CallMapper.class,
phoneInfoDBWritable.class,
phoneInfoDBWritable.class,
job); // 设置Reduce数量,没有使用到Reducer
job.setNumReduceTasks(0); System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

phoneInfo封装读取到的HBase

package MapReduce;

/**
* 构建phoneInfo类,将HBase表中的数据存储到phoneInfo对象中
* 实现封装数据
*/
public class phoneInfo{ private String caller;
private String callee;
private String time;
private String dur;
private String flag; public String getCaller() {
return caller;
} public void setCaller(String caller) {
this.caller = caller;
} public String getCallee() {
return callee;
} public void setCallee(String callee) {
this.callee = callee;
} public String getTime() {
return time;
} public void setTime(String time) {
this.time = time;
} public String getDur() {
return dur;
} public void setDur(String dur) {
this.dur = dur;
} public String getFlag() {
return flag;
} public void setFlag(String flag) {
this.flag = flag;
}
}

phoneInfoDBWritable实现DBWritable用于存放phoneInfo对象

package MapReduce;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException; /**
* 编写phoneInfoDBWritable类实现DBWritable,完成HBase的数据写入到指定的MySQL的序列化
*/
public class phoneInfoDBWritable implements DBWritable { private phoneInfo phoneinfo; public phoneInfoDBWritable() { } public phoneInfoDBWritable(phoneInfo phoneinfo) {
this.phoneinfo = phoneinfo;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, phoneinfo.getCaller());
statement.setString(2, phoneinfo.getCallee());
statement.setString(3, phoneinfo.getTime());
statement.setString(4, phoneinfo.getDur());
statement.setString(5, phoneinfo.getFlag());
} public void readFields(ResultSet resultSet) throws SQLException { }
}

使用MapReduce读取HBase数据存储到MySQL的相关教程结束。

《使用MapReduce读取HBase数据存储到MySQL.doc》

下载本文的Word格式文档,以方便收藏与打印。