本文最后更新于:2023年7月13日 晚上
windows添加Hadoop环境变量
下载群中软件包:hadoop-3.1.0
到 本地某个目录中—我是直接放在了C盘下
在环境变量–系统变量中添加
变量名:HADOOP_HOME
变量值:C:\hadoop-3.1.0 (注意替换为实际安装目录,上述为本人路径)
在Path中添加 %HADOOP_HOME%\bin
修改后最好重启电脑进行下一步操作
MapReduce搭建
创建Maven项目
编辑pom.xml
在pom文件中添加以下配置信息
hadoop-client : 连接hadoo集群相关依赖
junit : 单元测试依赖 (本项目没用到)
org.slf4j : 打印日志相关依赖
build : 用于maven项目打成jar包相关依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>2.0.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
|
配置日志打印
–找到工程目录–src–main–resources – 在resources目录下创建File – 名:log4j.properties–添加相关配置信息–控制日志为INFO
级别–
1 2 3 4 5 6 7 8
| log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
|
将IDEA的构建和运行托管到maven下面
– settings–Runner–勾选Delegate IDE build/run actions to Maven–ok–
MapReduce程序
经过上述步骤,我们已将maven工程的框架搭建成功,下面根据具体的需求编写代码
示例程序:编程实现文件合并和去重操作
任务要求:对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。
本地测试
在项目结构的 java 目录下创建com.zjl
包,然后创建Java class
,实验一代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| package com.zjl.demo;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Merge {
public static class Map extends Mapper<Object, Text, Text, Text> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { context.write(value, new Text("")); } }
public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { context.write(key, new Text("")); } }
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Merge.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setCombinerClass(Reduce.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
运行程序,控制台显示成功信息,然后去指定目录(output)查看
.crc : 是校验文件,不用管
_SUCCESS : 标志着运行成功
part-r-00000 : 里面是结果信息
注意: 将两个输入文件放在C:\\input
目录下 , 输出文件不需要自己创建,只需要程序中指定,如果指定目录已存在,程序会报错。 实验二步骤相同,不再演示!!
集群测试
- 前提:打开hadoop的hdfs和yarn相关服务
1
| cd /opt/module/hadoop-3.3.4
|
开启hdfs服务:start-dfs.sh
开启yarn
服务: start-yarn.sh
Browsing HDFS
将虚拟机上的文件上传的HDFS集群
修改程序的输入输出路径
1 2 3
| FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
|
程序打成jar包,上传到虚拟机
通过Maven的生命周期下的package进行打包
找到jar包所在地,上传到集群/opt/module/hadoop-3.3.4/ 下
修改jar包的名字为merge.java
运行jar
1
| hadoop jar merge.jar com.zjl.demo.Merge /usr/hadoop/input1 /usr/hadoop/output1
|
查看输出信息
下面对MergeSort文件来一遍同样的操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
| package com.zjl.demo1;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MergeSort {
public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
private static final IntWritable data = new IntWritable();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String text = value.toString(); data.set(Integer.parseInt(text)); context.write(data, new IntWritable(1)); } }
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> { private static IntWritable line_num = new IntWritable(1);
public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { for (IntWritable val : values) { context.write(line_num, key); line_num = new IntWritable(line_num.get() + 1); } } }
public static class Partition extends Partitioner<IntWritable, IntWritable> { public int getPartition(IntWritable key, IntWritable value, int num_Partition) { int Maxnumber = 65223; int bound = Maxnumber / num_Partition + 1; int keynumber = key.get(); for (int i = 0; i < num_Partition; i++) { if (keynumber < bound * (i + 1) && keynumber >= bound * i) { return i; } } return -1; } }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
Job job = Job.getInstance(conf); job.setJarByClass(MergeSort.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setPartitionerClass(Partition.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
} }
|