package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.StateAccessException;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBFullRestoreOperation.class */
public class RocksDBFullRestoreOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBFullRestoreOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final Map<Integer, String> id2StateName = new HashMap();
    private StreamCompressionDecorator keyGroupStreamCompressionDecorator;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBFullRestoreOperation(RocksDBInternalStateBackend rocksDBInternalStateBackend) {
        this.stateBackend = rocksDBInternalStateBackend;
    }

    public void restore(Collection<KeyedStateHandle> collection) throws Exception {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.stateBackend.createDB();
        Iterator<KeyedStateHandle> it = collection.iterator();
        while (it.hasNext()) {
            KeyGroupsStateSnapshot keyGroupsStateSnapshot = (KeyedStateHandle) it.next();
            if (keyGroupsStateSnapshot != null) {
                Preconditions.checkState(keyGroupsStateSnapshot instanceof KeyGroupsStateSnapshot);
                KeyGroupsStateSnapshot keyGroupsStateSnapshot2 = keyGroupsStateSnapshot;
                StreamStateHandle snapshotHandle = keyGroupsStateSnapshot2.getSnapshotHandle();
                if (snapshotHandle == null) {
                    continue;
                } else {
                    FSDataInputStream openInputStream = snapshotHandle.openInputStream();
                    try {
                        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(openInputStream);
                        InternalBackendSerializationProxy internalBackendSerializationProxy = new InternalBackendSerializationProxy(this.stateBackend.getUserClassLoader(), false);
                        internalBackendSerializationProxy.read(dataInputViewStreamWrapper);
                        this.keyGroupStreamCompressionDecorator = internalBackendSerializationProxy.isUsingKeyGroupCompression() ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
                        for (StateMetaInfoSnapshot stateMetaInfoSnapshot : internalBackendSerializationProxy.getKeyedStateMetaSnapshots()) {
                            String name = stateMetaInfoSnapshot.getName();
                            this.stateBackend.getRestoredKvStateMetaInfos().put(name, stateMetaInfoSnapshot);
                            this.stateBackend.getRegisteredStateMetaInfos().put(name, RegisteredStateMetaInfo.createKeyedStateMetaInfo(stateMetaInfoSnapshot));
                        }
                        for (StateMetaInfoSnapshot stateMetaInfoSnapshot2 : internalBackendSerializationProxy.getSubKeyedStateMetaSnapshots()) {
                            String name2 = stateMetaInfoSnapshot2.getName();
                            this.stateBackend.getRestoredKvStateMetaInfos().put(name2, stateMetaInfoSnapshot2);
                            this.stateBackend.getRegisteredStateMetaInfos().put(name2, RegisteredStateMetaInfo.createSubKeyedStateMetaInfo(stateMetaInfoSnapshot2));
                        }
                        int readInt = dataInputViewStreamWrapper.readInt();
                        for (int i = 0; i < readInt; i++) {
                            this.id2StateName.put((Integer) InstantiationUtil.deserializeObject(openInputStream, this.stateBackend.getUserClassLoader()), (String) InstantiationUtil.deserializeObject(openInputStream, this.stateBackend.getUserClassLoader()));
                        }
                        this.stateBackend.registerAllStates();
                        restoreData(keyGroupsStateSnapshot2.getMetaInfos(), openInputStream);
                        if (openInputStream != null) {
                            try {
                                openInputStream.close();
                            } catch (Exception e) {
                                LOG.warn("Could not properly close the input stream.", e);
                            }
                        }
                    } catch (Throwable th) {
                        if (openInputStream != null) {
                            try {
                                openInputStream.close();
                            } catch (Exception e2) {
                                LOG.warn("Could not properly close the input stream.", e2);
                            }
                        }
                        throw th;
                    }
                }
            }
        }
        LOG.info("Full Restored with RocksDB state backend using {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void restoreData(Map<Integer, Tuple2<Long, Integer>> map, FSDataInputStream fSDataInputStream) throws IOException {
        Iterator it = this.stateBackend.getKeyGroupRange().iterator();
        while (it.hasNext()) {
            Tuple2<Long, Integer> tuple2 = map.get(Integer.valueOf(((Integer) it.next()).intValue()));
            if (tuple2 != null) {
                long longValue = tuple2.f0.longValue();
                int intValue = tuple2.f1.intValue();
                fSDataInputStream.seek(longValue);
                if (intValue != 0) {
                    InputStream decorateWithCompression = this.keyGroupStreamCompressionDecorator.decorateWithCompression(fSDataInputStream);
                    Throwable th = null;
                    try {
                        try {
                            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(decorateWithCompression);
                            for (int i = 0; i < intValue; i++) {
                                Integer deserialize = IntSerializer.INSTANCE.deserialize((DataInputView) dataInputViewStreamWrapper);
                                String str = this.id2StateName.get(deserialize);
                                Preconditions.checkNotNull(str, "Unexpected state name for the id: " + deserialize);
                                try {
                                    this.stateBackend.getDbInstance().put(this.stateBackend.getOrCreateColumnFamily(str), BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView) dataInputViewStreamWrapper), BytePrimitiveArraySerializer.INSTANCE.deserialize((DataInputView) dataInputViewStreamWrapper));
                                } catch (RocksDBException e) {
                                    throw new StateAccessException(e);
                                }
                            }
                            if (decorateWithCompression != null) {
                                if (0 != 0) {
                                    try {
                                        decorateWithCompression.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    decorateWithCompression.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (decorateWithCompression != null) {
                            if (th != null) {
                                try {
                                    decorateWithCompression.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                decorateWithCompression.close();
                            }
                        }
                        throw th3;
                    }
                } else {
                    continue;
                }
            }
        }
    }
}
