package org.apache.flink.runtime.state.gemini.engine.filecache;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileReader;
import org.apache.flink.runtime.state.gemini.engine.fs.FileWriter;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.compress.GCompressAlgorithm;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecache/InfiniteCapacityFileCache.class */
public class InfiniteCapacityFileCache extends FileCache {
    private static final Logger LOG = LoggerFactory.getLogger(InfiniteCapacityFileCache.class);
    private final GContext gContext;
    private final long maxFileSize;
    private final FileManager localFileManager;
    private Map<EventExecutor, FileWriter> localFileWriters;
    private final FileManager dfsFileManager;
    private Map<EventExecutor, FileWriter> dfsFileWriters;
    private volatile boolean closed;

    public InfiniteCapacityFileCache(GContext gContext, FileManager fileManager, FileManager fileManager2) {
        super(Long.MAX_VALUE, new FileCacheStat());
        this.gContext = (GContext) Preconditions.checkNotNull(gContext);
        this.maxFileSize = gContext.getGConfiguration().getMaxLogStructureFileSize();
        Preconditions.checkArgument(this.maxFileSize > 0, "Max file size should be positive");
        this.localFileManager = (FileManager) Preconditions.checkNotNull(fileManager);
        this.dfsFileManager = (FileManager) Preconditions.checkNotNull(fileManager2);
        this.localFileWriters = new ConcurrentHashMap();
        this.dfsFileWriters = new ConcurrentHashMap();
        FileCacheMetrics fileCacheMetrics = gContext.getFileCacheMetrics();
        if (fileCacheMetrics != null) {
            fileCacheMetrics.register(this.fileCacheStat);
        }
        this.closed = false;
        LOG.info("InfiniteCapacityFileCache created, LocalFileManager {}, DfsFileManager {}", fileManager, fileManager2);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public boolean isCached(PageAddress pageAddress) {
        return pageAddress.isLocalValid();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void addPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, BiConsumer<Boolean, Throwable> biConsumer) {
        if (!pageAddress.isLocalValid()) {
            DataPage dataPage = pageAddress.getDataPage();
            eventExecutor.submit(() -> {
                DataPage dataPage2 = dataPage;
                try {
                    try {
                        if (!pageAddress.isLocalValid()) {
                            if (dataPage2 == null && pageAddress.isDfsValid()) {
                                FileManager fileManager = this.dfsFileManager;
                                pageAddress.getClass();
                                dataPage2 = getDataPage(fileManager, gRegionContext, pageAddress::getDfsAddress, pageAddress, false);
                            }
                            if (dataPage2 == null) {
                                throw new GeminiRuntimeException("data page does not exist");
                            }
                            internalAddPage(this.localFileManager, getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor), pageAddress, dataPage2, gRegionContext, true, true);
                        }
                        if (dataPage2 != null) {
                            dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                        }
                        if (biConsumer != null) {
                            biConsumer.accept(true, null);
                        }
                    } catch (Exception e) {
                        LOG.error("error when adding page to cache: {}", e);
                        if (dataPage2 != null) {
                            dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                        }
                        if (biConsumer != null) {
                            biConsumer.accept(false, e);
                        }
                    }
                } catch (Throwable th) {
                    if (dataPage2 != null) {
                        dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                    }
                    if (biConsumer != null) {
                        biConsumer.accept(false, null);
                    }
                    throw th;
                }
            });
        } else if (biConsumer != null) {
            biConsumer.accept(true, null);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public DataPage getPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        try {
            DataPage dataPage = null;
            if (pageAddress.isLocalValid()) {
                FileManager fileManager = this.localFileManager;
                pageAddress.getClass();
                dataPage = getDataPage(fileManager, gRegionContext, pageAddress::getLocalAddress, pageAddress, true);
                if (dataPage != null) {
                    this.fileCacheStat.addHitSize(dataPage.getSize());
                }
            } else if (pageAddress.isDfsValid()) {
                FileManager fileManager2 = this.dfsFileManager;
                pageAddress.getClass();
                dataPage = getDataPage(fileManager2, gRegionContext, pageAddress::getDfsAddress, pageAddress, false);
                if (dataPage != null) {
                    dataPage.addReferenceCount();
                    eventExecutor.submit(() -> {
                        try {
                            try {
                                internalAddPage(this.localFileManager, getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor), pageAddress, dataPage, gRegionContext, true, true);
                                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                            } catch (Exception e) {
                                LOG.error("cache data failed, {}", e);
                                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                            }
                        } catch (Throwable th) {
                            dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                            throw th;
                        }
                    });
                    this.fileCacheStat.addMissSize(dataPage.getSize());
                }
            }
            Preconditions.checkNotNull(dataPage, "no page exists on local and dfs");
            return dataPage;
        } catch (Exception e) {
            LOG.error("exception when get page, {}", e);
            throw new GeminiRuntimeException("exception when get page", e);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void discardPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        boolean isPageValid;
        boolean z = false;
        long j = -1;
        boolean z2 = false;
        long j2 = -1;
        synchronized (pageAddress) {
            isPageValid = pageAddress.isPageValid();
            if (isPageValid) {
                pageAddress.setPageStatus(false);
                z = pageAddress.isLocalValid();
                z2 = pageAddress.isDfsValid();
                if (z) {
                    j = pageAddress.getLocalAddress();
                }
                if (z2) {
                    j2 = pageAddress.getDfsAddress();
                }
            }
        }
        if (isPageValid) {
            long accessNumber = this.gContext.getAccessNumber();
            long currentTimeMillis = System.currentTimeMillis();
            if (z) {
                this.localFileManager.decDBReference(j, accessNumber, currentTimeMillis, pageAddress.getDataLen());
            }
            if (z2) {
                this.dfsFileManager.decDBReference(j2, accessNumber, currentTimeMillis, pageAddress.getDataLen());
            }
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void flushPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, boolean z, BiConsumer<Boolean, Throwable> biConsumer) {
        if (z || !pageAddress.isDfsValid()) {
            DataPage dataPage = pageAddress.getDataPage();
            eventExecutor.submit(() -> {
                boolean z2 = false;
                Exception exc = null;
                boolean z3 = false;
                DataPage dataPage2 = dataPage;
                if (!z) {
                    try {
                        try {
                        } catch (Exception e) {
                            z2 = false;
                            exc = e;
                            LOG.error("error when adding page to cache: pageIsNull={}, {}", new Object[]{Boolean.valueOf(z3), e.getMessage(), e});
                            if (dataPage2 != null) {
                                dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                            }
                            if (biConsumer != null) {
                                biConsumer.accept(false, exc);
                                return;
                            }
                            return;
                        }
                    } catch (Throwable th) {
                        if (dataPage2 != null) {
                            dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                        }
                        if (biConsumer != null) {
                            biConsumer.accept(Boolean.valueOf(z2), exc);
                        }
                        throw th;
                    }
                }
                if (dataPage2 == null) {
                    z3 = true;
                    if (pageAddress.isLocalValid()) {
                        FileManager fileManager = this.localFileManager;
                        pageAddress.getClass();
                        dataPage2 = getDataPage(fileManager, gRegionContext, pageAddress::getLocalAddress, pageAddress, true);
                    } else if (pageAddress.isDfsValid()) {
                        FileManager fileManager2 = this.dfsFileManager;
                        pageAddress.getClass();
                        dataPage2 = getDataPage(fileManager2, gRegionContext, pageAddress::getDfsAddress, pageAddress, false);
                        if (dataPage2 != null) {
                            dataPage2.addReferenceCount();
                            eventExecutor.submit(() -> {
                                try {
                                    try {
                                        internalAddPage(this.localFileManager, getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor), pageAddress, dataPage2, gRegionContext, true, true);
                                        dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                                    } catch (Exception e2) {
                                        LOG.error("cache data failed, {}", e2);
                                        dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                                    }
                                } catch (Throwable th2) {
                                    dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                                    throw th2;
                                }
                            });
                        }
                    }
                }
                Preconditions.checkNotNull(dataPage2, "Data page is null");
                internalAddPage(this.dfsFileManager, getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor), pageAddress, dataPage2, gRegionContext, false, false);
                if (dataPage2 != null) {
                    dataPage2.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                }
                if (biConsumer != null) {
                    biConsumer.accept(true, null);
                }
            });
        } else if (biConsumer != null) {
            biConsumer.accept(true, null);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void sync(EventExecutor eventExecutor) throws IOException {
        FileWriter fileWriter = this.dfsFileWriters.get(eventExecutor);
        if (fileWriter != null) {
            fileWriter.sync();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        synchronized (this) {
            if (this.closed) {
                LOG.warn("NoCapacityFileCache has been closed");
                return;
            }
            this.closed = true;
            Iterator<FileWriter> it = this.localFileWriters.values().iterator();
            while (it.hasNext()) {
                this.localFileManager.closeFileWriter(it.next());
            }
            this.localFileWriters.clear();
            Iterator<FileWriter> it2 = this.dfsFileWriters.values().iterator();
            while (it2.hasNext()) {
                this.dfsFileManager.closeFileWriter(it2.next());
            }
            this.dfsFileWriters.clear();
            LOG.info("InfiniteCapacityFileCache is closed");
        }
    }

    private void internalAddPage(FileManager fileManager, FileWriter fileWriter, PageAddress pageAddress, DataPage dataPage, GRegionContext gRegionContext, boolean z, boolean z2) throws Exception {
        boolean isPageValid;
        long nanoTime = System.nanoTime();
        int i = 0;
        do {
            try {
                long address = fileWriter.getAddress();
                int write = dataPage.write(fileWriter, gRegionContext.getPageSerdeFlink(), pageAddress, z ? gRegionContext.getGContext().getFlushWholePageGCompressAlgorithm() : GCompressAlgorithm.None, gRegionContext.getGContext().getGConfiguration().isChecksumEnable());
                fileWriter.resetFailCount();
                updateWriteStat(write, dataPage.getSize(), System.nanoTime() - nanoTime, z);
                if (z2) {
                    fileWriter.flush();
                }
                fileManager.incDBReference(address, pageAddress.getDataLen());
                boolean z3 = false;
                long j = 0;
                synchronized (pageAddress) {
                    pageAddress.afterFlush(write, gRegionContext.getGContext().getFlushWholePageGCompressAlgorithm());
                    isPageValid = pageAddress.isPageValid();
                    if (isPageValid) {
                        z3 = z ? pageAddress.isLocalValid() : pageAddress.isDfsValid();
                        j = z ? pageAddress.getLocalAddress() : pageAddress.getDfsAddress();
                    }
                    if (z) {
                        pageAddress.setLocalAddress(address);
                        pageAddress.setLocalStatus(true);
                    } else {
                        pageAddress.setDfsAddress(address);
                        pageAddress.setDfsStatus(true);
                    }
                }
                if (!isPageValid) {
                    fileManager.decDBReference(address, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
                    return;
                } else {
                    if (z3) {
                        fileManager.decDBReference(j, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
                        return;
                    }
                    return;
                }
            } catch (Exception e) {
                fileWriter.increasFailCount();
                i++;
            }
        } while (i <= 3);
        LOG.error("internal add page exception: {}, {}, {}", new Object[]{fileWriter, pageAddress, e});
        throw e;
    }

    private DataPage getDataPage(FileManager fileManager, GRegionContext gRegionContext, Callable<Long> callable, PageAddress pageAddress, boolean z) throws Exception {
        int i = 0;
        int i2 = 0;
        long longValue = callable.call().longValue();
        do {
            try {
                FileReader fileReader = fileManager.getFileReader(longValue);
                long fileOffset = fileManager.getFileOffset(longValue);
                long nanoTime = System.nanoTime();
                DataPage dataPageFromReader = fileManager.getDataPageUtil().getDataPageFromReader(gRegionContext.getPageSerdeFlink(), fileReader, (int) fileOffset, pageAddress);
                updateReadStat(pageAddress.getOnDiskDataLen(), System.nanoTime() - nanoTime, z);
                return dataPageFromReader;
            } catch (Exception e) {
                long j = longValue;
                longValue = callable.call().longValue();
                if (j == longValue) {
                    i++;
                } else {
                    i2++;
                }
                if (i >= 3) {
                    break;
                }
                LOG.error("get page failed, try " + i + " times unexpectedly, and try " + i2 + " times expectedly, last exception " + e);
                throw e;
            }
        } while (i2 < 10);
        LOG.error("get page failed, try " + i + " times unexpectedly, and try " + i2 + " times expectedly, last exception " + e);
        throw e;
    }

    private FileWriter getOrCreateFileWriter(Map<EventExecutor, FileWriter> map, FileManager fileManager, EventExecutor eventExecutor) {
        if (this.closed) {
            throw new GeminiRuntimeException("InfiniteCapacityFileCache has been closed.");
        }
        FileWriter fileWriter = map.get(eventExecutor);
        if (fileWriter != null && (!fileWriter.isValid() || fileWriter.size() >= this.maxFileSize)) {
            fileManager.closeFileWriter(fileWriter);
            map.remove(eventExecutor);
            LOG.debug("close file writer {}/{} in {}", new Object[]{fileWriter.getFileID(), Boolean.valueOf(fileWriter.isValid()), eventExecutor});
            fileWriter = null;
        }
        if (fileWriter == null) {
            fileWriter = fileManager.createNewFileWriter();
            map.put(eventExecutor, fileWriter);
            LOG.debug("create new file writer {} in {}", fileWriter.getFileID(), eventExecutor);
        }
        return fileWriter;
    }

    private void updateWriteStat(long j, long j2, long j3, boolean z) {
        if (z) {
            this.fileCacheStat.addLocalWrite(j, j2, j3);
        } else {
            this.fileCacheStat.addDFSWrite(j2, j3);
        }
    }

    private void updateReadStat(long j, long j2, boolean z) {
        if (z) {
            this.fileCacheStat.addLocalRead(j2, j);
        } else {
            this.fileCacheStat.addDFSRead(j2, j);
        }
    }
}
