/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.savepoint;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV3;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.IncrementalKeyedStateSnapshot;
import org.apache.flink.runtime.state.IncrementalSegmentStateSnapshot;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateSnapshot;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileSegmentStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.Preconditions;

class SavepointV3Serializer
implements SavepointSerializer<SavepointV3> {
    private static final int MASTER_STATE_MAGIC_NUMBER = -915728746;
    private static final byte NULL_HANDLE = 0;
    private static final byte BYTE_STREAM_STATE_HANDLE = 1;
    private static final byte FILE_STREAM_STATE_HANDLE = 2;
    private static final byte KEY_GROUPS_HANDLE = 3;
    private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
    private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
    private static final byte KEY_GROUP_STATE_SNAPSHOT = 6;
    private static final byte INCREMENTAL_KEYE_STATE_SNAPSHOT = 7;
    private static final byte FILE_SEGMENT_STREAM_STATE_HANDLE = 8;
    private static final byte INCREMENTAL_FILE_SEGMENT_STATE_SNAPSHOT = 9;
    public static final SavepointV3Serializer INSTANCE = new SavepointV3Serializer();

    private SavepointV3Serializer() {
    }

    @Override
    public void serialize(SavepointV3 checkpointMetadata, DataOutputStream dos) throws IOException {
        dos.writeLong(checkpointMetadata.getCheckpointId());
        Collection<MasterState> masterStates = checkpointMetadata.getMasterStates();
        dos.writeInt(masterStates.size());
        for (MasterState ms : masterStates) {
            this.serializeMasterState(ms, dos);
        }
        Collection<OperatorState> operatorStates = checkpointMetadata.getOperatorStates();
        dos.writeInt(operatorStates.size());
        for (OperatorState operatorState : operatorStates) {
            dos.writeLong(operatorState.getOperatorID().getLowerPart());
            dos.writeLong(operatorState.getOperatorID().getUpperPart());
            int parallelism = operatorState.getParallelism();
            dos.writeInt(parallelism);
            dos.writeInt(operatorState.getMaxParallelism());
            dos.writeInt(1);
            Map<Integer, OperatorSubtaskState> subtaskStateMap = operatorState.getSubtaskStates();
            dos.writeInt(subtaskStateMap.size());
            for (Map.Entry<Integer, OperatorSubtaskState> entry : subtaskStateMap.entrySet()) {
                dos.writeInt(entry.getKey());
                SavepointV3Serializer.serializeSubtaskState(entry.getValue(), dos);
            }
        }
    }

    @Override
    public SavepointV3 deserialize(DataInputStream dis, ClassLoader cl) throws IOException {
        List<MasterState> masterStates;
        long checkpointId = dis.readLong();
        if (checkpointId < 0L) {
            throw new IOException("invalid checkpoint ID: " + checkpointId);
        }
        int numMasterStates = dis.readInt();
        if (numMasterStates == 0) {
            masterStates = Collections.emptyList();
        } else if (numMasterStates > 0) {
            masterStates = new ArrayList(numMasterStates);
            for (int i = 0; i < numMasterStates; ++i) {
                masterStates.add(this.deserializeMasterState(dis));
            }
        } else {
            throw new IOException("invalid number of master states: " + numMasterStates);
        }
        int numTaskStates = dis.readInt();
        ArrayList<OperatorState> operatorStates = new ArrayList<OperatorState>(numTaskStates);
        for (int i = 0; i < numTaskStates; ++i) {
            OperatorID jobVertexId = new OperatorID(dis.readLong(), dis.readLong());
            int parallelism = dis.readInt();
            int maxParallelism = dis.readInt();
            int chainLength = dis.readInt();
            OperatorState taskState = new OperatorState(jobVertexId, parallelism, maxParallelism);
            operatorStates.add(taskState);
            int numSubTaskStates = dis.readInt();
            for (int j = 0; j < numSubTaskStates; ++j) {
                int subtaskIndex = dis.readInt();
                OperatorSubtaskState subtaskState = SavepointV3Serializer.deserializeSubtaskState(dis);
                taskState.putState(subtaskIndex, subtaskState);
            }
        }
        return new SavepointV3(checkpointId, operatorStates, masterStates);
    }

    private void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
        dos.writeInt(-915728746);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(baos);
        out.writeInt(state.version());
        out.writeUTF(state.name());
        byte[] bytes = state.bytes();
        out.writeInt(bytes.length);
        out.write(bytes, 0, bytes.length);
        out.close();
        byte[] data = baos.toByteArray();
        dos.writeInt(data.length);
        dos.write(data, 0, data.length);
    }

    private MasterState deserializeMasterState(DataInputStream dis) throws IOException {
        int magicNumber = dis.readInt();
        if (magicNumber != -915728746) {
            throw new IOException("incorrect magic number in master styte byte sequence");
        }
        int numBytes = dis.readInt();
        if (numBytes <= 0) {
            throw new IOException("found zero or negative length for master state bytes");
        }
        byte[] data = new byte[numBytes];
        dis.readFully(data);
        DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
        int version = in.readInt();
        String name = in.readUTF();
        byte[] bytes = new byte[in.readInt()];
        in.readFully(bytes);
        if (in.read() != -1) {
            throw new IOException("found trailing bytes in master state");
        }
        return new MasterState(name, bytes, version);
    }

    private static <T> T extractSingleton(Collection<T> collection) {
        if (collection == null || collection.isEmpty()) {
            return null;
        }
        if (collection.size() == 1) {
            return collection.iterator().next();
        }
        throw new IllegalStateException("Expected singleton collection, but found size: " + collection.size());
    }

    private static void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
        OperatorStateHandle operatorStateFromStream;
        dos.writeLong(-1L);
        int len = 0;
        dos.writeInt(len);
        OperatorStateHandle operatorStateBackend = SavepointV3Serializer.extractSingleton(subtaskState.getManagedOperatorState());
        len = operatorStateBackend != null ? 1 : 0;
        dos.writeInt(len);
        if (len == 1) {
            SavepointV3Serializer.serializeOperatorStateHandle(operatorStateBackend, dos);
        }
        len = (operatorStateFromStream = SavepointV3Serializer.extractSingleton(subtaskState.getRawOperatorState())) != null ? 1 : 0;
        dos.writeInt(len);
        if (len == 1) {
            SavepointV3Serializer.serializeOperatorStateHandle(operatorStateFromStream, dos);
        }
        KeyedStateHandle keyedStateBackend = SavepointV3Serializer.extractSingleton(subtaskState.getManagedKeyedState());
        SavepointV3Serializer.serializeKeyedStateBackend(keyedStateBackend, dos);
        KeyedStateHandle keyedStateStream = SavepointV3Serializer.extractSingleton(subtaskState.getRawKeyedState());
        SavepointV3Serializer.serializeRawKeyedStateHandle(keyedStateStream, dos);
    }

    private static OperatorSubtaskState deserializeSubtaskState(DataInputStream dis) throws IOException {
        long ignoredDuration = dis.readLong();
        int len = dis.readInt();
        if (SavepointSerializers.FAIL_WHEN_LEGACY_STATE_DETECTED) {
            Preconditions.checkState((len == 0 ? 1 : 0) != 0, (Object)"Legacy state (from Flink <= 1.1, created through the 'Checkpointed' interface) is no longer supported starting from Flink 1.4. Please rewrite your job to use 'CheckpointedFunction' instead!");
        } else {
            for (int i = 0; i < len; ++i) {
                SavepointV3Serializer.deserializeStreamStateHandle(dis);
            }
        }
        len = dis.readInt();
        OperatorStateHandle operatorStateBackend = len == 0 ? null : SavepointV3Serializer.deserializeOperatorStateHandle(dis);
        len = dis.readInt();
        OperatorStateHandle operatorStateStream = len == 0 ? null : SavepointV3Serializer.deserializeOperatorStateHandle(dis);
        KeyedStateHandle keyedStateBackend = SavepointV3Serializer.deserializeKeyedStateBackend(dis);
        KeyedStateHandle keyedStateStream = SavepointV3Serializer.deserializeRawKeyedStateHandle(dis);
        return new OperatorSubtaskState(operatorStateBackend, operatorStateStream, keyedStateBackend, keyedStateStream);
    }

    private static void serializeRawKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof KeyGroupsStateHandle) {
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)stateHandle;
            dos.writeByte(3);
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
            for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
                dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
            }
            SavepointV3Serializer.serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
        } else {
            throw new IllegalStateException("Unknown RawKeyedStateHandle type: " + stateHandle.getClass());
        }
    }

    private static void serializeKeyedStateBackend(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof KeyGroupsStateSnapshot) {
            KeyGroupsStateSnapshot keyGroupsStateSnapshot = (KeyGroupsStateSnapshot)stateHandle;
            dos.writeByte(6);
            dos.writeInt(keyGroupsStateSnapshot.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(keyGroupsStateSnapshot.getKeyGroupRange().getEndKeyGroup());
            Map<Integer, Tuple2<Long, Integer>> metaInfos = keyGroupsStateSnapshot.getMetaInfos();
            dos.writeInt(metaInfos.size());
            for (Map.Entry<Integer, Tuple2<Long, Integer>> entry : metaInfos.entrySet()) {
                dos.writeInt(entry.getKey());
                dos.writeLong((Long)entry.getValue().f0);
                dos.writeInt((Integer)entry.getValue().f1);
            }
            SavepointV3Serializer.serializeStreamStateHandle(keyGroupsStateSnapshot.getSnapshotHandle(), dos);
        } else if (stateHandle instanceof IncrementalKeyedStateSnapshot) {
            IncrementalKeyedStateSnapshot incrementSnapshot = (IncrementalKeyedStateSnapshot)stateHandle;
            if (stateHandle instanceof IncrementalSegmentStateSnapshot) {
                dos.writeByte(9);
            } else {
                dos.writeByte(7);
            }
            dos.writeInt(incrementSnapshot.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(incrementSnapshot.getKeyGroupRange().getEndKeyGroup());
            dos.writeLong(incrementSnapshot.getCheckpointId());
            SavepointV3Serializer.serializeStreamStateHandle(incrementSnapshot.getMetaStateHandle(), dos);
            dos.writeInt(incrementSnapshot.getSharedState().size());
            for (Map.Entry<StateHandleID, Tuple2<String, StreamStateHandle>> entry : incrementSnapshot.getSharedState().entrySet()) {
                dos.writeUTF(entry.getKey().toString());
                dos.writeUTF((String)entry.getValue().f0);
                SavepointV3Serializer.serializeStreamStateHandle((StreamStateHandle)entry.getValue().f1, dos);
            }
            SavepointV3Serializer.serializeStreamStateHandleMap(incrementSnapshot.getPrivateState(), dos);
        } else {
            throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
        }
    }

    private static void serializeStreamStateHandleMap(Map<StateHandleID, StreamStateHandle> map, DataOutputStream dos) throws IOException {
        dos.writeInt(map.size());
        for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) {
            dos.writeUTF(entry.getKey().toString());
            SavepointV3Serializer.serializeStreamStateHandle(entry.getValue(), dos);
        }
    }

    private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap(DataInputStream dis) throws IOException {
        int size = dis.readInt();
        HashMap<StateHandleID, StreamStateHandle> result = new HashMap<StateHandleID, StreamStateHandle>(size);
        for (int i = 0; i < size; ++i) {
            StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
            StreamStateHandle stateHandle = SavepointV3Serializer.deserializeStreamStateHandle(dis);
            result.put(stateHandleID, stateHandle);
        }
        return result;
    }

    private static KeyedStateHandle deserializeRawKeyedStateHandle(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (3 == type) {
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            long[] offsets = new long[numKeyGroups];
            for (int i = 0; i < numKeyGroups; ++i) {
                offsets[i] = dis.readLong();
            }
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
            StreamStateHandle stateHandle = SavepointV3Serializer.deserializeStreamStateHandle(dis);
            return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
        }
        throw new IllegalStateException("Reading invalid RawKeyedStateHandle, type: " + type);
    }

    private static KeyedStateHandle deserializeKeyedStateBackend(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (6 == type) {
            int startKeyGroup = dis.readInt();
            int endKeyGroups = dis.readInt();
            KeyGroupRange groupRange = new KeyGroupRange(startKeyGroup, endKeyGroups);
            int metaInfoSize = dis.readInt();
            HashMap<Integer, Tuple2<Long, Integer>> metaInfos = new HashMap<Integer, Tuple2<Long, Integer>>(metaInfoSize);
            for (int i = 0; i < metaInfoSize; ++i) {
                Integer group = dis.readInt();
                Long offset = dis.readLong();
                Integer numEntries = dis.readInt();
                metaInfos.put(group, (Tuple2<Long, Integer>)new Tuple2((Object)offset, (Object)numEntries));
            }
            StreamStateHandle stateHandle = SavepointV3Serializer.deserializeStreamStateHandle(dis);
            if (stateHandle == null) {
                return new KeyGroupsStateSnapshot(groupRange);
            }
            return new KeyGroupsStateSnapshot(groupRange, metaInfos, stateHandle);
        }
        if (7 == type || 9 == type) {
            int start2 = dis.readInt();
            int end = dis.readInt();
            KeyGroupRange range = new KeyGroupRange(start2, end);
            Long checkpointId = dis.readLong();
            StreamStateHandle metaStateHandle = SavepointV3Serializer.deserializeStreamStateHandle(dis);
            int size = dis.readInt();
            HashMap<StateHandleID, Tuple2<String, StreamStateHandle>> shared = new HashMap<StateHandleID, Tuple2<String, StreamStateHandle>>(size);
            for (int i = 0; i < size; ++i) {
                StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
                String uniqueId = dis.readUTF();
                StreamStateHandle stateHandle = SavepointV3Serializer.deserializeStreamStateHandle(dis);
                shared.put(stateHandleID, (Tuple2<String, StreamStateHandle>)new Tuple2((Object)uniqueId, (Object)stateHandle));
            }
            Map<StateHandleID, StreamStateHandle> privateStates = SavepointV3Serializer.deserializeStreamStateHandleMap(dis);
            if (7 == type) {
                return new IncrementalKeyedStateSnapshot(range, checkpointId, shared, privateStates, metaStateHandle);
            }
            return new IncrementalSegmentStateSnapshot(range, checkpointId, shared, privateStates, metaStateHandle);
        }
        throw new IllegalStateException("Reading invalid KeyedStateHandle for SavepointV3, type: " + type);
    }

    private static void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle != null) {
            dos.writeByte(4);
            Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets();
            dos.writeInt(partitionOffsetsMap.size());
            for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
                dos.writeUTF(entry.getKey());
                OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
                int mode = stateMetaInfo.getDistributionMode().ordinal();
                dos.writeByte(mode);
                long[] offsets = stateMetaInfo.getOffsets();
                dos.writeInt(offsets.length);
                for (long offset : offsets) {
                    dos.writeLong(offset);
                }
            }
            SavepointV3Serializer.serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
        } else {
            dos.writeByte(0);
        }
    }

    private static OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (4 == type) {
            int mapSize = dis.readInt();
            HashMap<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<String, OperatorStateHandle.StateMetaInfo>(mapSize);
            for (int i = 0; i < mapSize; ++i) {
                String key = dis.readUTF();
                byte modeOrdinal = dis.readByte();
                OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
                long[] offsets = new long[dis.readInt()];
                for (int j = 0; j < offsets.length; ++j) {
                    offsets[j] = dis.readLong();
                }
                OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(offsets, mode);
                offsetsMap.put(key, metaInfo);
            }
            StreamStateHandle stateHandle = SavepointV3Serializer.deserializeStreamStateHandle(dis);
            return new OperatorStreamStateHandle(offsetsMap, stateHandle);
        }
        throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
    }

    private static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof FileSegmentStateHandle) {
            dos.writeByte(8);
            FileSegmentStateHandle fileSegmentStateHandle = (FileSegmentStateHandle)stateHandle;
            dos.writeLong(fileSegmentStateHandle.getStartPosition());
            dos.writeLong(fileSegmentStateHandle.getEndPosition());
            dos.writeBoolean(fileSegmentStateHandle.isFileClosed());
            dos.writeUTF(fileSegmentStateHandle.getFilePath().toString());
        } else if (stateHandle instanceof FileStateHandle) {
            dos.writeByte(2);
            FileStateHandle fileStateHandle = (FileStateHandle)stateHandle;
            dos.writeLong(stateHandle.getStateSize());
            dos.writeUTF(fileStateHandle.getFilePath().toString());
        } else if (stateHandle instanceof ByteStreamStateHandle) {
            dos.writeByte(1);
            ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle)stateHandle;
            dos.writeUTF(byteStreamStateHandle.getHandleName());
            byte[] internalData = byteStreamStateHandle.getData();
            dos.writeInt(internalData.length);
            dos.write(byteStreamStateHandle.getData());
        } else {
            throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
        }
        dos.flush();
    }

    private static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
        int type = dis.read();
        if (0 == type) {
            return null;
        }
        if (2 == type) {
            long size = dis.readLong();
            String pathString = dis.readUTF();
            return new FileStateHandle(new Path(pathString), size);
        }
        if (1 == type) {
            String handleName = dis.readUTF();
            int numBytes = dis.readInt();
            byte[] data = new byte[numBytes];
            dis.readFully(data);
            return new ByteStreamStateHandle(handleName, data);
        }
        if (8 == type) {
            long startPosition = dis.readLong();
            long endPosition = dis.readLong();
            boolean fileClosed = dis.readBoolean();
            String pathString = dis.readUTF();
            return new FileSegmentStateHandle(new Path(pathString), startPosition, endPosition, fileClosed);
        }
        throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
    }
}

