package org.apache.flink.table.runtime.functions;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.SortedMapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.SortedMapSerializer;
import org.apache.flink.runtime.state.keyed.KeyedListState;
import org.apache.flink.runtime.state.keyed.KeyedListStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedMapState;
import org.apache.flink.runtime.state.keyed.KeyedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapState;
import org.apache.flink.runtime.state.keyed.KeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedState;
import org.apache.flink.runtime.state.keyed.KeyedStateDescriptor;
import org.apache.flink.runtime.state.keyed.KeyedValueState;
import org.apache.flink.runtime.state.keyed.KeyedValueStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedListStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedSortedMapStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedStateDescriptor;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueState;
import org.apache.flink.runtime.state.subkeyed.SubKeyedValueStateDescriptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataview.KeyedStateListView;
import org.apache.flink.table.dataview.KeyedStateMapView;
import org.apache.flink.table.dataview.KeyedStateSortedMapView;
import org.apache.flink.table.dataview.NullAwareKeyedStateMapView;
import org.apache.flink.table.dataview.NullAwareSubKeyedStateMapView;
import org.apache.flink.table.dataview.StateDataView;
import org.apache.flink.table.dataview.StateListView;
import org.apache.flink.table.dataview.StateMapView;
import org.apache.flink.table.dataview.StateSortedMapView;
import org.apache.flink.table.dataview.SubKeyedStateListView;
import org.apache.flink.table.dataview.SubKeyedStateMapView;
import org.apache.flink.table.typeutils.ListViewTypeInfo;
import org.apache.flink.table.typeutils.MapViewTypeInfo;
import org.apache.flink.table.typeutils.SortedMapViewTypeInfo;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/ExecutionContextImpl.class */
public final class ExecutionContextImpl implements ExecutionContext {
    private static final String NULL_STATE_POSTFIX = "_null_state";
    private final AbstractStreamOperator<?> operator;
    private final RuntimeContext runtimeContext;
    private final TypeSerializer<?> namespaceSerializer;
    private final List<StateDataView<BaseRow>> registeredStateDataViews;

    public ExecutionContextImpl(AbstractStreamOperator<?> abstractStreamOperator, RuntimeContext runtimeContext) {
        this(abstractStreamOperator, runtimeContext, null);
    }

