/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.handler.GeminiEventExecutorGroup;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.LogicChainedPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheStats;
import org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.FutureDataPage;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchPolicyImpl
implements FetchPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(FetchPolicyImpl.class);
    private final DataPageLRU<PageAddress, FutureDataPage> readPageCacheLRU;
    private final CacheStats cacheStats;
    private final FileCache fileCache;
    private final ExecutorService preFetchExecutor;
    private final AtomicInteger currentPrefetchingNum = new AtomicInteger(0);
    private final int maxPrefchingNum;

    public FetchPolicyImpl(GContext gContext, CacheStats cacheStats, DataPageLRU<PageAddress, FutureDataPage> readPageCacheLRU) {
        this.readPageCacheLRU = readPageCacheLRU;
        this.cacheStats = cacheStats;
        this.fileCache = gContext.getSupervisor().getFileCache();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExcetorPrefixName() + "geminiPrefetch-%d").build();
        this.preFetchExecutor = new GeminiEventExecutorGroup(gContext.getGConfiguration().getFlushThreadNum(), namedThreadFactory, gContext.getGConfiguration().getFetchThreadSleepTimeNS(), gContext);
        this.maxPrefchingNum = gContext.getGConfiguration().getMaxCompactionChainThreshold();
    }

    void preFetch(LogicChainedPage logicChainedPage, int startIndex, GRegionContext gRegionContext) {
        while (startIndex >= 0 && gRegionContext.getGContext().isDBNormal()) {
            PageAddress pageAddress = logicChainedPage.getPageAddress(startIndex);
            --startIndex;
            if (pageAddress.hasDataPage()) continue;
            if (this.currentPrefetchingNum.get() > this.maxPrefchingNum) {
                return;
            }
            FutureDataPage futureDataPageOld = this.readPageCacheLRU.get(pageAddress);
            if (futureDataPageOld != null && !futureDataPageOld.isFail()) {
                return;
            }
            this.currentPrefetchingNum.incrementAndGet();
            FutureDataPage futureDataPage = new FutureDataPage(pageAddress.getDataLen());
            this.readPageCacheLRU.put(pageAddress, futureDataPage);
            this.preFetchExecutor.submit(() -> this.asyncFetch(futureDataPage, pageAddress, gRegionContext));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void asyncFetch(FutureDataPage futureDataPage, PageAddress pageAddress, GRegionContext gRegionContext) {
        try {
            DataPage dataPage = this.fileCache.getPage(pageAddress, gRegionContext, gRegionContext.getGContext().getSupervisor().getFlushExecutorGroup().next());
            if (!futureDataPage.complete(dataPage)) {
                dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
            }
        }
        catch (Exception e) {
            futureDataPage.completeExceptionally(e);
        }
        finally {
            this.currentPrefetchingNum.decrementAndGet();
        }
    }

    @Override
    public DataPage fetch(PageAddress pageAddress, LogicChainedPage logicChainedPage, int startIndex, GRegionContext gRegionContext, boolean prefetch, boolean doCache) {
        FutureDataPage futureDataPage = this.readPageCacheLRU.get(pageAddress);
        if (futureDataPage != null && futureDataPage.isDone()) {
            try {
                DataPage dataPage = futureDataPage.get();
                dataPage.addReferenceCount();
                this.cacheStats.addPageCacheLRUHitCount();
                return dataPage;
            }
            catch (Exception dataPage) {
                // empty catch block
            }
        }
        if (prefetch) {
            this.preFetch(logicChainedPage, startIndex - 1, gRegionContext);
        }
        DataPage dataPage = this.fileCache.getPage(pageAddress, gRegionContext, gRegionContext.getGContext().getSupervisor().getFlushExecutorGroup().next());
        if (doCache) {
            this.readPageCacheLRU.put(pageAddress, new FutureDataPage(dataPage));
            dataPage.addReferenceCount();
        }
        return dataPage;
    }

    @Override
    public DataPageLRU<PageAddress, FutureDataPage> getDataPageLRU() {
        return this.readPageCacheLRU;
    }

    @Override
    public void close() {
        this.preFetchExecutor.shutdownNow();
    }
}

