package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiShutDownException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.filecache.PageBatchFlusher;
import org.apache.flink.runtime.state.gemini.engine.handler.PageHandler;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompletableFuture;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotStat;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/memstore/AbstractWriteBuffer.class */
public abstract class AbstractWriteBuffer<K, V> implements WriteBuffer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteBuffer.class);
    protected final EventExecutor eventExecutor;
    protected final GRegionContext gRegionContext;
    protected final GRegion gRegion;
    protected final PageStore<K, V> pageStore;
    private final WriteBufferManager writeBufferManager;
    private CompletableFuture lastFuture;
    protected long segmentID = 0;
    private long printTS = System.currentTimeMillis();

    public AbstractWriteBuffer(GRegion gRegion, EventExecutor eventExecutor, PageStore<K, V> pageStore) {
        this.gRegionContext = gRegion.getGRegionContext();
        this.gRegion = gRegion;
        this.eventExecutor = eventExecutor;
        this.pageStore = pageStore;
        this.writeBufferManager = this.gRegionContext.getGContext().getSupervisor().getWriteBufferManager();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.dbms.Executor
    public EventExecutor getExecutor() {
        return this.eventExecutor;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkResource() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.printTS + 60000 < currentTimeMillis) {
            this.printTS = currentTimeMillis;
            if (LOG.isDebugEnabled()) {
                LOG.debug("writeBufferStats: {}", this.gRegionContext.getWriteBufferStats());
                LOG.debug("pageStoreStats: {}", this.gRegionContext.getPageStoreStats());
            }
        }
        if (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() < 0) {
            if (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() == -2) {
                this.lastFuture = new CompletableFuture();
                this.gRegionContext.getWriteBufferStats().setAverageKeyLen(-1);
                Segment<K, V> copySegment = getActiveSegment().copySegment();
                this.eventExecutor.execute(() -> {
                    createPageHandler(copySegment, true).handle();
                    this.lastFuture.complete(null);
                });
            }
            if (getActiveSegment().getRecordCount() <= 1000) {
                return;
            }
            if (!this.lastFuture.isDone()) {
                try {
                    this.lastFuture.get(10L, TimeUnit.MILLISECONDS);
                } catch (Exception e) {
                    return;
                }
            }
        }
        int estimatedSize = getEstimatedSize(getActiveSegment().getRecordCount());
        int estimatedSize2 = estimatedSize + getEstimatedSize(this.writeBufferManager.getTotalRecordCount());
        if (estimatedSize >= this.gRegionContext.getWriteBufferWaterMark() || (estimatedSize2 >= this.writeBufferManager.getTotalMemSize() && this.writeBufferManager.isBestChoiceWriteBufferFlushing(this))) {
            GContext gContext = this.gRegionContext.getGContext();
            while (this.gRegionContext.getWriteBufferStats().getFlushingSegmentCount() >= gContext.getGConfiguration().getNumFlushingSegment()) {
                gContext.checkDBStatus();
                if (this.writeBufferManager.canFlushWriteBuffer(this)) {
                    break;
                }
                synchronized (this) {
                    try {
                        wait(1L);
                        this.writeBufferManager.increaseWriteBufferFlushBlock();
                    } catch (InterruptedException e2) {
                        throw new GeminiRuntimeException(e2);
                    }
                }
            }
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            if (currentTimeMillis2 > 10) {
                LOG.info("too much flushing segment or evict too long, wait time ={} ...", Long.valueOf(currentTimeMillis2));
            }
            Segment addFlushingSegment = addFlushingSegment();
            this.gRegionContext.getWriteBufferStats().addTotalFlushingRecordCount(addFlushingSegment.getRecordCount());
            this.gRegionContext.getWriteBufferStats().addTotalRecordCount(-addFlushingSegment.getRecordCount());
            doSegmentFlush(addFlushingSegment, estimatedSize);
        }
    }

    void doSegmentFlush(Segment segment, int i) {
        this.eventExecutor.execute(() -> {
            try {
                createPageHandler(segment, false).handle();
                endSegmentFlush(segment.getSegmentID());
            } catch (GeminiShutDownException e) {
                LOG.debug("gemini has shutdown", e);
            } catch (Exception e2) {
                LOG.error("Internal Bug. Flush segment failed", e2);
                this.gRegionContext.getGContext().setDBInternalError(e2);
            }
        });
    }

    private void endSegmentFlush(long j) {
        Segment pollFlushingSegment = pollFlushingSegment();
        Preconditions.checkArgument(pollFlushingSegment != null, "error segment!");
        Preconditions.checkArgument(pollFlushingSegment.getSegmentID() == j, "error segment!");
        this.gRegionContext.getWriteBufferStats().addFlushingSegmentCount(-1);
        this.gRegionContext.getWriteBufferStats().addTotalFlushingRecordCount(-pollFlushingSegment.getRecordCount());
        synchronized (this) {
            notify();
        }
    }

    public abstract Segment getActiveSegment();

    abstract Segment addFlushingSegment();

    abstract Segment pollFlushingSegment();

    abstract PageHandler createPageHandler(Segment segment, boolean z);

    private int getEstimatedSize(long j) {
        if (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() > 0) {
            return (int) (this.gRegionContext.getPageStoreStats().getPageSizeRate() * (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() + this.gRegionContext.getWriteBufferStats().getAverageValueLen()) * ((float) j));
        }
        LOG.error("Let's see whether it will happen!");
        return 0;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer
    public void doSnapshot(SnapshotOperation snapshotOperation) {
        SnapshotManager.PendingSnapshot pendingSnapshot = snapshotOperation.getPendingSnapshot();
        long checkpointId = pendingSnapshot.getCheckpointId();
        SnapshotCompletableFuture resultFuture = pendingSnapshot.getResultFuture();
        if (resultFuture.isEndSnapshot()) {
            return;
        }
        boolean isLocalSnapshotEnabled = this.gRegionContext.getGContext().getGConfiguration().isLocalSnapshotEnabled();
        resultFuture.incRunningTask();
        try {
            Segment addFlushingSegment = addFlushingSegment();
            PageHandler createPageHandler = createPageHandler(addFlushingSegment, false);
            this.gRegionContext.getWriteBufferStats().addTotalFlushingRecordCount(addFlushingSegment.getRecordCount());
            this.gRegionContext.getWriteBufferStats().addTotalRecordCount(-addFlushingSegment.getRecordCount());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Start to snapshot write buffer for {}.", Long.valueOf(checkpointId));
            }
            this.eventExecutor.execute(() -> {
                HashMap hashMap = new HashMap();
                try {
                    try {
                        createPageHandler.handle();
                        endSegmentFlush(addFlushingSegment.getSegmentID());
                        PageIndex<K> copy = this.pageStore.getPageIndex().copy(hashMap);
                        pendingSnapshot.addGRegionSnapshotMeta(this.gRegionContext, copy);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Segment flush and pageIndex copy done for {}, will start to flush.", Long.valueOf(checkpointId));
                        }
                        resultFuture.incRunningTask();
                        this.gRegionContext.getGContext().getSupervisor().getSnapshotManager().getSnapshotExecutor().execute(() -> {
                            PageBatchFlusher dfsPageBatchFlusher = pendingSnapshot.getDfsPageBatchFlusher();
                            EventExecutorGroup flushExecutorGroup = this.gRegionContext.getGContext().getSupervisor().getFlushExecutorGroup();
                            FileCache fileCache = this.gRegionContext.getGContext().getSupervisor().getFileCache();
                            try {
                                Iterator<PageAddress> pageIterator = copy.pageIterator();
                                int i = 0;
                                int i2 = 0;
                                long j = 0;
                                long j2 = 0;
                                int i3 = 0;
                                int i4 = 0;
                                long j3 = 0;
                                long j4 = 0;
                                while (this.gRegionContext.getGContext().isDBNormal() && pageIterator.hasNext() && !resultFuture.isEndSnapshot()) {
                                    PageAddress next = pageIterator.next();
                                    try {
                                        EventExecutor next2 = flushExecutorGroup.next();
                                        resultFuture.incRunningTask();
                                        if (isLocalSnapshotEnabled) {
                                            resultFuture.incRunningTask();
                                            if (!next.isLocalValid()) {
                                                i4++;
                                                j4 += next.getDataLen();
                                            }
                                            fileCache.addPage(next, this.gRegionContext, next2, (bool, th) -> {
                                                if (!bool.booleanValue()) {
                                                    LOG.error("Write error when snapshot local.");
                                                    resultFuture.setEndSnapshot();
                                                    resultFuture.completeExceptionally(th);
                                                }
                                                resultFuture.decRunningTask();
                                            });
                                            i2++;
                                            j2 += next.getDataLen();
                                        }
                                        if (snapshotOperation.isForceFlushPage() || !next.isDfsValid()) {
                                            i3++;
                                            j3 += next.getDataLen();
                                        } else {
                                            pendingSnapshot.getSnapshotCompaction().recordSharedPage(next);
                                        }
                                        dfsPageBatchFlusher.addPage(next, this.gRegionContext, (bool2, th2) -> {
                                            if (!bool2.booleanValue()) {
                                                LOG.error("Write error when snapshot dfs.");
                                                resultFuture.setEndSnapshot();
                                                resultFuture.completeExceptionally(th2);
                                            }
                                            resultFuture.decRunningTask();
                                        });
                                        i++;
                                        j += next.getDataLen();
                                        DataPage dataPage = (DataPage) hashMap.remove(next);
                                        if (dataPage != null) {
                                            dataPage.release();
                                        }
                                    } catch (Throwable th3) {
                                        DataPage dataPage2 = (DataPage) hashMap.remove(next);
                                        if (dataPage2 != null) {
                                            dataPage2.release();
                                        }
                                        throw th3;
                                    }
                                }
                                SnapshotStat snapshotStat = pendingSnapshot.getSnapshotStat();
                                snapshotStat.addAndGetTotalPages(i);
                                snapshotStat.addAndGetTotalSize(j);
                                snapshotStat.addAndGetIncrementalPages(i3);
                                snapshotStat.addAndGetIncrementalSize(j3);
                                snapshotStat.addAndGetTotalLocalPages(i2);
                                snapshotStat.addAndGetTotalLocalSize(j2);
                                snapshotStat.addAndGetLocalIncrementalPages(i4);
                                snapshotStat.addAndGetLocalIncrementalSize(j4);
                                dfsPageBatchFlusher.flush();
                                resultFuture.decRunningTask();
                                hashMap.values().forEach(dataPage3 -> {
                                    dataPage3.release();
                                });
                            } catch (Throwable th4) {
                                dfsPageBatchFlusher.flush();
                                resultFuture.decRunningTask();
                                hashMap.values().forEach(dataPage32 -> {
                                    dataPage32.release();
                                });
                                throw th4;
                            }
                        });
                        resultFuture.decRunningTask();
                    } catch (Exception e) {
                        resultFuture.setEndSnapshot();
                        resultFuture.completeExceptionally(e);
                        hashMap.values().forEach(dataPage -> {
                            dataPage.release();
                        });
                        LOG.error("Page handle error for {} with exception.", Long.valueOf(checkpointId), e);
                        resultFuture.decRunningTask();
                    }
                } catch (Throwable th) {
                    resultFuture.decRunningTask();
                    throw th;
                }
            });
        } catch (Exception e) {
            resultFuture.decRunningTask();
            resultFuture.setEndSnapshot();
            resultFuture.completeExceptionally(e);
            LOG.error("add flushing segment failed with exception {}", e);
            throw e;
        }
    }
}
