大文件异步分片上传到Seaweed服务器

2022-05-05 14:57:52 浏览数 (1)

大文件异步分片上传到Seaweed服务器

大文件分片上传到服务器临时目录

主要过程

客户端把大文件分片上传, 服务器接收到文件后, 按照每段的序号和每段大小重新拼接成完整的临时文件. 然后再将临时文件上传到文件服务器(Seaweed).

大文件上传到临时目录

接受文件的类

代码语言:javascript复制
/**
 * 文件传输对象
 */
@ApiModel("大文件分片入参实体")
@Data
public class MultipartFileParam {
    @ApiModelProperty("文件传输任务ID")
    private String taskId;

    @ApiModelProperty("当前为第几分片")
    private int chunk;

    @ApiModelProperty("每个分块的大小")
    private long size;

    @ApiModelProperty("分片总数")
    private int chunkTotal;

    @ApiModelProperty("分块文件传输对象")
    private MultipartFile file;
}

文件处理工具类

所有文件上传完毕后, 返回临时文件的存放路径, 否则返回空字符串

代码语言:javascript复制
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.commons.io.FileUtils;

import com.gato.cloud.sppc.common.bean.MultipartFileParam;

public class FileUtil {

    private static final Logger logger = Logger.getLogger(FileUtil.class.getName());

