/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.filecache;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filecache.AddBatchPageException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCacheStat;
import org.apache.flink.runtime.state.gemini.engine.filecache.FlushBatchPageException;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileReader;
import org.apache.flink.runtime.state.gemini.engine.fs.FileWriter;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressSingleImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCountable;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InfiniteFileCache
extends FileCache
implements FileCompactionPageTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(InfiniteFileCache.class);
    private final GContext gContext;
    private final long maxFileSize;
    private final boolean syncWhenBatchFlush;
    private final FileManager localFileManager;
    private Map<EventExecutor, FileWriter> localFileWriters;
    private final FileManager dfsFileManager;
    private Map<EventExecutor, FileWriter> dfsFileWriters;
    private volatile boolean closed;

    public InfiniteFileCache(GContext gContext, FileManager localFileManager, FileManager dfsFileManager) {
        super(Long.MAX_VALUE, new FileCacheStat());
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.maxFileSize = gContext.getGConfiguration().getMaxFileSize();
        Preconditions.checkArgument((this.maxFileSize > 0L ? 1 : 0) != 0, (Object)"Max file size should be positive");
        this.syncWhenBatchFlush = gContext.getGConfiguration().isSnapshotSyncWhenBatchFlush();
        this.localFileManager = (FileManager)Preconditions.checkNotNull((Object)localFileManager);
        this.dfsFileManager = (FileManager)Preconditions.checkNotNull((Object)dfsFileManager);
        this.localFileWriters = new ConcurrentHashMap<EventExecutor, FileWriter>();
        this.dfsFileWriters = new ConcurrentHashMap<EventExecutor, FileWriter>();
        FileCacheMetrics fileCacheMetrics = gContext.getFileCacheMetrics();
        if (fileCacheMetrics != null) {
            fileCacheMetrics.register(this.fileCacheStat);
        }
        this.closed = false;
        LOG.info("InfiniteFileCache created, LocalFileManager {}, DfsFileManager {}", (Object)localFileManager, (Object)dfsFileManager);
    }

    @VisibleForTesting
    public FileManager getLocalFileManager() {
        return this.localFileManager;
    }

    @VisibleForTesting
    public FileManager getDfsFileManager() {
        return this.dfsFileManager;
    }

    @Override
    public boolean isCached(PageAddress pageAddress) {
        return pageAddress.isLocalValid();
    }

    @Override
    public void addPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor flushEventExecutor, BiConsumer<Boolean, Throwable> callBack) {
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
        if (pageAddress.isLocalValid()) {
            if (callBack != null) {
                callBack.accept(true, null);
            }
            return;
        }
        flushEventExecutor.execute(() -> {
            boolean success = false;
            Exception throwable = null;
            GByteBuffer gByteBuffer = pageAddress.getGByteBufferWithReference();
            try {
                if (!pageAddress.isLocalValid()) {
                    if (gByteBuffer == null && pageAddress.isDfsValid()) {
                        gByteBuffer = this.getGByteBuffer(this.dfsFileManager, pageAddress::getDfsAddress, pageAddress, false);
                    }
                    if (gByteBuffer != null) {
                        FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, flushEventExecutor);
                        this.internalAddPage(this.localFileManager, fileWriter, pageAddress, gByteBuffer, gRegionContext, true, true);
                    } else {
                        throw new GeminiRuntimeException("data page does not exist");
                    }
                }
                success = true;
            }
            catch (Exception e) {
                success = false;
                throwable = e;
                LOG.error("error when adding page to cache", (Throwable)e);
            }
            finally {
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public GByteBuffer getPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor flushEventExecutor) {
        ReferenceCountable gByteBuffer = null;
        try {
            Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
            if (pageAddress.isLocalValid()) {
                gByteBuffer = this.getGByteBuffer(this.localFileManager, pageAddress::getLocalAddress, pageAddress, true);
                if (gByteBuffer != null) {
                    this.fileCacheStat.addHitSize(pageAddress.getDataLen());
                }
            } else if (pageAddress.isDfsValid()) {
                gByteBuffer = this.getGByteBuffer(this.dfsFileManager, pageAddress::getDfsAddress, pageAddress, false);
                if (gByteBuffer != null) {
                    this.cachePage(pageAddress, (GByteBuffer)gByteBuffer, flushEventExecutor, gRegionContext);
                    this.fileCacheStat.addMissSize(pageAddress.getDataLen());
                }
            }
            Preconditions.checkNotNull((Object)gByteBuffer, (String)"no page exists on local and dfs");
            return gByteBuffer;
        }
        catch (Exception e) {
            if (gByteBuffer != null) {
                gByteBuffer.release();
            }
            LOG.error("exception when get page", (Throwable)e);
            throw new GeminiRuntimeException("exception when get page: " + e.getMessage(), e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void discardPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        boolean pageValid;
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
        boolean localValid = false;
        long localAddress = -1L;
        boolean dfsValid = false;
        long dfsAddress = -1L;
        PageAddress pageAddress2 = pageAddress;
        synchronized (pageAddress2) {
            pageValid = pageAddress.isPageValid();
            if (pageValid) {
                pageAddress.setPageStatus(false);
                localValid = pageAddress.isLocalValid();
                dfsValid = pageAddress.isDfsValid();
                if (localValid) {
                    localAddress = pageAddress.getLocalAddress();
                }
                if (dfsValid) {
                    dfsAddress = pageAddress.getDfsAddress();
                }
            }
        }
        if (pageValid) {
            long accessNumber = this.gContext.getAccessNumber();
            long ts = System.currentTimeMillis();
            if (localValid) {
                this.localFileManager.decDBReference(localAddress, accessNumber, ts, pageAddress.getDataLen());
            }
            if (dfsValid) {
                this.dfsFileManager.decDBReference(dfsAddress, accessNumber, ts, pageAddress.getDataLen());
            }
        }
    }

    @Override
    public void flushPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, boolean force, BiConsumer<Boolean, Throwable> callBack) {
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
        if (!force && pageAddress.isDfsValid()) {
            if (callBack != null) {
                callBack.accept(true, null);
            }
            return;
        }
        eventExecutor.execute(() -> {
            boolean success = false;
            Exception throwable = null;
            boolean pageIsNull = false;
            GByteBuffer gByteBuffer = pageAddress.getGByteBufferWithReference();
            try {
                if (force || !pageAddress.isDfsValid()) {
                    if (gByteBuffer == null) {
                        pageIsNull = true;
                        if (pageAddress.isLocalValid()) {
                            gByteBuffer = this.getGByteBuffer(this.localFileManager, pageAddress::getLocalAddress, pageAddress, true);
                        } else if (pageAddress.isDfsValid()) {
                            gByteBuffer = this.getGByteBuffer(this.dfsFileManager, pageAddress::getDfsAddress, pageAddress, false);
                        }
                    }
                    Preconditions.checkNotNull((Object)gByteBuffer, (String)"Data page is null");
                    FileWriter fileWriter = this.getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                    this.internalAddPage(this.dfsFileManager, fileWriter, pageAddress, gByteBuffer, gRegionContext, false, false);
                }
                success = true;
            }
            catch (Exception e) {
                success = false;
                throwable = e;
                LOG.error("error when adding page to cache: pageIsNull={}, {}", new Object[]{pageIsNull, e.getMessage(), e});
            }
            finally {
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void addBatchPages(List<PageAddress> pages, List<GRegionContext> gRegionContexts, EventExecutor eventExecutor, List<BiConsumer<Boolean, Throwable>> callBacks) {
        if (pages.isEmpty()) {
            return;
        }
        eventExecutor.execute(() -> {
            boolean success = true;
            AddBatchPageException throwable = null;
            try {
                int size = pages.size();
                ArrayList<Long> addressList = new ArrayList<Long>(size);
                FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor);
                for (int i = 0; i < size; ++i) {
                    PageAddress page = (PageAddress)pages.get(i);
                    if (!page.isLocalValid()) {
                        GRegionContext gRegionContext = (GRegionContext)gRegionContexts.get(i);
                        GByteBuffer buffer = page.getGByteBufferWithReference();
                        try {
                            if (buffer == null && page.isDfsValid()) {
                                buffer = this.getGByteBuffer(this.dfsFileManager, page::getDfsAddress, page, false);
                            }
                            if (buffer != null) {
                                long address = this.writePage(this.localFileManager, fileWriter, page, buffer, gRegionContext, true);
                                addressList.add(address);
                                continue;
                            }
                            throw new GeminiRuntimeException("data page does not exist");
                        }
                        finally {
                            if (buffer != null) {
                                buffer.release();
                            }
                        }
                    }
                    addressList.add(null);
                }
                fileWriter.flush();
                long accessNumber = ((GRegionContext)gRegionContexts.get(0)).getGContext().getAccessNumber();
                for (int i = 0; i < size; ++i) {
                    Long address = (Long)addressList.get(i);
                    if (address == null) continue;
                    this.updatePageAddress(this.localFileManager, (PageAddress)pages.get(i), address, true, accessNumber);
                }
            }
            catch (Exception e) {
                success = false;
                throwable = new AddBatchPageException(e);
            }
            finally {
                for (BiConsumer callBack : callBacks) {
                    if (callBack == null) continue;
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void flushBatchPages(List<PageAddress> pages, List<GRegionContext> gRegionContexts, EventExecutor eventExecutor, boolean force, boolean flushLocal, List<BiConsumer<Boolean, Throwable>> callBacks) {
        if (pages.isEmpty()) {
            return;
        }
        eventExecutor.execute(() -> {
            boolean success = true;
            FlushBatchPageException throwable = null;
            try {
                int size = pages.size();
                ArrayList<Long> addressList = new ArrayList<Long>(size);
                FileWriter fileWriter = this.getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                for (int i = 0; i < size; ++i) {
                    PageAddress page = (PageAddress)pages.get(i);
                    if (force || !page.isDfsValid()) {
                        GByteBuffer buffer = page.getGByteBufferWithReference();
                        boolean inMemory = buffer != null;
                        try {
                            if (!inMemory) {
                                if (page.isLocalValid()) {
                                    buffer = this.getGByteBuffer(this.localFileManager, page::getLocalAddress, page, true);
                                } else if (page.isDfsValid()) {
                                    buffer = this.getGByteBuffer(this.dfsFileManager, page::getDfsAddress, page, false);
                                }
                            }
                            if (buffer != null) {
                                long address = this.writePage(this.dfsFileManager, fileWriter, page, buffer, (GRegionContext)gRegionContexts.get(i), false);
                                addressList.add(address);
                                if (!inMemory && flushLocal && !page.isLocalValid()) {
                                    FileWriter localFileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor);
                                    this.internalAddPage(this.localFileManager, localFileWriter, page, buffer, (GRegionContext)gRegionContexts.get(i), true, true);
                                }
                                continue;
                            }
                            throw new GeminiRuntimeException("data page does not exist");
                        }
                        finally {
                            if (buffer != null) {
                                buffer.release();
                            }
                        }
                    }
                    addressList.add(null);
                }
                if (this.syncWhenBatchFlush) {
                    fileWriter.sync();
                } else {
                    fileWriter.flush();
                }
                long accessNumber = ((GRegionContext)gRegionContexts.get(0)).getGContext().getAccessNumber();
                for (int i = 0; i < size; ++i) {
                    Long address = (Long)addressList.get(i);
                    if (address == null) continue;
                    this.updatePageAddress(this.dfsFileManager, (PageAddress)pages.get(i), address, false, accessNumber);
                }
            }
            catch (Exception e) {
                success = false;
                throwable = new FlushBatchPageException(e);
            }
            finally {
                for (BiConsumer callBack : callBacks) {
                    if (callBack == null) continue;
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void sync() throws IOException {
        for (FileWriter fileWriter : this.dfsFileWriters.values()) {
            fileWriter.sync();
        }
        for (FileWriter fileWriter : this.localFileWriters.values()) {
            fileWriter.sync();
        }
    }

    @Override
    public FileCache.FileCacheType getFileCacheType() {
        return FileCache.FileCacheType.INFINITE;
    }

    @Override
    public FileManager getDbFileManager() {
        return this.localFileManager;
    }

    @Override
    public boolean hasDbFileAddress(PageAddress pageAddress) {
        return pageAddress.isPageValid() && pageAddress.isLocalValid();
    }

    @Override
    public int getDbFileId(PageAddress pageAddress) {
        return this.localFileManager.getSimpleFileID(pageAddress.getLocalAddress());
    }

    @Override
    public void transferPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, @Nullable BiConsumer<Boolean, Throwable> callBack) {
        eventExecutor.execute(() -> {
            boolean success = false;
            Exception throwable = null;
            GByteBuffer buffer = pageAddress.getGByteBufferWithReference();
            try {
                if (pageAddress.isPageValid() && pageAddress.isLocalValid()) {
                    if (buffer == null) {
                        long localAddress = pageAddress.getLocalAddress();
                        FileReader fileReader = this.localFileManager.getFileReader(localAddress);
                        long offset = this.localFileManager.getFileOffset(localAddress);
                        buffer = this.localFileManager.getDataPageUtil().getDataPageFromReader(fileReader, (int)offset, pageAddress);
                    }
                    FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor);
                    this.internalAddPage(this.localFileManager, fileWriter, pageAddress, buffer, gRegionContext, true, true);
                    success = true;
                }
            }
            catch (Exception e) {
                throwable = e;
            }
            finally {
                if (buffer != null) {
                    buffer.release();
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Iterator<FileWriter> iterator = this;
        synchronized (iterator) {
            if (this.closed) {
                LOG.warn("NoFileCache has been closed");
                return;
            }
            this.closed = true;
        }
        for (FileWriter fileWriter : this.localFileWriters.values()) {
            this.localFileManager.closeFileWriter(fileWriter);
        }
        this.localFileWriters.clear();
        for (FileWriter fileWriter : this.dfsFileWriters.values()) {
            this.dfsFileManager.closeFileWriter(fileWriter);
        }
        this.dfsFileWriters.clear();
        LOG.info("InfiniteFileCache is closed");
    }

    private GByteBuffer getGByteBuffer(FileManager fileManager, Callable<Long> addressCallable, PageAddress pageAddress, boolean isLocal) throws Exception {
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
        int unexpectedTries = 0;
        int expectedTries = 0;
        ReferenceCountable gByteBuffer = null;
        long address = addressCallable.call();
        while (true) {
            try {
                FileReader fileReader = fileManager.getFileReader(address);
                long offset = fileManager.getFileOffset(address);
                long startTime = System.nanoTime();
                gByteBuffer = fileManager.getDataPageUtil().getDataPageFromReader(fileReader, (int)offset, pageAddress);
                this.updateReadStat(pageAddress.getDataLen(), System.nanoTime() - startTime, isLocal);
                return gByteBuffer;
            }
            catch (Exception e) {
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                gByteBuffer = null;
                long oldAddress = address;
                if (oldAddress == (address = addressCallable.call().longValue())) {
                    ++unexpectedTries;
                    continue;
                }
                ++expectedTries;
                if (unexpectedTries < 3 && expectedTries < 10) continue;
                LOG.error("get page failed, try " + unexpectedTries + " times unexpectedly, and try " + expectedTries + " times as expected, last exception", (Throwable)e);
                throw e;
            }
            break;
        }
    }

    private void cachePage(PageAddress page, GByteBuffer buffer, EventExecutor eventExecutor, GRegionContext gRegionContext) {
        buffer.retain();
        eventExecutor.execute(() -> {
            try {
                FileWriter fileWriter = this.getOrCreateFileWriter(this.localFileWriters, this.localFileManager, eventExecutor);
                this.internalAddPage(this.localFileManager, fileWriter, page, buffer, gRegionContext, true, true);
            }
            catch (Exception e) {
                LOG.error("cache data failed", (Throwable)e);
            }
            finally {
                buffer.release();
            }
        });
    }

    private FileWriter getOrCreateFileWriter(Map<EventExecutor, FileWriter> fileWriterMap, FileManager fileManager, EventExecutor eventExecutor) {
        if (this.closed) {
            throw new GeminiRuntimeException("InfiniteFileCache has been closed.");
        }
        FileWriter fileWriter = fileWriterMap.get(eventExecutor);
        if (!(fileWriter == null || fileWriter.isValid() && (long)fileWriter.size() < this.maxFileSize)) {
            fileManager.closeFileWriter(fileWriter);
            fileWriterMap.remove(eventExecutor);
            LOG.debug("close file writer {}/{} in {}", new Object[]{fileWriter.getFileID(), fileWriter.isValid(), eventExecutor});
            fileWriter = null;
        }
        if (fileWriter == null) {
            fileWriter = fileManager.createNewFileWriter();
            fileWriterMap.put(eventExecutor, fileWriter);
            LOG.debug("create new file writer {} in {}", (Object)fileWriter.getFileID(), (Object)eventExecutor);
        }
        return fileWriter;
    }

    private void updateReadStat(long time, long size, boolean isLocal) {
        if (isLocal) {
            this.fileCacheStat.addLocalRead(size, time);
        } else {
            this.fileCacheStat.addDFSRead(size, time);
        }
    }
}

