package org.apache.flink.runtime.state.gemini;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractInternalStateBackend;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.gemini.engine.GConfiguration;
import org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/runtime/state/gemini/GeminiStateBackend.class */
public class GeminiStateBackend extends AbstractStateBackend implements ConfigurableStateBackend {
    private static final long serialVersionUID = -8191916350224044011L;
    private static final Logger LOG = LoggerFactory.getLogger(GeminiStateBackend.class);
    private final AbstractFileStateBackend checkpointStreamBackend;
    private Configuration configuration;

    @Nullable
    private File[] localGeminiDbDirectories;
    private transient File[] initializedDbBasePaths;
    private transient JobID jobId;
    private transient int nextDirectory;
    private transient boolean isInitialized;

    public GeminiStateBackend(String str) throws IOException {
        this(new Path(str).toUri());
    }

    public GeminiStateBackend(URI uri) throws IOException {
        this(new FsStateBackend(uri));
    }

    public GeminiStateBackend(AbstractFileStateBackend abstractFileStateBackend) {
        this.configuration = new Configuration();
        this.checkpointStreamBackend = (AbstractFileStateBackend) Preconditions.checkNotNull(abstractFileStateBackend);
    }

    private GeminiStateBackend(GeminiStateBackend geminiStateBackend, Configuration configuration) {
        this.configuration = new Configuration();
        this.configuration = configuration.clone();
        this.configuration.addAll(geminiStateBackend.getConfiguration());
        StateBackend stateBackend = geminiStateBackend.checkpointStreamBackend;
        this.checkpointStreamBackend = (AbstractFileStateBackend) (stateBackend instanceof ConfigurableStateBackend ? ((ConfigurableStateBackend) stateBackend).configure(this.configuration) : stateBackend);
        if (geminiStateBackend.localGeminiDbDirectories != null) {
            this.localGeminiDbDirectories = geminiStateBackend.localGeminiDbDirectories;
            return;
        }
        String string = this.configuration.getString(GeminiOptions.LOCAL_PATH);
        if (string != null) {
            try {
                setDbStoragePaths(string.split(",|" + File.pathSeparator));
            } catch (IllegalArgumentException e) {
                throw new IllegalConfigurationException("Invalid configuration for GeminiDB state backend's local storage directories: " + e.getMessage(), e);
            }
        }
    }

    @Override // org.apache.flink.runtime.state.ConfigurableStateBackend
    public GeminiStateBackend configure(Configuration configuration) {
        return new GeminiStateBackend(this, configuration);
    }

