/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;

public class AbstractStreamOperatorWithMetrics<OUT>
extends AbstractStreamOperator<OUT> {
    public static final String ACCUMULATOR_PREFIX = "OperatorMetric_";
    public static final String ROW_COUNT_METRICS = "rowCount";
    public static final String METRICS_CONF_KEY = "operatorMetricCollect";
    public static final String METRICS_CONF_VALUE = "true";
    private Counter counter;
    protected boolean closed = false;
    private transient Configuration sqlConf;
    private int relID;

    public AbstractStreamOperatorWithMetrics() {
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        final Output rawOutput = this.output;
        if (this.isCollectMetricEnabled()) {
            this.counter = this.getMetricGroup().counter(ROW_COUNT_METRICS);
            this.output = new Output<StreamRecord<OUT>>(){

                public void collect(StreamRecord<OUT> record) {
                    AbstractStreamOperatorWithMetrics.this.counter.inc();
                    rawOutput.collect(record);
                }

                public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
                    AbstractStreamOperatorWithMetrics.this.counter.inc();
                    rawOutput.collect(outputTag, record);
                }

                public void close() {
                    rawOutput.close();
                }

                public void emitWatermark(Watermark mark) {
                    rawOutput.emitWatermark(mark);
                }

                public void emitLatencyMarker(LatencyMarker latencyMarker) {
                    rawOutput.emitLatencyMarker(latencyMarker);
                }
            };
        }
    }

    public void close() throws Exception {
        if (this.isCollectMetricEnabled()) {
            LongCounter rowCountAcc = new LongCounter(this.counter.getCount());
            String rowCountAccName = ACCUMULATOR_PREFIX + this.getOperatorConfig().getVertexID();
            this.getRuntimeContext().addAccumulator(rowCountAccName, (Accumulator)rowCountAcc);
        }
        super.close();
        this.closed = true;
    }

    public void dispose() throws Exception {
        if (!this.closed) {
            this.close();
        }
        super.dispose();
    }

    private boolean isCollectMetricEnabled() {
        ExecutionConfig.GlobalJobParameters parameters = this.getExecutionConfig().getGlobalJobParameters();
        if (parameters != null) {
            String metricsConfig = parameters.toMap().get(METRICS_CONF_KEY);
            return metricsConfig != null && metricsConfig.equalsIgnoreCase(METRICS_CONF_VALUE);
        }
        return false;
    }

    public Configuration getSqlConf() {
        if (this.sqlConf != null) {
            return this.sqlConf;
        }
        Configuration conf = this.getContainingTask().getJobConfiguration();
        ExecutionConfig.GlobalJobParameters paras = this.getExecutionConfig().getGlobalJobParameters();
        if (paras != null) {
            conf.addAll(paras.toMap());
        }
        this.sqlConf = conf;
        return conf;
    }
}

