Skip to content

Commit

Permalink
feat: 补充mapreduce用例
Browse files Browse the repository at this point in the history
  • Loading branch information
TomShiDi committed Feb 29, 2024
1 parent 51b448a commit d0f362c
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.tomshidi.hadoop.mapreduce.grouping;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* 自定义分组器,只对对象中某一属性做比较
* @author TomShiDi
* @since 2024/2/27 21:12
*/
public class DiyGrouping extends WritableComparator {
public DiyGrouping() {
super(OrderBean.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
return bean1.getOrderId().compareTo(bean2.getOrderId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.tomshidi.hadoop.mapreduce.grouping;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* @author TomShiDi
* @since 2024/2/27 20:46
*/
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;

@Override
public int compareTo(OrderBean o) {
// 判断是否为同一订单下
int result = o.getOrderId().compareTo(this.orderId);
if (result == 0) {
return o.getPrice().compareTo(this.price);
}
return result;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}

@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}

@Override
public String toString() {
return orderId + "\t" + price;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public Double getPrice() {
return price;
}

public void setPrice(Double price) {
this.price = price;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.tomshidi.hadoop.mapreduce.grouping;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* @author TomShiDi
* @since 2023/5/24 15:51
*/
public class OrderDriver {

/**
* 数据输入、MapTask、分区(Partitioner)、排序、Combiner、分组、ReduceTask
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "grouping");
// 2.设置jar包路径
job.setJarByClass(OrderDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 设置分组器
job.setGroupingComparatorClass(DiyGrouping.class);
// 4.设置map输出的kv类型
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(Text.class);
// 5.设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置文件合并输入
job.setInputFormatClass(CombineTextInputFormat.class);
// 设置逻辑切分的数据块大小
// 逻辑切片数决定MapTask的个数,并不是由物理切片block决定
// 切片边界机制:剩余内容大小小于切片大小的10%,则剩余内容不额外切片,合并到上一个切片
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\Personal-Projects\\tomshidi-springcloud-demo\\hadoop-demo\\src\\main\\resources\\input\\order"));
FileOutputFormat.setOutputPath(job, new Path("D:\\Personal-Projects\\tomshidi-springcloud-demo\\hadoop-demo\\src\\main\\resources\\output\\order"));
// 7.提交job
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.tomshidi.hadoop.mapreduce.grouping;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @author TomShiDi
* @since 2024/2/27 21:35
*/
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, Text> {

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, Text>.Context context) throws IOException, InterruptedException {
String data = value.toString();
String[] fields = data.split(" ");
if (fields.length < 3) {
return;
}
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(fields[0]);
orderBean.setPrice(Double.valueOf(fields[2]));
context.write(orderBean, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.tomshidi.hadoop.mapreduce.grouping;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* @author TomShiDi
* @since 2024/2/27 21:27
*/
public class OrderReducer extends Reducer<OrderBean, Text, Text, NullWritable> {

@Override
protected void reduce(OrderBean key, Iterable<Text> values, Reducer<OrderBean, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
// 因为在suffle阶段做了排序,所以同一组第一条记录的价格为最大
Text first = values.iterator().next();
context.write(first, NullWritable.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ public class WordCountDriver {

/**
* 数据输入、MapTask、分区(Partitioner)、排序、Combiner、分组、ReduceTask
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Expand All @@ -34,6 +30,8 @@ public static void main(String[] args) throws IOException, InterruptedException,
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 设置Combiner
job.setCombinerClass(WordCountReducer.class);
// 4.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
Expand Down
12 changes: 12 additions & 0 deletions hadoop-demo/src/main/resources/input/order/data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
order000000001 good1 188
order000000002 good1 22.1
order000000001 good1 33
order000000002 good1 600.33
order000000003 good1 100
order000000001 good1 100
order000000003 good1 888.1
order000000002 good1 66
order000000002 good1 60
order000000003 good1 100
order000000003 good1 70.1
order000000001 good1 100

0 comments on commit d0f362c

Please sign in to comment.