利用HDFS java API增删改查操作
在做这个实验的时候需要特别注意下面三个问题:
1、hdfs安全模式需要关闭 命令:./hadoop dfsadmin -safemode leave
2、工程中依赖的版本必须和集群的一致,否则也会报 version不一致错误
3、hadoop集群用户权限的问题,以及各个目录的作用
目前为什么会有这三个问题的原因待查!!!
未验证目前使用hadoop的版本(release-0.20.0)是否支持webhdfs,反正我是怎么都连接不上啊!!!
从这上面看,0.20.0 可能是不支持的
https://jira.springsource.org/browse/IMPALA-15?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
Serengeti Distro:
Apache Hadoop:1.0.1
GreenPlum HD:1.1(Apache Hadoop 1.0.0)
CloudEra: CDH3(Apache Hadoop 0.20.2, WebHDFS is not supported in this version)
Hortonworks: 1.0.7 (Apache Hadoop 1.0.2)
步骤如下:
工程结构,如图:

工程结构

工程结构


上代码 O(∩_∩)O哈哈~
pom.xml配置如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.yun.hdfs</groupId>
	<artifactId>hdfs</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	</properties>
	<build>
		<plugins>
			<plugin>
				<artifactId>maven-assembly-plugin</artifactId>
				<configuration>
					<appendAssemblyId>false</appendAssemblyId>
					<descriptorRefs>
						<descriptorRef>jar-with-dependencies</descriptorRef>
					</descriptorRefs>
					<archive>
						<manifest>
							<mainClass>com.yun.hdfs.WangPan</mainClass>
						</manifest>
					</archive>
				</configuration>
				<executions>
					<execution>
						<id>make-assembly</id>
						<phase>package</phase>
						<goals>
							<goal>assembly</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
	<dependencies>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-core</artifactId>
			<version>0.20.2</version>
			<type>jar</type>
			<scope>compile</scope>
		</dependency>
	</dependencies>
</project>

WangPan.java 主方法用于调用:

package com.yun.hdfs;
import java.io.IOException;
public class WangPan {
	/*** 运行结果描述. */
	private static String result = "";
	public static void main(String[] args) {
		try {
			// 判断命令输入是否正确
			if (args[0] != null && !"".equals(args[0]) && args.length > 0) {
				if ("upload".equals(args[0])) {
					result = "upload:" + WangPanUtils.uploadFile(args);
				} else if ("delete".equals(args[0])) {
					result = "delete:" + WangPanUtils.deleteFile(args);
				} else if ("query".equals(args[0])) {
					if (WangPanUtils.listFile(args) == null) {
						result = "query:fail!";
					} else {
						result = "query:success";
					}
				} else if ("read".equals(args[0])) {
					result = "read:" + WangPanUtils.readFile(args);
				} else {
					System.out.println("sorry,wo have no this service!");
				}
				System.out.println(result);
			} else {
				System.out.println("fail!");
				System.exit(1);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

WangPanUtils.java增删改查:

package com.yun.hdfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class WangPanUtils {
	/**
	 * 上传文件. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar upload /root/test-hdfs.txt
	 * hdfs://hadoopm:9000/user/root/upload/12390po.txt
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static String uploadFile(String[] args) throws IOException {
		String loaclSrc = args[1];
		String dst = args[2];
		if (args.length < 3) {
			return "fail";
		}
		InputStream in = new BufferedInputStream(new FileInputStream(loaclSrc));
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		OutputStream out = fs.create(new Path(dst));
		IOUtils.copyBytes(in, out, 4096, true);
		return "success";
	}
	/**
	 * 查询文件列表. java -jar /root/hdfs-0.0.1-SNAPSHOT.jar query
	 * hdfs://hadoopm:9000/user/root/
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static Path[] listFile(String[] args) throws IOException {
		if (args.length < 2) {
			return null;
		}
		String dst = args[1];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		FileStatus[] statu = fs.listStatus(new Path(dst));
		Path[] listPaths = FileUtil.stat2Paths(statu);
		return listPaths;
	}
	/**
	 * 删除文件.
	 * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar delete hdfs://hadoopm:9000/user/root/upload/12390po.txt
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static String deleteFile(String[] args) throws IOException {
		if (args.length < 2) {
			return "fail";
		}
		String fileName = args[1];
		Configuration config = new Configuration();
		FileSystem hdfs = FileSystem.get(URI.create(fileName), config);
		Path path = new Path(fileName);
		if (!hdfs.exists(path)) {
			return "fail";
		}
		boolean isDeleted = hdfs.delete(path, false);
		if (isDeleted) {
			return "success";
		} else {
			return "fail";
		}
	}
	/**
	 * 读取文件.
	 * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar read hdfs://hadoopm:9000/user/root/upload/123.txt /root/test-readfile898.txt
	 *
	 * @param args
	 * @return
	 * @throws IOException
	 */
	public static String readFile(String[] args) throws IOException {
		if(args.length < 3){
			return "fail";
		}
		String dst = args[1];
		String newPath = args[2];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		FSDataInputStream hdfsInStream = fs.open(new Path(dst));
		OutputStream out = new FileOutputStream(newPath);
		byte[] ioBuffer = new byte[1024];
		int readLen = hdfsInStream.read(ioBuffer);
		while (-1 != readLen) {
			out.write(ioBuffer, 0, readLen);
			readLen = hdfsInStream.read(ioBuffer);
		}
		out.close();
		hdfsInStream.close();
		fs.close();
		return "success";
	}
}
/**
	 * 创建文件夹.
	 * java -jar /root/hdfs-0.0.1-SNAPSHOT.jar mkdir hdfs://hadoopm:9000/user/root/upload/test909
	 *
	 * @return
	 * @throws IOException
	 */
	public static String mkdir(String[] args) throws IOException{
		if(args.length < 2){
			return "fali";
		}
		String dst = args[1];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		Path path = new Path(dst);
		if (fs.exists(path)) {
			return "fail";
		}
		fs.mkdirs(path);
		return "success";
	}

PS:需要注意的是,我们需要把这个工程利用maven打包成一个可运行的jar包,使用如下命令:

打包命令

打包命令


执行命令在每个方法注释上写明了,执行效果如下:
增删改查效果

增删改查效果


还需要访问 http://hadoopm:50070/ -> Browse the filesystem 查看hdfs文件操作是否真的成功
web hdfs

web hdfs