package org.apache.flink.runtime.state.gemini.engine.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictPolicy;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/gemini/engine/fs/PersistHugePageToLocalSync.class */
public class PersistHugePageToLocalSync implements PersistenceStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(PersistHugePageToLocalSync.class);
    private final EventExecutorGroup flushEventExecutorGroup;
    private final GContext gContext;
    private final Supervisor supervisor;
    private final EvictPolicy evictPolicy;
    private final int forceFlushCacheSize;
    private final AtomicLong runningPersistTask = new AtomicLong(0);
    private final AtomicLong runningPersistPageSize = new AtomicLong(0);
    private final AtomicLong totalPersistPageSize = new AtomicLong(0);

    public PersistHugePageToLocalSync(GContext gContext) {
        this.gContext = gContext;
        this.flushEventExecutorGroup = gContext.getSupervisor().getFlushExecutorGroup();
        this.supervisor = gContext.getSupervisor();
        this.forceFlushCacheSize = gContext.getGConfiguration().getForceSyncToCacheSize();
        this.evictPolicy = this.supervisor.getCacheManager().getEvictPolicy();
    }

    @Override // org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategy
    public void persistPage(GRegion gRegion, PageAddress pageAddress, int i) {
        if (i >= this.forceFlushCacheSize && this.evictPolicy.getMemoryUsedWaterMark(gRegion, i).getShortValue() >= EvictPolicy.MemoryUsedWaterMark.Middle.getShortValue()) {
            EventExecutor next = this.flushEventExecutorGroup.next();
            Iterator<PageAddress> pageIterator = pageAddress.pageIterator();
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ArrayList arrayList3 = new ArrayList();
            while (pageIterator.hasNext()) {
                arrayList.add(pageIterator.next());
                arrayList2.add(gRegion.getGRegionContext());
            }
            CountDownLatch countDownLatch = new CountDownLatch(1);
            this.runningPersistTask.incrementAndGet();
            this.runningPersistPageSize.addAndGet(i);
            this.totalPersistPageSize.addAndGet(i);
            arrayList3.add((bool, th) -> {
                this.runningPersistTask.decrementAndGet();
                this.runningPersistPageSize.addAndGet(-i);
                if (bool.booleanValue()) {
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        PageAddress pageAddress2 = (PageAddress) it.next();
                        if (pageAddress2.isLocalValid()) {
                            pageAddress2.setDataPage(null);
                        } else {
                            LOG.error("persistPage flush local {}, but it's local invalid", pageAddress2);
                        }
                    }
                } else {
                    LOG.error("persistPage flush local failed", th);
                    this.gContext.setDBInternalError(new GeminiRuntimeException("persistPage flush local failed, " + th));
                }
                countDownLatch.countDown();
            });
            this.supervisor.getFileCache().addBatchPages(arrayList, arrayList2, next, arrayList3);
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                LOG.error("persistPage failed", e);
                this.gContext.setDBInternalError(new GeminiRuntimeException("persistPage local failed, " + e));
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("runningPersistTask", this.runningPersistTask.get()).add("runningPersistPageSize", this.runningPersistPageSize.get()).add("totalPersistPageSize", this.totalPersistPageSize.get()).toString();
    }
}
