/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.temptable;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.external.PersistentFileType;
import org.apache.flink.service.LifeCycleAware;
import org.apache.flink.table.runtime.functions.aggfunctions.cardinality.MurmurHash;
import org.apache.flink.table.temptable.TableServiceOptions;
import org.apache.flink.table.temptable.util.BytesUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableStorage
implements LifeCycleAware {
    private static final Logger logger = LoggerFactory.getLogger(TableStorage.class);
    private long maxSegmentSize;
    private String storagePath;
    private Map<String, NavigableMap<Long, File>> partitionSegmentTracker;

    @Override
    public void open(Configuration config) {
        String tableServiceId = config.getString(TableServiceOptions.TABLE_SERVICE_ID, UUID.randomUUID().toString());
        String rootPath = config.getString(TableServiceOptions.TABLE_SERVICE_STORAGE_ROOT_PATH, System.getProperty("user.dir"));
        this.storagePath = rootPath + File.separator + "tableservice_" + tableServiceId;
        this.maxSegmentSize = Long.MAX_VALUE;
        this.deleteAll(this.storagePath);
        this.createDirs(this.storagePath);
        this.partitionSegmentTracker = new ConcurrentHashMap<String, NavigableMap<Long, File>>();
        logger.info("TableStorage opened with storage path: " + this.storagePath);
    }

    @Override
    public void close() {
        this.partitionSegmentTracker.clear();
        this.deleteAll(this.storagePath);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void deleteAll(String root) {
        Class<TableStorage> clazz = TableStorage.class;
        synchronized (TableStorage.class) {
            logger.info("delete file under: " + root);
            File file = new File(root);
            if (file.exists()) {
                File[] subFiles = file.listFiles();
                if (subFiles != null) {
                    for (File subFile : subFiles) {
                        if (subFile.isDirectory()) {
                            this.deleteAll(subFile.getAbsolutePath());
                            continue;
                        }
                        subFile.delete();
                    }
                }
                logger.info("delete file:" + root);
                file.delete();
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void createDirs(String root) {
        Class<TableStorage> clazz = TableStorage.class;
        synchronized (TableStorage.class) {
            File dir = new File(root);
            if (!dir.exists()) {
                dir.mkdirs();
            }
            // ** MonitorExit[var2_2] (shouldn't be in output)
            return;
        }
    }

    public void write(String tableName, int partitionId, byte[] content) {
        long lastFileOffset;
        File lastFile;
        String baseDirPath = this.getPartitionDirPath(tableName, partitionId);
        File baseDir = new File(baseDirPath);
        if (!baseDir.exists()) {
            baseDir.mkdirs();
        }
        int offset = 0;
        NavigableMap offsetMap = this.partitionSegmentTracker.computeIfAbsent(baseDirPath, d -> new ConcurrentSkipListMap());
        Map.Entry lastEntry = offsetMap.lastEntry();
        if (lastEntry == null) {
            lastFile = new File(baseDirPath + File.separator + 0 + ".data");
            lastFileOffset = 0L;
            offsetMap.put(0L, lastFile);
        } else {
            lastFile = (File)lastEntry.getValue();
            lastFileOffset = (Long)lastEntry.getKey();
        }
        while (offset < content.length) {
            int writeBytes = (int)Math.min(this.maxSegmentSize - lastFile.length(), (long)(content.length - offset));
            this.writeFile(lastFile, content, offset, writeBytes);
            if ((offset += writeBytes) >= content.length) continue;
            lastFile = new File(baseDirPath + File.separator + (lastFileOffset += this.maxSegmentSize));
            offsetMap.put(lastFileOffset, lastFile);
        }
    }

    public void delete(String tableName, int partitionId) {
        logger.debug("delete table, tableName: " + tableName + ", partitionId: " + partitionId);
        String baseDirPath = this.getPartitionDirPath(tableName, partitionId);
        this.deleteAll(baseDirPath);
    }

    public void finishPartition(String tableName, int partitionId) {
        String baseDirPath = this.getPartitionDirPath(tableName, partitionId);
        String finishFilePath = baseDirPath + File.separator + "finished";
        byte[] finishData = this.generateFinishData();
        File finishFile = new File(finishFilePath);
        if (finishFile.exists()) {
            finishFile.delete();
        }
        try {
            finishFile.createNewFile();
        }
        catch (IOException e2) {
            e2.printStackTrace();
        }
        this.writeFile(finishFile, finishData, 0, finishData.length);
        String indexFilePath = baseDirPath + File.separator + "0.index";
        byte[] indexData = this.generateIndexData(baseDirPath);
        File indexFile = new File(indexFilePath);
        if (indexFile.exists()) {
            indexFile.delete();
        }
        try {
            indexFile.createNewFile();
        }
        catch (IOException e3) {
            e3.printStackTrace();
        }
        this.writeFile(indexFile, indexData, 0, indexData.length);
    }

    private byte[] generateIndexData(String baseDirPath) {
        String dataFilePath = baseDirPath + File.separator + "0.data";
        long fileLength = new File(dataFilePath).length();
        byte[] indexData = new byte[24];
        byte[] size = BytesUtil.intToBytes(1);
        byte[] partitionId = BytesUtil.intToBytes(0);
        byte[] startOffset = BytesUtil.longToBytes(0L);
        byte[] length = BytesUtil.longToBytes(fileLength);
        int offset = 0;
        System.arraycopy(size, 0, indexData, offset, 4);
        System.arraycopy(partitionId, 0, indexData, offset += 4, 4);
        System.arraycopy(startOffset, 0, indexData, offset += 4, 8);
        System.arraycopy(length, 0, indexData, offset += 8, 8);
        offset += 8;
        return indexData;
    }

    private byte[] generateFinishData() {
        byte[] fileTypeBytes = null;
        try {
            fileTypeBytes = PersistentFileType.HASH_PARTITION_FILE.toString().getBytes("utf-8");
        }
        catch (UnsupportedEncodingException e2) {
            e2.printStackTrace();
        }
        int totalLength = 8 + fileTypeBytes.length + 4 + 4;
        int offset = 0;
        byte[] data = new byte[totalLength];
        byte[] versionBytes = BytesUtil.intToBytes(1);
        byte[] fileTypeLengthBytes = BytesUtil.intToBytes(fileTypeBytes.length);
        byte[] spillCountBytes = BytesUtil.intToBytes(1);
        byte[] subpartitionNumBytes = BytesUtil.intToBytes(1);
        System.arraycopy(versionBytes, 0, data, offset, 4);
        System.arraycopy(fileTypeLengthBytes, 0, data, offset += 4, 4);
        System.arraycopy(fileTypeBytes, 0, data, offset += 4, fileTypeBytes.length);
        System.arraycopy(spillCountBytes, 0, data, offset += fileTypeBytes.length, 4);
        System.arraycopy(subpartitionNumBytes, 0, data, offset += 4, 4);
        offset += 4;
        return data;
    }

    private void writeFile(File file, byte[] content, int offset, int len) {
        FileOutputStream output2 = null;
        FilterOutputStream bufferedOutput = null;
        try {
            output2 = new FileOutputStream(file, true);
            bufferedOutput = new BufferedOutputStream(output2);
            ((BufferedOutputStream)bufferedOutput).write(content, offset, len);
        }
        catch (Exception e2) {
            logger.error(e2.getMessage(), (Throwable)e2);
            throw new RuntimeException("write file error", e2);
        }
        finally {
            if (bufferedOutput != null) {
                try {
                    bufferedOutput.close();
                }
                catch (Exception exception) {}
            }
            if (output2 != null) {
                try {
                    ((OutputStream)output2).close();
                }
                catch (Exception exception) {}
            }
        }
    }

    @VisibleForTesting
    public int read(String tableName, int partitionId, int offset, int readCount, byte[] buffer) {
        String baseDirPath = this.getPartitionDirPath(tableName, partitionId);
        if (!this.partitionSegmentTracker.containsKey(baseDirPath)) {
            logger.error("file: " + baseDirPath + " is not ready for read.");
            throw new RuntimeException("file: " + baseDirPath + " is not ready for read.");
        }
        int totalRead = 0;
        NavigableMap<Long, File> offsetMap = this.partitionSegmentTracker.get(baseDirPath);
        Map.Entry<Long, File> segmentEntry = offsetMap.floorEntry(Long.valueOf(offset));
        while (segmentEntry != null && totalRead < readCount) {
            Long segmentFileOffset = segmentEntry.getKey();
            File segmentFile = segmentEntry.getValue();
            long readLimit = segmentFileOffset + this.maxSegmentSize - (long)offset;
            long readBytes = readLimit >= (long)(readCount - totalRead) ? (long)(readCount - totalRead) : readLimit;
            int nRead = this.readFile(segmentFile, (long)offset - segmentFileOffset, readBytes, buffer, totalRead);
            totalRead += nRead;
            offset += nRead;
            segmentEntry = offsetMap.higherEntry(segmentFileOffset);
        }
        return totalRead;
    }

    private int readFile(File file, long fileOffset, long readCount, byte[] buffer, int bufferOffset) {
        int nRead;
        FileInputStream input = null;
        BufferedInputStream bufferedInput = null;
        try {
            input = new FileInputStream(file);
            bufferedInput = new BufferedInputStream(input);
            bufferedInput.skip(fileOffset);
            nRead = bufferedInput.read(buffer, bufferOffset, (int)readCount);
        }
        catch (Exception e2) {
            logger.error(e2.getMessage(), (Throwable)e2);
            throw new RuntimeException("read file error", e2);
        }
        finally {
            if (bufferedInput != null) {
                try {
                    bufferedInput.close();
                }
                catch (IOException iOException) {}
            }
            if (input != null) {
                try {
                    input.close();
                }
                catch (IOException iOException) {}
            }
        }
        return nRead;
    }

    public void initializePartition(String tableName, int partitionId) {
        String partitionPath = this.getPartitionDirPath(tableName, partitionId);
        this.partitionSegmentTracker.remove(partitionPath);
        this.deleteAll(partitionPath);
        File partition2 = new File(partitionPath);
        partition2.mkdirs();
    }

    @VisibleForTesting
    Map<String, NavigableMap<Long, File>> getPartitionSegmentTracker() {
        return this.partitionSegmentTracker;
    }

    @VisibleForTesting
    String getPartitionDirPath(String tableName, int partitionId) {
        long tableHashId = MurmurHash.hash(tableName) & Integer.MAX_VALUE;
        return this.storagePath + File.separator + tableHashId + File.separator + partitionId;
    }
}

