/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.gemini;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.gemini.GeminiInternalStateBackend;
import org.apache.flink.runtime.state.gemini.GeminiOptions;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class GeminiStateBackend
extends AbstractStateBackend
implements ConfigurableStateBackend {
    private static final long serialVersionUID = -8191916350224044011L;
    private static final Logger LOG = LoggerFactory.getLogger(GeminiStateBackend.class);
    private final AbstractFileStateBackend checkpointStreamBackend;
    private Configuration configuration = new Configuration();
    @Nullable
    private File[] localGeminiDbDirectories;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;

    public GeminiStateBackend(String checkpointDataUri) throws IOException {
        this(new Path(checkpointDataUri).toUri());
    }

    public GeminiStateBackend(URI checkpointDataUri) throws IOException {
        this(new FsStateBackend(checkpointDataUri));
    }

    public GeminiStateBackend(AbstractFileStateBackend checkpointStreamBackend) {
        this.checkpointStreamBackend = (AbstractFileStateBackend)Preconditions.checkNotNull((Object)checkpointStreamBackend);
    }

    private GeminiStateBackend(GeminiStateBackend original, Configuration config) {
        this.configuration = config.clone();
        this.configuration.addAll(original.getConfiguration());
        AbstractFileStateBackend originalStreamBackend = original.checkpointStreamBackend;
        StateBackend stateBackend = this.checkpointStreamBackend = originalStreamBackend instanceof ConfigurableStateBackend ? ((ConfigurableStateBackend)((Object)originalStreamBackend)).configure(this.configuration) : originalStreamBackend;
        if (original.localGeminiDbDirectories != null) {
            this.localGeminiDbDirectories = original.localGeminiDbDirectories;
        } else {
            String geminiLocalPaths = this.configuration.getString(GeminiOptions.LOCAL_PATH);
            if (geminiLocalPaths != null) {
                String[] directories = geminiLocalPaths.split(",|" + File.pathSeparator);
                try {
                    this.setDbStoragePaths(directories);
                }
                catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Invalid configuration for GeminiDB state backend's local storage directories: " + e.getMessage(), (Throwable)e);
                }
            }
        }
    }

    @Override
    public GeminiStateBackend configure(Configuration config) {
        return new GeminiStateBackend(this, config);
    }

    @Override
    public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
        return this.checkpointStreamBackend.resolveCheckpoint(pointer);
    }

    @Override
    public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
        return this.checkpointStreamBackend.createCheckpointStorage(jobId);
    }

    @Override
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry) {
        throw new UnsupportedOperationException();
    }

    @Override
    public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) {
        boolean asyncSnapshots = true;
        return new DefaultOperatorStateBackend(env.getUserClassLoader(), env.getExecutionConfig(), true);
    }

    @Override
    public AbstractInternalStateBackend createInternalStateBackend(Environment env, String operatorIdentifier, int numberOfGroups, KeyGroupRange keyGroupRange, MetricGroup metricGroup) throws Exception {
        this.lazyInitializeForJob(env);
        GConfiguration gConfiguration = this.getGConfiguration();
        String fileCompatibleIdentifier = operatorIdentifier.replaceAll("[^a-zA-Z0-9\\-]", "_");
        File instanceBasePath = new File(this.getNextStoragePath(), "job_" + this.jobId + "_op_" + fileCompatibleIdentifier + "_uuid_" + UUID.randomUUID());
        gConfiguration.setLocalPath(instanceBasePath.getAbsolutePath());
        String dfsPath = this.configuration.getString(GeminiOptions.DFS_PATH);
        if (dfsPath == null) {
            Path basePath = ((FsCheckpointStorage)this.checkpointStreamBackend.createCheckpointStorage(this.jobId)).getCheckpointsDirectory();
            dfsPath = new Path(basePath, "shared").toUri().toString();
        }
        gConfiguration.setDfsPath(new Path(dfsPath, fileCompatibleIdentifier).toUri().toString());
        gConfiguration.setSubTaskIndex(env.getTaskInfo().getIndexOfThisSubtask());
        gConfiguration.setNumParallelSubtasks(env.getTaskInfo().getNumberOfParallelSubtasks());
        gConfiguration.setBackendUID(env.getExecutionId().toString());
        gConfiguration.setNumberSlots(env.getTaskManagerInfo().getNumberSlots());
        gConfiguration.setOperatorNameWithSubtask(this.getOperatorNameWithSubtask(operatorIdentifier));
        LocalRecoveryConfig localRecoveryConfig = env.getTaskStateManager().createLocalRecoveryConfig();
        gConfiguration.setLocalSnapshot(localRecoveryConfig.isLocalRecoveryEnabled());
        LOG.info("Create internal state backend, dfs path ({}), local path ({}) TM slots{}", new Object[]{dfsPath, instanceBasePath, env.getTaskManagerInfo().getNumberSlots()});
        return new GeminiInternalStateBackend(numberOfGroups, keyGroupRange, env.getUserClassLoader(), localRecoveryConfig, env.getTaskKvStateRegistry(), fileCompatibleIdentifier, env.getExecutionConfig(), gConfiguration, metricGroup);
    }

    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    public void setDbStoragePaths(String ... paths) {
        if (paths == null) {
            this.localGeminiDbDirectories = null;
            return;
        }
        if (paths.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] pp = new File[paths.length];
        for (int i = 0; i < paths.length; ++i) {
            String path;
            String rawPath = paths[i];
            if (rawPath == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(rawPath).toUri();
            }
            catch (Exception exception) {
                // empty catch block
            }
            if (uri != null && uri.getScheme() != null) {
                if (!"file".equalsIgnoreCase(uri.getScheme())) throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
                path = uri.getPath();
            } else {
                path = rawPath;
            }
            pp[i] = new File(path);
            if (pp[i].isAbsolute()) continue;
            throw new IllegalArgumentException("Relative paths are not supported");
        }
        this.localGeminiDbDirectories = pp;
    }

    private void lazyInitializeForJob(Environment env) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = env.getJobID();
        if (this.localGeminiDbDirectories == null) {
            String[] workingDirs = ConfigurationUtils.parseWorkingDirectories((Configuration)env.getTaskManagerInfo().getConfiguration());
            if (workingDirs.length == 0) {
                this.initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
                this.nextDirectory = ThreadLocalRandom.current().nextInt(this.initializedDbBasePaths.length);
                this.isInitialized = true;
                return;
            }
            this.setDbStoragePaths(workingDirs);
        }
        ArrayList<File> dirs = new ArrayList<File>(this.localGeminiDbDirectories.length);
        StringBuilder errorMessage = new StringBuilder();
        for (File f : this.localGeminiDbDirectories) {
            File testDir = new File(f, UUID.randomUUID().toString());
            if (!testDir.mkdirs()) {
                String msg = "Local DB files directory '" + f + "' does not exist and cannot be created. ";
                LOG.error(msg);
                errorMessage.append(msg);
            } else {
                dirs.add(f);
            }
            testDir.delete();
        }
        if (dirs.isEmpty()) {
            throw new IOException("No local storage directories available. " + errorMessage);
        }
        this.initializedDbBasePaths = dirs.toArray(new File[dirs.size()]);
        this.nextDirectory = ThreadLocalRandom.current().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int ni = this.nextDirectory + 1;
        this.nextDirectory = ni = ni >= this.initializedDbBasePaths.length ? 0 : ni;
        return this.initializedDbBasePaths[ni];
    }

    private GConfiguration getGConfiguration() {
        return new GConfiguration(this.configuration);
    }

    private String getOperatorNameWithSubtask(String operatorIdentifier) {
        String[] splits = operatorIdentifier.split("_");
        int len = splits.length;
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < len - 2; ++i) {
            sb.append(splits[i]);
            sb.append("_");
        }
        sb.append(splits[len - 1]);
        return sb.toString();
    }

    public String toString() {
        return "GeminiStateBackend{checkpointStreamBackend=" + this.checkpointStreamBackend + ", localGeminiDbDirectories=" + Arrays.toString(this.localGeminiDbDirectories);
    }

    @Override
    public boolean requiresSubsumedCheckpointNotification() {
        return true;
    }
}

