/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.filecache;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCacheStat;
import org.apache.flink.runtime.state.gemini.engine.filecache.FlushBatchPageException;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileReader;
import org.apache.flink.runtime.state.gemini.engine.fs.FileWriter;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressSingleImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCountable;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NoFileCache
extends FileCache
implements FileCompactionPageTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(NoFileCache.class);
    private final GContext gContext;
    private final long maxFileSize;
    private final boolean syncWhenBatchFlush;
    private final FileManager dfsFileManager;
    private volatile boolean closed;
    private Map<EventExecutor, FileWriter> dfsFileWriters;

    public NoFileCache(GContext gContext, FileManager dfsFileManager) {
        super(0L, new FileCacheStat());
        this.gContext = (GContext)Preconditions.checkNotNull((Object)gContext);
        this.maxFileSize = gContext.getGConfiguration().getMaxFileSize();
        this.syncWhenBatchFlush = gContext.getGConfiguration().isSnapshotSyncWhenBatchFlush();
        this.dfsFileManager = (FileManager)Preconditions.checkNotNull((Object)dfsFileManager);
        this.dfsFileWriters = new ConcurrentHashMap<EventExecutor, FileWriter>();
        FileCacheMetrics fileCacheMetrics = gContext.getFileCacheMetrics();
        if (fileCacheMetrics != null) {
            fileCacheMetrics.register(this.fileCacheStat);
        }
        this.closed = false;
        LOG.info("NoFileCache created, DestFileManager {}", (Object)dfsFileManager);
    }

    @VisibleForTesting
    public FileManager getDfsFileManager() {
        return this.dfsFileManager;
    }

    @Override
    public boolean isCached(PageAddress pageAddress) {
        return pageAddress.isDfsValid();
    }

    @Override
    public void addPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, BiConsumer<Boolean, Throwable> callBack) {
        this.flushPage(pageAddress, gRegionContext, eventExecutor, false, callBack);
    }

    @Override
    public GByteBuffer getPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        Preconditions.checkState((boolean)pageAddress.isDfsValid(), (Object)"dfs address should be valid");
        try {
            GByteBuffer dataPage = this.getGByteBuffer(pageAddress);
            Preconditions.checkNotNull((Object)dataPage, (String)"no page exists on dfs");
            return dataPage;
        }
        catch (Exception e) {
            LOG.error("exception when get page, {}", (Throwable)e);
            throw new GeminiRuntimeException("exception when get page", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void discardPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        boolean pageValid;
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
        boolean dfsValid = false;
        long dfsAddress = -1L;
        PageAddress pageAddress2 = pageAddress;
        synchronized (pageAddress2) {
            pageValid = pageAddress.isPageValid();
            if (pageValid) {
                pageAddress.setPageStatus(false);
                dfsValid = pageAddress.isDfsValid();
                if (dfsValid) {
                    dfsAddress = pageAddress.getDfsAddress();
                }
            }
        }
        if (pageValid && dfsValid) {
            long accessNumber = this.gContext.getAccessNumber();
            long ts = System.currentTimeMillis();
            this.dfsFileManager.decDBReference(dfsAddress, accessNumber, ts, pageAddress.getDataLen());
        }
    }

    @Override
    public void flushPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, boolean force, BiConsumer<Boolean, Throwable> callBack) {
        if (!force && pageAddress.isDfsValid()) {
            if (callBack != null) {
                callBack.accept(true, null);
            }
            return;
        }
        eventExecutor.execute(() -> {
            boolean success = false;
            Exception throwable = null;
            boolean pageIsNull = false;
            GByteBuffer gByteBuffer = pageAddress.getGByteBufferWithReference();
            try {
                if (force || !pageAddress.isDfsValid()) {
                    if (gByteBuffer == null) {
                        pageIsNull = true;
                        if (pageAddress.isDfsValid()) {
                            gByteBuffer = this.getGByteBuffer(pageAddress);
                        }
                    }
                    Preconditions.checkNotNull((Object)gByteBuffer, (String)"Data page is null");
                    FileWriter fileWriter = this.getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                    this.internalAddPage(this.dfsFileManager, fileWriter, pageAddress, gByteBuffer, gRegionContext, false, true);
                }
                success = true;
            }
            catch (Exception e) {
                success = false;
                throwable = e;
                LOG.error("error when adding page to cache: pageIsNull={}, {}", new Object[]{pageIsNull, e.getMessage(), e});
            }
            finally {
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void addBatchPages(List<PageAddress> pages, List<GRegionContext> gRegionContext, EventExecutor eventExecutor, List<BiConsumer<Boolean, Throwable>> callBacks) {
        this.flushBatchPages(pages, gRegionContext, eventExecutor, false, false, callBacks);
    }

    @Override
    public void flushBatchPages(List<PageAddress> pages, List<GRegionContext> gRegionContexts, EventExecutor eventExecutor, boolean force, boolean flushLocal, List<BiConsumer<Boolean, Throwable>> callBacks) {
        if (pages.isEmpty()) {
            return;
        }
        eventExecutor.execute(() -> {
            boolean success = true;
            FlushBatchPageException throwable = null;
            try {
                int size = pages.size();
                ArrayList<Long> addressList = new ArrayList<Long>(pages.size());
                FileWriter fileWriter = this.getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                for (int i = 0; i < size; ++i) {
                    PageAddress page = (PageAddress)pages.get(i);
                    if (force || !page.isDfsValid()) {
                        GByteBuffer buffer = page.getGByteBufferWithReference();
                        try {
                            if (buffer == null) {
                                buffer = this.getGByteBuffer(page);
                            }
                            if (buffer != null) {
                                long address = this.writePage(this.dfsFileManager, fileWriter, page, buffer, (GRegionContext)gRegionContexts.get(i), false);
                                addressList.add(address);
                                continue;
                            }
                            throw new GeminiRuntimeException("data page does not exist");
                        }
                        finally {
                            if (buffer != null) {
                                buffer.release();
                            }
                        }
                    }
                    addressList.add(null);
                }
                if (this.syncWhenBatchFlush) {
                    fileWriter.sync();
                } else {
                    fileWriter.flush();
                }
                long accessNumber = ((GRegionContext)gRegionContexts.get(0)).getGContext().getAccessNumber();
                for (int i = 0; i < size; ++i) {
                    Long address = (Long)addressList.get(i);
                    if (address == null) continue;
                    this.updatePageAddress(this.dfsFileManager, (PageAddress)pages.get(i), address, false, accessNumber);
                }
            }
            catch (Exception e) {
                success = false;
                throwable = new FlushBatchPageException(e);
            }
            finally {
                for (BiConsumer callBack : callBacks) {
                    if (callBack == null) continue;
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    @Override
    public void sync() throws IOException {
        for (FileWriter fileWriter : this.dfsFileWriters.values()) {
            fileWriter.sync();
        }
    }

    @Override
    public FileCache.FileCacheType getFileCacheType() {
        return FileCache.FileCacheType.NONE;
    }

    @Override
    public FileManager getDbFileManager() {
        return this.dfsFileManager;
    }

    @Override
    public boolean hasDbFileAddress(PageAddress pageAddress) {
        return pageAddress.isPageValid() && pageAddress.isDfsValid();
    }

    @Override
    public int getDbFileId(PageAddress pageAddress) {
        return this.dfsFileManager.getSimpleFileID(pageAddress.getDfsAddress());
    }

    @Override
    public void transferPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, @Nullable BiConsumer<Boolean, Throwable> callBack) {
        eventExecutor.execute(() -> {
            boolean success = false;
            Exception throwable = null;
            ReferenceCountable buffer = null;
            try {
                if (pageAddress.isPageValid() && pageAddress.isDfsValid()) {
                    buffer = pageAddress.getGByteBufferWithReference();
                    if (buffer == null) {
                        long dfsAddress = pageAddress.getDfsAddress();
                        FileReader fileReader = this.dfsFileManager.getFileReader(dfsAddress);
                        long offset = this.dfsFileManager.getFileOffset(dfsAddress);
                        buffer = this.dfsFileManager.getDataPageUtil().getDataPageFromReader(fileReader, (int)offset, pageAddress);
                    }
                    FileWriter fileWriter = this.getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                    this.internalAddPage(this.dfsFileManager, fileWriter, pageAddress, (GByteBuffer)buffer, gRegionContext, false, true);
                    success = true;
                }
            }
            catch (Exception e) {
                throwable = e;
            }
            finally {
                if (buffer != null) {
                    buffer.release();
                }
                if (callBack != null) {
                    callBack.accept(success, throwable);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        NoFileCache noFileCache = this;
        synchronized (noFileCache) {
            if (this.closed) {
                LOG.warn("NoFileCache has been closed");
                return;
            }
            this.closed = true;
        }
        for (FileWriter fileWriter : this.dfsFileWriters.values()) {
            this.dfsFileManager.closeFileWriter(fileWriter);
        }
        this.dfsFileWriters.clear();
        LOG.info("NoFileCache is closed");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private GByteBuffer getGByteBuffer(PageAddress pageAddress) throws Exception {
        Preconditions.checkArgument((boolean)(pageAddress instanceof PageAddressSingleImpl));
        int unexpectedTries = 0;
        int expectedTries = 0;
        ReferenceCountable gByteBuffer = null;
        long address = pageAddress.getDfsAddress();
        while (true) {
            try {
                FileReader fileReader = this.dfsFileManager.getFileReader(address);
                long offset = this.dfsFileManager.getFileOffset(address);
                long startTime = System.nanoTime();
                gByteBuffer = this.dfsFileManager.getDataPageUtil().getDataPageFromReader(fileReader, (int)offset, pageAddress);
                this.fileCacheStat.addDFSRead(pageAddress.getDataLen(), System.nanoTime() - startTime);
                ReferenceCountable referenceCountable = gByteBuffer;
                return referenceCountable;
            }
            catch (Exception e) {
                long oldAddress;
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if ((oldAddress = address) == (address = pageAddress.getDfsAddress())) {
                    ++unexpectedTries;
                } else {
                    ++expectedTries;
                }
                if (unexpectedTries < 3 && expectedTries < 10) continue;
                LOG.error("get page failed, try " + unexpectedTries + " times unexpectedly, and try " + expectedTries + " times expected, last exception " + e);
                throw e;
            }
            finally {
                gByteBuffer = null;
                continue;
            }
            break;
        }
    }

    private FileWriter getOrCreateFileWriter(Map<EventExecutor, FileWriter> fileWriterMap, FileManager fileManager, EventExecutor eventExecutor) {
        if (this.closed) {
            throw new GeminiRuntimeException("NoFileCache has been closed");
        }
        FileWriter fileWriter = fileWriterMap.get(eventExecutor);
        if (!(fileWriter == null || fileWriter.isValid() && (long)fileWriter.size() < this.maxFileSize)) {
            fileManager.closeFileWriter(fileWriter);
            fileWriterMap.remove(eventExecutor);
            LOG.info("close file writer {}/{} in {}", new Object[]{fileWriter.getFileID(), fileWriter.isValid(), eventExecutor});
            fileWriter = null;
        }
        if (fileWriter == null) {
            fileWriter = fileManager.createNewFileWriter();
            fileWriterMap.put(eventExecutor, fileWriter);
            LOG.info("create new file writer {} in {}", (Object)fileWriter.getFileID(), (Object)eventExecutor);
        }
        return fileWriter;
    }
}