    public ExecutionContextImpl(AbstractStreamOperator<?> abstractStreamOperator, RuntimeContext runtimeContext, TypeSerializer<?> typeSerializer) {
        this.operator = abstractStreamOperator;
        this.runtimeContext = (RuntimeContext) Preconditions.checkNotNull(runtimeContext);
        this.namespaceSerializer = typeSerializer;
        this.registeredStateDataViews = new ArrayList();
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, V, S extends KeyedState<K, V>> S getKeyedState(KeyedStateDescriptor<K, V, S> keyedStateDescriptor) throws Exception {
        return (S) this.operator.getKeyedState(keyedStateDescriptor);
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, N, V, S extends SubKeyedState<K, N, V>> S getSubKeyedState(SubKeyedStateDescriptor<K, N, V, S> subKeyedStateDescriptor) throws Exception {
        return (S) this.operator.getSubKeyedState(subKeyedStateDescriptor);
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, V> KeyedValueState<K, V> getKeyedValueState(ValueStateDescriptor<V> valueStateDescriptor) throws Exception {
        valueStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        return this.operator.getKeyedState(new KeyedValueStateDescriptor(valueStateDescriptor.getName(), this.operator.getKeySerializer(), valueStateDescriptor.getSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, V> KeyedListState<K, V> getKeyedListState(ListStateDescriptor<V> listStateDescriptor) throws Exception {
        listStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        return this.operator.getKeyedState(new KeyedListStateDescriptor(listStateDescriptor.getName(), this.operator.getKeySerializer(), (ListSerializer) listStateDescriptor.getSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, UK, UV> KeyedMapState<K, UK, UV> getKeyedMapState(MapStateDescriptor<UK, UV> mapStateDescriptor) throws Exception {
        mapStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        return this.operator.getKeyedState(new KeyedMapStateDescriptor(mapStateDescriptor.getName(), this.operator.getKeySerializer(), (MapSerializer) mapStateDescriptor.getSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, UK, UV> KeyedSortedMapState<K, UK, UV> getKeyedSortedMapState(SortedMapStateDescriptor<UK, UV> sortedMapStateDescriptor) throws Exception {
        sortedMapStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        return this.operator.getKeyedState(new KeyedSortedMapStateDescriptor(sortedMapStateDescriptor.getName(), this.operator.getKeySerializer(), sortedMapStateDescriptor.getSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, N, V> SubKeyedValueState<K, N, V> getSubKeyedValueState(ValueStateDescriptor<V> valueStateDescriptor) throws Exception {
        if (this.namespaceSerializer == null) {
            throw new RuntimeException("The namespace serializer has not been initialized.");
        }
        valueStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        return this.operator.getSubKeyedState(new SubKeyedValueStateDescriptor(valueStateDescriptor.getName(), this.operator.getKeySerializer(), this.namespaceSerializer, valueStateDescriptor.getSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, N, V> SubKeyedListState<K, N, V> getSubKeyedListState(ListStateDescriptor<V> listStateDescriptor) throws Exception {
        if (this.namespaceSerializer == null) {
            throw new RuntimeException("The namespace serializer has not been initialized.");
        }
        listStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        return this.operator.getSubKeyedState(new SubKeyedListStateDescriptor(listStateDescriptor.getName(), this.operator.getKeySerializer(), this.namespaceSerializer, ((ListSerializer) listStateDescriptor.getSerializer()).getElementSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, N, UK, UV> SubKeyedMapState<K, N, UK, UV> getSubKeyedMapState(MapStateDescriptor<UK, UV> mapStateDescriptor) throws Exception {
        if (this.namespaceSerializer == null) {
            throw new RuntimeException("The namespace serializer has not been initialized.");
        }
        mapStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        MapSerializer mapSerializer = (MapSerializer) mapStateDescriptor.getSerializer();
        return this.operator.getSubKeyedState(new SubKeyedMapStateDescriptor(mapStateDescriptor.getName(), this.operator.getKeySerializer(), this.namespaceSerializer, mapSerializer.getKeySerializer(), mapSerializer.getValueSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, N, UK, UV> SubKeyedSortedMapState<K, N, UK, UV> getSubKeyedSortedMapState(SortedMapStateDescriptor<UK, UV> sortedMapStateDescriptor) throws Exception {
        if (this.namespaceSerializer == null) {
            throw new RuntimeException("The namespace serializer has not been initialized.");
        }
        sortedMapStateDescriptor.initializeSerializerUnlessSet(this.operator.getExecutionConfig());
        SortedMapSerializer<UK, UV> serializer = sortedMapStateDescriptor.getSerializer();
        return this.operator.getSubKeyedState(new SubKeyedSortedMapStateDescriptor(sortedMapStateDescriptor.getName(), this.operator.getKeySerializer(), this.namespaceSerializer, serializer.getComparator(), serializer.getKeySerializer(), serializer.getValueSerializer()));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, UK, UV> StateMapView<K, UK, UV> getStateMapView(String str, MapViewTypeInfo<UK, UV> mapViewTypeInfo, boolean z) throws Exception {
        MapStateDescriptor<UK, UV> mapStateDescriptor = new MapStateDescriptor<>(str, mapViewTypeInfo.keyType(), mapViewTypeInfo.valueType());
        ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor(str + NULL_STATE_POSTFIX, mapViewTypeInfo.valueType());
        if (z) {
            SubKeyedMapState subKeyedMapState = getSubKeyedMapState(mapStateDescriptor);
            return mapViewTypeInfo.nullAware() ? new NullAwareSubKeyedStateMapView(subKeyedMapState, getSubKeyedValueState(valueStateDescriptor)) : new SubKeyedStateMapView(subKeyedMapState);
        }
        KeyedMapState<K, UK, UV> keyedMapState = getKeyedMapState(mapStateDescriptor);
        return mapViewTypeInfo.nullAware() ? new NullAwareKeyedStateMapView(keyedMapState, getKeyedValueState(valueStateDescriptor)) : new KeyedStateMapView(keyedMapState);
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, UK, UV> StateSortedMapView<K, UK, UV> getStateSortedMapView(String str, SortedMapViewTypeInfo<UK, UV> sortedMapViewTypeInfo, boolean z) throws Exception {
        SortedMapStateDescriptor<UK, UV> sortedMapStateDescriptor = new SortedMapStateDescriptor<>(str, sortedMapViewTypeInfo.comparator, sortedMapViewTypeInfo.keyType, sortedMapViewTypeInfo.valueType);
        if (z) {
            throw new UnsupportedOperationException("SubKeyedState SortedMapView is not supported currently");
        }
        return new KeyedStateSortedMapView(getKeyedSortedMapState(sortedMapStateDescriptor));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K, V> StateListView<K, V> getStateListView(String str, ListViewTypeInfo<V> listViewTypeInfo, boolean z) throws Exception {
        ListStateDescriptor<V> listStateDescriptor = new ListStateDescriptor<>(str, listViewTypeInfo.elementType());
        return z ? new SubKeyedStateListView(getSubKeyedListState(listStateDescriptor)) : new KeyedStateListView(getKeyedListState(listStateDescriptor));
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public void registerStateDataView(StateDataView<BaseRow> stateDataView) {
        this.registeredStateDataViews.add(stateDataView);
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public <K> TypeSerializer<K> getKeySerializer() {
        return this.operator.getKeySerializer();
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public BaseRow currentKey() {
        return (BaseRow) this.operator.getCurrentKey();
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public void setCurrentKey(BaseRow baseRow) {
        this.operator.setCurrentKey(baseRow);
        Iterator<StateDataView<BaseRow>> it = this.registeredStateDataViews.iterator();
        while (it.hasNext()) {
            it.next().setCurrentKey(baseRow);
        }
    }

    @Override // org.apache.flink.table.runtime.functions.ExecutionContext
    public RuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }
}
