package org.apache.flink.runtime.state.gemini.engine.filecache;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer;
import org.apache.flink.runtime.state.gemini.engine.fs.FileManager;
import org.apache.flink.runtime.state.gemini.engine.fs.FileReader;
import org.apache.flink.runtime.state.gemini.engine.fs.FileWriter;
import org.apache.flink.runtime.state.gemini.engine.metrics.FileCacheMetrics;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddressSingleImpl;
import org.apache.flink.runtime.state.gemini.engine.rm.GByteBuffer;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/filecache/NoFileCache.class */
public class NoFileCache extends FileCache implements FileCompactionPageTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(NoFileCache.class);
    private final GContext gContext;
    private final long maxFileSize;
    private final boolean syncWhenBatchFlush;
    private final FileManager dfsFileManager;
    private volatile boolean closed;
    private Map<EventExecutor, FileWriter> dfsFileWriters;

    public NoFileCache(GContext gContext, FileManager fileManager) {
        super(0L, new FileCacheStat());
        this.gContext = (GContext) Preconditions.checkNotNull(gContext);
        this.maxFileSize = gContext.getGConfiguration().getMaxFileSize();
        this.syncWhenBatchFlush = gContext.getGConfiguration().isSnapshotSyncWhenBatchFlush();
        this.dfsFileManager = (FileManager) Preconditions.checkNotNull(fileManager);
        this.dfsFileWriters = new ConcurrentHashMap();
        FileCacheMetrics fileCacheMetrics = gContext.getFileCacheMetrics();
        if (fileCacheMetrics != null) {
            fileCacheMetrics.register(this.fileCacheStat);
        }
        this.closed = false;
        LOG.info("NoFileCache created, DestFileManager {}", fileManager);
    }

    @VisibleForTesting
    public FileManager getDfsFileManager() {
        return this.dfsFileManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public boolean isCached(PageAddress pageAddress) {
        return pageAddress.isDfsValid();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void addPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, BiConsumer<Boolean, Throwable> biConsumer) {
        flushPage(pageAddress, gRegionContext, eventExecutor, false, biConsumer);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public GByteBuffer getPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        Preconditions.checkState(pageAddress.isDfsValid(), "dfs address should be valid");
        try {
            GByteBuffer gByteBuffer = getGByteBuffer(pageAddress);
            Preconditions.checkNotNull(gByteBuffer, "no page exists on dfs");
            return gByteBuffer;
        } catch (Exception e) {
            LOG.error("exception when get page, {}", e);
            throw new GeminiRuntimeException("exception when get page", e);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void discardPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor) {
        boolean isPageValid;
        Preconditions.checkArgument(pageAddress instanceof PageAddressSingleImpl);
        boolean z = false;
        long j = -1;
        synchronized (pageAddress) {
            isPageValid = pageAddress.isPageValid();
            if (isPageValid) {
                pageAddress.setPageStatus(false);
                z = pageAddress.isDfsValid();
                if (z) {
                    j = pageAddress.getDfsAddress();
                }
            }
        }
        if (isPageValid && z) {
            this.dfsFileManager.decDBReference(j, this.gContext.getAccessNumber(), System.currentTimeMillis(), pageAddress.getDataLen());
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void flushPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, boolean z, BiConsumer<Boolean, Throwable> biConsumer) {
        if (z || !pageAddress.isDfsValid()) {
            eventExecutor.execute(() -> {
                boolean z2 = false;
                GByteBuffer gByteBufferWithReference = pageAddress.getGByteBufferWithReference();
                try {
                    if (!z) {
                        try {
                        } catch (Exception e) {
                            LOG.error("error when adding page to cache: pageIsNull={}, {}", new Object[]{Boolean.valueOf(z2), e.getMessage(), e});
                            if (gByteBufferWithReference != null) {
                                gByteBufferWithReference.release();
                            }
                            if (biConsumer != null) {
                                biConsumer.accept(false, e);
                                return;
                            }
                            return;
                        }
                    }
                    if (gByteBufferWithReference == null) {
                        z2 = true;
                        if (pageAddress.isDfsValid()) {
                            gByteBufferWithReference = getGByteBuffer(pageAddress);
                        }
                    }
                    Preconditions.checkNotNull(gByteBufferWithReference, "Data page is null");
                    internalAddPage(this.dfsFileManager, getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor), pageAddress, gByteBufferWithReference, gRegionContext, false, true);
                    if (gByteBufferWithReference != null) {
                        gByteBufferWithReference.release();
                    }
                    if (biConsumer != null) {
                        biConsumer.accept(true, null);
                    }
                } catch (Throwable th) {
                    if (gByteBufferWithReference != null) {
                        gByteBufferWithReference.release();
                    }
                    if (biConsumer != null) {
                        biConsumer.accept(false, null);
                    }
                    throw th;
                }
            });
        } else if (biConsumer != null) {
            biConsumer.accept(true, null);
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void addBatchPages(List<PageAddress> list, List<GRegionContext> list2, EventExecutor eventExecutor, List<BiConsumer<Boolean, Throwable>> list3) {
        flushBatchPages(list, list2, eventExecutor, false, false, list3);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void flushBatchPages(List<PageAddress> list, List<GRegionContext> list2, EventExecutor eventExecutor, boolean z, boolean z2, List<BiConsumer<Boolean, Throwable>> list3) {
        if (list.isEmpty()) {
            return;
        }
        eventExecutor.execute(() -> {
            try {
                try {
                    int size = list.size();
                    ArrayList arrayList = new ArrayList(list.size());
                    FileWriter orCreateFileWriter = getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor);
                    for (int i = 0; i < size; i++) {
                        PageAddress pageAddress = (PageAddress) list.get(i);
                        if (z || !pageAddress.isDfsValid()) {
                            GByteBuffer gByteBufferWithReference = pageAddress.getGByteBufferWithReference();
                            if (gByteBufferWithReference == null) {
                                try {
                                    gByteBufferWithReference = getGByteBuffer(pageAddress);
                                } catch (Throwable th) {
                                    if (gByteBufferWithReference != null) {
                                        gByteBufferWithReference.release();
                                    }
                                    throw th;
                                }
                            }
                            if (gByteBufferWithReference == null) {
                                throw new GeminiRuntimeException("data page does not exist");
                            }
                            arrayList.add(Long.valueOf(writePage(this.dfsFileManager, orCreateFileWriter, pageAddress, gByteBufferWithReference, (GRegionContext) list2.get(i), false)));
                            if (gByteBufferWithReference != null) {
                                gByteBufferWithReference.release();
                            }
                        } else {
                            arrayList.add(null);
                        }
                    }
                    if (this.syncWhenBatchFlush) {
                        orCreateFileWriter.sync();
                    } else {
                        orCreateFileWriter.flush();
                    }
                    long accessNumber = ((GRegionContext) list2.get(0)).getGContext().getAccessNumber();
                    for (int i2 = 0; i2 < size; i2++) {
                        Long l = (Long) arrayList.get(i2);
                        if (l != null) {
                            updatePageAddress(this.dfsFileManager, (PageAddress) list.get(i2), l.longValue(), false, accessNumber);
                        }
                    }
                } catch (Exception e) {
                    FlushBatchPageException flushBatchPageException = new FlushBatchPageException(e);
                    Iterator it = list3.iterator();
                    while (it.hasNext()) {
                        BiConsumer biConsumer = (BiConsumer) it.next();
                        if (biConsumer != null) {
                            biConsumer.accept(false, flushBatchPageException);
                        }
                    }
                }
            } finally {
                Iterator it2 = list3.iterator();
                while (it2.hasNext()) {
                    BiConsumer biConsumer2 = (BiConsumer) it2.next();
                    if (biConsumer2 != null) {
                        biConsumer2.accept(true, null);
                    }
                }
            }
        });
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public void sync() throws IOException {
        Iterator<FileWriter> it = this.dfsFileWriters.values().iterator();
        while (it.hasNext()) {
            it.next().sync();
        }
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecache.FileCache
    public FileCache.FileCacheType getFileCacheType() {
        return FileCache.FileCacheType.NONE;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer
    public FileManager getDbFileManager() {
        return this.dfsFileManager;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer
    public boolean hasDbFileAddress(PageAddress pageAddress) {
        return pageAddress.isPageValid() && pageAddress.isDfsValid();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer
    public int getDbFileId(PageAddress pageAddress) {
        return this.dfsFileManager.getSimpleFileID(pageAddress.getDfsAddress());
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.filecompaction.FileCompactionPageTransfer
    public void transferPage(PageAddress pageAddress, GRegionContext gRegionContext, EventExecutor eventExecutor, @Nullable BiConsumer<Boolean, Throwable> biConsumer) {
        eventExecutor.execute(() -> {
            boolean z = false;
            GByteBuffer gByteBuffer = null;
            try {
                if (pageAddress.isPageValid() && pageAddress.isDfsValid()) {
                    gByteBuffer = pageAddress.getGByteBufferWithReference();
                    if (gByteBuffer == null) {
                        long dfsAddress = pageAddress.getDfsAddress();
                        gByteBuffer = this.dfsFileManager.getDataPageUtil().getDataPageFromReader(this.dfsFileManager.getFileReader(dfsAddress), (int) this.dfsFileManager.getFileOffset(dfsAddress), pageAddress);
                    }
                    internalAddPage(this.dfsFileManager, getOrCreateFileWriter(this.dfsFileWriters, this.dfsFileManager, eventExecutor), pageAddress, gByteBuffer, gRegionContext, false, true);
                    z = true;
                }
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if (biConsumer != null) {
                    biConsumer.accept(Boolean.valueOf(z), null);
                }
            } catch (Exception e) {
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if (biConsumer != null) {
                    biConsumer.accept(Boolean.valueOf(z), e);
                }
            } catch (Throwable th) {
                if (gByteBuffer != null) {
                    gByteBuffer.release();
                }
                if (biConsumer != null) {
                    biConsumer.accept(Boolean.valueOf(z), null);
                }
                throw th;
            }
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        synchronized (this) {
            if (this.closed) {
                LOG.warn("NoFileCache has been closed");
                return;
            }
            this.closed = true;
            Iterator<FileWriter> it = this.dfsFileWriters.values().iterator();
            while (it.hasNext()) {
                this.dfsFileManager.closeFileWriter(it.next());
            }
            this.dfsFileWriters.clear();
            LOG.info("NoFileCache is closed");
        }
    }

    private GByteBuffer getGByteBuffer(PageAddress pageAddress) throws Exception {
        Preconditions.checkArgument(pageAddress instanceof PageAddressSingleImpl);
        int i = 0;
        int i2 = 0;
        GByteBuffer gByteBuffer = null;
        long dfsAddress = pageAddress.getDfsAddress();
        while (true) {
            try {
                try {
                    FileReader fileReader = this.dfsFileManager.getFileReader(dfsAddress);
                    long fileOffset = this.dfsFileManager.getFileOffset(dfsAddress);
                    long nanoTime = System.nanoTime();
                    gByteBuffer = this.dfsFileManager.getDataPageUtil().getDataPageFromReader(fileReader, (int) fileOffset, pageAddress);
                    this.fileCacheStat.addDFSRead(pageAddress.getDataLen(), System.nanoTime() - nanoTime);
                    return gByteBuffer;
                } catch (Exception e) {
                    if (gByteBuffer != null) {
                        gByteBuffer.release();
                    }
                    long j = dfsAddress;
                    dfsAddress = pageAddress.getDfsAddress();
                    if (j == dfsAddress) {
                        i++;
                    } else {
                        i2++;
                    }
                    if (i >= 3 || i2 >= 10) {
                        LOG.error("get page failed, try " + i + " times unexpectedly, and try " + i2 + " times expected, last exception " + e);
                        throw e;
                    }
                    gByteBuffer = null;
                }
            } catch (Throwable th) {
                throw th;
            }
        }
        LOG.error("get page failed, try " + i + " times unexpectedly, and try " + i2 + " times expected, last exception " + e);
        throw e;
    }

    private FileWriter getOrCreateFileWriter(Map<EventExecutor, FileWriter> map, FileManager fileManager, EventExecutor eventExecutor) {
        if (this.closed) {
            throw new GeminiRuntimeException("NoFileCache has been closed");
        }
        FileWriter fileWriter = map.get(eventExecutor);
        if (fileWriter != null && (!fileWriter.isValid() || fileWriter.size() >= this.maxFileSize)) {
            fileManager.closeFileWriter(fileWriter);
            map.remove(eventExecutor);
            LOG.info("close file writer {}/{} in {}", new Object[]{fileWriter.getFileID(), Boolean.valueOf(fileWriter.isValid()), eventExecutor});
            fileWriter = null;
        }
        if (fileWriter == null) {
            fileWriter = fileManager.createNewFileWriter();
            map.put(eventExecutor, fileWriter);
            LOG.info("create new file writer {} in {}", fileWriter.getFileID(), eventExecutor);
        }
        return fileWriter;
    }
}
