/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.dbms;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.dbms.DiscardOrEvictPageReleaseManager;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.InfiniteCapacityFileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.NoCapacityFileCache;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleaner;
import org.apache.flink.runtime.state.gemini.engine.fs.FileCleanerImpl;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.handler.GeminiEventExecutorGroup;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.DfsDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.LocalDataPageUtil;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.rm.Allocator;
import org.apache.flink.runtime.state.gemini.engine.rm.PoolAllocatorNettyImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.runtime.state.gemini.engine.rm.UnpoolAllocatorImpl;
import org.apache.flink.runtime.state.gemini.engine.snapshot.BackendSnapshotMeta;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManager;
import org.apache.flink.runtime.state.gemini.engine.vm.CacheManagerImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.DataPageLRU;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicy;
import org.apache.flink.runtime.state.gemini.engine.vm.FetchPolicyImpl;
import org.apache.flink.runtime.state.gemini.engine.vm.FutureDataPage;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SupervisorImpl
implements Supervisor {
    private static final Logger LOG = LoggerFactory.getLogger(SupervisorImpl.class);
    private final GContext gContext;
    private final Allocator allocator;
    private final Allocator defaultAllocator = new UnpoolAllocatorImpl();
    private final Allocator forReadAllocator;
    private final WriteBufferManager writeBufferManager;
    private final CacheManager cacheManager;
    private final SnapshotManager snapshotManager;
    private final FileManager localFileManager;
    private final FileManager dfsFileManager;
    private final FileCache fileCache;
    private final FileCleaner fileCleaner;
    private final EventExecutorGroup regionExecutorGroup;
    private final EventExecutorGroup flusherExecutorGroup;
    private final EventExecutorGroup snapshotExecutorGroup;
    private final EventExecutorGroup compactionExecutorGroup;
    private final FetchPolicy fetchPolicy;
    public final DiscardOrEvictPageReleaseManager discardOrEvictPageReleaseManager;

    public SupervisorImpl(GContext gContext) {
        gContext.setSupervisor(this);
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.writeBufferManager = new WriteBufferManagerImpl(gContext);
        GConfiguration gConfiguration = gContext.getGConfiguration();
        this.cacheManager = new CacheManagerImpl(this.gContext);
        this.discardOrEvictPageReleaseManager = new DiscardOrEvictPageReleaseManager(this.gContext);
        if (gConfiguration.getUseOffheap()) {
            this.allocator = this.forReadAllocator = new PoolAllocatorNettyImpl(gConfiguration, this.discardOrEvictPageReleaseManager);
        } else {
            this.allocator = new UnpoolAllocatorImpl();
            this.forReadAllocator = gConfiguration.getForceReadUseOffheap() ? new PoolAllocatorNettyImpl(gConfiguration, this.discardOrEvictPageReleaseManager) : this.allocator;
        }
        this.fileCleaner = new FileCleanerImpl(gContext);
        this.localFileManager = new FileManagerImpl(gContext, "local", new Path(gConfiguration.getLocalPath()), false, new LocalDataPageUtil(this.forReadAllocator, gConfiguration.isChecksumEnable()));
        this.fileCleaner.registerFileManager(this.localFileManager);
        this.dfsFileManager = new FileManagerImpl(gContext, "dfs", new Path(gConfiguration.getDfsPath()), true, new DfsDataPageUtil(gConfiguration.isChecksumEnable()));
        this.fileCleaner.registerFileManager(this.dfsFileManager);
        long fileCacheCapacity = gConfiguration.getFileCacheCapacity();
        if (fileCacheCapacity == FileCache.INFINITE_CAPACITY) {
            this.fileCache = new InfiniteCapacityFileCache(gContext, this.localFileManager, this.dfsFileManager);
        } else if (fileCacheCapacity <= 0L) {
            this.fileCache = new NoCapacityFileCache(gContext, this.dfsFileManager);
        } else {
            throw new GeminiRuntimeException("unsupported file cache capacity " + fileCacheCapacity);
        }
        this.snapshotManager = new SnapshotManagerImpl(this.gContext, this.writeBufferManager, this.localFileManager, this.dfsFileManager);
        this.fetchPolicy = new FetchPolicyImpl(gContext, this.cacheManager.getCacheStats(), new DataPageLRU<PageAddress, FutureDataPage>((long)this.cacheManager.getReadPageCacheLRUSize(), new DataPageLRU.DataPageLRUFuction<FutureDataPage>(){

            @Override
            public int size(FutureDataPage value) {
                return value.getSize();
            }

            @Override
            public void removed(FutureDataPage value) {
                value.removed();
            }
        }));
        String prefix = gConfiguration.getExcetorPrefixName();
        ThreadFactory regionThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiRegion-%d").build();
        this.regionExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getRegionThreadNum(), regionThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        ThreadFactory flushThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiFlush-%d").build();
        this.flusherExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getFlushThreadNum(), flushThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        ThreadFactory snapshotThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiSnapshot-%d").build();
        this.snapshotExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getSnapshotThreadNum(), snapshotThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        ThreadFactory compactionThreadFactory = new ThreadFactoryBuilder().setNameFormat(prefix + "geminiCompaction-%d").build();
        this.compactionExecutorGroup = new GeminiEventExecutorGroup(gContext.getGConfiguration().getCompactionThreadNum(), compactionThreadFactory, gContext.getGConfiguration().getCommonThreadSleepTimeNs(), gContext);
        LOG.info("Supervisor is created");
    }

    @Override
    public void start() {
        this.cacheManager.start();
        this.fileCleaner.start();
        this.localFileManager.start();
        this.dfsFileManager.start();
        LOG.info("Supervisor is started");
    }

    @Override
    public void close() {
        this.regionExecutorGroup.shutdownGracefully();
        this.flusherExecutorGroup.shutdownGracefully();
        this.compactionExecutorGroup.shutdownGracefully();
        this.snapshotExecutorGroup.shutdownGracefully();
        this.closeQueitly(this.fetchPolicy, "FetchPolicy");
        this.closeQueitly(this.cacheManager, "CacheManager");
        this.closeQueitly(this.snapshotManager, "SnapshotManager");
        this.closeQueitly(this.fileCache, "FileCache");
        this.closeQueitly(this.localFileManager, "LocalFileManager");
        this.closeQueitly(this.dfsFileManager, "DFSFileManager");
        this.closeQueitly(this.fileCleaner, "FileCleaner");
    }

    @Override
    public void startSnapshot(BackendSnapshotMeta backendSnapshotMeta) throws IOException {
        this.snapshotManager.startSnapshot(backendSnapshotMeta);
    }

    @Override
    public SnapshotManager.PendingSnapshot getPendingSnapshot(long checkpointId) {
        return this.snapshotManager.getPendingSnapshot(checkpointId);
    }

    @Override
    public Allocator getAllocator() {
        return this.allocator;
    }

    @Override
    public Allocator getDefaultAllocator() {
        return this.defaultAllocator;
    }

    @Override
    public Allocator getForReadAllocator() {
        return this.forReadAllocator;
    }

    @Override
    public WriteBufferManager getWriteBufferManager() {
        return this.writeBufferManager;
    }

    @Override
    public CacheManager getCacheManager() {
        return this.cacheManager;
    }

    @Override
    public SnapshotManager getSnapshotManager() {
        return this.snapshotManager;
    }

    @Override
    public FileManager getLocalFileManager() {
        return this.localFileManager;
    }

    @Override
    public FileManager getDfsFileManager() {
        return this.dfsFileManager;
    }

    @Override
    public FileCache getFileCache() {
        return this.fileCache;
    }

    @Override
    public FileCleaner getFileCleaner() {
        return this.fileCleaner;
    }

    @Override
    public EventExecutorGroup getRegionExecutorGroup() {
        return this.regionExecutorGroup;
    }

    @Override
    public EventExecutorGroup getFlushExecutorGroup() {
        return this.flusherExecutorGroup;
    }

    @Override
    public EventExecutorGroup getSnapshotExecutorGroup() {
        return this.snapshotExecutorGroup;
    }

    @Override
    public EventExecutorGroup getCompactionExecutorGroup() {
        return this.compactionExecutorGroup;
    }

    @Override
    public void discardPage(GRegionContext gRegionContext, List<PageAddress> pageAddressList) {
        for (PageAddress pageAddress : pageAddressList) {
            this.fileCache.discardPage(pageAddress, gRegionContext, null);
        }
        for (PageAddress pageAddress : pageAddressList) {
            DataPage dataPage = pageAddress.getDataPageNoReference();
            if (dataPage == null) continue;
            if (dataPage.getGBinaryHashMap().getGByteBuffer().getCnt() != 1) {
                this.discardOrEvictPageReleaseManager.addMonitorPageStillHaveReference(dataPage.getGBinaryHashMap().getGByteBuffer(), ReferenceCount.ReleaseType.Discard, pageAddress);
            }
            dataPage.delReferenceCount(ReferenceCount.ReleaseType.Discard);
        }
    }

    @Override
    public Map<String, GTable> getAllTables() {
        return this.gContext.getGeminiDB().getGeminiTableMap();
    }

    @Override
    public FetchPolicy getFetchPolicy() {
        return this.fetchPolicy;
    }

    @Override
    public DiscardOrEvictPageReleaseManager getDiscardOrEvictPageReleaseManager() {
        return this.discardOrEvictPageReleaseManager;
    }

    private void closeQueitly(Closeable closeable, String closeableName) {
        try {
            closeable.close();
        }
        catch (Exception e) {
            LOG.error("Failed to close {}, {}", (Object)closeableName, (Object)e);
        }
    }
}

