本文介绍大数据平台数据交换案例,本案例为结构化集群HDFS client API 文件上传,结构化集群HDFS client API 文件下载和非结构化集群HDFS client API 文件上传/下载在此不做介绍(做法类似,自行脑补)。
大数据平台数据交互整体架构
主要讲解一下思路,给出部分代码,可根据实际情况进行修改或优化。
定时任务:时间为每天上午10:00:00,task()方法为要执行的任务(文件上传)
class="java" name="code">//定时任务主流程
public void timer() {
Calendar calendar = Calendar.getInstance();
calendar.set(Calendar.HOUR_OF_DAY, 10); // 控制时
calendar.set(Calendar.MINUTE, 00); // 控制分
calendar.set(Calendar.SECOND, 00); // 控制秒
Date time = calendar.getTime(); // 获取执行任务的时间
Timer timer = new Timer();
//执行定时任务
timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
task();
} catch (SQLException e) {
log.error("获取数据失败!");
//失败后的处理(如:入库记录,然后手工推送或者查询数据库自动推送)
//do something for resolve error
e.printStackTrace();
}
}
}, time, 1000 * 60 * 60 * 24);// 这里设定循环时间为每天
}
task()执行的任务:此处为部分代码,笔者只为介绍流程,不提供完整代码
/**
* windows环境生成文件路径
* **/
String dbFilename = "F:/bigdata_file/KKPAY_ROUTE_INFO-a10.TXT";
String okFilename = "F:/bigdata_file/KKPAY_ROUTE_INFO-a10.OK";
/**
* linux环境生成文件路径
* **/
//String dbFilename = "/data/app/bigdata/KKPAY_ROUTE_INFO-a10.TXT";
//String okFilename = "/data/app/bigdata/KKPAY_ROUTE_INFO-a10.OK";
createFile(dbData,dbFilename);
createFile(OkData,okFilename);
//上传数据文件和标志性文件,如果失败则5分钟后再次上传,直到上传成功为止
while(uploadFlag){
upload(dbFilename,okFilename);
try {
Thread.sleep(1000*60*5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
获取数据:这里主要是注意防止数据串行,另外这里我对每个字段之间加了分隔符进行分割
DBUtil dbUtil = new DBUtil();
Connection conn = dbUtil.getConnection();
//PreparedStatement是statement的子类,提高了效率,扩展性,安全性
PreparedStatement preparedState = null;
ResultSet resultSet = null;
StringBuffer dbData = new StringBuffer();
try {
preparedState = (PreparedStatement) conn.prepareStatement("select * from kkpay_route_info");
resultSet = preparedState.executeQuery();
while(resultSet.next()){
for(int i = 1;i <= resultSet.getMetaData().getColumnCount();i++){
if(i == resultSet.getMetaData().getColumnCount()){
dbData.append(resultSet.getString(i).replace("\r\n", "").replace("\r\t", "") + "\r\n");
}else{
dbData.append(resultSet.getString(i).replace("\r\n", "").replace("\r\t", "") + "\001");
}
}
datenum ++;
}
} catch (SQLException e) {
throw e;
}finally{
if(resultSet!=null){
log.info("获取数据成功!");
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(preparedState!=null){
try {
preparedState.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
调用API上传文件:上传数据文件和标志性文件
//调用API上传文件
public void upload(String dbFileSourcepath,String OkFileSourcepath){
HdfsClient client = HdfsClientInstance.getInstance();
HdfsClientConf conf = new HdfsClientConf();
/**
* linux环境配置文件读取路径
* **/
//conf.setConfigFilePath("/data/app/bigdata/conf");
//conf.setKeyTabFile("/data/app/bigdata/conf/pcc_user.keytab");
/**
* windows环境配置文件读取路径
* **/
conf.setConfigFilePath("F:/conf");
conf.setKeyTabFile("F:/conf/pcc_user.keytab");
conf.setKeyTabUserName("pcc_user@TDH");
client.setHdfsClientConf(conf);
//上传数据文件
Map<String,String> dbFileRetrunMsg = client.putFileHdfs(dbFileSourcepath,"PCC");
if("0".equals(dbFileRetrunMsg.get("code"))){
Map<String,String> okFileRetrunMsg = null;
//上传标记性文件(由于这时数据文件已经上传成功,所以标志性文件必须有重发机制保证两个文件的同步)
while(uploadFlag){
okFileRetrunMsg = client.putFileHdfs(OkFileSourcepath, "PCC");
if("0".equals(okFileRetrunMsg.get("code"))){
uploadFlag = false;
log.info("数据文件(" + dbFileSourcepath +")、标志性文件(" + OkFileSourcepath + ")上传成功!");
}
}
}else{
log.error(dbFileRetrunMsg.get("ERROR_DESC"));
}
}
另外需要做以下配置:
1.配置大数据平台Guardian 文件
2.配置大数据平台HOSTS(windows hosts文件默认路径如图所示,linux一般在/etc目录下)
3.配置大数据平台
XML及
KEYTAB文件(以笔者代码所指路径为例如图)
eclipse打成可运行jar包,如图
运行方式:java -jar *.jar(*.jar指的是你的jar包名称)
以下分别为eclipse控制台、windows(cmd)环境、linux环境输出结果截图:
本文永久地址:http://jsonliangyoujun.iteye.com/blog/2360795
- 大小: 254.9 KB
- 大小: 33.9 KB
- 大小: 26.1 KB
- 大小: 22 KB
- 大小: 126.1 KB
- 大小: 109.7 KB
- 大小: 782.3 KB
- 大小: 38.2 KB