package org.apache.flink.runtime.state.gemini.engine.memstore;

import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GTable;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.metrics.HandlerMetrics;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotCompletableFuture;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotManager;
import org.apache.flink.runtime.state.gemini.engine.snapshot.SnapshotOperation;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/memstore/WriteBufferManagerImpl.class */
public class WriteBufferManagerImpl implements WriteBufferManager {
    private static final Logger LOG = LoggerFactory.getLogger(WriteBufferManagerImpl.class);
    private static final long MIN_WRITE_BUFFER_TOTAL_SIZE = 67108864;
    private final GContext gContext;
    private final long maxTotalWriteBufferSize;
    private final int regionCount;
    private final int totalFlushingLimitPerTable;
    private final AtomicLong totalWriteBufferFlushBlock = new AtomicLong(0);
    private final AtomicLong totalWriteBufferRecordCount = new AtomicLong(0);
    private final AtomicLong totalWriteBufferFlushingRecordCount = new AtomicLong(0);
    private final AtomicLong totalWriteBufferFlushingSegmentCount = new AtomicLong(0);
    private final AtomicInteger tableCount = new AtomicInteger(0);

    public WriteBufferManagerImpl(GContext gContext) {
        this.gContext = (GContext) Preconditions.checkNotNull(gContext);
        long totalHeapMemSize = ((float) gContext.getGConfiguration().getTotalHeapMemSize()) * gContext.getGConfiguration().getTotalWriteBufferRate();
        this.maxTotalWriteBufferSize = totalHeapMemSize < MIN_WRITE_BUFFER_TOTAL_SIZE ? 67108864L : totalHeapMemSize;
        this.regionCount = (gContext.getEndRegionId() - gContext.getStartRegionId()) + 1;
        this.totalFlushingLimitPerTable = (int) (gContext.getGConfiguration().getNumFlushingSegment() * this.regionCount * gContext.getGConfiguration().getTotalNumFlushingSegmentRatio());
        LOG.info("WriteBufferManager maxTotalWriteBufferSize={}, regionCount={}, totalFlushingLimitPerTable={}", new Object[]{Long.valueOf(this.maxTotalWriteBufferSize), Integer.valueOf(this.regionCount), Integer.valueOf(this.totalFlushingLimitPerTable)});
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public void increaseWriteBufferFlushBlock() {
        this.totalWriteBufferFlushBlock.addAndGet(1L);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public void addTotalRecordCount(int i) {
        this.totalWriteBufferRecordCount.addAndGet(i);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public long getTotalRecordCount() {
        return this.totalWriteBufferRecordCount.get();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public void addTotalFlushingRecordCount(int i) {
        this.totalWriteBufferFlushingRecordCount.addAndGet(i);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public void addTotalFlushingSegmentCount(int i) {
        this.totalWriteBufferFlushingSegmentCount.addAndGet(i);
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public long getTotalMemSize() {
        return this.maxTotalWriteBufferSize;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public boolean isBestChoiceWriteBufferFlushing(WriteBuffer writeBuffer) {
        return true;
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public boolean canFlushWriteBuffer(WriteBuffer writeBuffer) {
        int i = this.tableCount.get();
        return ((long) (this.totalFlushingLimitPerTable * (i == 0 ? 1 : i))) > this.totalWriteBufferFlushingSegmentCount.get();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public void addTableNum(String str) {
        this.tableCount.addAndGet(1);
        LOG.info("WriteBufferManager add tableName={}, now totalTables={}", str, Integer.valueOf(this.tableCount.get()));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.memstore.WriteBufferManager
    public void doSnapshot(SnapshotOperation snapshotOperation) {
        SnapshotManager.PendingSnapshot pendingSnapshot = snapshotOperation.getPendingSnapshot();
        long checkpointId = pendingSnapshot.getCheckpointId();
        SnapshotCompletableFuture resultFuture = pendingSnapshot.getResultFuture();
        resultFuture.incRunningTask();
        LOG.info("WriteBufferManagerImpl start to snapshot for {}", Long.valueOf(checkpointId));
        for (GTable gTable : this.gContext.getGeminiDB().getGeminiTableMap().values()) {
            Iterator<GRegion> regionIterator = gTable.regionIterator();
            while (regionIterator.hasNext()) {
                regionIterator.next().getWriteBuffer().doSnapshot(snapshotOperation);
            }
            Iterator<GRegion> indexRegionIterator = gTable.indexRegionIterator();
            while (indexRegionIterator.hasNext()) {
                indexRegionIterator.next().getWriteBuffer().doSnapshot(snapshotOperation);
            }
        }
        resultFuture.decRunningTask();
        LOG.info("WriteBufferManagerImpl end snapshot for {}", Long.valueOf(checkpointId));
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.metrics.MetricsRegisterAble
    public void registerMetrics(MetricGroup metricGroup) {
        metricGroup.gauge(HandlerMetrics.TOTAL_WR_BUFFER_FLUSH_BLOCK, () -> {
            return Long.valueOf(this.totalWriteBufferFlushBlock.get());
        });
        metricGroup.gauge(HandlerMetrics.TOTAL_WR_BUFFER_RECORD_COUNT, () -> {
            return Long.valueOf(this.totalWriteBufferRecordCount.get());
        });
        metricGroup.gauge(HandlerMetrics.TOTAL_WR_BUFFER_FLUSH_RECORD_COUNT, () -> {
            return Long.valueOf(this.totalWriteBufferFlushingRecordCount.get());
        });
        metricGroup.gauge(HandlerMetrics.TOTAL_WR_BUFFER_FLUSH_SEGMENT_COUNT, () -> {
            return Long.valueOf(this.totalWriteBufferFlushingSegmentCount.get());
        });
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("totalWriteBufferFlushBlock", this.totalWriteBufferFlushBlock).add(HandlerMetrics.TOTAL_WR_BUFFER_RECORD_COUNT, this.totalWriteBufferRecordCount).add(HandlerMetrics.TOTAL_WR_BUFFER_FLUSH_RECORD_COUNT, this.totalWriteBufferFlushingRecordCount).add(HandlerMetrics.TOTAL_WR_BUFFER_FLUSH_SEGMENT_COUNT, this.totalWriteBufferFlushingSegmentCount).add("maxTotalWriteBufferSize", this.maxTotalWriteBufferSize).add("tableCount", this.tableCount).add("regionCount", this.regionCount).toString();
    }
}