    @Override // org.apache.flink.runtime.state.StateBackend
    public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
        return this.checkpointStreamBackend.resolveCheckpoint(str);
    }

    @Override // org.apache.flink.runtime.state.StateBackend
    public CheckpointStorage createCheckpointStorage(JobID jobID) throws IOException {
        return this.checkpointStreamBackend.createCheckpointStorage(jobID);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public OperatorStateBackend createOperatorStateBackend(Environment environment, String str) {
        return new DefaultOperatorStateBackend(environment.getUserClassLoader(), environment.getExecutionConfig(), true);
    }

    @Override // org.apache.flink.runtime.state.AbstractStateBackend, org.apache.flink.runtime.state.StateBackend
    public AbstractInternalStateBackend createInternalStateBackend(Environment environment, String str, int i, KeyGroupRange keyGroupRange, MetricGroup metricGroup) throws Exception {
        lazyInitializeForJob(environment);
        GConfiguration gConfiguration = getGConfiguration();
        String replaceAll = str.replaceAll("[^a-zA-Z0-9\\-]", "_");
        File file = new File(getNextStoragePath(), "job_" + this.jobId + "_op_" + replaceAll + "_uuid_" + UUID.randomUUID());
        gConfiguration.setLocalPath(file.getAbsolutePath());
        String string = this.configuration.getString(GeminiOptions.DFS_PATH);
        if (string == null) {
            string = new Path(((FsCheckpointStorage) this.checkpointStreamBackend.createCheckpointStorage(this.jobId)).getCheckpointsDirectory(), AbstractFsCheckpointStorage.CHECKPOINT_SHARED_STATE_DIR).toUri().toString();
        }
        gConfiguration.setDfsPath(new Path(string, replaceAll).toUri().toString());
        gConfiguration.setSubTaskIndex(environment.getTaskInfo().getIndexOfThisSubtask());
        gConfiguration.setNumParallelSubtasks(environment.getTaskInfo().getNumberOfParallelSubtasks());
        gConfiguration.setBackendUID(environment.getExecutionId().toString());
        gConfiguration.setNumberSlots(environment.getTaskManagerInfo().getNumberSlots());
        gConfiguration.setOperatorNameWithSubtask(getOperatorNameWithSubtask(str));
        LocalRecoveryConfig createLocalRecoveryConfig = environment.getTaskStateManager().createLocalRecoveryConfig();
        gConfiguration.setLocalSnapshot(createLocalRecoveryConfig.isLocalRecoveryEnabled());
        LOG.info("Create internal state backend, dfs path ({}), local path ({}) TM slots{}", new Object[]{string, file, Integer.valueOf(environment.getTaskManagerInfo().getNumberSlots())});
        return new GeminiInternalStateBackend(i, keyGroupRange, environment.getUserClassLoader(), createLocalRecoveryConfig, environment.getTaskKvStateRegistry(), replaceAll, environment.getExecutionConfig(), gConfiguration, metricGroup);
    }

    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void setDbStoragePaths(String... strArr) {
        String str;
        if (strArr == null) {
            this.localGeminiDbDirectories = null;
            return;
        }
        if (strArr.length == 0) {
            throw new IllegalArgumentException("empty paths");
        }
        File[] fileArr = new File[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            String str2 = strArr[i];
            if (str2 == null) {
                throw new IllegalArgumentException("null path");
            }
            URI uri = null;
            try {
                uri = new Path(str2).toUri();
            } catch (Exception e) {
            }
            if (uri == null || uri.getScheme() == null) {
                str = str2;
            } else {
                if (!HttpPostBodyUtil.FILE.equalsIgnoreCase(uri.getScheme())) {
                    throw new IllegalArgumentException("Path " + str2 + " has a non-local scheme");
                }
                str = uri.getPath();
            }
            fileArr[i] = new File(str);
            if (!fileArr[i].isAbsolute()) {
                throw new IllegalArgumentException("Relative paths are not supported");
            }
        }
        this.localGeminiDbDirectories = fileArr;
    }

    private void lazyInitializeForJob(Environment environment) throws IOException {
        if (this.isInitialized) {
            return;
        }
        this.jobId = environment.getJobID();
        if (this.localGeminiDbDirectories == null) {
            String[] parseWorkingDirectories = ConfigurationUtils.parseWorkingDirectories(environment.getTaskManagerInfo().getConfiguration());
            if (parseWorkingDirectories.length == 0) {
                this.initializedDbBasePaths = environment.getIOManager().getSpillingDirectories();
                this.nextDirectory = ThreadLocalRandom.current().nextInt(this.initializedDbBasePaths.length);
                this.isInitialized = true;
                return;
            }
            setDbStoragePaths(parseWorkingDirectories);
        }
        ArrayList arrayList = new ArrayList(this.localGeminiDbDirectories.length);
        StringBuilder sb = new StringBuilder();
        for (File file : this.localGeminiDbDirectories) {
            File file2 = new File(file, UUID.randomUUID().toString());
            if (file2.mkdirs()) {
                arrayList.add(file);
            } else {
                String str = "Local DB files directory '" + file + "' does not exist and cannot be created. ";
                LOG.error(str);
                sb.append(str);
            }
            file2.delete();
        }
        if (arrayList.isEmpty()) {
            throw new IOException("No local storage directories available. " + ((Object) sb));
        }
        this.initializedDbBasePaths = (File[]) arrayList.toArray(new File[arrayList.size()]);
        this.nextDirectory = ThreadLocalRandom.current().nextInt(this.initializedDbBasePaths.length);
        this.isInitialized = true;
    }

    private File getNextStoragePath() {
        int i = this.nextDirectory + 1;
        int i2 = i >= this.initializedDbBasePaths.length ? 0 : i;
        this.nextDirectory = i2;
        return this.initializedDbBasePaths[i2];
    }

    private GConfiguration getGConfiguration() {
        return new GConfiguration(this.configuration);
    }

    private String getOperatorNameWithSubtask(String str) {
        String[] split = str.split("_");
        int length = split.length;
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < length - 2; i++) {
            sb.append(split[i]);
            sb.append("_");
        }
        sb.append(split[length - 1]);
        return sb.toString();
    }

    public String toString() {
        return "GeminiStateBackend{checkpointStreamBackend=" + this.checkpointStreamBackend + ", localGeminiDbDirectories=" + Arrays.toString(this.localGeminiDbDirectories);
    }

    @Override // org.apache.flink.runtime.state.StateBackend
    public boolean requiresSubsumedCheckpointNotification() {
        return true;
    }
}
