package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.InternalBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.types.Pair;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBFullSnapshotOperation.class */
public class RocksDBFullSnapshotOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBFullSnapshotOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final long checkpointId;
    private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
    private final CloseableRegistry snapshotCloseableRegistry;
    private final ResourceGuard.Lease dbLease;
    private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
    private DataOutputView outputView;
    private List<ColumnFamilyDescriptor> descriptors;
    private List<ColumnFamilyHandle> columnFamilyHandles;
    private Map<Integer, Tuple2<Long, Integer>> metaInfo;
    private SnapshotDirectory snapshotDirectory;
    private final Map<String, Integer> stateName2Id = new HashMap();
    private List<StateMetaInfoSnapshot> keyedStateMetaInfos = new ArrayList();
    private List<StateMetaInfoSnapshot> subKeyedStateMetaInfos = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBFullSnapshotOperation(RocksDBInternalStateBackend rocksDBInternalStateBackend, long j, SupplierWithException<CheckpointStreamWithResultProvider, Exception> supplierWithException, CloseableRegistry closeableRegistry) throws IOException {
        this.stateBackend = rocksDBInternalStateBackend;
        this.checkpointId = j;
        this.checkpointStreamSupplier = supplierWithException;
        this.snapshotCloseableRegistry = closeableRegistry;
        this.dbLease = rocksDBInternalStateBackend.rocksDBResourceGuard.acquireResource();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void takeDBSnapShot() throws IOException, RocksDBException {
        Preconditions.checkArgument(this.snapshotDirectory == null, "Only one ongoing snapshot allowed!");
        Iterator it = this.stateBackend.getRegisteredStateMetaInfos().entrySet().iterator();
        while (it.hasNext()) {
            RegisteredStateMetaInfo registeredStateMetaInfo = (RegisteredStateMetaInfo) this.stateBackend.getRegisteredStateMetaInfos().get((String) ((Map.Entry) it.next()).getKey());
            if (registeredStateMetaInfo.getStateType().isKeyedState()) {
                this.keyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
            } else {
                this.subKeyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
            }
        }
        Map<String, Tuple2<ColumnFamilyHandle, ColumnFamilyDescriptor>> columnFamilyHandles = this.stateBackend.getColumnFamilyHandles();
        this.descriptors = new ArrayList(columnFamilyHandles.size() + 1);
        this.descriptors.add(this.stateBackend.getDefaultColumnFamilyDescriptor());
        this.columnFamilyHandles = new ArrayList(columnFamilyHandles.size() + 1);
        int i = 1;
        Iterator<StateMetaInfoSnapshot> it2 = this.keyedStateMetaInfos.iterator();
        while (it2.hasNext()) {
            String name = it2.next().getName();
            int i2 = i;
            i++;
            this.stateName2Id.put(name, Integer.valueOf(i2));
            this.descriptors.add(columnFamilyHandles.get(name).f1);
        }
        Iterator<StateMetaInfoSnapshot> it3 = this.subKeyedStateMetaInfos.iterator();
        while (it3.hasNext()) {
            String name2 = it3.next().getName();
            int i3 = i;
            i++;
            this.stateName2Id.put(name2, Integer.valueOf(i3));
            this.descriptors.add(columnFamilyHandles.get(name2).f1);
        }
        this.snapshotDirectory = SnapshotDirectory.temporary(new Path(this.stateBackend.getInstanceBasePath().getAbsolutePath(), "chk-" + this.checkpointId));
        LOG.info("Taking snapshot for RocksDB instance at {}.", this.snapshotDirectory.toString());
        this.stateBackend.takeDbSnapshot(this.snapshotDirectory.getDirectory().getPath());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void openCheckpointStream() throws Exception {
        Preconditions.checkArgument(this.checkpointStreamWithResultProvider == null, "Output stream for snapshot is already set.");
        this.checkpointStreamWithResultProvider = this.checkpointStreamSupplier.get();
        this.snapshotCloseableRegistry.registerCloseable(this.checkpointStreamWithResultProvider);
        this.outputView = new DataOutputViewStreamWrapper(this.checkpointStreamWithResultProvider.getCheckpointOutputStream());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeDBSnapshot() throws Exception {
        if (null == this.snapshotDirectory) {
            throw new IOException("No snapshot available. Might be released due to cancellation.");
        }
        Preconditions.checkNotNull(this.checkpointStreamWithResultProvider, "No output stream to write snapshot.");
        materializeMetaData();
        materializeKVStateData();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public SnapshotResult<KeyedStateHandle> getKeyGroupStateSnapshot() throws IOException {
        Preconditions.checkNotNull(this.metaInfo);
        SnapshotResult closeAndFinalizeCheckpointStreamResult = this.checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
        LOG.info("Successfully complete the snapshot of the states");
        KeyGroupsStateSnapshot keyGroupsStateSnapshot = new KeyGroupsStateSnapshot(this.stateBackend.getKeyGroupRange(), this.metaInfo, closeAndFinalizeCheckpointStreamResult.getJobManagerOwnedSnapshot());
        StreamStateHandle taskLocalSnapshot = closeAndFinalizeCheckpointStreamResult.getTaskLocalSnapshot();
        return taskLocalSnapshot != null ? SnapshotResult.withLocalState(keyGroupsStateSnapshot, new KeyGroupsStateSnapshot(this.stateBackend.getKeyGroupRange(), this.metaInfo, taskLocalSnapshot)) : SnapshotResult.of(keyGroupsStateSnapshot);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseSnapshotResources() {
        this.checkpointStreamWithResultProvider = null;
        try {
            this.snapshotDirectory.cleanup();
        } catch (IOException e) {
            LOG.warn("Fail to clean up the snapshot directory {}.", this.snapshotDirectory.getDirectory());
        }
        this.snapshotDirectory = null;
        this.dbLease.close();
    }

    private void materializeMetaData() throws Exception {
        new InternalBackendSerializationProxy(this.keyedStateMetaInfos, this.subKeyedStateMetaInfos, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.stateBackend.getKeyGroupCompressionDecorator())).write(this.outputView);
        this.outputView.writeInt(this.stateName2Id.size());
        for (Map.Entry<String, Integer> entry : this.stateName2Id.entrySet()) {
            InstantiationUtil.serializeObject(this.checkpointStreamWithResultProvider.getCheckpointOutputStream(), entry.getKey());
            InstantiationUtil.serializeObject(this.checkpointStreamWithResultProvider.getCheckpointOutputStream(), entry.getValue());
        }
    }

    private void materializeKVStateData() throws IOException, RocksDBException {
        CheckpointStreamFactory.CheckpointStateOutputStream checkpointOutputStream = this.checkpointStreamWithResultProvider.getCheckpointOutputStream();
        this.metaInfo = new HashMap();
        RocksDB open = RocksDB.open(this.snapshotDirectory.getDirectory().getPath(), this.descriptors, this.columnFamilyHandles);
        WriteOptions disableWAL = new WriteOptions().setDisableWAL(true);
        try {
            Iterator it = this.stateBackend.getKeyGroupRange().iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                long pos = checkpointOutputStream.getPos();
                int i = 0;
                byte[] groupPrefix = getGroupPrefix(intValue);
                OutputStream decorateWithCompression = this.stateBackend.getKeyGroupCompressionDecorator().decorateWithCompression(checkpointOutputStream);
                Throwable th = null;
                try {
                    try {
                        DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(decorateWithCompression);
                        for (int i2 = 1; i2 < this.columnFamilyHandles.size(); i2++) {
                            RocksDBStoragePrefixIterator rocksDBStoragePrefixIterator = new RocksDBStoragePrefixIterator(new RocksDBStorageInstance(open, this.columnFamilyHandles.get(i2), disableWAL), groupPrefix);
                            while (rocksDBStoragePrefixIterator.hasNext()) {
                                Pair<byte[], byte[]> m3831next = rocksDBStoragePrefixIterator.m3831next();
                                IntSerializer.INSTANCE.serialize(Integer.valueOf(i2), (DataOutputView) dataOutputViewStreamWrapper);
                                BytePrimitiveArraySerializer.INSTANCE.serialize(m3831next.getKey(), (DataOutputView) dataOutputViewStreamWrapper);
                                BytePrimitiveArraySerializer.INSTANCE.serialize(m3831next.getValue(), (DataOutputView) dataOutputViewStreamWrapper);
                                i++;
                            }
                        }
                        if (decorateWithCompression != null) {
                            if (0 != 0) {
                                try {
                                    decorateWithCompression.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                decorateWithCompression.close();
                            }
                        }
                        if (i != 0) {
                            this.metaInfo.put(Integer.valueOf(intValue), new Tuple2<>(Long.valueOf(pos), Integer.valueOf(i)));
                        }
                    } finally {
                    }
                } finally {
                }
            }
        } finally {
            IOUtils.closeQuietly(disableWAL);
            Iterator<ColumnFamilyHandle> it2 = this.columnFamilyHandles.iterator();
            while (it2.hasNext()) {
                IOUtils.closeQuietly(it2.next());
            }
            IOUtils.closeQuietly(open);
        }
    }

    private byte[] getGroupPrefix(int i) throws IOException {
        ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos(3);
        Throwable th = null;
        try {
            try {
                StateSerializerUtil.writeGroup(byteArrayOutputStreamWithPos, i);
                byte[] byteArray = byteArrayOutputStreamWithPos.toByteArray();
                if (byteArrayOutputStreamWithPos != null) {
                    if (0 != 0) {
                        try {
                            byteArrayOutputStreamWithPos.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        byteArrayOutputStreamWithPos.close();
                    }
                }
                return byteArray;
            } finally {
            }
        } catch (Throwable th3) {
            if (byteArrayOutputStreamWithPos != null) {
                if (th != null) {
                    try {
                        byteArrayOutputStreamWithPos.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    byteArrayOutputStreamWithPos.close();
                }
            }
            throw th3;
        }
    }
}
