/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap.internal;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.heap.internal.AbstractStateTableSnapshot;
import org.apache.flink.runtime.state.heap.internal.CopyOnWriteStateTable;

@Internal
public class CopyOnWriteStateTableSnapshot<K, N, S>
extends AbstractStateTableSnapshot<K, N, S, CopyOnWriteStateTable<K, N, S>> {
    private final int snapshotVersion;
    private final int stateTableSize;
    private final CopyOnWriteStateTable.StateTableEntry<K, N, S>[] snapshotData;
    private int[] keyGroupOffsets;
    private final TypeSerializer<K> localKeySerializer;
    private final TypeSerializer<N> localNamespaceSerializer;
    private final TypeSerializer<S> localStateSerializer;

    CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K, N, S> owningStateTable) {
        super(owningStateTable);
        this.snapshotData = owningStateTable.snapshotTableArrays();
        this.snapshotVersion = owningStateTable.getStateTableVersion();
        this.stateTableSize = owningStateTable.size();
        this.localKeySerializer = owningStateTable.getKeySerializer().duplicate();
        this.localNamespaceSerializer = owningStateTable.getNamespaceSerializer().duplicate();
        this.localStateSerializer = owningStateTable.getStateSerializer().duplicate();
        this.keyGroupOffsets = null;
    }

    int getSnapshotVersion() {
        return this.snapshotVersion;
    }

    private void partitionEntriesByKeyGroup() {
        int effectiveKgIdx;
        if (null != this.keyGroupOffsets) {
            return;
        }
        KeyGroupRange keyGroupRange = ((CopyOnWriteStateTable)this.owningStateTable).getStateBackend().getKeyGroupRange();
        int totalKeyGroups = ((CopyOnWriteStateTable)this.owningStateTable).getStateBackend().getNumGroups();
        int baseKgIdx = keyGroupRange.getStartKeyGroup();
        int[] histogram = new int[keyGroupRange.getNumberOfKeyGroups() + 1];
        CopyOnWriteStateTable.StateTableEntry[] unfold = new CopyOnWriteStateTable.StateTableEntry[this.stateTableSize];
        int unfoldIndex = 0;
        for (CopyOnWriteStateTable.StateTableEntry<K, N, S> entry : this.snapshotData) {
            while (null != entry) {
                int n = effectiveKgIdx = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(entry.key.hashCode(), totalKeyGroups) - baseKgIdx + 1;
                histogram[n] = histogram[n] + 1;
                unfold[unfoldIndex++] = entry;
                entry = entry.next;
            }
        }
        for (int i = 1; i < histogram.length; ++i) {
            int n = i;
            histogram[n] = histogram[n] + histogram[i - 1];
        }
        for (CopyOnWriteStateTable.StateTableEntry t : unfold) {
            int n = effectiveKgIdx = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(t.key.hashCode(), totalKeyGroups) - baseKgIdx;
            int n2 = histogram[n];
            histogram[n] = n2 + 1;
            this.snapshotData[n2] = t;
        }
        this.keyGroupOffsets = histogram;
    }

    @Override
    public void release() {
        ((CopyOnWriteStateTable)this.owningStateTable).releaseSnapshot(this);
    }

    @Override
    public int writeMappingsInKeyGroup(DataOutputView dov, int keyGroupId) throws IOException {
        if (null == this.keyGroupOffsets) {
            this.partitionEntriesByKeyGroup();
        }
        CopyOnWriteStateTable.StateTableEntry<K, N, S>[] groupedOut = this.snapshotData;
        KeyGroupRange keyGroupRange = ((CopyOnWriteStateTable)this.owningStateTable).getStateBackend().getKeyGroupRange();
        int keyGroupOffsetIdx = keyGroupId - keyGroupRange.getStartKeyGroup() - 1;
        int startOffset = keyGroupOffsetIdx < 0 ? 0 : this.keyGroupOffsets[keyGroupOffsetIdx];
        int endOffset = this.keyGroupOffsets[keyGroupOffsetIdx + 1];
        boolean hasNamespace = ((CopyOnWriteStateTable)this.owningStateTable).isUsingNamespace();
        int numMappings = endOffset - startOffset;
        dov.writeInt(numMappings);
        for (int i = startOffset; i < endOffset; ++i) {
            CopyOnWriteStateTable.StateTableEntry<K, N, S> toWrite = groupedOut[i];
            groupedOut[i] = null;
            this.localKeySerializer.serialize(toWrite.key, dov);
            if (hasNamespace) {
                this.localNamespaceSerializer.serialize(toWrite.namespace, dov);
            }
            this.localStateSerializer.serialize(toWrite.state, dov);
        }
        return numMappings;
    }

    boolean isOwner(CopyOnWriteStateTable<K, N, S> stateTable) {
        return stateTable == this.owningStateTable;
    }
}

