/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.exceptions.GeminiRuntimeException;
import org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategy;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.runtime.state.gemini.engine.vm.EvictPolicy;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistHugePageToLocalSync
implements PersistenceStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(PersistHugePageToLocalSync.class);
    private final EventExecutorGroup flushEventExecutorGroup;
    private final GContext gContext;
    private final Supervisor supervisor;
    private final EvictPolicy evictPolicy;
    private final int forceFlushCacheSize;
    private final AtomicLong runningPersistTask = new AtomicLong(0L);
    private final AtomicLong runningPersistPageSize = new AtomicLong(0L);
    private final AtomicLong totalPersistPageSize = new AtomicLong(0L);

    public PersistHugePageToLocalSync(GContext gContext) {
        this.gContext = gContext;
        this.flushEventExecutorGroup = gContext.getSupervisor().getFlushExecutorGroup();
        this.supervisor = gContext.getSupervisor();
        this.forceFlushCacheSize = gContext.getGConfiguration().getForceSyncToCacheSize();
        this.evictPolicy = this.supervisor.getCacheManager().getEvictPolicy();
    }

    @Override
    public void persistPage(GRegion gRegion, PageAddress pageAddress, int compactedMemSize) {
        if (compactedMemSize < this.forceFlushCacheSize) {
            return;
        }
        if (this.evictPolicy.getMemoryUsedWaterMark(gRegion, compactedMemSize).getShortValue() < EvictPolicy.MemoryUsedWaterMark.Middle.getShortValue()) {
            return;
        }
        EventExecutor flushEventExecutor = this.flushEventExecutorGroup.next();
        Iterator<PageAddress> pageAddressIterator = pageAddress.pageIterator();
        ArrayList<PageAddress> pageAddressList = new ArrayList<PageAddress>();
        ArrayList<GRegionContext> gRegionContextList = new ArrayList<GRegionContext>();
        ArrayList<BiConsumer<Boolean, Throwable>> callBacks = new ArrayList<BiConsumer<Boolean, Throwable>>();
        while (pageAddressIterator.hasNext()) {
            PageAddress pageAddressSingle = pageAddressIterator.next();
            pageAddressList.add(pageAddressSingle);
            gRegionContextList.add(gRegion.getGRegionContext());
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.runningPersistTask.incrementAndGet();
        this.runningPersistPageSize.addAndGet(compactedMemSize);
        this.totalPersistPageSize.addAndGet(compactedMemSize);
        BiConsumer<Boolean, Throwable> callBack = (success, throwable) -> {
            this.runningPersistTask.decrementAndGet();
            this.runningPersistPageSize.addAndGet(-compactedMemSize);
            if (!success.booleanValue()) {
                LOG.error("persistPage flush local failed", throwable);
                this.gContext.setDBInternalError(new GeminiRuntimeException("persistPage flush local failed, " + throwable));
            } else {
                for (PageAddress pageAddressSingle : pageAddressList) {
                    if (pageAddressSingle.isLocalValid()) {
                        pageAddressSingle.setDataPage(null);
                        continue;
                    }
                    LOG.error("persistPage flush local {}, but it's local invalid", (Object)pageAddressSingle);
                }
            }
            countDownLatch.countDown();
        };
        callBacks.add(callBack);
        this.supervisor.getFileCache().addBatchPages(pageAddressList, gRegionContextList, flushEventExecutor, callBacks);
        try {
            countDownLatch.await();
        }
        catch (InterruptedException e) {
            LOG.error("persistPage failed", (Throwable)e);
            this.gContext.setDBInternalError(new GeminiRuntimeException("persistPage local failed, " + e));
        }
    }

    @Override
    public void close() throws IOException {
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("runningPersistTask", this.runningPersistTask.get()).add("runningPersistPageSize", this.runningPersistPageSize.get()).add("totalPersistPageSize", this.totalPersistPageSize.get()).toString();
    }
}