    private static String OS = System.getProperty("os.name").toLowerCase();
    // 文件临时存放位置
    private static final String URL_PROPERTIES_UNIX = "/opt/big_file_tmp";
    private static final String URL_PROPERTIES_WIN = "c:\big_file_tmp";
    private static String basePath = URL_PROPERTIES_UNIX;
    static {
        try {
            if (isWindows()) {
                basePath = URL_PROPERTIES_WIN;
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    // 第一步:获取RandomAccessFile,随机访问文件类的对象
    // 第二步:调用RandomAccessFile的getChannel()方法,打开文件通道 FileChannel
    // 第三步:获取当前是第几个分块,计算文件的最后偏移量
    // 第四步:获取当前文件分块的字节数组,用于获取文件字节长度
    // 第五步:使用文件通道FileChannel类的 map()方法创建直接字节缓冲器 MappedByteBuffer
    // 第六步:将分块的字节数组放入到当前位置的缓冲区内 mappedByteBuffer.put(byte[] b);
    // 第七步:释放缓冲区
    // 第八步:检查文件是否全部完成上传
    public static String uploadFileByMappedByteBuffer(MultipartFileParam param) throws IOException {
        if (param.getTaskId() == null || "".equals(param.getTaskId())) {
            param.setTaskId(UUID.randomUUID().toString());
        }
        /**
         * 1:原文件名改为UUID 2:创建临时文件,和源文件一个路径 3:如果文件路径不存在重新创建
         */
        String fileName = param.getFile().getOriginalFilename();
        // fileName.substring(fileName.lastIndexOf(".")) 这个地方可以直接写死 写成你的上传路径
        String tempFileName = param.getTaskId()   fileName.substring(fileName.lastIndexOf("."))   "_tmp";
        String filePath = basePath   "/original";
        File fileDir = new File(filePath);
        if (!fileDir.exists()) {
            fileDir.mkdirs();
        }
        File tempFile = new File(filePath, tempFileName);
        try (RandomAccessFile raf = new RandomAccessFile(tempFile, "rw"); // 第一步
            FileChannel fileChannel = raf.getChannel()) // 第二步
        {
            // 第三步
            long offset = param.getChunk() * param.getSize();
            // 第四步
            byte[] fileData = param.getFile().getBytes();
            // 第五步
            MappedByteBuffer mappedByteBuffer =
                fileChannel.map(FileChannel.MapMode.READ_WRITE, offset, fileData.length);
            // 第六步
            mappedByteBuffer.put(fileData);
            // 第七步
            freedMappedByteBuffer(mappedByteBuffer);
        } catch (IOException e) {
            logger.log(Level.SEVERE, "上传大文件出错", e);
        }
        // 第八步
        boolean isComplete = checkUploadStatus(param, fileName, filePath);
        if (isComplete) {
            String storeName = param.getTaskId()   fileName.substring(fileName.lastIndexOf("."));
            boolean flag = renameFile(tempFile, storeName);
            if (flag) {
                // 返回临时文件存放路径
                return filePath   "/"   storeName;
            }
        }
        return "";
    }

    /**
     * 文件重命名
     *
     * @param toBeRenamed
     *            将要修改名字的文件
     * @param toFileNewName
     *            新的名字
     * @return
     */
    public static boolean renameFile(File toBeRenamed, String toFileNewName) {
        // 检查要重命名的文件是否存在,是否是文件
        if (!toBeRenamed.exists() || toBeRenamed.isDirectory()) {
            return false;
        }
        String p = toBeRenamed.getParent();
        File newFile = new File(p   File.separatorChar   toFileNewName);
        // 修改文件名
        return toBeRenamed.renameTo(newFile);
    }

    /**
     * 检查文件上传进度
     *
     * @return
     */
    public static boolean checkUploadStatus(MultipartFileParam param, String fileName, String filePath)
        throws IOException {
        File confFile = new File(filePath, fileName   ".conf");
        try (RandomAccessFile confAccessFile = new RandomAccessFile(confFile, "rw")) {
            // 设置文件长度
            confAccessFile.setLength(param.getChunkTotal());
            // 设置起始偏移量
            confAccessFile.seek(param.getChunk());
            // 将指定的一个字节写入文件中 127,
            confAccessFile.write(Byte.MAX_VALUE);
        } catch (IOException e) {
            logger.log(Level.SEVERE, "读取上传大文件临时文件异常", e);
            return false;
        }
        byte[] completeStatusList = FileUtils.readFileToByteArray(confFile);
        byte isComplete = Byte.MAX_VALUE;
        // 创建conf文件文件长度为总分片数,
        // 每上传一个分块即向conf文件中写入一个127,
        // 那么没上传的位置就是默认的0,已上传的就是Byte.MAX_VALUE 127
        for (int i = 0; i < completeStatusList.length && isComplete == Byte.MAX_VALUE; i  ) {
            // 按位与运算,将&两边的数转为二进制进行比较,有一个为0结果为0,全为1结果为1 eg.3&5 即 0000 0011 & 0000 0101 = 0000 0001 因此,3&5的值得1。
            isComplete = (byte)(isComplete & completeStatusList[i]);
            System.out.println("check part "   i   " complete?:"   completeStatusList[i]);
        }
        if (isComplete == Byte.MAX_VALUE) {
            // 如果全部文件上传完成,删除conf文件
            confFile.delete();
            return true;
        }
        return false;
    }

    public static boolean isWindows() {
        return (OS.indexOf("win") >= 0);
    }

    // 在MappedByteBuffer释放后再对它进行读操作的话就会引发jvm crash,在并发情况下很容易发生
    // 正在释放时另一个线程正开始读取,于是crash就发生了。
    // 所以为了系统稳定性释放前一般需要检查是否还有线程在读或写
    public static void freedMappedByteBuffer(final MappedByteBuffer mappedByteBuffer) {
        try {
            if (mappedByteBuffer == null) {
                return;
            }
            mappedByteBuffer.force();
            AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Method getCleanerMethod = mappedByteBuffer.getClass().getMethod("cleaner", new Class[0]);
                        // 可以访问private的权限
                        getCleanerMethod.setAccessible(true);
                        // 在具有指定参数的 方法对象上调用此 方法对象表示的底层方法
                        sun.misc.Cleaner cleaner =
                            (sun.misc.Cleaner)getCleanerMethod.invoke(mappedByteBuffer, new Object[0]);
                        cleaner.clean();
                    } catch (Exception e) {
                        logger.log(Level.SEVERE, "clean MappedByteBuffer error!!!", e);
                    }
                    logger.info("clean MappedByteBuffer completed!!!");
                    return null;
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

临时文件上传到文件服务器(Seaweed)

代码语言:javascript复制
     /**
     * 文件系统上传文件的地址
     */
	@Value("${custom.fileTargetUrl}")
    private String FILE_TARGET_URL ;

	public String uploadFile(MultipartFileParam param) throws IOException {
        String filePath = FileUtil.uploadFileByMappedByteBuffer(param);
        if (StringUtils.isNotEmpty(filePath)) {
            // 文件已经整合完了, 上传文件到服务器
            String result = sendToFileStoreFromLocalPath(filePath);
            String url = JSONObject.parseObject(result).get("fileUrl").toString();
            // 文件上传完后, 删除临时文件
            File temFile = new File(filePath);
            temFile.delete();
            return url;
        }
        return null;
    }

	/**
     * 获取本地文件,转存到文件系统
     * @param filPath 本地文件地址
     * @return
     */
    public static String sendToFileStoreFromLocalPath(String filPath) {
        if (filPath == null || "".equals(filPath)) {
            return null;
        }
        HttpPost postRequest = new HttpPost(FILE_TARGET_URL);
        try(
                //获取本地文件输入流
                InputStream inputStream = new FileInputStream(filPath);
                CloseableHttpClient client = HttpClientBuilder.create().build()
            ) {
            // 将流写入文件系统
            MultipartEntityBuilder builder = MultipartEntityBuilder.create();
            builder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
            builder.addBinaryBody("file", inputStream);

            postRequest.setEntity(builder.build());

            HttpResponse postResponse = client.execute(postRequest);

            // 获取响应信息
            String result = EntityUtils.toString(postResponse.getEntity());
            postRequest.releaseConnection();
            return result;
        } catch (Exception e) {
            log.info("获取本地文件,转存到文件系统出错:{}",e.getMessage(),e);
            return null;
        } finally {
            postRequest.releaseConnection();
        }
    }

0 人点赞