package org.apache.flink.table.runtime.window.aligned;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.ExecutionContext;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.util.Collector;

/* loaded from: input_file:org/apache/flink/table/runtime/window/aligned/LocalAlignedWindowAggregator.class */
public class LocalAlignedWindowAggregator implements AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> {
    private static final long serialVersionUID = 1;
    private final TypeInformation<BaseRow> keyTypeInfo;
    private final TypeInformation<BaseRow> accTypeInfo;
    private final GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedWindowAggregator;
    private final long minibatchSize;
    private transient SubKeyedAggsHandleFunction<TimeWindow> windowAggregator;
    private transient Map<TimeWindow, Map<BaseRow, BaseRow>> buffer;
    private transient ListState<Tuple3<TimeWindow, BaseRow, BaseRow>> bufferState;
    private transient int numOfElements;
    private transient JoinedRow outRow;
    private transient Collector<BaseRow> out;

    public LocalAlignedWindowAggregator(TypeInformation<BaseRow> typeInformation, TypeInformation<BaseRow> typeInformation2, GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedSubKeyedAggsHandleFunction, long j) {
        this.keyTypeInfo = typeInformation;
        this.accTypeInfo = typeInformation2;
        this.generatedWindowAggregator = generatedSubKeyedAggsHandleFunction;
        this.minibatchSize = j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void open(AlignedWindowAggregator.Context context) throws Exception {
        ExecutionContext executionContext = context.getExecutionContext();
        this.buffer = new HashMap();
        this.numOfElements = 0;
        this.outRow = new JoinedRow();
        this.out = context.getCollector();
        BaseRowUtil.setAccumulate(this.outRow);
        this.windowAggregator = (SubKeyedAggsHandleFunction) this.generatedWindowAggregator.newInstance(executionContext.getRuntimeContext().getUserCodeClassLoader());
        this.windowAggregator.open(executionContext);
        this.bufferState = context.getOpStateStore().getListState(new ListStateDescriptor("_local_window_buffer_", new TupleTypeInfo(new GenericTypeInfo(TimeWindow.class), this.keyTypeInfo, this.accTypeInfo)));
        if (this.bufferState != null) {
            for (Tuple3 tuple3 : this.bufferState.get()) {
                this.buffer.computeIfAbsent((TimeWindow) tuple3.f0, timeWindow -> {
                    return new HashMap();
                }).put((BaseRow) tuple3.f1, (BaseRow) tuple3.f2);
                this.numOfElements++;
            }
            this.bufferState.clear();
        }
        executionContext.getRuntimeContext().getMetricGroup().gauge("bundleSize", (String) () -> {
            return Integer.valueOf(this.numOfElements);
        });
        executionContext.getRuntimeContext().getMetricGroup().gauge("bundleRatio", (String) () -> {
            int size = this.buffer.size();
            return size == 0 ? Double.valueOf(0.0d) : Double.valueOf((1.0d * this.numOfElements) / size);
        });
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void addElement(BaseRow baseRow, TimeWindow timeWindow, BaseRow baseRow2) throws Exception {
        this.numOfElements++;
        Map<BaseRow, BaseRow> computeIfAbsent = this.buffer.computeIfAbsent(timeWindow, timeWindow2 -> {
            return new HashMap();
        });
        BaseRow baseRow3 = computeIfAbsent.get(baseRow);
        if (baseRow3 == null) {
            baseRow3 = this.windowAggregator.createAccumulators();
        }
        this.windowAggregator.setAccumulators(null, baseRow3);
        if (BaseRowUtil.isAccumulateMsg(baseRow2)) {
            this.windowAggregator.accumulate(baseRow2);
        } else {
            this.windowAggregator.retract(baseRow2);
        }
        computeIfAbsent.put(baseRow, this.windowAggregator.getAccumulators());
        if (this.minibatchSize <= 0 || this.numOfElements <= this.minibatchSize) {
            return;
        }
        Iterator<TimeWindow> it = windows().iterator();
        while (it.hasNext()) {
            fireWindow(it.next());
        }
        expireAllWindows();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void fireWindow(TimeWindow timeWindow) throws Exception {
        Map<BaseRow, BaseRow> map = this.buffer.get(timeWindow);
        if (map == null) {
            return;
        }
        for (Map.Entry<BaseRow, BaseRow> entry : map.entrySet()) {
            BaseRow key = entry.getKey();
            BaseRow value = entry.getValue();
            GenericRow genericRow = new GenericRow(1);
            genericRow.setLong(0, timeWindow.getStart());
            this.outRow.replace(new JoinedRow(key, genericRow), value);
            this.out.collect(this.outRow);
        }
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void snapshot() throws Exception {
        this.bufferState.clear();
        ArrayList arrayList = new ArrayList();
        for (TimeWindow timeWindow : this.buffer.keySet()) {
            for (Map.Entry<BaseRow, BaseRow> entry : this.buffer.get(timeWindow).entrySet()) {
                arrayList.add(Tuple3.of(timeWindow, entry.getKey(), entry.getValue()));
            }
        }
        this.bufferState.addAll(arrayList);
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public Iterable<TimeWindow> windows() throws Exception {
        return this.buffer.keySet();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void close() throws Exception {
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void expireWindow(TimeWindow timeWindow) throws Exception {
        this.buffer.remove(timeWindow);
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public void expireAllWindows() throws Exception {
        this.numOfElements = 0;
        this.buffer.clear();
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public TimeWindow lowestWindow() throws Exception {
        throw new UnsupportedOperationException("This method should not be called for LocalAlignedWindowAggregator.");
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public Iterable<TimeWindow> ascendingWindows() throws Exception {
        throw new UnsupportedOperationException("This method should not be called for LocalAlignedWindowAggregator.");
    }

    @Override // org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator
    public Iterable<TimeWindow> ascendingWindows(TimeWindow timeWindow) throws Exception {
        throw new UnsupportedOperationException("This method should not be called for LocalAlignedWindowAggregator.");
    }
}
