/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.gemini.engine.GRegion;
import org.apache.flink.runtime.state.gemini.engine.GRegionContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.dbms.Supervisor;
import org.apache.flink.runtime.state.gemini.engine.fs.PersistenceStrategy;
import org.apache.flink.runtime.state.gemini.engine.page.DataPage;
import org.apache.flink.runtime.state.gemini.engine.page.PageAddress;
import org.apache.flink.shaded.guava18.com.google.common.base.MoreObjects;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PersistenceGroupPageToDfs
implements PersistenceStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(PersistenceGroupPageToDfs.class);
    private final EventExecutorGroup snapshotExecutorGroup;
    private final Supervisor supervisor;
    private Map<PageAddress, GRegion> batchMap = new ConcurrentHashMap<PageAddress, GRegion>();
    private AtomicInteger batchSize = new AtomicInteger(0);
    private final AtomicLong runningPersistTask = new AtomicLong(0L);
    private final AtomicLong runningPersistPageSize = new AtomicLong(0L);
    private final AtomicLong totalPersistPageSize = new AtomicLong(0L);
    private final int batchPersistenceSize;
    private final int maxPersistenceRunningTask;
    final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public PersistenceGroupPageToDfs(GContext gContext) {
        this.snapshotExecutorGroup = gContext.getSupervisor().getSnapshotExecutorGroup();
        this.supervisor = gContext.getSupervisor();
        this.batchPersistenceSize = gContext.getGConfiguration().getBatchPersistenceSize();
        this.maxPersistenceRunningTask = gContext.getGConfiguration().getMaxPersistenceRunningTask() * gContext.getGConfiguration().getSnapshotThreadNum();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void persistPage(GRegion gRegion, PageAddress pageAddress, int compactedMemSize) {
        int flushingSize;
        Map<PageAddress, GRegion> copyBatchMap;
        if (this.runningPersistTask.incrementAndGet() > (long)this.maxPersistenceRunningTask) {
            this.runningPersistTask.decrementAndGet();
            return;
        }
        Iterator<PageAddress> pageAddressIterator = pageAddress.pageIterator();
        this.lock.readLock().lock();
        try {
            while (pageAddressIterator.hasNext()) {
                PageAddress realPageAddress = pageAddressIterator.next();
                DataPage dataPage = realPageAddress.getDataPageNoReference();
                if (dataPage == null) continue;
                this.batchMap.computeIfAbsent(realPageAddress, nothing -> {
                    this.batchSize.addAndGet(realPageAddress.getDataLen());
                    return gRegion;
                });
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        if (this.batchSize.get() < this.batchPersistenceSize) {
            this.runningPersistTask.decrementAndGet();
            return;
        }
        this.lock.writeLock().lock();
        try {
            if (this.batchSize.get() < this.batchPersistenceSize) {
                this.runningPersistTask.decrementAndGet();
                return;
            }
            copyBatchMap = this.batchMap;
            this.batchMap = new ConcurrentHashMap<PageAddress, GRegion>(copyBatchMap.size());
            flushingSize = this.batchSize.getAndSet(0);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (copyBatchMap.size() == 0) {
            this.runningPersistTask.decrementAndGet();
            return;
        }
        this.runningPersistPageSize.addAndGet(flushingSize);
        this.totalPersistPageSize.addAndGet(flushingSize);
        EventExecutor snapshotEventExecutor = this.snapshotExecutorGroup.next();
        ArrayList<PageAddress> pageAddressList = new ArrayList<PageAddress>();
        ArrayList<GRegionContext> regionContextList = new ArrayList<GRegionContext>();
        ArrayList<BiConsumer<Boolean, Throwable>> callBacks = new ArrayList<BiConsumer<Boolean, Throwable>>();
        for (Map.Entry<PageAddress, GRegion> entry : copyBatchMap.entrySet()) {
            pageAddressList.add(entry.getKey());
            regionContextList.add(entry.getValue().getGRegionContext());
        }
        BiConsumer<Boolean, Throwable> callBack = (success, throwable) -> {
            this.runningPersistPageSize.addAndGet(-flushingSize);
            this.runningPersistTask.decrementAndGet();
            if (!success.booleanValue()) {
                LOG.error("persistPage flush failed", throwable);
            }
        };
        callBacks.add(callBack);
        this.supervisor.getFileCache().flushBatchPages(pageAddressList, regionContextList, snapshotEventExecutor, false, false, callBacks);
    }

    @Override
    public void close() throws IOException {
    }

    public String toString() {
        return MoreObjects.toStringHelper((Object)this).add("batchSize", (Object)this.batchSize).add("runningPersistTask", this.runningPersistTask.get()).add("runningPersistPageSize", this.runningPersistPageSize.get()).add("totalPersistPageSize", this.totalPersistPageSize.get()).toString();
    }

    @VisibleForTesting
    public int getBatchSize() {
        return this.batchSize.get();
    }

    @VisibleForTesting
    public int getBatchMapSize() {
        return this.batchMap.size();
    }

    @VisibleForTesting
    public long getRunningPersistTask() {
        return this.runningPersistTask.get();
    }
}

