/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini.engine.rm;

import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.state.gemini.engine.dbms.GContext;
import org.apache.flink.runtime.state.gemini.engine.rm.Finalizable;
import org.apache.flink.runtime.state.gemini.engine.rm.GarbageReleaseManager;
import org.apache.flink.runtime.state.gemini.engine.rm.ReferenceCountable;
import org.apache.flink.shaded.guava18.com.google.common.collect.Queues;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;

public class GarbageReleaseManagerImpl<T extends ReferenceCountable & Finalizable>
implements GarbageReleaseManager<T> {
    private static final int LOOP_INTERVAL = 10;
    private static final int LOOP_INTERVAL_WHEN_EMPTY = 100;
    private final GContext gContext;
    private boolean threadStarted = false;
    private ExecutorService executor;
    private Queue<T> monitorQueue = Queues.newConcurrentLinkedQueue();

    public GarbageReleaseManagerImpl(GContext gContext) {
        this.gContext = gContext;
    }

    @Override
    public void enqueue(T object) {
        if (((Finalizable)object).isFreed()) {
            return;
        }
        if (object.refCnt() == 0 && ((Finalizable)object).canSyncFree()) {
            ((Finalizable)object).doFree();
            return;
        }
        ((Finalizable)object).setSeqID(this.gContext.getAccessNumber());
        this.monitorQueue.add(object);
    }

    @Override
    public void start() {
        if (this.threadStarted) {
            return;
        }
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat(this.gContext.getGConfiguration().getExecutorPrefixName() + "GeminiReleaseManager-%d").build();
        this.executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(Short.MAX_VALUE), namedThreadFactory);
        this.threadStarted = true;
        this.executor.submit(this);
    }

    @Override
    public void run() {
        while (this.threadStarted) {
            int size = this.monitorQueue.size();
            while (size-- > 0) {
                ReferenceCountable object = (ReferenceCountable)this.monitorQueue.poll();
                if (object == null || ((Finalizable)((Object)object)).isFreed()) continue;
                if (object.refCnt() <= 0 && ((Finalizable)((Object)object)).getSeqID() != this.gContext.getAccessNumber()) {
                    ((Finalizable)((Object)object)).doFree();
                    continue;
                }
                this.monitorQueue.add(object);
            }
            try {
                if (this.monitorQueue.size() > 0) {
                    Thread.sleep(10L);
                    continue;
                }
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    @Override
    public void close() {
        if (this.threadStarted) {
            this.threadStarted = false;
            this.executor.shutdownNow();
            try {
                this.executor.awaitTermination(1000L, TimeUnit.SECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        while (this.monitorQueue.size() > 0) {
            ReferenceCountable object = (ReferenceCountable)this.monitorQueue.poll();
            if (object == null || ((Finalizable)((Object)object)).isFreed() || object.refCnt() > 0) continue;
            ((Finalizable)((Object)object)).doFree();
        }
    }
}

