/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBInternalStateBackend;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateAccessException;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBFullRestoreOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBFullRestoreOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final Map<Integer, String> id2StateName = new HashMap<Integer, String>();
    private StreamCompressionDecorator keyGroupStreamCompressionDecorator;

    RocksDBFullRestoreOperation(RocksDBInternalStateBackend stateBackend) {
        this.stateBackend = stateBackend;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restore(Collection<KeyedStateHandle> restoredSnapshots) throws Exception {
        if (restoredSnapshots == null || restoredSnapshots.isEmpty()) {
            return;
        }
        long startMills = System.currentTimeMillis();
        this.stateBackend.createDB();
        for (KeyedStateHandle rawSnapshot : restoredSnapshots) {
            if (rawSnapshot == null) continue;
            Preconditions.checkState(rawSnapshot instanceof KeyGroupsStateSnapshot);
            KeyGroupsStateSnapshot snapshot = (KeyGroupsStateSnapshot)rawSnapshot;
            StreamStateHandle snapshotHandle = snapshot.getSnapshotHandle();
            if (snapshotHandle == null) continue;
            FSDataInputStream inputStream = snapshotHandle.openInputStream();
            try {
                String stateName;
                Object keyedStateMetaSnapshot2;
                DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream);
                InternalBackendSerializationProxy serializationProxy = new InternalBackendSerializationProxy(this.stateBackend.getUserClassLoader(), false);
                serializationProxy.read((DataInputView)inputView);
                this.keyGroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                List keyedStateMetaInfos = serializationProxy.getKeyedStateMetaSnapshots();
                for (Object keyedStateMetaSnapshot2 : keyedStateMetaInfos) {
                    String stateName2 = keyedStateMetaSnapshot2.getName();
                    this.stateBackend.getRestoredKvStateMetaInfos().put(stateName2, keyedStateMetaSnapshot2);
                    RegisteredStateMetaInfo keyedStateMetaInfo = RegisteredStateMetaInfo.createKeyedStateMetaInfo((StateMetaInfoSnapshot)keyedStateMetaSnapshot2);
                    this.stateBackend.getRegisteredStateMetaInfos().put(stateName2, keyedStateMetaInfo);
                }
                List subKeyedStateMetaInfos = serializationProxy.getSubKeyedStateMetaSnapshots();
                keyedStateMetaSnapshot2 = subKeyedStateMetaInfos.iterator();
                while (keyedStateMetaSnapshot2.hasNext()) {
                    StateMetaInfoSnapshot subKeyedStateMetaSnapshot = (StateMetaInfoSnapshot)keyedStateMetaSnapshot2.next();
                    stateName = subKeyedStateMetaSnapshot.getName();
                    this.stateBackend.getRestoredKvStateMetaInfos().put(stateName, subKeyedStateMetaSnapshot);
                    RegisteredStateMetaInfo subKeyedStateMetaInfo = RegisteredStateMetaInfo.createSubKeyedStateMetaInfo((StateMetaInfoSnapshot)subKeyedStateMetaSnapshot);
                    this.stateBackend.getRegisteredStateMetaInfos().put(stateName, subKeyedStateMetaInfo);
                }
                int numStates = inputView.readInt();
                for (int i = 0; i < numStates; ++i) {
                    stateName = (String)InstantiationUtil.deserializeObject(inputStream, this.stateBackend.getUserClassLoader());
                    Integer id = (Integer)InstantiationUtil.deserializeObject(inputStream, this.stateBackend.getUserClassLoader());
                    this.id2StateName.put(id, stateName);
                }
                this.stateBackend.registerAllStates();
                Map metaInfos = snapshot.getMetaInfos();
                this.restoreData(metaInfos, inputStream);
            }
            finally {
                if (inputStream == null) continue;
                try {
                    inputStream.close();
                }
                catch (Exception e2) {
                    LOG.warn("Could not properly close the input stream.", (Throwable)e2);
                }
            }
        }
        long endMills = System.currentTimeMillis();
        LOG.info("Full Restored with RocksDB state backend using {} ms.", (Object)(endMills - startMills));
    }

    private void restoreData(Map<Integer, Tuple2<Long, Integer>> metaInfos, FSDataInputStream inputStream) throws IOException {
        Iterator iterator = this.stateBackend.getKeyGroupRange().iterator();
        while (iterator.hasNext()) {
            int group = (Integer)iterator.next();
            Tuple2<Long, Integer> metaInfo = metaInfos.get(group);
            if (metaInfo == null) continue;
            long offset = (Long)metaInfo.f0;
            int numEntries = (Integer)metaInfo.f1;
            inputStream.seek(offset);
            if (numEntries == 0) continue;
            InputStream compressedKgIn = this.keyGroupStreamCompressionDecorator.decorateWithCompression((InputStream)inputStream);
            Throwable throwable = null;
            try {
                DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
                for (int i = 0; i < numEntries; ++i) {
                    Integer id = IntSerializer.INSTANCE.deserialize(compressedKgInputView);
                    String cfNameStr = this.id2StateName.get(id);
                    Preconditions.checkNotNull(cfNameStr, "Unexpected state name for the id: " + id);
                    ColumnFamilyHandle columnFamilyHandle = this.stateBackend.getOrCreateColumnFamily(cfNameStr);
                    byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
                    byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
                    try {
                        this.stateBackend.getDbInstance().put(columnFamilyHandle, key, value);
                        continue;
                    }
                    catch (RocksDBException e2) {
                        throw new StateAccessException((Throwable)e2);
                    }
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (compressedKgIn == null) continue;
                if (throwable != null) {
                    try {
                        compressedKgIn.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                compressedKgIn.close();
            }
        }
    }
}

