公司业务上传支付宝、微信交易记录,并和系统进行对账 功能
个人使用了,java 的nio? 和 多线程进行扫描文件 并装载bean对象,具体代码如下:
附件有对应的工具类,一直上传失败,放到百度云盘,有需要的可以取下载:链接: https://pan.baidu.com/s/1y-I36iUAQbx2_Ss1Ih8ASQ 提取码: meng?
?
@RequestMapping(value = "checkReconciliationData/{channel}", method = RequestMethod.POST)
public CResponse checkReconciliationDataNew(@RequestParam("file") MultipartFile file, @PathVariable String channel)throws IOException {
if (file != null && !file.isEmpty()) {
String fileName = file.getOriginalFilename();
UploadPayDataRequest uploadPayDataRequest = new UploadPayDataRequest();
uploadPayDataRequest.setFileName(fileName);
List<UploadPayDataEntity> uploadPayDataEntities = uploadPayDataService.selectInfoByList(uploadPayDataRequest);
if (uploadPayDataEntities.size() > 0) {
return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件已存在,无需重复上传!");
}
String keyName = MD5.MD5Encode(fileName);
System.out.println("keyName:" + keyName);
String keyValue = jedisCluster.get(keyName);
if (keyValue == null) {
jedisCluster.set(keyName, "PROCESSING");//文件处理中
ExecutorService executorService = Executors.newSingleThreadExecutor();
UploadPayDataDto uploadPayDataDto = new UploadPayDataDto();
executorService.execute(() -> {
try {
CommonsMultipartFile cf = (CommonsMultipartFile) file;
DiskFileItem fi = (DiskFileItem) cf.getFileItem();
File f = fi.getStoreLocation();
BigFileReader.Builder builder = new BigFileReader.Builder(f, line -> {
if (line.indexOf("渠道") == -1) {
String[] strs = line.split(",");
String json = JSONObject.toJSONString(setItem(strs, channel, fileName));
jedisCluster.lpush(fileName, json);
}
});
BigFileReader bigFileReader = builder
.threadPoolSize(10)
.charset(Charset.forName("GBK"))
.bufferSize(1024).build();
bigFileReader.start(jedisCluster, keyName);
upload(file);
} catch (Exception e) {
log.error("异步操作文件内容失败,原因:" + e.getMessage());
jedisCluster.del(keyName);
}
});
BufferedInputStream bis = new BufferedInputStream(file.getInputStream());
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(bis, "GBK"), 30 * 1024 * 1024);//10M缓存
uploadPayDataDto.setTotalNum(bufferedReader.lines().count());
uploadPayDataDto.setUrl(ALIYUN_IMAGE_URL + fileName);
uploadPayDataDto.setFileName(fileName);
return CResponse.success(uploadPayDataDto);
} else {
return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件处理中");
}
} else {
return CResponse.error(ResultCodeEnum.ERROR_DATA_FORMAT, "文件数据为空");
}
}
public class BigFileReader { private int threadPoolSize; private Charset charset; private int bufferSize; private IFileHandle handle; private ExecutorService executorService; private long fileLength; private RandomAccessFile rAccessFile; private Set<StartEndPair> startEndPairs; private CyclicBarrier cyclicBarrier; private AtomicLong counter = new AtomicLong(0); public BigFileReader(File file, IFileHandle handle, Charset charset, int bufferSize, int threadPoolSize) { this.fileLength = file.length(); this.handle = handle; this.charset = charset; this.bufferSize = bufferSize; this.threadPoolSize = threadPoolSize; try { this.rAccessFile = new RandomAccessFile(file, "r"); } catch (FileNotFoundException e) { e.printStackTrace(); } this.executorService = Executors.newFixedThreadPool(threadPoolSize); startEndPairs = new HashSet<>(); } public void start(JedisCluster jedisCluster, String fileName) { long everySize = this.fileLength / this.threadPoolSize; try { calculateStartEnd(0, everySize); } catch (IOException e) { e.printStackTrace(); return; } final long startTime = System.currentTimeMillis(); cyclicBarrier = new CyclicBarrier(startEndPairs.size(), () -> { System.out.println("use time: " + (System.currentTimeMillis() - startTime)); System.out.println("all line: " + counter.get()); System.out.println(fileName); jedisCluster.set(fileName, "SUCCESS"); shutdown(); }); for (StartEndPair pair : startEndPairs) { System.out.println("分配分片:" + pair); this.executorService.execute(new SliceReaderTask(pair)); } } private void calculateStartEnd(long start, long size) throws IOException { if (start > fileLength - 1) { return; } StartEndPair pair = new StartEndPair(); pair.start = start; long endPosition = start + size - 1; if (endPosition >= fileLength - 1) { pair.end = fileLength - 1; startEndPairs.add(pair); return; } rAccessFile.seek(endPosition); byte tmp = (byte) rAccessFile.read(); while (tmp != '\n' && tmp != '\r') { endPosition++; if (endPosition >= fileLength - 1) { endPosition = fileLength - 1; break; } rAccessFile.seek(endPosition); tmp = (byte) rAccessFile.read(); } pair.end = endPosition; startEndPairs.add(pair); calculateStartEnd(endPosition + 1, size); } public void shutdown() { try { this.rAccessFile.close(); } catch (IOException e) { e.printStackTrace(); } this.executorService.shutdown(); } private void handle(byte[] bytes) throws UnsupportedEncodingException { String line = null; if (this.charset == null) { line = new String(bytes); } else { line = new String(bytes, charset); } if (line != null && !"".equals(line)) { this.handle.handle(line); counter.incrementAndGet(); } } private static class StartEndPair { public long start; public long end; @Override public String toString() { return "star=" + start + ";end=" + end; } } private class SliceReaderTask implements Runnable { private long start; private long sliceSize; private byte[] readBuff; public SliceReaderTask(StartEndPair pair) { this.start = pair.start; this.sliceSize = pair.end - pair.start + 1; this.readBuff = new byte[bufferSize]; } @Override public void run() { try { MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.sliceSize); ByteArrayOutputStream bos = new ByteArrayOutputStream(); for (int offset = 0; offset < sliceSize; offset += bufferSize) { int readLength; if (offset + bufferSize <= sliceSize) { readLength = bufferSize; } else { readLength = (int) (sliceSize - offset); } mapBuffer.get(readBuff, 0, readLength); for (int i = 0; i < readLength; i++) { byte tmp = readBuff[i]; //碰到换行符 if (tmp == '\n' || tmp == '\r') { handle(bos.toByteArray()); bos.reset(); } else { bos.write(tmp); } } } if (bos.size() > 0) { handle(bos.toByteArray()); } cyclicBarrier.await();//测试性能用 } catch (Exception e) { e.printStackTrace(); } } } public static class Builder { private int threadSize = 1; private Charset charset; private int bufferSize = 1024 * 1024; private IFileHandle handle; private File file; public Builder(File file, IFileHandle handle) { this.file = file; if (!this.file.exists()) throw new IllegalArgumentException("文件不存在!"); this.handle = handle; } public Builder threadPoolSize(int size) { this.threadSize = size; return this; } public Builder charset(Charset charset) { this.charset = charset; return this; } public Builder bufferSize(int bufferSize) { this.bufferSize = bufferSize; return this; } public BigFileReader build() { return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize); } } }
public interface IFileHandle {
void handle(String line);
}