/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap.internal;

import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.TreeSet;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.StateTransformationFunction;
import org.apache.flink.runtime.state.heap.internal.CopyOnWriteStateTableSnapshot;
import org.apache.flink.runtime.state.heap.internal.StateEntry;
import org.apache.flink.runtime.state.heap.internal.StateTable;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyOnWriteStateTable<K, N, S>
extends StateTable<K, N, S>
implements Iterable<StateEntry<K, N, S>> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteStateTable.class);
    private static final int MINIMUM_CAPACITY = 4;
    private static final int MAXIMUM_CAPACITY = 0x40000000;
    private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
    private static final StateTableEntry<?, ?, ?>[] EMPTY_TABLE = new StateTableEntry[2];
    private static final StateTableEntry<?, ?, ?> ITERATOR_BOOTSTRAP_ENTRY = new StateTableEntry();
    private final TreeSet<Integer> snapshotVersions;
    private StateTableEntry<K, N, S>[] primaryTable = EMPTY_TABLE;
    private StateTableEntry<K, N, S>[] incrementalRehashTable = EMPTY_TABLE;
    private int primaryTableSize = 0;
    private int incrementalRehashTableSize = 0;
    private int rehashIndex = 0;
    private int stateTableVersion = 0;
    private int highestRequiredSnapshotVersion = 0;
    private N lastNamespace;
    private int threshold;
    private int modCount;

    public CopyOnWriteStateTable(AbstractInternalStateBackend internalStateBackend, RegisteredStateMetaInfo stateMetaInfo, boolean usingNamespace) {
        this(internalStateBackend, stateMetaInfo, usingNamespace, 1024);
    }

    private CopyOnWriteStateTable(AbstractInternalStateBackend internalStateBackend, RegisteredStateMetaInfo stateMetaInfo, boolean usingNamespace, int capacity) {
        super(internalStateBackend, stateMetaInfo, usingNamespace);
        this.snapshotVersions = new TreeSet();
        if (capacity < 0) {
            throw new IllegalArgumentException("Capacity: " + capacity);
        }
        if (capacity == 0) {
            this.threshold = -1;
            return;
        }
        capacity = capacity < 4 ? 4 : (capacity > 0x40000000 ? 0x40000000 : MathUtils.roundUpToPowerOfTwo((int)capacity));
        this.primaryTable = this.makeTable(capacity);
    }

    @Override
    public int size() {
        return this.primaryTableSize + this.incrementalRehashTableSize;
    }

    @Override
    public S get(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        int requiredVersion = this.highestRequiredSnapshotVersion;
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        while (e != null) {
            Object eKey = e.key;
            Object eNamespace = e.namespace;
            if (e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace)) {
                if (e.stateVersion < requiredVersion) {
                    if (e.entryVersion < requiredVersion) {
                        e = this.handleChainedEntryCopyOnWrite(tab, hash & tab.length - 1, e);
                    }
                    e.stateVersion = this.stateTableVersion;
                    e.state = this.getStateSerializer().copy(e.state);
                }
                return e.state;
            }
            e = e.next;
        }
        return null;
    }

    @Override
    public Stream<K> getKeys(N namespace) {
        return StreamSupport.stream(this.spliterator(), false).filter(entry -> entry.getNamespace().equals(namespace)).map(StateEntry::getKey);
    }

    @Override
    public void put(K key, N namespace, S value) {
        StateTableEntry<K, N, S> e = this.putEntry(key, namespace);
        e.state = value;
        e.stateVersion = this.stateTableVersion;
    }

    @Override
    public S putAndGetOld(K key, N namespace, S state) {
        StateTableEntry<K, N, S> e = this.putEntry(key, namespace);
        Object oldState = e.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(e.state) : e.state;
        e.state = state;
        e.stateVersion = this.stateTableVersion;
        return oldState;
    }

    @Override
    public boolean containsKey(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        while (e != null) {
            Object eKey = e.key;
            Object eNamespace = e.namespace;
            if (e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace)) {
                return true;
            }
            e = e.next;
        }
        return false;
    }

    @Override
    public boolean remove(K key, N namespace) {
        return this.removeEntry(key, namespace) != null;
    }

    @Override
    public S removeAndGetOld(K key, N namespace) {
        StateTableEntry<K, N, S> e = this.removeEntry(key, namespace);
        return (S)(e != null ? (e.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(e.state) : (Object)e.state) : null);
    }

    @Override
    public Map<N, S> getAll(K key) {
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        HashMap results = new HashMap();
        StateEntryIterator iterator = new StateEntryIterator();
        while (iterator.hasNext()) {
            Object stateTableEntry = iterator.next();
            if (!((StateTableEntry)stateTableEntry).key.equals(key)) continue;
            results.put(((StateTableEntry)stateTableEntry).namespace, ((StateTableEntry)stateTableEntry).state);
        }
        return results;
    }

    @Override
    public void removeAll(K key) {
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        HashMap results = new HashMap();
        StateEntryIterator iterator = new StateEntryIterator();
        while (iterator.hasNext()) {
            Object stateTableEntry = iterator.next();
            if (!((StateTableEntry)stateTableEntry).key.equals(key)) continue;
            results.put(((StateTableEntry)stateTableEntry).namespace, ((StateTableEntry)stateTableEntry).state);
        }
        for (Map.Entry entry : results.entrySet()) {
            this.removeEntry(key, entry.getKey());
        }
    }

    @Override
    public void removeAll() {
        if (this.isRehashing()) {
            this.primaryTable = this.incrementalRehashTable;
            this.primaryTableSize = this.incrementalRehashTableSize;
            this.incrementalRehashTableSize = 0;
            this.incrementalRehashTable = EMPTY_TABLE;
            this.rehashIndex = 0;
        }
        int count = 0;
        int totalNum = this.primaryTableSize;
        for (int i = 0; i < this.primaryTable.length && count < totalNum; ++i) {
            if (this.primaryTable[i] == null) continue;
            ++count;
            this.primaryTable[i] = null;
        }
        this.primaryTableSize = 0;
        ++this.modCount;
    }

    @Override
    public <T> void transform(K key, N namespace, T value, StateTransformationFunction<S, T> transformation) throws Exception {
        StateTableEntry<K, N, S> entry = this.putEntry(key, namespace);
        entry.state = transformation.apply(entry.stateVersion < this.highestRequiredSnapshotVersion ? this.getStateSerializer().copy(entry.state) : entry.state, value);
        entry.stateVersion = this.stateTableVersion;
    }

    private StateTableEntry<K, N, S> putEntry(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        while (e != null) {
            if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
                if (e.entryVersion < this.highestRequiredSnapshotVersion) {
                    e = this.handleChainedEntryCopyOnWrite(tab, index, e);
                }
                return e;
            }
            e = e.next;
        }
        ++this.modCount;
        if (this.size() > this.threshold) {
            this.doubleCapacity();
        }
        return this.addNewStateTableEntry(tab, key, namespace, hash);
    }

    private StateTableEntry<K, N, S> removeEntry(K key, N namespace) {
        int hash = this.computeHashForOperationAndDoIncrementalRehash(key, namespace);
        StateTableEntry<K, N, S>[] tab = this.selectActiveTable(hash);
        int index = hash & tab.length - 1;
        StateTableEntry<K, N, S> e = tab[index];
        StateTableEntry<K, N, S> prev = null;
        while (e != null) {
            if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
                if (prev == null) {
                    tab[index] = e.next;
                } else {
                    if (prev.entryVersion < this.highestRequiredSnapshotVersion) {
                        prev = this.handleChainedEntryCopyOnWrite(tab, index, prev);
                    }
                    prev.next = e.next;
                }
                ++this.modCount;
                if (tab == this.primaryTable) {
                    --this.primaryTableSize;
                } else {
                    --this.incrementalRehashTableSize;
                }
                return e;
            }
            prev = e;
            e = e.next;
        }
        return null;
    }

    private void checkKeyNamespacePreconditions(K key, N namespace) {
        Preconditions.checkNotNull(key, (String)"No key set. This method should not be called outside of a keyed context.");
        if (this.usingNamespace) {
            Preconditions.checkNotNull(namespace, (String)"Provided namespace is null.");
        }
    }

    @Override
    public Iterator<Map.Entry<K, S>> entryIterator() {
        return new Iterator<Map.Entry<K, S>>(){
            private final StateEntryIterator iterator;
            {
                this.iterator = new StateEntryIterator();
            }

            @Override
            public boolean hasNext() {
                return this.iterator.hasNext();
            }

            @Override
            public Map.Entry<K, S> next() {
                Object stateEntry = this.iterator.next();
                return new Map.Entry<K, S>((StateTableEntry)stateEntry){
                    final /* synthetic */ StateTableEntry val$stateEntry;
                    {
                        this.val$stateEntry = stateTableEntry;
                    }

                    @Override
                    public K getKey() {
                        return this.val$stateEntry.getKey();
                    }

                    @Override
                    public S getValue() {
                        return this.val$stateEntry.getState();
                    }

                    @Override
                    public S setValue(S value) {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    @Override
    public Iterator<N> namespaceIterator(final K key) {
        if (!this.usingNamespace) {
            throw new UnsupportedOperationException("This method should be called with namespace supported");
        }
        final HashMap results = new HashMap();
        StateEntryIterator iterator = new StateEntryIterator();
        while (iterator.hasNext()) {
            Object stateTableEntry = iterator.next();
            if (!((StateTableEntry)stateTableEntry).key.equals(key)) continue;
            results.put(((StateTableEntry)stateTableEntry).namespace, ((StateTableEntry)stateTableEntry).state);
        }
        return new Iterator<N>(){
            private final K k;
            private final Iterator<N> iterator;
            private N namespace;
            {
                this.k = key;
                this.iterator = results.keySet().iterator();
            }

            @Override
            public boolean hasNext() {
                return this.iterator.hasNext();
            }

            @Override
            public N next() {
                this.namespace = this.iterator.next();
                return this.namespace;
            }

            @Override
            public void remove() {
                this.iterator.remove();
                CopyOnWriteStateTable.this.remove(this.k, this.namespace);
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void releaseSnapshot(int snapshotVersion) {
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            Preconditions.checkState((boolean)this.snapshotVersions.remove(snapshotVersion), (Object)"Attempt to release unknown snapshot version");
            this.highestRequiredSnapshotVersion = this.snapshotVersions.isEmpty() ? 0 : this.snapshotVersions.last();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    StateTableEntry<K, N, S>[] snapshotTableArrays() {
        TreeSet<Integer> treeSet = this.snapshotVersions;
        synchronized (treeSet) {
            if (++this.stateTableVersion < 0) {
                throw new IllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");
            }
            this.highestRequiredSnapshotVersion = this.stateTableVersion;
            this.snapshotVersions.add(this.highestRequiredSnapshotVersion);
        }
        StateTableEntry<K, N, S>[] table = this.primaryTable;
        if (this.isRehashing()) {
            int localRehashIndex = this.rehashIndex;
            int localCopyLength = table.length - localRehashIndex;
            StateTableEntry[] copy = new StateTableEntry[localRehashIndex + table.length];
            System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
            table = this.incrementalRehashTable;
            System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
            System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);
            return copy;
        }
        return Arrays.copyOf(table, table.length);
    }

    private StateTableEntry<K, N, S>[] makeTable(int newCapacity) {
        if (0x40000000 == newCapacity) {
            LOG.warn("Maximum capacity of 2^30 in StateTable reached. Cannot increase hash table size. This can lead to more collisions and lower performance. Please consider scaling-out your job or using a different keyed state backend implementation!");
        }
        this.threshold = (newCapacity >> 1) + (newCapacity >> 2);
        StateTableEntry[] newTable = new StateTableEntry[newCapacity];
        return newTable;
    }

    private StateTableEntry<K, N, S> addNewStateTableEntry(StateTableEntry<K, N, S>[] table, K key, N namespace, int hash) {
        if (namespace.equals(this.lastNamespace)) {
            namespace = this.lastNamespace;
        } else {
            this.lastNamespace = namespace;
        }
        int index = hash & table.length - 1;
        StateTableEntry<K, N, Object> newEntry = new StateTableEntry<K, N, Object>(key, namespace, null, hash, table[index], this.stateTableVersion, this.stateTableVersion);
        table[index] = newEntry;
        if (table == this.primaryTable) {
            ++this.primaryTableSize;
        } else {
            ++this.incrementalRehashTableSize;
        }
        return newEntry;
    }

    private StateTableEntry<K, N, S>[] selectActiveTable(int hashCode2) {
        return (hashCode2 & this.primaryTable.length - 1) >= this.rehashIndex ? this.primaryTable : this.incrementalRehashTable;
    }

    private void doubleCapacity() {
        Preconditions.checkState((!this.isRehashing() ? 1 : 0) != 0, (Object)"There is already a rehash in progress.");
        StateTableEntry<K, N, S>[] oldTable = this.primaryTable;
        int oldCapacity = oldTable.length;
        if (oldCapacity == 0x40000000) {
            return;
        }
        this.incrementalRehashTable = this.makeTable(oldCapacity * 2);
    }

    @VisibleForTesting
    boolean isRehashing() {
        return EMPTY_TABLE != this.incrementalRehashTable;
    }

    private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {
        this.checkKeyNamespacePreconditions(key, namespace);
        if (this.isRehashing()) {
            this.incrementalRehash();
        }
        return CopyOnWriteStateTable.compositeHash(key, namespace);
    }

    private void incrementalRehash() {
        StateTableEntry<K, N, S>[] oldTable = this.primaryTable;
        StateTableEntry<K, N, S>[] newTable = this.incrementalRehashTable;
        int oldCapacity = oldTable.length;
        int newMask = newTable.length - 1;
        int requiredVersion = this.highestRequiredSnapshotVersion;
        int rhIdx = this.rehashIndex;
        int transferred = 0;
        while (transferred < 4) {
            StateTableEntry<K, N, S> e = oldTable[rhIdx];
            while (e != null) {
                if (e.entryVersion < requiredVersion) {
                    e = new StateTableEntry<K, N, S>(e, this.stateTableVersion);
                }
                StateTableEntry n = e.next;
                int pos = e.hash & newMask;
                e.next = newTable[pos];
                newTable[pos] = e;
                e = n;
                ++transferred;
            }
            oldTable[rhIdx] = null;
            if (++rhIdx != oldCapacity) continue;
            this.primaryTable = newTable;
            this.incrementalRehashTable = EMPTY_TABLE;
            this.primaryTableSize += this.incrementalRehashTableSize;
            this.incrementalRehashTableSize = 0;
            this.rehashIndex = 0;
            return;
        }
        this.primaryTableSize -= transferred;
        this.incrementalRehashTableSize += transferred;
        this.rehashIndex = rhIdx;
    }

    private StateTableEntry<K, N, S> handleChainedEntryCopyOnWrite(StateTableEntry<K, N, S>[] tab, int tableIdx, StateTableEntry<K, N, S> untilEntry) {
        StateTableEntry<K, N, S> copy;
        int required = this.highestRequiredSnapshotVersion;
        StateTableEntry<K, N, S> current = tab[tableIdx];
        if (current.entryVersion < required) {
            tab[tableIdx] = copy = new StateTableEntry<K, N, S>(current, this.stateTableVersion);
        } else {
            copy = current;
        }
        while (current != untilEntry) {
            current = current.next;
            if (current.entryVersion < required) {
                copy.next = new StateTableEntry<K, N, S>(current, this.stateTableVersion);
                copy = copy.next;
                continue;
            }
            copy = current;
        }
        return copy;
    }

    private static <K, N, S> StateTableEntry<K, N, S> getBootstrapEntry() {
        return ITERATOR_BOOTSTRAP_ENTRY;
    }

    private static int compositeHash(Object key, Object namespace) {
        return MathUtils.bitMix((int)(key.hashCode() ^ namespace.hashCode()));
    }

    int getStateTableVersion() {
        return this.stateTableVersion;
    }

    @Override
    public CopyOnWriteStateTableSnapshot<K, N, S> createSnapshot() {
        return new CopyOnWriteStateTableSnapshot(this);
    }

    @Override
    public int sizeOfNamespace(Object namespace) {
        Preconditions.checkState((boolean)this.isUsingNamespace());
        Preconditions.checkNotNull((Object)namespace);
        int count = 0;
        for (StateEntry<K, N, S> entry : this) {
            if (null == entry || !Objects.equals(namespace, entry.getNamespace())) continue;
            ++count;
        }
        return count;
    }

    void releaseSnapshot(CopyOnWriteStateTableSnapshot<K, N, S> snapshotToRelease) {
        Preconditions.checkArgument((boolean)snapshotToRelease.isOwner(this), (Object)"Cannot release snapshot which is owned by a different state table.");
        this.releaseSnapshot(snapshotToRelease.getSnapshotVersion());
    }

    @Override
    public Iterator<StateEntry<K, N, S>> iterator() {
        return new StateEntryIterator();
    }

    class StateEntryIterator
    implements Iterator<StateEntry<K, N, S>> {
        private StateTableEntry<K, N, S>[] activeTable;
        private int nextTablePosition;
        private StateTableEntry<K, N, S> nextEntry;
        private int expectedModCount;

        StateEntryIterator() {
            this.activeTable = CopyOnWriteStateTable.this.primaryTable;
            this.nextTablePosition = 0;
            this.expectedModCount = CopyOnWriteStateTable.this.modCount;
            this.nextEntry = CopyOnWriteStateTable.getBootstrapEntry();
            this.advanceIterator();
        }

        private StateTableEntry<K, N, S> advanceIterator() {
            StateTableEntry entryToReturn = this.nextEntry;
            StateTableEntry next = entryToReturn.next;
            while (next == null) {
                StateTableEntry<K, N, S>[] tab = this.activeTable;
                while (this.nextTablePosition < tab.length) {
                    if ((next = tab[this.nextTablePosition++]) == null) continue;
                    this.nextEntry = next;
                    return entryToReturn;
                }
                if (this.activeTable == CopyOnWriteStateTable.this.incrementalRehashTable) break;
                this.activeTable = CopyOnWriteStateTable.this.incrementalRehashTable;
                this.nextTablePosition = 0;
            }
            this.nextEntry = next;
            return entryToReturn;
        }

        @Override
        public boolean hasNext() {
            return this.nextEntry != null;
        }

        @Override
        public StateTableEntry<K, N, S> next() {
            if (CopyOnWriteStateTable.this.modCount != this.expectedModCount) {
                throw new ConcurrentModificationException();
            }
            if (this.nextEntry == null) {
                throw new NoSuchElementException();
            }
            return this.advanceIterator();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Read-only iterator");
        }
    }

    static class StateTableEntry<K, N, S>
    implements StateEntry<K, N, S> {
        final K key;
        final N namespace;
        S state;
        StateTableEntry<K, N, S> next;
        int entryVersion;
        int stateVersion;
        final int hash;

        StateTableEntry() {
            this(null, null, null, 0, null, 0, 0);
        }

        StateTableEntry(StateTableEntry<K, N, S> other, int entryVersion) {
            this(other.key, other.namespace, other.state, other.hash, other.next, entryVersion, other.stateVersion);
        }

        StateTableEntry(K key, N namespace, S state, int hash, StateTableEntry<K, N, S> next, int entryVersion, int stateVersion) {
            this.key = key;
            this.namespace = namespace;
            this.hash = hash;
            this.next = next;
            this.entryVersion = entryVersion;
            this.state = state;
            this.stateVersion = stateVersion;
        }

        public final void setState(S value, int mapVersion) {
            if (value != this.state) {
                this.state = value;
                this.stateVersion = mapVersion;
            }
        }

        @Override
        public K getKey() {
            return this.key;
        }

        @Override
        public N getNamespace() {
            return this.namespace;
        }

        @Override
        public S getState() {
            return this.state;
        }

        public final boolean equals(Object o) {
            if (!(o instanceof StateTableEntry)) {
                return false;
            }
            StateEntry e = (StateEntry)o;
            return e.getKey().equals(this.key) && e.getNamespace().equals(this.namespace) && Objects.equals(e.getState(), this.state);
        }

        public final int hashCode() {
            return this.key.hashCode() ^ this.namespace.hashCode() ^ Objects.hashCode(this.state);
        }

        public final String toString() {
            return "(" + this.key + "|" + this.namespace + ")=" + this.state;
        }
    }
}

