/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.filecache;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCacheStat;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileReader;
import org.apache.flink.runtime.state.gemini.engine.fs.FileWriter;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.compress.GCompressAlgorithm;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NoCapacityFileCache
extends FileCache {
    private static final Logger LOG = LoggerFactory.getLogger(NoCapacityFileCache.class);
    private final GContext gContext;
    private final long maxFileSize;
    private final FileManager dfsFileManager;
    private volatile boolean closed;
    private Map<EventExecutor, FileWriter> destFileWriters;

    public NoCapacityFileCache(GContext gContext, FileManager dfsFileManager) {
        super(0L, new FileCacheStat());
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.maxFileSize = gContext.getGConfiguration().getMaxLogStructureFileSize();
        this.dfsFileManager = (FileManager)Preconditions.checkNotNull((Object)dfsFileManager);
        this.destFileWriters = new ConcurrentHashMap<EventExecutor, FileWriter>();
        FileCacheMetrics fileCacheMetrics = gContext.getFileCacheMetrics();
        if (fileCacheMetrics != null) {
            fileCacheMetrics.register(this.fileCacheStat);
        }
        this.closed = false;
        LOG.info("NoCapacityFileCache created, DestFileManager {}", (Object)dfsFileManager);
    }

    @Override
    public boolean isCached(PageAddress pageAddress) {
        return pageAddress.isDfsValid();
    }

    @Override
    public void addPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, BiConsumer<Boolean, Throwable> callBack) {
        this.flushPage(pageAddress, gRegionContext, eventExecutor, false, callBack);
    }

    @Override
    public DataPage getPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        Preconditions.checkState((boolean)pageAddress.isDfsValid(), (Object)"dfs address should be valid");
        try {
            DataPage dataPage = this.getDataPage(this.dfsFileManager, gRegionContext, pageAddress::getDfsAddress, pageAddress);
            Preconditions.checkNotNull((Object)dataPage, (String)"no page exists on dfs");
            return dataPage;
        }
        catch (Exception e) {
            LOG.error("exception when get page, {}", (Throwable)e);
            throw new GeminiRuntimeException("exception when get page", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void discardPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        boolean pageValid;
        boolean dfsValid = false;
        long dfsAddress = -1L;
        PageAddress pageAddress2 = pageAddress;
        synchronized (pageAddress2) {
            pageValid = pageAddress.isPageValid();
            if (pageValid) {
                pageAddress.setPageStatus(false);
                dfsValid = pageAddress.isDfsValid();
                if (dfsValid) {
                    dfsAddress = pageAddress.getDfsAddress();
                }
            }
        }
        if (pageValid && dfsValid) {
            long accessNumber = this.gContext.getAccessNumber();
            long ts = System.currentTimeMillis();
            this.dfsFileManager.decDBReference(dfsAddress, accessNumber, ts, pageAddress.getDataLen());
        }
    }

    @Override
    public void flushPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, boolean force, BiConsumer<Boolean, Throwable> callBack) {
        if (!force && pageAddress.isDfsValid()) {
            if (callBack != null) {
                callBack.accept(true, null);
            }
            return;
        }
        DataPage oriDataPage = pageAddress.getDataPage();
        eventExecutor.submit(() -> {
            boolean success = false;
            Exception throwable = null;
            boolean pageIsNull = false;
            DataPage dataPage = oriDataPage;
            try {
                if (force || !pageAddress.isDfsValid()) {
                    if (dataPage == null) {
                        pageIsNull = true;
                        if (pageAddress.isDfsValid()) {
                            dataPage = this.getDataPage(this.dfsFileManager, gRegionContext, pageAddress::getDfsAddress, pageAddress);
                        }
                    }
                    Preconditions.checkNotNull((Object)dataPage, (String)"Data page is null");
                    FileWriter fileWriter = this.getOrCreateFileWriter(this.destFileWriters, this.dfsFileManager, eventExecutor);
                    this.internalAddPage(this.dfsFileManager, fileWriter, pageAddress, dataPage, gRegionContext, false, true);
                }
                success = true;
            }
            catch (Exception e) {
                success = false;
                throwable = e;
                LOG.error("error when adding page to cache: pageIsNull={}, {}", new Object[]{pageIsNull, e.getMessage(), e});
            }
            finally {
                if (dataPage != null) {
                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void sync(EventExecutor eventExecutor) throws IOException {
        FileWriter fileWriter = this.destFileWriters.get(eventExecutor);
        if (fileWriter != null) {
            fileWriter.sync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        NoCapacityFileCache noCapacityFileCache = this;
        synchronized (noCapacityFileCache) {
            if (this.closed) {
                LOG.warn("NoCapacityFileCache has been closed");
                return;
            }
            this.closed = true;
        }
        for (FileWriter fileWriter : this.destFileWriters.values()) {
            this.dfsFileManager.closeFileWriter(fileWriter);
        }
        this.destFileWriters.clear();
        LOG.info("NoCapacityFileCache is closed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void internalAddPage(FileManager fileManager, FileWriter fileWriter, PageAddress pageAddress, DataPage dataPage, GRegionContext gRegionContext, boolean isLocal, boolean flushForce) throws Exception {
        boolean pageValid;
        int onDiskLen;
        long address;
        long startTime = System.nanoTime();
        int numRetires = 0;
        while (true) {
            try {
                address = fileWriter.getAddress();
                onDiskLen = dataPage.write(fileWriter, gRegionContext.getPageSerdeFlink(), pageAddress, isLocal ? gRegionContext.getGContext().getFlushWholePageGCompressAlgorithm() : GCompressAlgorithm.None, gRegionContext.getGContext().getGConfiguration().isChecksumEnable());
                fileWriter.resetFailCount();
            }
            catch (IOException e) {
                fileWriter.increasFailCount();
                if (++numRetires <= 3) continue;
                LOG.error("internal add page exception: {}, {}", (Object)fileWriter, (Object)pageAddress);
                throw e;
            }
            break;
        }
        if (flushForce) {
            fileWriter.flush();
        }
        this.fileCacheStat.addDFSWrite(dataPage.getSize(), System.nanoTime() - startTime);
        fileManager.incDBReference(address, pageAddress.getDataLen());
        boolean hasOldAddress = false;
        long oldAddress = 0L;
        PageAddress pageAddress2 = pageAddress;
        synchronized (pageAddress2) {
            pageAddress.afterFlush(onDiskLen, gRegionContext.getGContext().getFlushWholePageGCompressAlgorithm());
            pageValid = pageAddress.isPageValid();
            if (pageValid) {
                hasOldAddress = isLocal ? pageAddress.isLocalValid() : pageAddress.isDfsValid();
                long l = oldAddress = isLocal ? pageAddress.getLocalAddress() : pageAddress.getDfsAddress();
            }
            if (isLocal) {
                pageAddress.setLocalAddress(address);
                pageAddress.setLocalStatus(true);
            } else {
                pageAddress.setDfsAddress(address);
                pageAddress.setDfsStatus(true);
            }
        }
        if (pageValid) {
            if (hasOldAddress) {
                fileManager.decDBReference(oldAddress, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
            }
        } else {
            fileManager.decDBReference(address, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
        }
    }

    private DataPage getDataPage(FileManager fileManager, GRegionContext gRegionContext, Callable<Long> addressCallable, PageAddress pageAddress) throws Exception {
        int unexpectedTries = 0;
        int expectedTries = 0;
        long address = addressCallable.call();
        while (true) {
            try {
                FileReader fileReader = fileManager.getFileReader(address);
                long offset = fileManager.getFileOffset(address);
                long startTime = System.nanoTime();
                DataPage dataPage = fileManager.getDataPageUtil().getDataPageFromReader(gRegionContext.getPageSerdeFlink(), fileReader, (int)offset, pageAddress);
                this.fileCacheStat.addDFSRead(pageAddress.getOnDiskDataLen(), System.nanoTime() - startTime);
                return dataPage;
            }
            catch (Exception e) {
                long oldAddress = address;
                address = addressCallable.call();
                if (oldAddress == address) {
                    ++unexpectedTries;
                    continue;
                }
                ++expectedTries;
                if (unexpectedTries < 3 && expectedTries < 10) continue;
                LOG.error("get page failed, try " + unexpectedTries + " times unexpectedly, and try " + expectedTries + " times expectedly, last exception " + e);
                throw e;
            }
            break;
        }
    }

    private FileWriter getOrCreateFileWriter(Map<EventExecutor, FileWriter> fileWriterMap, FileManager fileManager, EventExecutor eventExecutor) {
        if (this.closed) {
            throw new GeminiRuntimeException("NoCapacityFileCache has been closed");
        }
        FileWriter fileWriter = fileWriterMap.get(eventExecutor);
        if (!(fileWriter == null || fileWriter.isValid() && (long)fileWriter.size() < this.maxFileSize)) {
            fileManager.closeFileWriter(fileWriter);
            fileWriterMap.remove(eventExecutor);
            LOG.info("close file writer {}/{} in {}", new Object[]{fileWriter.getFileID(), fileWriter.isValid(), eventExecutor});
            fileWriter = null;
        }
        if (fileWriter == null) {
            fileWriter = fileManager.createNewFileWriter();
            fileWriterMap.put(eventExecutor, fileWriter);
            LOG.info("create new file writer {} in {}", (Object)fileWriter.getFileID(), (Object)eventExecutor);
        }
        return fileWriter;
    }
}

