/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.MemoryInfo;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheStats;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictPolicySepImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.WaterMark;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CacheManagerImpl
implements CacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(CacheManagerImpl.class);
    private final GContext gContext;
    private final long totalMemSize;
    private final long totalMemLowMark;
    private final long totalMemMiddleMark;
    private final long totalMemHighMark;
    private final long totalIndexCountHighMark;
    private final long totalIndexCountLowMark;
    private EvictPolicy evictPolicy;
    private final CacheStats cacheStats = new CacheStats();
    private final ExecutorService cacheManagerExecutor;
    private final int readPageCacheLRUSize;
    private volatile long currentTickTime;
    private final int timePerTick;
    private final ScheduledExecutorService timeTickService;

    public CacheManagerImpl(GContext gContext) {
        this.gContext = gContext;
        MemoryInfo memoryInfo = gContext.getMemoryInfo();
        this.totalMemSize = memoryInfo.isUseOffHeap() ? memoryInfo.getTotalOffheapSize() : memoryInfo.getTotalHeapSize();
        this.totalMemLowMark = (long)((float)this.totalMemSize * gContext.getGConfiguration().getTotalHeapLowMarkRate());
        this.totalMemMiddleMark = (long)((float)this.totalMemSize * gContext.getGConfiguration().getTotalHeapMiddleMarkRate());
        this.totalMemHighMark = (long)((float)this.totalMemSize * gContext.getGConfiguration().getTotalHeapHighMarkRate());
        this.totalIndexCountHighMark = gContext.getGConfiguration().getTotalIndexCountHighMark();
        this.totalIndexCountLowMark = gContext.getGConfiguration().getTotalIndexCountLowMark();
        String prefix = gContext.getGConfiguration().getExecutorPrefixName();
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "GeminiCacheManagerImpl-%d").build();
        this.cacheManagerExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Short.MAX_VALUE), namedThreadFactory);
        int configReadPageCacheLRUSize = Math.min(20000, gContext.getGConfiguration().getTotalReadPageLRUNum());
        this.readPageCacheLRUSize = memoryInfo.isUseOffHeap() || memoryInfo.isUseOffheapForRead() ? Math.max(128, configReadPageCacheLRUSize) : configReadPageCacheLRUSize;
        this.timePerTick = gContext.getGConfiguration().getTimePerTick();
        this.currentTickTime = System.currentTimeMillis() / 1000L / (long)this.timePerTick;
        this.evictPolicy = new EvictPolicySepImpl(this.gContext, this);
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "TickTimeService-%d").build();
        this.timeTickService = new ScheduledThreadPoolExecutor(1, threadFactory);
        LOG.info("CacheManagerImpl{}, offheap={}, totalMemSize={}, lowMark={}, MiddleMark={}, HighMark={}, indexLowMark={}, indexHighMark={}, dbSlots={}, readPageCacheLRUSize={}, checksumEnable={}, timePerTick={}", new Object[]{this, memoryInfo.isUseOffHeap(), this.totalMemSize, this.totalMemLowMark, this.totalMemMiddleMark, this.totalMemHighMark, this.totalIndexCountLowMark, this.totalIndexCountHighMark, gContext.getGConfiguration().getNumberSlots(), this.readPageCacheLRUSize, gContext.getGConfiguration().isChecksumEnable(), this.timePerTick});
    }

    @Override
    public void start() {
        this.cacheManagerExecutor.submit(this.evictPolicy);
        this.timeTickService.scheduleAtFixedRate(() -> {
            long curTick = System.currentTimeMillis() / 1000L / (long)this.timePerTick;
            if (curTick > this.currentTickTime) {
                this.currentTickTime = curTick;
            }
        }, 1L, 1L, TimeUnit.SECONDS);
        if (this.gContext.getCacheMetric() != null) {
            this.gContext.getCacheMetric().getMetricGroup().gauge("totalMemHighMark", () -> this.totalMemHighMark);
            this.gContext.getCacheMetric().getMetricGroup().gauge("totalMemMiddleMark", () -> this.totalMemMiddleMark);
            this.gContext.getCacheMetric().getMetricGroup().gauge("totalMemLowMark", () -> this.totalMemLowMark);
            this.gContext.getCacheMetric().registerMetricsCacheStat(this.cacheStats);
        }
        LOG.info("CacheManager is initialized.");
    }

    @Override
    public void addIndexCapacity(int indexCapacity) {
        this.cacheStats.addIndexCapacity(indexCapacity);
    }

    @Override
    public long getPageUsedMemory() {
        return this.cacheStats.getTotalPageUsedMemory();
    }

    @Override
    public boolean forbidIndexExpand() {
        return this.cacheStats.getTotalIndexCapacity() >= this.totalIndexCountHighMark * 2L;
    }

    @Override
    public WaterMark getIndexCapacityWaterMark() {
        if (this.cacheStats.getTotalIndexCapacity() >= this.totalIndexCountHighMark) {
            return WaterMark.High;
        }
        if (this.cacheStats.getTotalIndexCapacity() >= this.totalIndexCountLowMark) {
            return WaterMark.Low;
        }
        return WaterMark.Normal;
    }

    @Override
    public WaterMark getMemWaterMark(int addEstimatedSize) {
        if (this.cacheStats.getTotalPageUsedMemory() + (long)addEstimatedSize >= this.totalMemHighMark) {
            return WaterMark.High;
        }
        if (this.cacheStats.getTotalPageUsedMemory() + (long)addEstimatedSize >= this.totalMemMiddleMark) {
            return WaterMark.Middle;
        }
        if (this.cacheStats.getTotalPageUsedMemory() + (long)addEstimatedSize >= this.totalMemLowMark) {
            return WaterMark.Low;
        }
        return WaterMark.Normal;
    }

    @Override
    public CacheStats getCacheStats() {
        return this.cacheStats;
    }

    @Override
    public void close() {
        this.cacheManagerExecutor.shutdownNow();
        this.evictPolicy.shutdown();
        this.timeTickService.shutdownNow();
    }

    @Override
    public void addTable(GTable gTable) {
    }

    @Override
    public EvictPolicy getEvictPolicy() {
        return this.evictPolicy;
    }

    @Override
    public void addRegion(GRegion gRegion) {
        this.evictPolicy.addRegion(gRegion);
    }

    @Override
    public long getMemLowMark() {
        return this.totalMemLowMark;
    }

    @Override
    public long getMemMidMark() {
        return this.totalMemMiddleMark;
    }

    @Override
    public long getMemHighMark() {
        return this.totalMemHighMark;
    }

    @Override
    public int getReadPageCacheLRUSize() {
        return this.readPageCacheLRUSize;
    }

    @Override
    public long getCurrentTickTime() {
        return this.currentTickTime;
    }
}

