Skip to content

Commit

Permalink
feat: 增加mapreduce使用用例
Browse files Browse the repository at this point in the history
  • Loading branch information
TomShiDi committed Feb 27, 2024
1 parent 77b7d99 commit 51b448a
Show file tree
Hide file tree
Showing 11 changed files with 341 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.tomshidi.hadoop.mapreduce.wordcount;

import com.tomshidi.hadoop.mapreduce.wordcount.partitioner.WordCountPartitioner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* @author TomShiDi
* @since 2023/5/24 15:51
*/
public class WordCountDriver {

/**
* 数据输入、MapTask、分区(Partitioner)、排序、Combiner、分组、ReduceTask
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "wordcount");
// 2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置文件合并输入
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
// 设置自定义分区类
job.setPartitionerClass(WordCountPartitioner.class);
// 设置Reducer个数,需要与WordCountPartitioner中的分区数对应
job.setNumReduceTasks(5);
// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\Personal-Projects\\tomshidi-springcloud-demo\\hadoop-demo\\src\\main\\resources\\input\\wordcount"));
FileOutputFormat.setOutputPath(job, new Path("D:\\Personal-Projects\\tomshidi-springcloud-demo\\hadoop-demo\\src\\main\\resources\\output\\wordcount"));
// 7.提交job
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.tomshidi.hadoop.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @author TomShiDi
* @date 2023/5/24 15:26
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outK = new Text();

private IntWritable outV = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");

for (String word : words) {
outK.set(word);
outV.set(1);
context.write(outK, outV);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.tomshidi.hadoop.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/**
* @author TomShiDi
* @since 2023/5/24 15:27
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum = sum + value.get();
}
outV.set(sum);
context.write(key, outV);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.tomshidi.hadoop.mapreduce.wordcount.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* @author TomShiDi
* @since 2023/5/25 21:31
*/
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count = count + value.get();
}
outV.set(count);
context.write(key, outV);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.tomshidi.hadoop.mapreduce.wordcount.combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* @author TomShiDi
* @since 2023/5/24 15:51
*/
public class WordCountDriver {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3.关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4.设置map输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置后,会在MapTask上运行一次初步合并,减轻ReducerTask压力
job.setCombinerClass(WordCountReducer.class);

// 6.设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("D:\\Personal-Projects\\hdfsclient\\src\\main\\resources\\input.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\\Personal-Projects\\hdfsclient\\src\\main\\resources\\output-combiner"));
// 7.提交job
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.tomshidi.hadoop.mapreduce.wordcount.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @author TomShiDi
* @date 2023/5/24 15:26
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

private Text outK = new Text();

private IntWritable outV = new IntWritable();

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");

for (String word : words) {
outK.set(word);
outV.set(1);
context.write(outK, outV);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.tomshidi.hadoop.mapreduce.wordcount.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


/**
* @author TomShiDi
* @since 2023/5/24 15:27
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

private IntWritable outV = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum = sum + value.get();
}
outV.set(sum);
context.write(key, outV);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.tomshidi.hadoop.mapreduce.wordcount.partitioner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


/**
* 自定义分区逻辑
* @author TomShiDi
* @since 2024/2/27 15:57
*/
public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
String value = text.toString();
if (value == null || value.length() == 0) {
return 0;
}
// 根据首字母分区
char firstChar = value.charAt(0);
return firstChar % 5;
}
}
3 changes: 3 additions & 0 deletions hadoop-demo/src/main/resources/input/wordcount/data.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Hello World
This is a simple word text
Hello Hadoop
28 changes: 28 additions & 0 deletions hadoop-demo/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
log4j.rootLogger=INFO, Console

#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n


# Custom tweaks
#log4j.logger.com.codahale.metrics=WARN
#log4j.logger.com.ryantenney=WARN
#log4j.logger.com.zaxxer=WARN
#log4j.logger.org.apache=WARN
#log4j.logger.org.hibernate=WARN
#log4j.logger.org.hibernate.engine.internal=WARN
#log4j.logger.org.hibernate.validator=WARN
#log4j.logger.org.springframework=WARN
#log4j.logger.org.springframework.web=WARN
#log4j.logger.org.springframework.security=WARN

# log file
#log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
#log4j.appender.D.File = D://log.log
#log4j.appender.D.Append = true
#log4j.appender.D.Threshold = DEBUG
#log4j.appender.D.layout = org.apache.log4j.PatternLayout
#log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n

Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
* @author TomShiDi
Expand All @@ -20,9 +22,10 @@ public class HadoopCurdTest {
private static FileSystem fileSystem;

@BeforeAll
public static void init() throws IOException {
public static void init() throws IOException, URISyntaxException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://node1:8020");
// FileSystem.get(new URI("hdfs://node1:8020"), conf, "root");
fileSystem = FileSystem.get(conf);
}

Expand All @@ -37,6 +40,14 @@ public void listFiles() throws IOException {
while (files.hasNext()) {
LocatedFileStatus fileStatus = files.next();
System.out.println(fileStatus.getPath().toString());

// Linux生成指定大小的文件命令 dd if=/dev/zero of=my.txt bs=100M count=1
// 获取文件的block信息
BlockLocation[] locations = fileStatus.getBlockLocations();
for (BlockLocation location : locations) {
System.out.println(location.toString());
}
System.out.println("******************");
}
fileSystem.close();
}
Expand All @@ -50,6 +61,10 @@ public void createDir() throws IOException {
fileSystem.create(new Path("/xxx/yyy/ccc"));
}

/**
* 下载文件
* @throws IOException IO异常
*/
@Test
public void downloadFile1() throws IOException {
FileOutputStream fileOutputStream = new FileOutputStream(new File("E:\\hadoop-data.txt"));
Expand All @@ -59,16 +74,40 @@ public void downloadFile1() throws IOException {
IOUtils.closeQuietly(inputStream);
}

/**
* 文件下载
* @throws IOException IO异常
*/
@Test
public void downloadFile2() throws IOException {
fileSystem.copyToLocalFile(new Path("/data.txt"), new Path("E:\\hadoop-data.txt"));
}

/**
* 上传文件
* @throws IOException IO异常
*/
@Test
public void uploadFile() throws IOException {
fileSystem.copyFromLocalFile(new Path("E:\\hadoop-conf.zip"), new Path("/hadoop-conf.zip"));
}

/**
* 文件合并
* @throws IOException IO异常
*/
@Test
public void mergeFile() throws IOException {
FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/mergefile.txt"));
LocalFileSystem localFileSystem = FileSystem.getLocal(new Configuration());
FileStatus[] fileStatuses = localFileSystem.listStatus(new Path("E:\\hadoop-2.7.5\\input-data"));
for (FileStatus fileStatus : fileStatuses) {
FSDataInputStream inputStream = localFileSystem.open(fileStatus.getPath());
IOUtils.copy(inputStream, fsDataOutputStream);
IOUtils.closeQuietly(inputStream);
}
}

@AfterAll
public static void destroy() throws IOException {
fileSystem.close();
Expand Down

0 comments on commit 51b448a

Please sign in to comment.