/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.util.resource;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.util.resource.AbstractJsonSerializable;

@JsonInclude(value=JsonInclude.Include.NON_DEFAULT)
@JsonIgnoreProperties(ignoreUnknown=true)
public class StreamEdgeProperty
extends AbstractJsonSerializable
implements Comparable<StreamEdgeProperty> {
    private String source;
    private String target;
    private int index = 0;
    public static final Set<String> STRATEGY_UPDATABLE = new HashSet<String>();
    public static final String FORWARD_STRATEGY = "FORWARD";
    public static final String REBALANCE_STRATEGY = "REBALANCE";
    public static final String RESCALE_STRATEGY = "RESCALE";
    @JsonProperty(value="ship_strategy")
    private String shipStrategy = "FORWARD";

    public StreamEdgeProperty() {
    }

    public StreamEdgeProperty(String source, String target, int index) {
        this.source = source;
        this.target = target;
        this.index = index;
    }

    public String getSource() {
        return this.source;
    }

    public void setSource(String source) {
        this.source = source;
    }

    public String getTarget() {
        return this.target;
    }

    public void setTarget(String target) {
        this.target = target;
    }

    public int getIndex() {
        return this.index;
    }

    public void setIndex(int index) {
        this.index = index;
    }

    public String getShipStrategy() {
        return this.shipStrategy;
    }

    public void setShipStrategy(String shipStrategy) {
        this.shipStrategy = shipStrategy;
    }

    @Override
    public int compareTo(StreamEdgeProperty streamEdgeProperty) {
        int r = this.source.compareTo(streamEdgeProperty.source);
        if (r == 0) {
            r = this.target.compareTo(streamEdgeProperty.target);
        }
        if (r == 0) {
            r = this.index - streamEdgeProperty.index;
        }
        return r;
    }

    public void apply(StreamEdge edge, StreamGraph graph) {
        if (this.shipStrategy.equalsIgnoreCase(FORWARD_STRATEGY)) {
            if (graph.getStreamNode(Integer.valueOf(edge.getTargetId())).getParallelism() != graph.getStreamNode(Integer.valueOf(edge.getSourceId())).getParallelism()) {
                edge.setPartitioner((StreamPartitioner)new RebalancePartitioner());
            } else {
                edge.setPartitioner((StreamPartitioner)new ForwardPartitioner());
            }
        } else if (this.shipStrategy.equalsIgnoreCase(RESCALE_STRATEGY)) {
            edge.setPartitioner((StreamPartitioner)new RescalePartitioner());
        } else if (this.shipStrategy.equalsIgnoreCase(REBALANCE_STRATEGY)) {
            edge.setPartitioner((StreamPartitioner)new RebalancePartitioner());
        }
    }

    public void update(StreamEdgeProperty property) {
        if (STRATEGY_UPDATABLE.contains(this.shipStrategy.toUpperCase()) && STRATEGY_UPDATABLE.contains(property.shipStrategy.toUpperCase())) {
            this.shipStrategy = property.getShipStrategy();
        } else if (!this.shipStrategy.equalsIgnoreCase(property.getShipStrategy())) {
            throw new RuntimeException("Fail to apply resource configuration file to edge " + this + ".");
        }
    }

    static {
        STRATEGY_UPDATABLE.add(FORWARD_STRATEGY);
        STRATEGY_UPDATABLE.add(REBALANCE_STRATEGY);
        STRATEGY_UPDATABLE.add(RESCALE_STRATEGY);
    }
}

