exp_MapReduce

本文最后更新于:2023年7月13日 晚上

windows添加Hadoop环境变量

  1. 下载群中软件包:hadoop-3.1.0 到 本地某个目录中—我是直接放在了C盘下

  2. 在环境变量–系统变量中添加

    变量名:HADOOP_HOME

    变量值:C:\hadoop-3.1.0 (注意替换为实际安装目录,上述为本人路径)

    在Path中添加 %HADOOP_HOME%\bin

  3. 修改后最好重启电脑进行下一步操作

MapReduce搭建

创建Maven项目

编辑pom.xml

在pom文件中添加以下配置信息

hadoop-client : 连接hadoo集群相关依赖

junit : 单元测试依赖 (本项目没用到)

org.slf4j : 打印日志相关依赖

build : 用于maven项目打成jar包相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

配置日志打印

–找到工程目录–src–main–resources – 在resources目录下创建File – 名:log4j.properties–添加相关配置信息–控制日志为INFO级别–

1
2
3
4
5
6
7
8
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

将IDEA的构建和运行托管到maven下面

– settings–Runner–勾选Delegate IDE build/run actions to Maven–ok–

MapReduce程序

经过上述步骤,我们已将maven工程的框架搭建成功,下面根据具体的需求编写代码

示例程序:编程实现文件合并和去重操作

任务要求:对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。

本地测试

在项目结构的 java 目录下创建com.zjl 包,然后创建Java class,实验一代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package com.zjl.demo;
/*
对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
*/
//hadoop jar test6.jar com.zjl.demo.Merge /usr/hadoop/input1 /usr/hadoop/output1

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class Merge {

/**
*
*/
//重载map函数,直接将输入中的value复制到输出数据的key上
public static class Map extends Mapper<Object, Text, Text, Text> {

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, new Text(""));
//System.out.println("MAP success!!!");
}
}

//重载reduce函数,直接将输入中的key复制到输出数据的key上
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
//System.out.println("REDUCE success!!!");
}
}

public static void main(String[] args) throws Exception {


// TODO Auto-generated method stub
//1、 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(Merge.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Reduce.class);


// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//使用逗号,设置多个路径
// FileInputFormat.addInputPath(job, new Path("C:\\input"));
// FileOutputFormat.setOutputPath(job, new Path("C:\\output"));

// 7 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

运行程序,控制台显示成功信息,然后去指定目录(output)查看

.crc : 是校验文件,不用管

_SUCCESS : 标志着运行成功

part-r-00000 : 里面是结果信息

注意: 将两个输入文件放在C:\\input目录下 , 输出文件不需要自己创建,只需要程序中指定,如果指定目录已存在,程序会报错。 实验二步骤相同,不再演示!!

image-20221211221931884

集群测试

  • 前提:打开hadoop的hdfs和yarn相关服务
1
cd /opt/module/hadoop-3.3.4

​ 开启hdfs服务:start-dfs.sh 开启yarn服务: start-yarn.sh

Browsing HDFS

  1. 将虚拟机上的文件上传的HDFS集群

    image-20221211222450950

  2. 修改程序的输入输出路径

    1
    2
    3
    //将本地设置输入输出路径的语句替换成下面,通过命令行输入参数,这是放在Merge.java文件末尾的
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
  3. 程序打成jar包,上传到虚拟机

    通过Maven的生命周期下的package进行打包

    找到jar包所在地,上传到集群/opt/module/hadoop-3.3.4/ 下

    修改jar包的名字为merge.java

  4. 运行jar

    1
    hadoop jar merge.jar com.zjl.demo.Merge /usr/hadoop/input1 /usr/hadoop/output1
  5. 查看输出信息

    image-20221211223723558

下面对MergeSort文件来一遍同样的操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.zjl.demo1;


//hadoop jar test6.jar com.zjl.demo1.MergeSort /usr/hadoop/input2 /usr/hadoop/output2

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MergeSort {
/**
* 输入多个文件,每个文件中的每行内容均为一个整数
* 输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数
*/
//map函数读取输入中的value,将其转化成IntWritable类型,最后作为输出key
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {

private static final IntWritable data = new IntWritable();

public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String text = value.toString();
data.set(Integer.parseInt(text));
context.write(data, new IntWritable(1));
}
}

//reduce函数将map输入的key复制到输出的value上,然后根据输入的value-list中元素的个数决定key的输出次数,定义一个全局变量line_num来代表key的位次
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
private static IntWritable line_num = new IntWritable(1);

public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
for (IntWritable val : values) {
context.write(line_num, key);
line_num = new IntWritable(line_num.get() + 1);
}
}
}

//自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和边界的关系返回对应的Partition ID
public static class Partition extends Partitioner<IntWritable, IntWritable> {
public int getPartition(IntWritable key, IntWritable value, int num_Partition) {
int Maxnumber = 65223;//int型的最大数值
int bound = Maxnumber / num_Partition + 1;
int keynumber = key.get();
for (int i = 0; i < num_Partition; i++) {
if (keynumber < bound * (i + 1) && keynumber >= bound * i) {
return i;
}
}
return -1;
}
}

public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();

Job job = Job.getInstance(conf);
// 2 关联本Driver程序的jar
job.setJarByClass(MergeSort.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setPartitionerClass(Partition.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);


// 6 设置输入和输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// FileInputFormat.addInputPath(job, new Path("C:\\input2"));
// FileOutputFormat.setOutputPath(job, new Path("C:\\output2"));

// 7 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);

}
}

exp_MapReduce
https://jialiangz.github.io/2022/12/11/exp-MapReduce/
作者
爱吃菠萝
发布于
2022年12月11日
更新于
2023年7月13日
许可协议