余子越的博客
Toggle navigation
余子越的博客
主页
计算机网络
大数据分析
系统与工具
编程之路
容器引擎
作者
归档
标签
Hadoop JAVA接口示例
2020-12-25 08:42:45
12
0
0
yuziyue
[TOC] # 一. 添加依赖 ``` <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>3.1.1</version> </dependency> ``` # 二. 编写代码 ``` package com; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.util.Scanner; public class HadoopHello { public static void main(String[] args) throws Exception { System.out.println(""); HadoopClient hc = new HadoopClient(); hc.prepare(); hc.mkDir(); hc.createThenWrite(); hc.exists(); hc.readToString(); hc.rename(); hc.upload(); hc.download(); } } class HadoopClient { private static final String HDFS_PATH = "hdfs://192.168.1.10:8020"; private static final String HDFS_USER = "hdfs"; private static FileSystem fileSystem; /** * 获取fileSystem */ void prepare() { try { Configuration configuration = new Configuration(); configuration.set("dfs.replication", "1"); configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); fileSystem = FileSystem.get(new URI(HDFS_PATH), configuration, HDFS_USER); } catch (URISyntaxException | InterruptedException | IOException e) { e.printStackTrace(); } } /** * 释放fileSystem */ void destroy() { fileSystem = null; } /** * 使用默认权限创建目录 * * @throws Exception Exception */ void mkDir() throws Exception { fileSystem.mkdirs(new Path("/user/work/hdfs-api")); } /** * 指定权限创建目录 * * @throws Exception Exception */ void mkDirWithPermission() throws Exception { fileSystem.mkdirs(new Path("/user/work/hdfs-api"), new FsPermission(FsAction.READ_WRITE, FsAction.READ, FsAction.READ)); } /** * 创建文件然后写入内容 * * @throws Exception Exception */ void createThenWrite() throws Exception { // 如果文件存在,默认会覆盖, 可以通过第二个参数进行控制,第三个参数可以控制使用缓冲区的大小。 FSDataOutputStream out = fileSystem.create(new Path("/user/work/hdfs-api/a.txt"), true, 4096); int n = 0; while (n < 10) { out.write(("hello hadoop!" + n + "\n").getBytes()); n += 1; } out.flush(); out.close(); } /** * 判断文件是否存在 * * @throws Exception Exception */ void exists() throws Exception { boolean exists = fileSystem.exists(new Path("/user/work/hdfs-api/a.txt")); System.out.println(exists); } /** * 查看文件内容 * * @throws Exception */ void readToString() throws Exception { FSDataInputStream inputStream = fileSystem.open(new Path("/user/work/hdfs-api/a.txt")); Scanner scanner = new Scanner(inputStream); while (scanner.hasNextLine()) { String context = scanner.nextLine(); System.out.println(context); } } /** * 文件重命名 * * @throws Exception */ void rename() throws Exception { Path oldPath = new Path("/user/work/hdfs-api/a.txt"); Path newPath = new Path("/user/work/hdfs-api/b.txt"); boolean result = fileSystem.rename(oldPath, newPath); System.out.println(result); } /** * 删除文件 * * @throws Exception */ void delete() throws Exception { /* * 第二个参数代表是否递归删除 * 如果path是一个目录且递归删除为true, 则删除该目录及其中所有文件; * 如果path是一个目录但递归删除为false, 则会则抛出异常。 */ boolean result = fileSystem.delete(new Path("/user/work/hdfs-api/b.txt"), true); System.out.println(result); } /** * 上传文件到HDFS */ void upload() throws Exception { File file = new File("/Users/yzy/Downloads/VMware-Fusion-11.5.6-16696540.dmg"); final float fileSize = file.length(); InputStream in = new BufferedInputStream(new FileInputStream(file)); FSDataOutputStream out = fileSystem.create(new Path("/user/work/jdk.tgz"), new Progressable() { long fileCount = 0; public void progress() { // progress 方法每上传大约 64KB 的数据后就会被调用一次 fileCount++; int processing = (int) ((fileCount * 64 * 1024 / fileSize) * 100); processing = Math.min(processing, 100); System.out.print("processing:" + processing + "%\r"); } }); IOUtils.copyBytes(in, out, 4096); } /** * 从HDFS上下载文件 */ void download() throws Exception { Path src = new Path("/user/work/jdk.tgz"); Path dst = new Path("/tmp/"); // 第一个参数控制下载完成后是否删除源文件,默认是true,即删除; fileSystem.copyToLocalFile(false, src, dst); } } ``` <br><br><br>
上一篇:
SSH远程登录慢问题解决
下一篇:
Hbase JAVA接口示例
0
赞
12 人读过
新浪微博
微信
腾讯微博
QQ空间
人人网
文档导航