/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.handler.GeminiEventExecutorGroup;
import org.apache.flink.runtime.state.gemini.engine.page.LogicalPageChain;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressCompositeImpl;
import org.apache.flink.runtime.state.gemini.engine.page.PageContext;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheStats;
import org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.FutureDataPage;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FetchPolicyImpl
implements FetchPolicy {
    private static final Logger LOG = LoggerFactory.getLogger(FetchPolicyImpl.class);
    private final DataPageLRU<PageAddress, DataPageLRU.PageWithContext> readPageCacheLRU;
    private final CacheStats cacheStats;
    private final FileCache fileCache;
    private final ExecutorService preFetchExecutor;
    private final AtomicInteger currentPrefetchingNum = new AtomicInteger(0);
    private final int maxPrefchingNum;

    public FetchPolicyImpl(GContext gContext, CacheStats cacheStats, DataPageLRU<PageAddress, DataPageLRU.PageWithContext> readPageCacheLRU) {
        this.readPageCacheLRU = readPageCacheLRU;
        this.cacheStats = cacheStats;
        this.fileCache = gContext.getSupervisor().getFileCache();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(gContext.getGConfiguration().getExecutorPrefixName() + "geminiPrefetch-%d").build();
        this.preFetchExecutor = new GeminiEventExecutorGroup(gContext.getGConfiguration().getFlushThreadNum(), namedThreadFactory, gContext.getGConfiguration().getFetchThreadSleepTimeNS(), gContext);
        this.maxPrefchingNum = gContext.getGConfiguration().getMaxCompactionThreshold();
    }

    void preFetch(LogicalPageChain logicalPageChain, int logicPageChainIndex, int startIndex, GRegionContext gRegionContext) {
        while (startIndex >= 0 && gRegionContext.getGContext().isDBNormal()) {
            PageAddress pageAddressRequest = logicalPageChain.getPageAddress(startIndex);
            --startIndex;
            Iterator<PageAddress> pageAddressIterable = pageAddressRequest.pageIteratorOrdered();
            while (pageAddressIterable.hasNext()) {
                FutureDataPage futureDataPageOld;
                PageAddress pageAddress = pageAddressIterable.next();
                if (pageAddress.hasDataPage()) continue;
                if (this.currentPrefetchingNum.get() > this.maxPrefchingNum) {
                    return;
                }
                DataPageLRU.PageWithContext pageWithContext = this.readPageCacheLRU.get(pageAddress);
                if (pageWithContext != null && (futureDataPageOld = pageWithContext.getFutureDataPage()) != null && !futureDataPageOld.isFail()) {
                    return;
                }
                this.currentPrefetchingNum.incrementAndGet();
                FutureDataPage futureDataPage = new FutureDataPage(pageAddress.getDataLen());
                this.readPageCacheLRU.put(pageAddress, new DataPageLRU.PageWithContext(PageContext.of(gRegionContext.getRegionId(), logicPageChainIndex, logicalPageChain.hashCode(), PageContext.CacheStatus.IN_LRU), futureDataPage));
                this.preFetchExecutor.submit(() -> this.asyncFetch(futureDataPage, pageAddress, gRegionContext));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void asyncFetch(FutureDataPage futureDataPage, PageAddress pageAddress, GRegionContext gRegionContext) {
        try {
            GByteBuffer dataPage = this.fileCache.getPage(pageAddress, gRegionContext, gRegionContext.getGContext().getSupervisor().getFlushExecutorGroup().next());
            futureDataPage.complete(dataPage);
            dataPage.release();
        }
        catch (Exception e) {
            futureDataPage.completeExceptionally(e);
        }
        finally {
            this.currentPrefetchingNum.decrementAndGet();
        }
    }

    @Override
    public GByteBuffer fetch(PageAddress pageAddressRequested, LogicalPageChain logicalPageChain, int logicPageChainIndex, int startIndex, GRegionContext gRegionContext, boolean prefetch, boolean doCache) {
        FutureDataPage futureDataPage;
        DataPageLRU.PageWithContext pageWithContext;
        PageAddress pageAddress = pageAddressRequested;
        if (pageAddressRequested instanceof PageAddressCompositeImpl) {
            pageAddress = ((PageAddressCompositeImpl)pageAddressRequested).getMainPageAddress();
        }
        if ((pageWithContext = this.readPageCacheLRU.get(pageAddress)) != null && (futureDataPage = pageWithContext.getFutureDataPage()) != null && futureDataPage.isDone()) {
            try {
                GByteBuffer dataPage = futureDataPage.get();
                dataPage.retain();
                this.cacheStats.addPageCacheLRUHitCount();
                return dataPage;
            }
            catch (Exception dataPage) {
                // empty catch block
            }
        }
        if (prefetch) {
            this.preFetch(logicalPageChain, logicPageChainIndex, startIndex - 1, gRegionContext);
        }
        GByteBuffer dataPage = this.fileCache.getPage(pageAddress, gRegionContext, gRegionContext.getGContext().getSupervisor().getFlushExecutorGroup().next());
        if (doCache) {
            DataPageLRU.PageWithContext node = new DataPageLRU.PageWithContext(PageContext.of(gRegionContext.getRegionId(), logicPageChainIndex, logicalPageChain.hashCode(), PageContext.CacheStatus.IN_LRU), new FutureDataPage(dataPage));
            this.readPageCacheLRU.put(pageAddress, node);
            dataPage.retain();
        }
        return dataPage;
    }

    @Override
    public GByteBuffer fetchSubPage(PageAddress pageAddress, PageContext pageContext, GRegionContext gRegionContext, boolean doCache) {
        FutureDataPage futureDataPage;
        DataPageLRU.PageWithContext pageWithContext = this.readPageCacheLRU.get(pageAddress);
        if (pageWithContext != null && (futureDataPage = pageWithContext.getFutureDataPage()) != null && futureDataPage.isDone()) {
            try {
                GByteBuffer dataPage = futureDataPage.get();
                dataPage.retain();
                this.cacheStats.addSubPageCacheHitCount();
                return dataPage;
            }
            catch (Exception dataPage) {
                // empty catch block
            }
        }
        this.cacheStats.addSubPageCacheMissCount();
        GByteBuffer dataPage = this.fileCache.getPage(pageAddress, gRegionContext, gRegionContext.getGContext().getSupervisor().getFlushExecutorGroup().next());
        if (doCache) {
            DataPageLRU.PageWithContext node = new DataPageLRU.PageWithContext(pageContext, new FutureDataPage(dataPage));
            this.readPageCacheLRU.put(pageAddress, node);
            dataPage.retain();
        }
        return dataPage;
    }

    @Override
    public DataPageLRU<PageAddress, DataPageLRU.PageWithContext> getDataPageLRU() {
        return this.readPageCacheLRU;
    }

    @Override
    public void close() {
        this.preFetchExecutor.shutdownNow();
        this.readPageCacheLRU.clear();
    }
}

