import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import bean.TableStatistic; @Controller @RequestMapping("/dfview") public class DataFrameViewController extends BaseController { private ConcurrentMap<String, UserGroupInformation> cache = new ConcurrentHashMap<String, UserGroupInformation>(); private ConcurrentMap<String, FileSystem> fileSystemCache = new ConcurrentHashMap<String, FileSystem>(); private Configuration hadoopConf = new Configuration(); private static final String HDFS_JSON_NAME = "jsonObj"; @RequestMapping(value = "/getDFviewOfColumn", method = { RequestMethod.GET }) @ResponseBody public TableStatistic getDFviewOfTable(String tableName) throws Exception { String user = "bi"; String dirpath = "/user/cbt/datax/temp_transfer/zzzdes"; Path homePath = new Path(dirpath); FileSystem fs = this.createFileSystem(user); FileStatus[] stats = fs.listStatus(homePath); StringBuffer txtContent = new StringBuffer(); for (int i = 0; i < stats.length; ++i) { if (stats[i].isFile()) { FileStatus file = stats[i]; if( HDFS_JSON_NAME.equalsIgnoreCase(file.getPath().getName())){ InputStream in = fs.open(file.getPath()); byte[] b = new byte[1]; while (in.read(b) != -1) { // 字符串拼接 txtContent.append(new String(b)); } in.close(); break; } } } TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class); return ts; } public static void main(String[] args) throws Exception { DataFrameViewController aaa = new DataFrameViewController(); FileSystem fs = aaa.createFileSystem("bi"); Path homePath = new Path("/user/cbt/datax/temp_transfer/zzzdes"); System.out.println("***********************************"); FileStatus[] stats = fs.listStatus(homePath); for (int i = 0; i < stats.length; ++i) { if (stats[i].isFile()) { FileStatus file = stats[i]; StringBuffer txtContent = new StringBuffer(); if( "jsonObj".equalsIgnoreCase(file.getPath().getName())){ InputStream in = fs.open(file.getPath()); byte[] b = new byte[1]; while (in.read(b) != -1) { // 字符串拼接 txtContent.append(new String(b)); } // IOUtils.copyBytes(fs.open(file.getPath()), System.out, 4096,false); in.close(); // fs.close(); } System.out.print(txtContent.toString()); System.out .println("************************************************"); JSONObject jb = JSON.parseObject(txtContent.toString()); System.out.println("********!!!!! : " + jb.get("colUnique")); TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class); System.out.println("********!!!!! : " + ts.getColUnique().toString()); } else if (stats[i].isDirectory()) { System.out.println(stats[i].getPath().toString()); } else if (stats[i].isSymlink()) { System.out.println("&&&&&&&&" + stats[i].getPath().toString()); } } FsStatus fsStatus = fs.getStatus(homePath); } public FileSystem createFileSystem(String user) throws Exception { final Configuration conf = loadHadoopConf(); conf.set("hadoop.job.ugi", user); // conf.set("HADOOP_USER_NAME", user); if (fileSystemCache.get(user) != null) { return fileSystemCache.get(user); } UserGroupInformation ugi = getProxyUser(user); FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws Exception { return FileSystem.get(conf); } }); fileSystemCache.put(user, fs); return fs; } public static final ThreadLocal<SimpleDateFormat> appDateFormat = new ThreadLocal<SimpleDateFormat>() { @Override public SimpleDateFormat initialValue() { SimpleDateFormat dateformat = new java.text.SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); return dateformat; } }; private static final String[] HADOOP_CONF_FILES = { "core-site.xml", "hdfs-site.xml" }; private Configuration loadHadoopConf() { if (hadoopConf != null) { return hadoopConf; } Configuration conf = new Configuration(); for (String fileName : HADOOP_CONF_FILES) { try { InputStream inputStream = DataFrameViewController.class .getClassLoader().getResourceAsStream(fileName); conf.addResource(inputStream); } catch (Exception ex) { } } return conf; } public void destroy() { for (UserGroupInformation ugi : cache.values()) { try { FileSystem.closeAllForUGI(ugi); } catch (IOException ioe) { // Logger.error("Exception occurred while closing filesystems for " // + ugi.getUserName(), ioe); } } cache.clear(); } private UserGroupInformation getProxyUser(String user) throws IOException { cache.putIfAbsent(user, UserGroupInformation.createRemoteUser(user)); return cache.get(user); } }
时间: 2024-12-23 16:43:12