/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiShutDownException;
import org.apache.flink.runtime.state.gemini.engine.filecache.FileCache;
import org.apache.flink.runtime.state.gemini.engine.handler.PageHandler;
import org.apache.flink.runtime.state.gemini.engine.memstore.Segment;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBuffer;
import org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.page.PageIndex;
import org.apache.flink.runtime.state.gemini.engine.page.PageStore;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCount;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompletableFuture;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotStat;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractWriteBuffer<K, V>
implements WriteBuffer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteBuffer.class);
    protected final EventExecutor eventExecutor;
    protected final GRegionContext gRegionContext;
    protected long segmentID = 0L;
    protected final GRegion gRegion;
    protected final PageStore<K, V> pageStore;
    private final WriteBufferManager writeBufferManager;
    private long printTS = System.currentTimeMillis();
    private CompletableFuture lastFuture;

    public AbstractWriteBuffer(GRegion gRegion, EventExecutor eventExecutor, PageStore<K, V> pageStore) {
        this.gRegionContext = gRegion.getGRegionContext();
        this.gRegion = gRegion;
        this.eventExecutor = eventExecutor;
        this.pageStore = pageStore;
        this.writeBufferManager = this.gRegionContext.getGContext().getSupervisor().getWriteBufferManager();
    }

    @Override
    public EventExecutor getExecutor() {
        return this.eventExecutor;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkResource() {
        long startTime = System.currentTimeMillis();
        if (this.printTS + 60000L < startTime) {
            this.printTS = startTime;
            if (LOG.isDebugEnabled()) {
                LOG.debug("writeBufferStats: {}", (Object)this.gRegionContext.getWriteBufferStats());
                LOG.debug("pageStoreStats: {}", (Object)this.gRegionContext.getPageStoreStats());
            }
        }
        if (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() < 0) {
            if (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() == -2) {
                this.lastFuture = new CompletableFuture();
                this.gRegionContext.getWriteBufferStats().setAverageKeyLen(-1);
                Segment segmentCopy = this.getActiveSegment().copySegment();
                this.eventExecutor.execute(() -> {
                    this.createPageHandler(segmentCopy, true).handle();
                    this.lastFuture.complete(null);
                });
            }
            if (this.getActiveSegment().getRecordCount() > 1000) {
                if (!this.lastFuture.isDone()) {
                    try {
                        this.lastFuture.get(10L, TimeUnit.MILLISECONDS);
                    }
                    catch (Exception e) {
                        return;
                    }
                }
            } else {
                return;
            }
        }
        int writeBufferEstimatedSize = this.getEstimatedSize(this.getActiveSegment().getRecordCount());
        int totalWriteBufferEstimatedSize = writeBufferEstimatedSize + this.getEstimatedSize(this.writeBufferManager.getTotalRecordCount());
        if (!((long)writeBufferEstimatedSize >= this.gRegionContext.getWriteBufferWaterMark() || (long)totalWriteBufferEstimatedSize >= this.writeBufferManager.getTotalMemSize() && this.writeBufferManager.isBestChoiceWriteBufferFlushing(this))) {
            return;
        }
        GContext gContext = this.gRegionContext.getGContext();
        long waitTime = System.currentTimeMillis() - startTime;
        while (this.gRegionContext.getWriteBufferStats().getFlushingSegmentCount() >= gContext.getGConfiguration().getNumFlushingSegment()) {
            gContext.checkDBStatus();
            if (this.writeBufferManager.canFlushWriteBuffer(this)) break;
            AbstractWriteBuffer abstractWriteBuffer = this;
            synchronized (abstractWriteBuffer) {
                try {
                    this.wait(1L);
                    this.writeBufferManager.increaseWriteBufferFlushBlock();
                }
                catch (InterruptedException e) {
                    throw new GeminiRuntimeException(e);
                }
            }
        }
        if (waitTime > 10L) {
            LOG.info("too much flushing segment or evict too long, wait time ={} ...", (Object)waitTime);
        }
        Segment rs = this.addFlushingSegment();
        this.gRegionContext.getWriteBufferStats().addTotalFlushingRecordCount(rs.getRecordCount());
        this.gRegionContext.getWriteBufferStats().addTotalRecordCount(-rs.getRecordCount());
        this.doSegmentFlush(rs, writeBufferEstimatedSize);
    }

    void doSegmentFlush(Segment segment, int estimatedSize) {
        this.eventExecutor.execute(() -> {
            try {
                PageHandler pageHandler = this.createPageHandler(segment, false);
                pageHandler.handle();
                this.endSegmentFlush(segment.getSegmentID());
            }
            catch (GeminiShutDownException ignore) {
                LOG.debug("gemini has shutdown", (Throwable)ignore);
            }
            catch (Exception e) {
                LOG.error("Flush segment failed", (Throwable)e);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void endSegmentFlush(long segmentID) {
        Segment segment = this.pollFlushingSegment();
        Preconditions.checkArgument((segment != null ? 1 : 0) != 0, (Object)"error segment!");
        Preconditions.checkArgument((segment.getSegmentID() == segmentID ? 1 : 0) != 0, (Object)"error segment!");
        this.gRegionContext.getWriteBufferStats().addFlushingSegmentCount(-1);
        this.gRegionContext.getWriteBufferStats().addTotalFlushingRecordCount(-segment.getRecordCount());
        AbstractWriteBuffer abstractWriteBuffer = this;
        synchronized (abstractWriteBuffer) {
            this.notify();
        }
    }

    public abstract Segment getActiveSegment();

    abstract Segment addFlushingSegment();

    abstract Segment pollFlushingSegment();

    abstract PageHandler createPageHandler(Segment var1, boolean var2);

    private int getEstimatedSize(long elementSize) {
        if (this.gRegionContext.getWriteBufferStats().getAverageKeyLen() <= 0) {
            LOG.error("Let's see whether it will happen!");
            return 0;
        }
        return (int)(this.gRegionContext.getPageStoreStats().getPageSizeRate() * (float)(this.gRegionContext.getWriteBufferStats().getAverageKeyLen() + this.gRegionContext.getWriteBufferStats().getAverageValueLen()) * (float)elementSize);
    }

    @Override
    public void doSnapshot(SnapshotOperation snapshotOperation) {
        SnapshotManager.PendingSnapshot pendingSnapshot = snapshotOperation.getPendingSnapshot();
        long checkpointId = pendingSnapshot.getCheckpointId();
        SnapshotCompletableFuture snapshotCompletableFuture = pendingSnapshot.getResultFuture();
        if (snapshotCompletableFuture.isEndSnapshot()) {
            return;
        }
        boolean isLocalSnapshotEnabled = this.gRegionContext.getGContext().getGConfiguration().isLocalSnapshotEnabled();
        snapshotCompletableFuture.incRunningTask();
        try {
            Segment rs = this.addFlushingSegment();
            PageHandler pageHandler = this.createPageHandler(rs, false);
            this.gRegionContext.getWriteBufferStats().addTotalFlushingRecordCount(rs.getRecordCount());
            this.gRegionContext.getWriteBufferStats().addTotalRecordCount(-rs.getRecordCount());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Start to snapshot write buffer for {}.", (Object)checkpointId);
            }
            this.eventExecutor.execute(() -> {
                HashMap<PageAddress, DataPage> allAddReferenceDataPage = new HashMap<PageAddress, DataPage>();
                try {
                    pageHandler.handle();
                    this.endSegmentFlush(rs.getSegmentID());
                    PageIndex<K> copyPageIndex = this.pageStore.getPageIndex().deepCopy(allAddReferenceDataPage);
                    pendingSnapshot.addGRegionSnapshotMeta(this.gRegionContext.getTableName(), this.gRegionContext.getRegionId(), copyPageIndex, this.gRegionContext.getLastSeqID(), this.gRegionContext.getRemoveAllSeqID());
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Segment flush and pageIndex copy done for {}, will start to flush.", (Object)checkpointId);
                    }
                    snapshotCompletableFuture.incRunningTask();
                    this.gRegionContext.getGContext().getSupervisor().getSnapshotManager().getSnapshotExecutor().execute(() -> {
                        try {
                            Iterator<PageAddress> pageIterator = copyPageIndex.pageIterator();
                            EventExecutorGroup snapshotEventExecutorGroup = this.gRegionContext.getGContext().getSupervisor().getSnapshotExecutorGroup();
                            FileCache fileCache = this.gRegionContext.getGContext().getSupervisor().getFileCache();
                            int totalPage = 0;
                            long totalSize = 0L;
                            int incrementalPages = 0;
                            long incrementalSize = 0L;
                            int totalLocalPage = 0;
                            long totalLocalSize = 0L;
                            int localIncrementalPages = 0;
                            long localIncrementalSize = 0L;
                            while (this.gRegionContext.getGContext().isDBNormal() && pageIterator.hasNext() && !snapshotCompletableFuture.isEndSnapshot()) {
                                PageAddress pageAddress = pageIterator.next();
                                try {
                                    EventExecutor snapshotEventExecutor = snapshotEventExecutorGroup.next();
                                    snapshotCompletableFuture.incRunningTask();
                                    if (!pageAddress.isDfsValid()) {
                                        ++incrementalPages;
                                        incrementalSize += (long)pageAddress.getDataLen();
                                    }
                                    fileCache.flushPage(pageAddress, this.gRegionContext, snapshotEventExecutor, false, (success, throwable) -> {
                                        if (!success.booleanValue()) {
                                            LOG.error("Write error when snapshot dfs.");
                                            snapshotCompletableFuture.setEndSnapshot();
                                            snapshotCompletableFuture.completeExceptionally((Throwable)throwable);
                                        }
                                        snapshotCompletableFuture.decRunningTask();
                                    });
                                    ++totalPage;
                                    totalSize += (long)pageAddress.getDataLen();
                                    if (isLocalSnapshotEnabled) {
                                        snapshotCompletableFuture.incRunningTask();
                                        if (!pageAddress.isLocalValid()) {
                                            ++localIncrementalPages;
                                            localIncrementalSize += (long)pageAddress.getDataLen();
                                        }
                                        fileCache.addPage(pageAddress, this.gRegionContext, snapshotEventExecutor, (success, throwable) -> {
                                            if (!success.booleanValue()) {
                                                LOG.error("Write error when snapshot local.");
                                                snapshotCompletableFuture.setEndSnapshot();
                                                snapshotCompletableFuture.completeExceptionally((Throwable)throwable);
                                            }
                                            snapshotCompletableFuture.decRunningTask();
                                        });
                                        ++totalLocalPage;
                                        totalLocalSize += (long)pageAddress.getDataLen();
                                    }
                                    pageIterator.remove();
                                }
                                finally {
                                    DataPage dataPage = (DataPage)allAddReferenceDataPage.remove(pageAddress);
                                    if (dataPage == null) continue;
                                    dataPage.delReferenceCount(ReferenceCount.ReleaseType.Normal);
                                }
                            }
                            SnapshotStat snapshotStat = pendingSnapshot.getSnapshotStat();
                            snapshotStat.addAndGetTotalPages(totalPage);
                            snapshotStat.addAndGetTotalSize(totalSize);
                            snapshotStat.addAndGetIncrementalPages(incrementalPages);
                            snapshotStat.addAndGetIncrementalSize(incrementalSize);
                            snapshotStat.addAndGetTotalLocalPages(totalLocalPage);
                            snapshotStat.addAndGetTotalLocalSize(totalLocalSize);
                            snapshotStat.addAndGetLocalIncrementalPages(localIncrementalPages);
                            snapshotStat.addAndGetLocalIncrementalSize(localIncrementalSize);
                            snapshotCompletableFuture.decRunningTask();
                        }
                        finally {
                            allAddReferenceDataPage.values().forEach(datapage -> datapage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
                        }
                    });
                }
                catch (Exception e) {
                    snapshotCompletableFuture.setEndSnapshot();
                    snapshotCompletableFuture.completeExceptionally(e);
                    allAddReferenceDataPage.values().forEach(datapage -> datapage.delReferenceCount(ReferenceCount.ReleaseType.Normal));
                    LOG.error("Page handle error for {} with exception {}.", new Object[]{checkpointId, e.getMessage(), e});
                }
                finally {
                    snapshotCompletableFuture.decRunningTask();
                }
            });
        }
        catch (Exception e) {
            snapshotCompletableFuture.decRunningTask();
            snapshotCompletableFuture.setEndSnapshot();
            snapshotCompletableFuture.completeExceptionally(e);
            LOG.error("add flushing segment failed with exception {}", (Throwable)e);
            throw e;
        }
    }
}

