/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.filecache;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCacheStat;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileReader;
import org.apache.flink.runtime.state.gemini.engine.fs.FileWriter;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.compress.GCompressAlgorithm;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfiniteCapacityFileCache
extends FileCache {
    private static final Logger LOG = LoggerFactory.getLogger(InfiniteCapacityFileCache.class);
    private final GContext gContext;
    private final long maxFileSize;
    private final FileManager localFileManager;
    private Map<EventExecutor, FileWriter> localFileWriters;
    private final FileManager dfsFileManager;
    private Map<EventExecutor, FileWriter> dfsFileWriters;
    private volatile boolean closed;

    public InfiniteCapacityFileCache(GContext gContext, FileManager localFileManager, FileManager dfsFileManager) {
        super(Long.MAX_VALUE, new FileCacheStat());
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.maxFileSize = gContext.getGConfiguration().getMaxLogStructureFileSize();
        Preconditions.checkArgument((this.maxFileSize > 0L ? 1 : 0) != 0, (Object)"Max file size should be positive");
        this.localFileManager = (FileManager)Preconditions.checkNotNull((Object)localFileManager);
        this.dfsFileManager = (FileManager)Preconditions.checkNotNull((Object)dfsFileManager);
        this.localFileWriters = new ConcurrentHashMap<EventExecutor, FileWriter>();
        this.dfsFileWriters = new ConcurrentHashMap<EventExecutor, FileWriter>();
        FileCacheMetrics fileCacheMetrics = gContext.getFileCacheMetrics();
        if (fileCacheMetrics != null) {
            fileCacheMetrics.register(this.fileCacheStat);
        }
        this.closed = false;
        LOG.info("InfiniteCapacityFileCache created, LocalFileManager {}, DfsFileManager {}", (Object)localFileManager, (Object)dfsFileManager);
    }

    @Override
    public boolean isCached(PageAddress pageAddress) {
        return pageAddress.isLocalValid();
    }

    @Override
    public void addPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor flushEventExecutor, BiConsumer<Boolean, Throwable> callBack) {
        if (pageAddress.isLocalValid()) {
            if (callBack != null) {
                callBack.accept(true, null);
            }
            return;
        }
        DataPage oriDataPage = pageAddress.getDataPage();
        flushEventExecutor.submit(() -> {
            boolean success = false;
            Exception throwable = null;
            DataPage dataPage = oriDataPage;
            try {
                if (!pageAddress.isLocalValid()) {
                    if (dataPage == null && pageAddress.isDfsValid()) {
                        dataPage = this.getDataPage(this.dfsFileManager, gRegionContext, pageAddress::getDfsAddress, pageAddress, false);
                    }
                    if (dataPage != null) {
                        FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, flushEventExecutor);
                        this.internalAddPage(this.localFileManager, fileWriter, pageAddress, dataPage, gRegionContext, true, true);
                    } else {
                        throw new GeminiRuntimeException("data page does not exist");
                    }
                }
                success = true;
            }
            catch (Exception e) {
                success = false;
                throwable = e;
                LOG.error("error when adding page to cache: {}", (Throwable)e);
            }
            finally {
                if (dataPage != null) {
                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public DataPage getPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor flushEventExecutor) {
        try {
            DataPage dataPage = null;
            if (pageAddress.isLocalValid()) {
                dataPage = this.getDataPage(this.localFileManager, gRegionContext, pageAddress::getLocalAddress, pageAddress, true);
                if (dataPage != null) {
                    this.fileCacheStat.addHitSize(dataPage.getSize());
                }
            } else if (pageAddress.isDfsValid()) {
                dataPage = this.getDataPage(this.dfsFileManager, gRegionContext, pageAddress::getDfsAddress, pageAddress, false);
                if (dataPage != null) {
                    DataPage cacheDataPage = dataPage;
                    cacheDataPage.addReferenceCount();
                    flushEventExecutor.submit(() -> {
                        try {
                            FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, flushEventExecutor);
                            this.internalAddPage(this.localFileManager, fileWriter, pageAddress, cacheDataPage, gRegionContext, true, true);
                        }
                        catch (Exception e) {
                            LOG.error("cache data failed, {}", (Throwable)e);
                        }
                        finally {
                            cacheDataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                        }
                    });
                    this.fileCacheStat.addMissSize(dataPage.getSize());
                }
            }
            Preconditions.checkNotNull(dataPage, (String)"no page exists on local and dfs");
            return dataPage;
        }
        catch (Exception e) {
            LOG.error("exception when get page, {}", (Throwable)e);
            throw new GeminiRuntimeException("exception when get page", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void discardPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        boolean pageValid;
        boolean localValid = false;
        long localAddress = -1L;
        boolean dfsValid = false;
        long dfsAddress = -1L;
        PageAddress pageAddress2 = pageAddress;
        synchronized (pageAddress2) {
            pageValid = pageAddress.isPageValid();
            if (pageValid) {
                pageAddress.setPageStatus(false);
                localValid = pageAddress.isLocalValid();
                dfsValid = pageAddress.isDfsValid();
                if (localValid) {
                    localAddress = pageAddress.getLocalAddress();
                }
                if (dfsValid) {
                    dfsAddress = pageAddress.getDfsAddress();
                }
            }
        }
        if (pageValid) {
            long accessNumber = this.gContext.getAccessNumber();
            long ts = System.currentTimeMillis();
            if (localValid) {
                this.localFileManager.decDBReference(localAddress, accessNumber, ts, pageAddress.getDataLen());
            }
            if (dfsValid) {
                this.dfsFileManager.decDBReference(dfsAddress, accessNumber, ts, pageAddress.getDataLen());
            }
        }
    }

    @Override
    public void flushPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, boolean force, BiConsumer<Boolean, Throwable> callBack) {
        if (!force && pageAddress.isDfsValid()) {
            if (callBack != null) {
                callBack.accept(true, null);
            }
            return;
        }
        DataPage oriDataPage = pageAddress.getDataPage();
        eventExecutor.submit(() -> {
            boolean success = false;
            Exception throwable = null;
            boolean pageIsNull = false;
            DataPage dataPage = oriDataPage;
            try {
                if (force || !pageAddress.isDfsValid()) {
                    if (dataPage == null) {
                        pageIsNull = true;
                        if (pageAddress.isLocalValid()) {
                            dataPage = this.getDataPage(this.localFileManager, gRegionContext, pageAddress::getLocalAddress, pageAddress, true);
                        } else if (pageAddress.isDfsValid()) {
                            dataPage = this.getDataPage(this.dfsFileManager, gRegionContext, pageAddress::getDfsAddress, pageAddress, false);
                            if (dataPage != null) {
                                DataPage cacheDataPage = dataPage;
                                cacheDataPage.addReferenceCount();
                                eventExecutor.submit(() -> {
                                    try {
                                        FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor);
                                        this.internalAddPage(this.localFileManager, fileWriter, pageAddress, cacheDataPage, gRegionContext, true, true);
                                    }
                                    catch (Exception e) {
                                        LOG.error("cache data failed, {}", (Throwable)e);
                                    }
                                    finally {
                                        cacheDataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                                    }
                                });
                            }
                        }
                    }
                    Preconditions.checkNotNull((Object)dataPage, (String)"Data page is null");
                    FileWriter fileWriter = this.getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                    this.internalAddPage(this.dfsFileManager, fileWriter, pageAddress, dataPage, gRegionContext, false, false);
                }
                success = true;
            }
            catch (Exception e) {
                success = false;
                throwable = e;
                LOG.error("error when adding page to cache: pageIsNull={}, {}", new Object[]{pageIsNull, e.getMessage(), e});
            }
            finally {
                if (dataPage != null) {
                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void sync(EventExecutor eventExecutor) throws IOException {
        FileWriter fileWriter = this.dfsFileWriters.get(eventExecutor);
        if (fileWriter != null) {
            fileWriter.sync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Iterator<FileWriter> iterator = this;
        synchronized (iterator) {
            if (this.closed) {
                LOG.warn("NoCapacityFileCache has been closed");
                return;
            }
            this.closed = true;
        }
        for (FileWriter fileWriter : this.localFileWriters.values()) {
            this.localFileManager.closeFileWriter(fileWriter);
        }
        this.localFileWriters.clear();
        for (FileWriter fileWriter : this.dfsFileWriters.values()) {
            this.dfsFileManager.closeFileWriter(fileWriter);
        }
        this.dfsFileWriters.clear();
        LOG.info("InfiniteCapacityFileCache is closed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void internalAddPage(FileManager fileManager, FileWriter fileWriter, PageAddress pageAddress, DataPage dataPage, GRegionContext gRegionContext, boolean isLocal, boolean flushForce) throws Exception {
        boolean pageValid;
        int diskDataLen;
        long address;
        long startTime = System.nanoTime();
        int numRetires = 0;
        while (true) {
            try {
                address = fileWriter.getAddress();
                diskDataLen = dataPage.write(fileWriter, gRegionContext.getPageSerdeFlink(), pageAddress, isLocal ? gRegionContext.getGContext().getFlushWholePageGCompressAlgorithm() : GCompressAlgorithm.None, gRegionContext.getGContext().getGConfiguration().isChecksumEnable());
                fileWriter.resetFailCount();
            }
            catch (Exception e) {
                fileWriter.increasFailCount();
                if (++numRetires <= 3) continue;
                LOG.error("internal add page exception: {}, {}, {}", new Object[]{fileWriter, pageAddress, e});
                throw e;
            }
            break;
        }
        this.updateWriteStat(diskDataLen, dataPage.getSize(), System.nanoTime() - startTime, isLocal);
        if (flushForce) {
            fileWriter.flush();
        }
        fileManager.incDBReference(address, pageAddress.getDataLen());
        boolean hasOldAddress = false;
        long oldAddress = 0L;
        PageAddress pageAddress2 = pageAddress;
        synchronized (pageAddress2) {
            pageAddress.afterFlush(diskDataLen, gRegionContext.getGContext().getFlushWholePageGCompressAlgorithm());
            pageValid = pageAddress.isPageValid();
            if (pageValid) {
                hasOldAddress = isLocal ? pageAddress.isLocalValid() : pageAddress.isDfsValid();
                long l = oldAddress = isLocal ? pageAddress.getLocalAddress() : pageAddress.getDfsAddress();
            }
            if (isLocal) {
                pageAddress.setLocalAddress(address);
                pageAddress.setLocalStatus(true);
            } else {
                pageAddress.setDfsAddress(address);
                pageAddress.setDfsStatus(true);
            }
        }
        if (pageValid) {
            if (hasOldAddress) {
                fileManager.decDBReference(oldAddress, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
            }
        } else {
            fileManager.decDBReference(address, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
        }
    }

    private DataPage getDataPage(FileManager fileManager, GRegionContext gRegionContext, Callable<Long> addressCallable, PageAddress pageAddress, boolean isLocal) throws Exception {
        int unexpectedTries = 0;
        int expectedTries = 0;
        long address = addressCallable.call();
        while (true) {
            try {
                FileReader fileReader = fileManager.getFileReader(address);
                long offset = fileManager.getFileOffset(address);
                long startTime = System.nanoTime();
                DataPage dataPage = fileManager.getDataPageUtil().getDataPageFromReader(gRegionContext.getPageSerdeFlink(), fileReader, (int)offset, pageAddress);
                this.updateReadStat(pageAddress.getOnDiskDataLen(), System.nanoTime() - startTime, isLocal);
                return dataPage;
            }
            catch (Exception e) {
                long oldAddress = address;
                address = addressCallable.call();
                if (oldAddress == address) {
                    ++unexpectedTries;
                    continue;
                }
                ++expectedTries;
                if (unexpectedTries < 3 && expectedTries < 10) continue;
                LOG.error("get page failed, try " + unexpectedTries + " times unexpectedly, and try " + expectedTries + " times expectedly, last exception " + e);
                throw e;
            }
            break;
        }
    }

    private FileWriter getOrCreateFileWriter(Map<EventExecutor, FileWriter> fileWriterMap, FileManager fileManager, EventExecutor eventExecutor) {
        if (this.closed) {
            throw new GeminiRuntimeException("InfiniteCapacityFileCache has been closed.");
        }
        FileWriter fileWriter = fileWriterMap.get(eventExecutor);
        if (!(fileWriter == null || fileWriter.isValid() && (long)fileWriter.size() < this.maxFileSize)) {
            fileManager.closeFileWriter(fileWriter);
            fileWriterMap.remove(eventExecutor);
            LOG.debug("close file writer {}/{} in {}", new Object[]{fileWriter.getFileID(), fileWriter.isValid(), eventExecutor});
            fileWriter = null;
        }
        if (fileWriter == null) {
            fileWriter = fileManager.createNewFileWriter();
            fileWriterMap.put(eventExecutor, fileWriter);
            LOG.debug("create new file writer {} in {}", (Object)fileWriter.getFileID(), (Object)eventExecutor);
        }
        return fileWriter;
    }

    private void updateWriteStat(long diskLen, long size, long time, boolean isLocal) {
        if (isLocal) {
            this.fileCacheStat.addLocalWrite(diskLen, size, time);
        } else {
            this.fileCacheStat.addDFSWrite(size, time);
        }
    }

    private void updateReadStat(long time, long size, boolean isLocal) {
        if (isLocal) {
            this.fileCacheStat.addLocalRead(size, time);
        } else {
            this.fileCacheStat.addDFSRead(size, time);
        }
    }
}

