package org.apache.flink.runtime.state.gemini.engine.dbms;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.GRegionID;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompaction;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionImpl;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.NoFileCompaction;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleaner;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleanerImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategy;
import org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategyFactory;
import org.apache.flink.runtime.state.gemini.engine.handler.GeminiEventExecutorGroup;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.page.DfsDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.LocalDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.LogicalPageChain;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageContext;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageStatus;
import org.apache.flink.runtime.state.gemini.engine.rm.Allocator;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.rm.GarbageReleaseManager;
import org.apache.flink.runtime.state.gemini.engine.rm.GarbageReleaseManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.LeakDetector;
import org.apache.flink.runtime.state.gemini.engine.rm.LeakDetectorImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.PoolAllocatorNettyImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.UnpoolAllocatorImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.BloomFilterManager;
import org.apache.flink.runtime.state.gemini.engine.vm.BloomFilterManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicyImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.NoBloomFilterManagerImpl;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/dbms/SupervisorImpl.class */
public class SupervisorImpl implements Supervisor {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorImpl.class);
    private final GContext gContext;
    private final Allocator allocator;
    private final Allocator defaultAllocator = new UnpoolAllocatorImpl();
    private final Allocator forReadAllocator;
    private final WriteBufferManager writeBufferManager;
    private final CacheManager cacheManager;
    private final SnapshotManager snapshotManager;
    private final FileManager localFileManager;
    private final FileManager dfsFileManager;
    private final FileCache fileCache;
    private final FileCleaner fileCleaner;
    private final FileCompaction fileCompaction;
    private final EventExecutorGroup regionExecutorGroup;
    private final EventExecutorGroup flusherExecutorGroup;
    private final EventExecutorGroup snapshotExecutorGroup;
    private final EventExecutorGroup compactionExecutorGroup;
    private final EventExecutorGroup lruIntoMainCacheExecutorGroup;
    private final FetchPolicy fetchPolicy;
    private final GarbageReleaseManager<GByteBuffer> garbageReleaseManager;
    private final LeakDetector leakDetector;
    private final PersistenceStrategy persistenceStrategy;
    private final BloomFilterManager bloomFilterManager;

    public SupervisorImpl(GContext gContext) {
        gContext.setSupervisor(this);
        this.gContext = (GContext) Preconditions.checkNotNull(gContext);
        this.writeBufferManager = new WriteBufferManagerImpl(gContext);
        GConfiguration gConfiguration = gContext.getGConfiguration();
        this.cacheManager = new CacheManagerImpl(this.gContext);
        if (gContext.getMemoryInfo().isUseOffHeap()) {
            this.garbageReleaseManager = new GarbageReleaseManagerImpl(this.gContext);
            this.leakDetector = new LeakDetectorImpl();
            this.forReadAllocator = new PoolAllocatorNettyImpl(gContext, this.garbageReleaseManager, this.leakDetector);
            this.allocator = this.forReadAllocator;
        } else {
            this.allocator = new UnpoolAllocatorImpl();
            if (gContext.getMemoryInfo().isUseOffheapForRead()) {
                this.garbageReleaseManager = new GarbageReleaseManagerImpl(this.gContext);
                this.leakDetector = new LeakDetectorImpl();
                this.forReadAllocator = new PoolAllocatorNettyImpl(gContext, this.garbageReleaseManager, this.leakDetector);
            } else {
                this.garbageReleaseManager = null;
                this.leakDetector = null;
                this.forReadAllocator = this.allocator;
            }
        }
        this.fileCleaner = new FileCleanerImpl(gContext);
        this.localFileManager = new FileManagerImpl(gContext, "local", new Path(gConfiguration.getLocalPath()), false, new LocalDataPageUtil(this.forReadAllocator, gConfiguration.isChecksumEnable()));
        this.fileCleaner.registerFileManager(this.localFileManager);
        this.dfsFileManager = new FileManagerImpl(gContext, "dfs", new Path(gConfiguration.getDfsPath()), true, new DfsDataPageUtil(gConfiguration.isChecksumEnable()));
        this.fileCleaner.registerFileManager(this.dfsFileManager);
        this.fileCache = FileCache.createFileCache(gContext, this.localFileManager, this.dfsFileManager);
        if (gConfiguration.isFileCompactionEnabled()) {
            this.fileCompaction = new FileCompactionImpl(gContext, (FileCompactionPageTransfer) this.fileCache);
        } else {
            this.fileCompaction = new NoFileCompaction();
        }
        this.snapshotManager = new SnapshotManagerImpl(this.gContext, this.writeBufferManager, this.localFileManager, this.dfsFileManager);
        this.fetchPolicy = new FetchPolicyImpl(gContext, this.cacheManager.getCacheStats(), new DataPageLRU(gContext.getStartRegionId(), gContext.getEndRegionId(), this.cacheManager.getReadPageCacheLRUSize(), createLRUFunction(), gConfiguration.isEnableLruAccessMode(), gConfiguration.isEnableEvictRegionEven()));
        String executorPrefixName = gConfiguration.getExecutorPrefixName();
        this.regionExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getRegionThreadNum(), new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "geminiRegion-%d").build(), gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        this.flusherExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getFlushThreadNum(), new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "geminiFlush-%d").build(), gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        this.snapshotExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getSnapshotThreadNum(), new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "geminiSnapshot-%d").build(), gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        this.compactionExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getCompactionThreadNum(), new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "geminiCompaction-%d").build(), gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        this.persistenceStrategy = PersistenceStrategyFactory.INSTANCE.create(gContext);
        this.lruIntoMainCacheExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getLruIntoMainCacheThreadNum(), new ThreadFactoryBuilder().setNameFormat(executorPrefixName + "lruIntoMain-%d").build(), gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        if (gConfiguration.isEnableBloomFilter()) {
            this.bloomFilterManager = new BloomFilterManagerImpl(((float) gContext.getMemoryInfo().getTotalHeapSize()) * gConfiguration.getBloomFilterMemRate(), gConfiguration.getRegionThreadNum() + 1);
        } else {
            this.bloomFilterManager = new NoBloomFilterManagerImpl();
        }
        LOG.info("Supervisor is created");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public void start() {
        this.cacheManager.start();
        this.fileCleaner.start();
        this.localFileManager.start();
        this.dfsFileManager.start();
        this.fileCompaction.start();
        if (this.garbageReleaseManager != null) {
            this.garbageReleaseManager.start();
        }
        LOG.info("Supervisor is started");
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public void stop() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.regionExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        arrayList.add(this.flusherExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        arrayList.add(this.compactionExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        arrayList.add(this.snapshotExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        closeQueitly(this.fetchPolicy, "FetchPolicy");
        closeQueitly(this.cacheManager, "CacheManager");
        closeQueitly(this.snapshotManager, "SnapshotManager");
        closeQueitly(this.fileCompaction, "FileCompaction");
        closeQueitly(this.fileCache, "FileCache");
        closeQueitly(this.localFileManager, "LocalFileManager");
        closeQueitly(this.dfsFileManager, "DFSFileManager");
        closeQueitly(this.fileCleaner, "FileCleaner");
        closeQueitly(this.bloomFilterManager, "BloomFilterManager");
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Future) it.next()).awaitUninterruptibly(1L, TimeUnit.MINUTES);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public void close() {
        if (this.garbageReleaseManager != null) {
            closeQueitly(this.garbageReleaseManager, "GarbageReleaseManager");
        }
        if (this.leakDetector != null) {
            closeQueitly(this.leakDetector, "LeakDetector");
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public void startSnapshot(BackendSnapshotMeta backendSnapshotMeta) throws IOException {
        this.snapshotManager.startSnapshot(backendSnapshotMeta);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public SnapshotManager.PendingSnapshot getPendingSnapshot(long j) {
        return this.snapshotManager.getPendingSnapshot(j);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public Allocator getAllocator() {
        return this.allocator;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public Allocator getDefaultAllocator() {
        return this.defaultAllocator;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public Allocator getForReadAllocator() {
        return this.forReadAllocator;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public WriteBufferManager getWriteBufferManager() {
        return this.writeBufferManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public CacheManager getCacheManager() {
        return this.cacheManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public SnapshotManager getSnapshotManager() {
        return this.snapshotManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public FileManager getLocalFileManager() {
        return this.localFileManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public FileManager getDfsFileManager() {
        return this.dfsFileManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public FileCache getFileCache() {
        return this.fileCache;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public FileCleaner getFileCleaner() {
        return this.fileCleaner;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public FileCompaction getFileCompaction() {
        return this.fileCompaction;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public EventExecutorGroup getRegionExecutorGroup() {
        return this.regionExecutorGroup;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public EventExecutorGroup getFlushExecutorGroup() {
        return this.flusherExecutorGroup;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public EventExecutorGroup getSnapshotExecutorGroup() {
        return this.snapshotExecutorGroup;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public EventExecutorGroup getCompactionExecutorGroup() {
        return this.compactionExecutorGroup;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public EventExecutorGroup getLruIntoMainCacheExecutorGroup() {
        return this.lruIntoMainCacheExecutorGroup;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public void discardPage(GRegionContext gRegionContext, List<PageAddress> list) {
        Iterator<PageAddress> it = list.iterator();
        while (it.hasNext()) {
            it.next().discard(this.fileCache, gRegionContext, null);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public Map<String, GTable> getAllTables() {
        return this.gContext.getGeminiDB().getGeminiTableMap();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public FetchPolicy getFetchPolicy() {
        return this.fetchPolicy;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public GarbageReleaseManager getGarbageReleaseManager() {
        return this.garbageReleaseManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public LeakDetector getLeakDetector() {
        return this.leakDetector;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public PersistenceStrategy getPersistencyStrategy() {
        return this.persistenceStrategy;
    }

    private DataPageLRU.DataPageLRUFuction createLRUFunction() {
        return new DataPageLRU.DataPageLRUFuction<DataPageLRU.PageWithContext>() { // from class: org.apache.flink.runtime.state.gemini.engine.dbms.SupervisorImpl.1
            @Override // org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU.DataPageLRUFuction
            public int size(DataPageLRU.PageWithContext pageWithContext) {
                return pageWithContext.getFutureDataPage().getSize();
            }

            @Override // org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU.DataPageLRUFuction
            public void removed(DataPageLRU.PageWithContext pageWithContext) {
                pageWithContext.getFutureDataPage().removed();
            }

            @Override // org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU.DataPageLRUFuction
            public int getSlotIndex(DataPageLRU.PageWithContext pageWithContext, int i) {
                if (pageWithContext.getPageContext() == null) {
                    return 0;
                }
                return pageWithContext.getPageContext().getGRegionID().getId() - i;
            }

            @Override // org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU.DataPageLRUFuction
            public boolean canAddIntoMainCache(DataPageLRU.PageWithContext pageWithContext, PageIndex pageIndex, GRegionID gRegionID) {
                LogicalPageChain logicPage;
                PageContext pageContext = pageWithContext.getPageContext();
                if (pageContext == null || pageContext.getCacheStatus() == PageContext.CacheStatus.CACHING_TO_MAIN || pageContext.getLogicPageIndex() < 0 || !pageWithContext.getFutureDataPage().isDone() || pageWithContext.getFutureDataPage().isFail() || !gRegionID.equals(pageContext.getGRegionID())) {
                    return false;
                }
                int logicPageIndex = pageContext.getLogicPageIndex();
                int indexCapacity = pageIndex.getIndexCapacity();
                if (logicPageIndex >= indexCapacity) {
                    SupervisorImpl.LOG.debug("LogicPageChainIndex error, pageIndex {}, indexCapacity {}, expectedRegion {}, context region {}", new Object[]{Integer.valueOf(logicPageIndex), Integer.valueOf(indexCapacity), gRegionID, pageContext.getGRegionID()});
                    return false;
                }
                LogicalPageChain logicPage2 = pageIndex.getLogicPage(logicPageIndex);
                if (logicPage2 == null || pageContext.getLogicPageChainHashCode() != logicPage2.hashCode() || logicPage2.getPageStatus().equals(PageStatus.Compacting)) {
                    return false;
                }
                return logicPageIndex >= (indexCapacity >> 1) || (logicPage = pageIndex.getLogicPage(logicPageIndex + (indexCapacity >> 1))) == null || !logicPage.getPageStatus().equals(PageStatus.Init);
            }
        };
    }

    private void closeQueitly(Closeable closeable, String str) {
        try {
            closeable.close();
        } catch (Exception e) {
            LOG.error("Failed to close {}, {}", str, e);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor
    public BloomFilterManager getBloomFilterManager() {
        return this.bloomFilterManager;
    }
}
