/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.contrib.streaming.state;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.contrib.streaming.state.RocksDBInternalBackendSerializationProxy;
import org.apache.flink.contrib.streaming.state.RocksDBInternalStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBStorageInstance;
import org.apache.flink.contrib.streaming.state.RocksDBStoragePrefixIterator;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityQueueStateMetaInfoSnapshot;
import org.apache.flink.runtime.state.RegisteredStateMetaInfo;
import org.apache.flink.runtime.state.SnapshotDirectory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSerializerUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.function.SupplierWithException;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBFullSnapshotOperation {
    private static final Logger LOG = LoggerFactory.getLogger(RocksDBFullSnapshotOperation.class);
    private final RocksDBInternalStateBackend stateBackend;
    private final long checkpointId;
    private final SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier;
    private final CloseableRegistry snapshotCloseableRegistry;
    private final ResourceGuard.Lease dbLease;
    private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
    private DataOutputView outputView;
    private List<StateMetaInfoSnapshot> keyedStateMetaInfos;
    private List<StateMetaInfoSnapshot> subKeyedStateMetaInfos;
    private List<PriorityQueueStateMetaInfoSnapshot> priorityQueueMetaInfos;
    private List<ColumnFamilyDescriptor> descriptors;
    private List<ColumnFamilyHandle> columnFamilyHandles;
    private Map<Integer, Tuple2<Long, Integer>> metaInfo;
    private final Map<String, Integer> stateName2Id;
    private SnapshotDirectory snapshotDirectory;

    RocksDBFullSnapshotOperation(RocksDBInternalStateBackend stateBackend, long checkpointId, SupplierWithException<CheckpointStreamWithResultProvider, Exception> checkpointStreamSupplier, CloseableRegistry registry) throws IOException {
        this.stateBackend = stateBackend;
        this.checkpointId = checkpointId;
        this.checkpointStreamSupplier = checkpointStreamSupplier;
        this.snapshotCloseableRegistry = registry;
        this.dbLease = stateBackend.rocksDBResourceGuard.acquireResource();
        this.stateName2Id = new HashMap<String, Integer>();
        this.keyedStateMetaInfos = new ArrayList<StateMetaInfoSnapshot>();
        this.subKeyedStateMetaInfos = new ArrayList<StateMetaInfoSnapshot>();
        this.priorityQueueMetaInfos = new ArrayList<PriorityQueueStateMetaInfoSnapshot>();
    }

    /*
     * WARNING - void declaration
     */
    void takeDBSnapShot() throws IOException, RocksDBException {
        String stateName;
        RegisteredStateMetaInfo registeredStateMetaInfo;
        Preconditions.checkArgument(this.snapshotDirectory == null, "Only one ongoing snapshot allowed!");
        for (Map.Entry entry : this.stateBackend.getRegisteredStateMetaInfos().entrySet()) {
            String stateName2 = (String)entry.getKey();
            registeredStateMetaInfo = (RegisteredStateMetaInfo)this.stateBackend.getRegisteredStateMetaInfos().get(stateName2);
            if (registeredStateMetaInfo.getStateType().isKeyedState()) {
                this.keyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
                continue;
            }
            this.subKeyedStateMetaInfos.add(registeredStateMetaInfo.snapshot());
        }
        for (Map.Entry<Object, Object> entry : this.stateBackend.getPriorityQueueInformations().entrySet()) {
            Iterator<StateMetaInfoSnapshot> stateName2 = (String)entry.getKey();
            registeredStateMetaInfo = this.stateBackend.getPriorityQueueInformations().get(stateName2);
            this.priorityQueueMetaInfos.add(registeredStateMetaInfo.snapshot());
        }
        Map<String, Tuple2<ColumnFamilyHandle, ColumnFamilyDescriptor>> columnFamilyHandles = this.stateBackend.getColumnFamilyHandles();
        this.descriptors = new ArrayList<ColumnFamilyDescriptor>(columnFamilyHandles.size() + 1);
        this.descriptors.add(this.stateBackend.getDefaultColumnFamilyDescriptor());
        this.columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(columnFamilyHandles.size() + 1);
        boolean bl = true;
        for (StateMetaInfoSnapshot keyedStateMetaInfo : this.keyedStateMetaInfos) {
            void var2_6;
            stateName = keyedStateMetaInfo.getName();
            this.stateName2Id.put(stateName, (int)(++var2_6));
            this.descriptors.add((ColumnFamilyDescriptor)columnFamilyHandles.get((Object)stateName).f1);
        }
        for (StateMetaInfoSnapshot subKeyedStateMetaInfo : this.subKeyedStateMetaInfos) {
            void var2_7;
            stateName = subKeyedStateMetaInfo.getName();
            this.stateName2Id.put(stateName, (int)(++var2_7));
            this.descriptors.add((ColumnFamilyDescriptor)columnFamilyHandles.get((Object)stateName).f1);
        }
        for (PriorityQueueStateMetaInfoSnapshot pq : this.priorityQueueMetaInfos) {
            void var2_8;
            stateName = pq.getName();
            this.stateName2Id.put(stateName, (int)(++var2_8));
            this.descriptors.add((ColumnFamilyDescriptor)columnFamilyHandles.get((Object)stateName).f1);
        }
        Path path = new Path(this.stateBackend.getInstanceBasePath().getAbsolutePath(), "chk-" + this.checkpointId);
        this.snapshotDirectory = SnapshotDirectory.temporary((Path)path);
        LOG.info("Taking snapshot for RocksDB instance at {}.", (Object)this.snapshotDirectory.toString());
        this.stateBackend.takeDbSnapshot(this.snapshotDirectory.getDirectory().getPath());
    }

    void openCheckpointStream() throws Exception {
        Preconditions.checkArgument(this.checkpointStreamWithResultProvider == null, "Output stream for snapshot is already set.");
        this.checkpointStreamWithResultProvider = this.checkpointStreamSupplier.get();
        this.snapshotCloseableRegistry.registerCloseable(this.checkpointStreamWithResultProvider);
        this.outputView = new DataOutputViewStreamWrapper((OutputStream)this.checkpointStreamWithResultProvider.getCheckpointOutputStream());
    }

    void writeDBSnapshot() throws Exception {
        if (null == this.snapshotDirectory) {
            throw new IOException("No snapshot available. Might be released due to cancellation.");
        }
        Preconditions.checkNotNull(this.checkpointStreamWithResultProvider, "No output stream to write snapshot.");
        this.materializeMetaData();
        this.materializeKVStateData();
    }

    @Nonnull
    SnapshotResult<KeyedStateHandle> getKeyGroupStateSnapshot() throws IOException {
        Preconditions.checkNotNull(this.metaInfo);
        SnapshotResult snapshotResult = this.checkpointStreamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
        LOG.info("Successfully complete the snapshot of the states");
        StreamStateHandle snapshotHandle = (StreamStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        KeyGroupsStateSnapshot snapshot = new KeyGroupsStateSnapshot(this.stateBackend.getKeyGroupRange(), this.metaInfo, snapshotHandle);
        StreamStateHandle localSnapshotHandle = (StreamStateHandle)snapshotResult.getTaskLocalSnapshot();
        if (localSnapshotHandle != null) {
            KeyGroupsStateSnapshot localSnapshot = new KeyGroupsStateSnapshot(this.stateBackend.getKeyGroupRange(), this.metaInfo, localSnapshotHandle);
            return SnapshotResult.withLocalState((StateObject)snapshot, (StateObject)localSnapshot);
        }
        return SnapshotResult.of((StateObject)snapshot);
    }

    void releaseSnapshotResources() {
        this.checkpointStreamWithResultProvider = null;
        try {
            this.snapshotDirectory.cleanup();
        }
        catch (IOException e2) {
            LOG.warn("Fail to clean up the snapshot directory {}.", (Object)this.snapshotDirectory.getDirectory());
        }
        this.snapshotDirectory = null;
        this.dbLease.close();
    }

    private void materializeMetaData() throws Exception {
        RocksDBInternalBackendSerializationProxy backendSerializationProxy = new RocksDBInternalBackendSerializationProxy(this.keyedStateMetaInfos, this.subKeyedStateMetaInfos, this.priorityQueueMetaInfos, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, this.stateBackend.getKeyGroupCompressionDecorator()));
        backendSerializationProxy.write(this.outputView);
        this.outputView.writeInt(this.stateName2Id.size());
        for (Map.Entry<String, Integer> entry : this.stateName2Id.entrySet()) {
            InstantiationUtil.serializeObject((OutputStream)this.checkpointStreamWithResultProvider.getCheckpointOutputStream(), entry.getKey());
            InstantiationUtil.serializeObject((OutputStream)this.checkpointStreamWithResultProvider.getCheckpointOutputStream(), entry.getValue());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void materializeKVStateData() throws IOException, RocksDBException {
        CheckpointStreamFactory.CheckpointStateOutputStream outputStream = this.checkpointStreamWithResultProvider.getCheckpointOutputStream();
        this.metaInfo = new HashMap<Integer, Tuple2<Long, Integer>>();
        RocksDB db = RocksDB.open((String)this.snapshotDirectory.getDirectory().getPath(), this.descriptors, this.columnFamilyHandles);
        WriteOptions writeOptions = new WriteOptions().setDisableWAL(true);
        KeyGroupRange groups = this.stateBackend.getKeyGroupRange();
        try {
            Iterator<ColumnFamilyHandle> iterator = groups.iterator();
            while (iterator.hasNext()) {
                int group = (Integer)iterator.next();
                long offset = outputStream.getPos();
                int numEntries = 0;
                byte[] groupPrefix = this.getGroupPrefix(group);
                try (OutputStream kgOutStream = this.stateBackend.getKeyGroupCompressionDecorator().decorateWithCompression((OutputStream)outputStream);){
                    DataOutputViewStreamWrapper kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
                    for (int i = 1; i < this.columnFamilyHandles.size(); ++i) {
                        RocksDBStorageInstance storageInstance = new RocksDBStorageInstance(db, this.columnFamilyHandles.get(i), writeOptions);
                        RocksDBStoragePrefixIterator iterator2 = new RocksDBStoragePrefixIterator(storageInstance, groupPrefix);
                        while (iterator2.hasNext()) {
                            Object pair = iterator2.next();
                            IntSerializer.INSTANCE.serialize(i, (DataOutputView)kgOutView);
                            BytePrimitiveArraySerializer.INSTANCE.serialize((byte[])pair.getKey(), (DataOutputView)kgOutView);
                            BytePrimitiveArraySerializer.INSTANCE.serialize((byte[])pair.getValue(), (DataOutputView)kgOutView);
                            ++numEntries;
                        }
                    }
                }
                if (numEntries == 0) continue;
                this.metaInfo.put(group, new Tuple2<Long, Integer>(offset, numEntries));
            }
        }
        finally {
            IOUtils.closeQuietly((AutoCloseable)writeOptions);
            for (ColumnFamilyHandle handle : this.columnFamilyHandles) {
                IOUtils.closeQuietly((AutoCloseable)handle);
            }
            IOUtils.closeQuietly((AutoCloseable)db);
        }
    }

    private byte[] getGroupPrefix(int group) throws IOException {
        try (ByteArrayOutputStreamWithPos innerStream = new ByteArrayOutputStreamWithPos(3);){
            StateSerializerUtil.writeGroup((ByteArrayOutputStreamWithPos)innerStream, (int)group);
            byte[] byArray = innerStream.toByteArray();
            return byArray;
        }
    }
}

