/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.window.aligned;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.table.api.window.TimeWindow;
import org.apache.flink.table.codegen.GeneratedSubKeyedAggsHandleFunction;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.dataformat.JoinedRow;
import org.apache.flink.table.dataformat.util.BaseRowUtil;
import org.apache.flink.table.runtime.functions.ExecutionContext;
import org.apache.flink.table.runtime.functions.SubKeyedAggsHandleFunction;
import org.apache.flink.table.runtime.window.aligned.AlignedWindowAggregator;
import org.apache.flink.util.Collector;

public class LocalAlignedWindowAggregator
implements AlignedWindowAggregator<BaseRow, TimeWindow, BaseRow> {
    private static final long serialVersionUID = 1L;
    private final TypeInformation<BaseRow> keyTypeInfo;
    private final TypeInformation<BaseRow> accTypeInfo;
    private final GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedWindowAggregator;
    private final long minibatchSize;
    private transient SubKeyedAggsHandleFunction<TimeWindow> windowAggregator;
    private transient Map<TimeWindow, Map<BaseRow, BaseRow>> buffer;
    private transient ListState<Tuple3<TimeWindow, BaseRow, BaseRow>> bufferState;
    private transient int numOfElements;
    private transient JoinedRow outRow;
    private transient Collector<BaseRow> out;

    public LocalAlignedWindowAggregator(TypeInformation<BaseRow> keyTypeInfo, TypeInformation<BaseRow> accTypeInfo, GeneratedSubKeyedAggsHandleFunction<TimeWindow> generatedWindowAggregator, long minibatchSize) {
        this.keyTypeInfo = keyTypeInfo;
        this.accTypeInfo = accTypeInfo;
        this.generatedWindowAggregator = generatedWindowAggregator;
        this.minibatchSize = minibatchSize;
    }

    @Override
    public void open(AlignedWindowAggregator.Context context) throws Exception {
        ExecutionContext ctx = context.getExecutionContext();
        this.buffer = new HashMap<TimeWindow, Map<BaseRow, BaseRow>>();
        this.numOfElements = 0;
        this.outRow = new JoinedRow();
        this.out = context.getCollector();
        BaseRowUtil.setAccumulate(this.outRow);
        this.windowAggregator = (SubKeyedAggsHandleFunction)this.generatedWindowAggregator.newInstance(ctx.getRuntimeContext().getUserCodeClassLoader());
        this.windowAggregator.open(ctx);
        TupleTypeInfo tupleType = new TupleTypeInfo(new GenericTypeInfo<TimeWindow>(TimeWindow.class), this.keyTypeInfo, this.accTypeInfo);
        this.bufferState = context.getOpStateStore().getListState(new ListStateDescriptor("_local_window_buffer_", tupleType));
        if (this.bufferState != null) {
            for (Tuple3 tuple : (Iterable)this.bufferState.get()) {
                TimeWindow window = (TimeWindow)tuple.f0;
                BaseRow key = (BaseRow)tuple.f1;
                BaseRow acc = (BaseRow)tuple.f2;
                Map keyAccs = this.buffer.computeIfAbsent(window, k -> new HashMap());
                keyAccs.put(key, acc);
                ++this.numOfElements;
            }
            this.bufferState.clear();
        }
        ctx.getRuntimeContext().getMetricGroup().gauge("bundleSize", () -> this.numOfElements);
        ctx.getRuntimeContext().getMetricGroup().gauge("bundleRatio", () -> {
            int numOfKeys = this.buffer.size();
            if (numOfKeys == 0) {
                return 0.0;
            }
            return 1.0 * (double)this.numOfElements / (double)numOfKeys;
        });
    }

    @Override
    public void addElement(BaseRow key, TimeWindow window, BaseRow input) throws Exception {
        ++this.numOfElements;
        Map keyAccs = this.buffer.computeIfAbsent(window, k -> new HashMap());
        BaseRow acc = (BaseRow)keyAccs.get(key);
        if (acc == null) {
            acc = this.windowAggregator.createAccumulators();
        }
        this.windowAggregator.setAccumulators(null, acc);
        if (BaseRowUtil.isAccumulateMsg(input)) {
            this.windowAggregator.accumulate(input);
        } else {
            this.windowAggregator.retract(input);
        }
        acc = this.windowAggregator.getAccumulators();
        keyAccs.put(key, acc);
        if (this.minibatchSize > 0L && (long)this.numOfElements > this.minibatchSize) {
            for (TimeWindow w : this.windows()) {
                this.fireWindow(w);
            }
            this.expireAllWindows();
        }
    }

    @Override
    public void fireWindow(TimeWindow window) throws Exception {
        Map<BaseRow, BaseRow> fired = this.buffer.get(window);
        if (fired == null) {
            return;
        }
        for (Map.Entry<BaseRow, BaseRow> entry : fired.entrySet()) {
            BaseRow key = entry.getKey();
            BaseRow acc = entry.getValue();
            GenericRow windowTime = new GenericRow(1);
            windowTime.setLong(0, window.getStart());
            this.outRow.replace(new JoinedRow(key, windowTime), acc);
            this.out.collect(this.outRow);
        }
    }

    @Override
    public void snapshot() throws Exception {
        this.bufferState.clear();
        ArrayList<Tuple3<TimeWindow, BaseRow, BaseRow>> stateToPut = new ArrayList<Tuple3<TimeWindow, BaseRow, BaseRow>>();
        for (TimeWindow window : this.buffer.keySet()) {
            for (Map.Entry<BaseRow, BaseRow> entry : this.buffer.get(window).entrySet()) {
                BaseRow key = entry.getKey();
                BaseRow acc = entry.getValue();
                stateToPut.add(Tuple3.of(window, key, acc));
            }
        }
        this.bufferState.addAll(stateToPut);
    }

    @Override
    public Iterable<TimeWindow> windows() throws Exception {
        return this.buffer.keySet();
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public void expireWindow(TimeWindow window) throws Exception {
        this.buffer.remove(window);
    }

    @Override
    public void expireAllWindows() throws Exception {
        this.numOfElements = 0;
        this.buffer.clear();
    }

    @Override
    public TimeWindow lowestWindow() throws Exception {
        throw new UnsupportedOperationException("This method should not be called for LocalAlignedWindowAggregator.");
    }

    @Override
    public Iterable<TimeWindow> ascendingWindows() throws Exception {
        throw new UnsupportedOperationException("This method should not be called for LocalAlignedWindowAggregator.");
    }

    @Override
    public Iterable<TimeWindow> ascendingWindows(TimeWindow fromWindow) throws Exception {
        throw new UnsupportedOperationException("This method should not be called for LocalAlignedWindowAggregator.");
    }
}

