package org.apache.flink.runtime.state.gemini.engine.vm;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.MemoryInfo;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.metrics.CacheMetrics;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/vm/CacheManagerImpl.class */
public class CacheManagerImpl implements CacheManager {
    private static final Logger LOG = LoggerFactory.getLogger(CacheManagerImpl.class);
    private final GContext gContext;
    private final long totalMemSize;
    private final long totalMemLowMark;
    private final long totalMemMiddleMark;
    private final long totalMemHighMark;
    private final long totalIndexCountHighMark;
    private final long totalIndexCountLowMark;
    private EvictPolicy evictPolicy;
    private final CacheStats cacheStats = new CacheStats();
    private final ExecutorService cacheManagerExecutor;
    private final int readPageCacheLRUSize;
    private volatile long currentTickTime;
    private final int timePerTick;
    private final ScheduledExecutorService timeTickService;

    public CacheManagerImpl(GContext gContext) {
        this.gContext = gContext;
        MemoryInfo memoryInfo = gContext.getMemoryInfo();
        this.totalMemSize = memoryInfo.isUseOffHeap() ? memoryInfo.getTotalOffheapSize() : memoryInfo.getTotalHeapSize();
        this.totalMemLowMark = ((float) this.totalMemSize) * gContext.getGConfiguration().getTotalHeapLowMarkRate();
        this.totalMemMiddleMark = ((float) this.totalMemSize) * gContext.getGConfiguration().getTotalHeapMiddleMarkRate();
        this.totalMemHighMark = ((float) this.totalMemSize) * gContext.getGConfiguration().getTotalHeapHighMarkRate();
        this.totalIndexCountHighMark = gContext.getGConfiguration().getTotalIndexCountHighMark();
        this.totalIndexCountLowMark = gContext.getGConfiguration().getTotalIndexCountLowMark();
        String executorPrefixName = gContext.getGConfiguration().getExecutorPrefixName();
        this.cacheManagerExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(32767), new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "GeminiCacheManagerImpl-%d").build());
        int min = Math.min(20000, gContext.getGConfiguration().getTotalReadPageLRUNum());
        if (memoryInfo.isUseOffHeap() || memoryInfo.isUseOffheapForRead()) {
            this.readPageCacheLRUSize = Math.max(KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, min);
        } else {
            this.readPageCacheLRUSize = min;
        }
        this.timePerTick = gContext.getGConfiguration().getTimePerTick();
        this.currentTickTime = (System.currentTimeMillis() / 1000) / this.timePerTick;
        this.evictPolicy = new EvictPolicySepImpl(this.gContext, this);
        this.timeTickService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "TickTimeService-%d").build());
        LOG.info("CacheManagerImpl{}, offheap={}, totalMemSize={}, lowMark={}, MiddleMark={}, HighMark={}, indexLowMark={}, indexHighMark={}, dbSlots={}, readPageCacheLRUSize={}, checksumEnable={}, timePerTick={}", new Object[]{this, Boolean.valueOf(memoryInfo.isUseOffHeap()), Long.valueOf(this.totalMemSize), Long.valueOf(this.totalMemLowMark), Long.valueOf(this.totalMemMiddleMark), Long.valueOf(this.totalMemHighMark), Long.valueOf(this.totalIndexCountLowMark), Long.valueOf(this.totalIndexCountHighMark), Integer.valueOf(gContext.getGConfiguration().getNumberSlots()), Integer.valueOf(this.readPageCacheLRUSize), Boolean.valueOf(gContext.getGConfiguration().isChecksumEnable()), Integer.valueOf(this.timePerTick)});
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public void start() {
        this.cacheManagerExecutor.submit(this.evictPolicy);
        this.timeTickService.scheduleAtFixedRate(() -> {
            long currentTimeMillis = (System.currentTimeMillis() / 1000) / this.timePerTick;
            if (currentTimeMillis > this.currentTickTime) {
                this.currentTickTime = currentTimeMillis;
            }
        }, 1L, 1L, TimeUnit.SECONDS);
        if (this.gContext.getCacheMetric() != null) {
            this.gContext.getCacheMetric().getMetricGroup().gauge(CacheMetrics.TOTAL_MEM_HIGH_MARK, () -> {
                return Long.valueOf(this.totalMemHighMark);
            });
            this.gContext.getCacheMetric().getMetricGroup().gauge(CacheMetrics.TOTAL_MEM_MIDDLE_MARK, () -> {
                return Long.valueOf(this.totalMemMiddleMark);
            });
            this.gContext.getCacheMetric().getMetricGroup().gauge(CacheMetrics.TOTAL_MEM_LOW_MARK, () -> {
                return Long.valueOf(this.totalMemLowMark);
            });
            this.gContext.getCacheMetric().registerMetricsCacheStat(this.cacheStats);
        }
        LOG.info("CacheManager is initialized.");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public void addIndexCapacity(int i) {
        this.cacheStats.addIndexCapacity(i);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public long getPageUsedMemory() {
        return this.cacheStats.getTotalPageUsedMemory();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public boolean forbidIndexExpand() {
        return this.cacheStats.getTotalIndexCapacity() >= this.totalIndexCountHighMark * 2;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public WaterMark getIndexCapacityWaterMark() {
        return this.cacheStats.getTotalIndexCapacity() >= this.totalIndexCountHighMark ? WaterMark.High : this.cacheStats.getTotalIndexCapacity() >= this.totalIndexCountLowMark ? WaterMark.Low : WaterMark.Normal;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public WaterMark getMemWaterMark(int i) {
        return this.cacheStats.getTotalPageUsedMemory() + ((long) i) >= this.totalMemHighMark ? WaterMark.High : this.cacheStats.getTotalPageUsedMemory() + ((long) i) >= this.totalMemMiddleMark ? WaterMark.Middle : this.cacheStats.getTotalPageUsedMemory() + ((long) i) >= this.totalMemLowMark ? WaterMark.Low : WaterMark.Normal;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public CacheStats getCacheStats() {
        return this.cacheStats;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.cacheManagerExecutor.shutdownNow();
        this.evictPolicy.shutdown();
        this.timeTickService.shutdownNow();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public void addTable(GTable gTable) {
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public EvictPolicy getEvictPolicy() {
        return this.evictPolicy;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public void addRegion(GRegion gRegion) {
        this.evictPolicy.addRegion(gRegion);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public long getMemLowMark() {
        return this.totalMemLowMark;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public long getMemMidMark() {
        return this.totalMemMiddleMark;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public long getMemHighMark() {
        return this.totalMemHighMark;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public int getReadPageCacheLRUSize() {
        return this.readPageCacheLRUSize;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.vm.CacheManager
    public long getCurrentTickTime() {
        return this.currentTickTime;
    }
}
