/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.dbms;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.GRegionID;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompaction;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionImpl;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.NoFileCompaction;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleaner;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleanerImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategy;
import org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategyFactory;
import org.apache.flink.runtime.state.gemini.engine.handler.GeminiEventExecutorGroup;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.page.DfsDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.LocalDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.LogicalPageChain;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageContext;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageStatus;
import org.apache.flink.runtime.state.gemini.engine.rm.Allocator;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.rm.GarbageReleaseManager;
import org.apache.flink.runtime.state.gemini.engine.rm.GarbageReleaseManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.LeakDetector;
import org.apache.flink.runtime.state.gemini.engine.rm.LeakDetectorImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.PoolAllocatorNettyImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.UnpoolAllocatorImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.BloomFilterManager;
import org.apache.flink.runtime.state.gemini.engine.vm.BloomFilterManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicyImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.NoBloomFilterManagerImpl;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SupervisorImpl
implements Supervisor {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorImpl.class);
    private final GContext gContext;
    private final Allocator allocator;
    private final Allocator defaultAllocator = new UnpoolAllocatorImpl();
    private final Allocator forReadAllocator;
    private final WriteBufferManager writeBufferManager;
    private final CacheManager cacheManager;
    private final SnapshotManager snapshotManager;
    private final FileManager localFileManager;
    private final FileManager dfsFileManager;
    private final FileCache fileCache;
    private final FileCleaner fileCleaner;
    private final FileCompaction fileCompaction;
    private final EventExecutorGroup regionExecutorGroup;
    private final EventExecutorGroup flusherExecutorGroup;
    private final EventExecutorGroup snapshotExecutorGroup;
    private final EventExecutorGroup compactionExecutorGroup;
    private final EventExecutorGroup lruIntoMainCacheExecutorGroup;
    private final FetchPolicy fetchPolicy;
    private final GarbageReleaseManager<GByteBuffer> garbageReleaseManager;
    private final LeakDetector leakDetector;
    private final PersistenceStrategy persistenceStrategy;
    private final BloomFilterManager bloomFilterManager;

    public SupervisorImpl(GContext gContext) {
        gContext.setSupervisor(this);
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.writeBufferManager = new WriteBufferManagerImpl(gContext);
        GConfiguration gConfiguration = gContext.getGConfiguration();
        this.cacheManager = new CacheManagerImpl(this.gContext);
        if (gContext.getMemoryInfo().isUseOffHeap()) {
            this.garbageReleaseManager = new GarbageReleaseManagerImpl<GByteBuffer>(this.gContext);
            this.leakDetector = new LeakDetectorImpl();
            this.allocator = this.forReadAllocator = new PoolAllocatorNettyImpl(gContext, this.garbageReleaseManager, this.leakDetector);
        } else {
            this.allocator = new UnpoolAllocatorImpl();
            if (gContext.getMemoryInfo().isUseOffheapForRead()) {
                this.garbageReleaseManager = new GarbageReleaseManagerImpl<GByteBuffer>(this.gContext);
                this.leakDetector = new LeakDetectorImpl();
                this.forReadAllocator = new PoolAllocatorNettyImpl(gContext, this.garbageReleaseManager, this.leakDetector);
            } else {
                this.garbageReleaseManager = null;
                this.leakDetector = null;
                this.forReadAllocator = this.allocator;
            }
        }
        this.fileCleaner = new FileCleanerImpl(gContext);
        this.localFileManager = new FileManagerImpl(gContext, "local", new Path(gConfiguration.getLocalPath()), false, new LocalDataPageUtil(this.forReadAllocator, gConfiguration.isChecksumEnable()));
        this.fileCleaner.registerFileManager(this.localFileManager);
        this.dfsFileManager = new FileManagerImpl(gContext, "dfs", new Path(gConfiguration.getDfsPath()), true, new DfsDataPageUtil(gConfiguration.isChecksumEnable()));
        this.fileCleaner.registerFileManager(this.dfsFileManager);
        this.fileCache = FileCache.createFileCache(gContext, this.localFileManager, this.dfsFileManager);
        this.fileCompaction = gConfiguration.isFileCompactionEnabled() ? new FileCompactionImpl(gContext, (FileCompactionPageTransfer)((Object)this.fileCache)) : new NoFileCompaction();
        this.snapshotManager = new SnapshotManagerImpl(this.gContext, this.writeBufferManager, this.localFileManager, this.dfsFileManager);
        this.fetchPolicy = new FetchPolicyImpl(gContext, this.cacheManager.getCacheStats(), new DataPageLRU<PageAddress, DataPageLRU.PageWithContext>(gContext.getStartRegionId(), gContext.getEndRegionId(), this.cacheManager.getReadPageCacheLRUSize(), this.createLRUFunction(), gConfiguration.isEnableLruAccessMode(), gConfiguration.isEnableEvictRegionEven()));
        String prefix = gConfiguration.getExecutorPrefixName();
        ThreadFactory regionThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiRegion-%d").build();
        this.regionExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getRegionThreadNum(), regionThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        ThreadFactory flushThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiFlush-%d").build();
        this.flusherExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getFlushThreadNum(), flushThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        ThreadFactory snapshotThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiSnapshot-%d").build();
        this.snapshotExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getSnapshotThreadNum(), snapshotThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        ThreadFactory compactionThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiCompaction-%d").build();
        this.compactionExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getCompactionThreadNum(), compactionThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        this.persistenceStrategy = PersistenceStrategyFactory.INSTANCE.create(gContext);
        ThreadFactory lruIntoMainThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "lruIntoMain-%d").build();
        this.lruIntoMainCacheExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getLruIntoMainCacheThreadNum(), lruIntoMainThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        this.bloomFilterManager = gConfiguration.isEnableBloomFilter() ? new BloomFilterManagerImpl((long)((float)gContext.getMemoryInfo().getTotalHeapSize() * gConfiguration.getBloomFilterMemRate()), gConfiguration.getRegionThreadNum() + 1) : new NoBloomFilterManagerImpl();
        LOG.info("Supervisor is created");
    }

    @Override
    public void start() {
        this.cacheManager.start();
        this.fileCleaner.start();
        this.localFileManager.start();
        this.dfsFileManager.start();
        this.fileCompaction.start();
        if (this.garbageReleaseManager != null) {
            this.garbageReleaseManager.start();
        }
        LOG.info("Supervisor is started");
    }

    @Override
    public void stop() {
        ArrayList<Future> futureList = new ArrayList<Future>();
        futureList.add(this.regionExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        futureList.add(this.flusherExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        futureList.add(this.compactionExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        futureList.add(this.snapshotExecutorGroup.shutdownGracefully(100L, 500L, TimeUnit.MILLISECONDS));
        this.closeQueitly(this.fetchPolicy, "FetchPolicy");
        this.closeQueitly(this.cacheManager, "CacheManager");
        this.closeQueitly(this.snapshotManager, "SnapshotManager");
        this.closeQueitly(this.fileCompaction, "FileCompaction");
        this.closeQueitly(this.fileCache, "FileCache");
        this.closeQueitly(this.localFileManager, "LocalFileManager");
        this.closeQueitly(this.dfsFileManager, "DFSFileManager");
        this.closeQueitly(this.fileCleaner, "FileCleaner");
        this.closeQueitly(this.bloomFilterManager, "BloomFilterManager");
        for (Future future : futureList) {
            future.awaitUninterruptibly(1L, TimeUnit.MINUTES);
        }
    }

    @Override
    public void close() {
        if (this.garbageReleaseManager != null) {
            this.closeQueitly(this.garbageReleaseManager, "GarbageReleaseManager");
        }
        if (this.leakDetector != null) {
            this.closeQueitly(this.leakDetector, "LeakDetector");
        }
    }

    @Override
    public void startSnapshot(BackendSnapshotMeta backendSnapshotMeta) throws IOException {
        this.snapshotManager.startSnapshot(backendSnapshotMeta);
    }

    @Override
    public SnapshotManager.PendingSnapshot getPendingSnapshot(long checkpointId) {
        return this.snapshotManager.getPendingSnapshot(checkpointId);
    }

    @Override
    public Allocator getAllocator() {
        return this.allocator;
    }

    @Override
    public Allocator getDefaultAllocator() {
        return this.defaultAllocator;
    }

    @Override
    public Allocator getForReadAllocator() {
        return this.forReadAllocator;
    }

    @Override
    public WriteBufferManager getWriteBufferManager() {
        return this.writeBufferManager;
    }

    @Override
    public CacheManager getCacheManager() {
        return this.cacheManager;
    }

    @Override
    public SnapshotManager getSnapshotManager() {
        return this.snapshotManager;
    }

    @Override
    public FileManager getLocalFileManager() {
        return this.localFileManager;
    }

    @Override
    public FileManager getDfsFileManager() {
        return this.dfsFileManager;
    }

    @Override
    public FileCache getFileCache() {
        return this.fileCache;
    }

    @Override
    public FileCleaner getFileCleaner() {
        return this.fileCleaner;
    }

    @Override
    public FileCompaction getFileCompaction() {
        return this.fileCompaction;
    }

    @Override
    public EventExecutorGroup getRegionExecutorGroup() {
        return this.regionExecutorGroup;
    }

    @Override
    public EventExecutorGroup getFlushExecutorGroup() {
        return this.flusherExecutorGroup;
    }

    @Override
    public EventExecutorGroup getSnapshotExecutorGroup() {
        return this.snapshotExecutorGroup;
    }

    @Override
    public EventExecutorGroup getCompactionExecutorGroup() {
        return this.compactionExecutorGroup;
    }

    @Override
    public EventExecutorGroup getLruIntoMainCacheExecutorGroup() {
        return this.lruIntoMainCacheExecutorGroup;
    }

    @Override
    public void discardPage(GRegionContext gRegionContext, List<PageAddress> pageAddressList) {
        for (PageAddress pageAddress : pageAddressList) {
            pageAddress.discard(this.fileCache, gRegionContext, null);
        }
    }

    @Override
    public Map<String, GTable> getAllTables() {
        return this.gContext.getGeminiDB().getGeminiTableMap();
    }

    @Override
    public FetchPolicy getFetchPolicy() {
        return this.fetchPolicy;
    }

    @Override
    public GarbageReleaseManager getGarbageReleaseManager() {
        return this.garbageReleaseManager;
    }

    @Override
    public LeakDetector getLeakDetector() {
        return this.leakDetector;
    }

    @Override
    public PersistenceStrategy getPersistencyStrategy() {
        return this.persistenceStrategy;
    }

    private DataPageLRU.DataPageLRUFuction createLRUFunction() {
        return new DataPageLRU.DataPageLRUFuction<DataPageLRU.PageWithContext>(){

            @Override
            public int size(DataPageLRU.PageWithContext value) {
                return value.getFutureDataPage().getSize();
            }

            @Override
            public void removed(DataPageLRU.PageWithContext value) {
                value.getFutureDataPage().removed();
            }

            @Override
            public int getSlotIndex(DataPageLRU.PageWithContext value, int offset) {
                return value.getPageContext() == null ? 0 : value.getPageContext().getGRegionID().getId() - offset;
            }

            @Override
            public boolean canAddIntoMainCache(DataPageLRU.PageWithContext value, PageIndex pageIndex, GRegionID expectedRegionId) {
                LogicalPageChain buddyPageChain;
                int indexCapacity;
                PageContext context = value.getPageContext();
                if (context == null) {
                    return false;
                }
                if (context.getCacheStatus() == PageContext.CacheStatus.CACHING_TO_MAIN) {
                    return false;
                }
                if (context.getLogicPageIndex() < 0) {
                    return false;
                }
                if (!value.getFutureDataPage().isDone() || value.getFutureDataPage().isFail()) {
                    return false;
                }
                if (!expectedRegionId.equals(context.getGRegionID())) {
                    return false;
                }
                int logicPageIndex = context.getLogicPageIndex();
                if (logicPageIndex >= (indexCapacity = pageIndex.getIndexCapacity())) {
                    LOG.debug("LogicPageChainIndex error, pageIndex {}, indexCapacity {}, expectedRegion {}, context region {}", new Object[]{logicPageIndex, indexCapacity, expectedRegionId, context.getGRegionID()});
                    return false;
                }
                LogicalPageChain pageChain = pageIndex.getLogicPage(logicPageIndex);
                if (pageChain == null) {
                    return false;
                }
                if (context.getLogicPageChainHashCode() != pageChain.hashCode()) {
                    return false;
                }
                if (pageChain.getPageStatus().equals((Object)PageStatus.Compacting)) {
                    return false;
                }
                return logicPageIndex >= indexCapacity >> 1 || (buddyPageChain = pageIndex.getLogicPage(logicPageIndex + (indexCapacity >> 1))) == null || !buddyPageChain.getPageStatus().equals((Object)PageStatus.Init);
            }
        };
    }

    private void closeQueitly(Closeable closeable, String closeableName) {
        try {
            closeable.close();
        }
        catch (Exception e) {
            LOG.error("Failed to close {}, {}", (Object)closeableName, (Object)e);
        }
    }

    @Override
    public BloomFilterManager getBloomFilterManager() {
        return this.bloomFilterManager;
    }
}

