package com.alibaba.blink.streaming.connectors.common;

import com.alibaba.blink.cache.AllCache;
import com.alibaba.blink.cache.Cache;
import com.alibaba.blink.cache.CacheFactory;
import com.alibaba.blink.cache.CacheStrategy;
import com.alibaba.blink.streaming.connectors.common.reload.CacheAllReloadConf;
import com.alibaba.blink.streaming.connectors.common.reload.SerializableRunnable;
import com.alibaba.blink.streaming.connectors.common.util.BaseRowGetter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.GenericRow;
import org.apache.flink.table.sources.IndexKey;
import org.apache.parquet.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/blink/streaming/connectors/common/DimJoinFetcher.class */
public abstract class DimJoinFetcher extends AbstractRichFunction {
    private static final long serialVersionUID = -3591005897957294244L;
    private static Logger LOG = LoggerFactory.getLogger(DimJoinFetcher.class);
    public static final BaseRow NULL_BASEROW = new GenericRow(0);
    public static final List<BaseRow> EMPTY_ROW_LIST = new ArrayList(0);
    static AtomicInteger counter = new AtomicInteger(0);
    protected final String sqlTableName;
    protected final IndexKey index;
    protected final CacheStrategy cacheStrategy;
    protected SerializableRunnable cacheReloadRunner;
    protected CacheAllReloadConf reloadConf;
    protected transient CacheFactory<Object, BaseRow> one2oneCacheFactory;
    protected transient Cache<Object, BaseRow> one2oneCache;
    protected transient CacheFactory<Object, List<BaseRow>> one2manyCacheFactory;
    protected transient Cache<Object, List<BaseRow>> one2manyCache;
    protected transient ScheduledExecutorService reloadExecutor;
    protected transient AllCache<Object, BaseRow> one2oneAllCacheRef;
    protected transient AllCache<Object, List<BaseRow>> one2manyAllCacheRef;
    protected DropwizardHistogramWrapper reloadLatency = null;
    protected DropwizardHistogramWrapper fetchLatency = null;
    protected DropwizardMeterWrapper fetchQPS = null;
    protected DropwizardMeterWrapper fetchHitQPS = null;
    protected DropwizardMeterWrapper cacheHitQPS = null;
    protected AtomicLong emptyKeyCounter = new AtomicLong(0);

    protected DimJoinFetcher(String str, IndexKey indexKey, CacheStrategy cacheStrategy) {
        Preconditions.checkArgument(null != str, "sqlTableName cannot be null!");
        Preconditions.checkArgument(null != indexKey, "index cannot be null!");
        Preconditions.checkArgument(null != cacheStrategy, "cacheStrategy cannot be null!");
        this.sqlTableName = str;
        this.index = indexKey;
        this.cacheStrategy = cacheStrategy;
    }

    public void setAllCacheReloadRunner(SerializableRunnable serializableRunnable, CacheAllReloadConf cacheAllReloadConf) {
        if (this.cacheStrategy.isAllCache()) {
            Objects.requireNonNull(serializableRunnable);
            Objects.requireNonNull(cacheAllReloadConf);
            Objects.requireNonNull(cacheAllReloadConf.timeRangeBlackList);
            this.cacheReloadRunner = serializableRunnable;
            this.reloadConf = cacheAllReloadConf;
        }
    }

    public abstract void openConnection(Configuration configuration);

    public abstract void closeConnection();

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        LOG.info("the {}th started worker, begin to prepare connection...", Integer.valueOf(counter.incrementAndGet()));
        openConnection(configuration);
        if (this.index.isUnique()) {
            LOG.info("table " + this.sqlTableName + " preparing one2oneCache...");
            this.one2oneCacheFactory = CacheFactory.getInstance();
            this.one2oneCache = this.one2oneCacheFactory.getCache(this.sqlTableName, this.cacheStrategy);
            LOG.info("table " + this.sqlTableName + " one2oneCache prepared, strategy:" + this.cacheStrategy);
            if (this.cacheStrategy.isAllCache()) {
                this.one2oneAllCacheRef = this.one2oneCache;
                this.reloadLatency = getRuntimeContext().getMetricGroup().histogram("dimJoin.allCache.reloadLatency", new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(100))));
                if (this.one2oneAllCacheRef.isRegisteredTimer.compareAndSet(false, true)) {
                    this.reloadExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("one2oneCache-reload").setDaemon(true).build());
                    this.one2oneAllCacheRef.setScheduledFuture(this.reloadExecutor.scheduleWithFixedDelay(new Thread(this.cacheReloadRunner), 0L, this.reloadConf.ttlMs, TimeUnit.MILLISECONDS));
                }
            }
        } else {
            LOG.info("table " + this.sqlTableName + " preparing one2manyCache...");
            this.one2manyCacheFactory = CacheFactory.getInstance();
            this.one2manyCache = this.one2manyCacheFactory.getCache(this.sqlTableName, this.cacheStrategy);
            LOG.info("table " + this.sqlTableName + " one2manyCache prepared, strategy:" + this.cacheStrategy);
            if (this.cacheStrategy.isAllCache()) {
                this.one2manyAllCacheRef = this.one2manyCache;
                this.reloadLatency = getRuntimeContext().getMetricGroup().histogram("dimJoin.allCache.reloadLatency", new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(100))));
                if (this.one2manyAllCacheRef.isRegisteredTimer.compareAndSet(false, true)) {
                    this.reloadExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("one2manyCache-reload").setDaemon(true).build());
                    this.one2manyAllCacheRef.setScheduledFuture(this.reloadExecutor.scheduleWithFixedDelay(new Thread(this.cacheReloadRunner), 0L, this.reloadConf.ttlMs, TimeUnit.MILLISECONDS));
                }
            }
        }
        metricInit(getRuntimeContext());
        LOG.info("metric init done.");
    }

    public void close() throws Exception {
        try {
            try {
                if (this.cacheStrategy.isAllCache()) {
                    LOG.info("start to cancel reloading thread...");
                    ScheduledFuture scheduledFuture = this.index.isUnique() ? this.one2oneAllCacheRef.getScheduledFuture() : this.one2manyAllCacheRef.getScheduledFuture();
                    if (scheduledFuture != null) {
                        scheduledFuture.cancel(true);
                    }
                    if (null != this.reloadExecutor && !this.reloadExecutor.isShutdown()) {
                        this.reloadExecutor.shutdownNow();
                        this.reloadExecutor = null;
                    }
                }
                LOG.info("start to close connection...");
                closeConnection();
                if (counter.decrementAndGet() == 0 && this.cacheStrategy.isAllCache()) {
                    if (this.index.isUnique()) {
                        this.one2oneAllCacheRef.isRegisteredTimer.compareAndSet(true, false);
                    } else {
                        this.one2manyAllCacheRef.isRegisteredTimer.compareAndSet(true, false);
                    }
                }
            } catch (Exception e) {
                LOG.error("Error happens when shutdown reload executor.", e);
                if (counter.decrementAndGet() == 0 && this.cacheStrategy.isAllCache()) {
                    if (this.index.isUnique()) {
                        this.one2oneAllCacheRef.isRegisteredTimer.compareAndSet(true, false);
                    } else {
                        this.one2manyAllCacheRef.isRegisteredTimer.compareAndSet(true, false);
                    }
                }
            }
            LOG.info("start to release cache of table:{} ...", this.sqlTableName);
            if (this.index.isUnique()) {
                if (this.one2oneCacheFactory != null) {
                    LOG.info("table " + this.sqlTableName + " one2oneCache removing...");
                    this.one2oneCacheFactory.removeCache(this.sqlTableName);
                    LOG.info("table " + this.sqlTableName + " one2oneCache removed");
                }
            } else if (this.one2manyCacheFactory != null) {
                LOG.info("table " + this.sqlTableName + " one2manyCache removing...");
                this.one2manyCacheFactory.removeCache(this.sqlTableName);
                LOG.info("table " + this.sqlTableName + " one2manyCache removed");
            }
            super.close();
        } catch (Throwable th) {
            if (counter.decrementAndGet() == 0 && this.cacheStrategy.isAllCache()) {
                if (this.index.isUnique()) {
                    this.one2oneAllCacheRef.isRegisteredTimer.compareAndSet(true, false);
                } else {
                    this.one2manyAllCacheRef.isRegisteredTimer.compareAndSet(true, false);
                }
            }
            throw th;
        }
    }

    public void metricInit(RuntimeContext runtimeContext) {
        this.fetchLatency = runtimeContext.getMetricGroup().histogram("dimJoin.fetchLatency", new DropwizardHistogramWrapper(new Histogram(new SlidingWindowReservoir(100))));
        this.fetchQPS = runtimeContext.getMetricGroup().meter("dimJoin.fetchQPS", new DropwizardMeterWrapper(new Meter()));
        this.fetchHitQPS = runtimeContext.getMetricGroup().meter("dimJoin.fetchHitQPS", new DropwizardMeterWrapper(new Meter()));
        this.cacheHitQPS = runtimeContext.getMetricGroup().meter("dimJoin.cacheHitQPS", new DropwizardMeterWrapper(new Meter()));
        runtimeContext.getMetricGroup().gauge("dimJoin.fetchHit", new Gauge<Double>() { // from class: com.alibaba.blink.streaming.connectors.common.DimJoinFetcher.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m10getValue() {
                double rate = DimJoinFetcher.this.fetchQPS.getRate();
                return rate < 0.01d ? Double.valueOf(0.0d) : Double.valueOf((DimJoinFetcher.this.fetchHitQPS.getRate() + DimJoinFetcher.this.cacheHitQPS.getRate()) / rate);
            }
        });
        runtimeContext.getMetricGroup().gauge("dimJoin.cacheHit", new Gauge<Double>() { // from class: com.alibaba.blink.streaming.connectors.common.DimJoinFetcher.2
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Double m11getValue() {
                double rate = DimJoinFetcher.this.fetchQPS.getRate();
                return rate < 0.01d ? Double.valueOf(0.0d) : Double.valueOf(DimJoinFetcher.this.cacheHitQPS.getRate() / rate);
            }
        });
        if (this.index.isUnique()) {
            MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
            Cache<Object, BaseRow> cache = this.one2oneCache;
            cache.getClass();
            metricGroup.gauge("dimJoin.cacheSize", cache::size);
        } else {
            MetricGroup metricGroup2 = getRuntimeContext().getMetricGroup();
            Cache<Object, List<BaseRow>> cache2 = this.one2manyCache;
            cache2.getClass();
            metricGroup2.gauge("dimJoin.cacheSize", cache2::size);
        }
        runtimeContext.getMetricGroup().gauge("dimJoin.nullKeyCount", () -> {
            return Long.valueOf(this.emptyKeyCounter.get());
        });
    }

    public Object getKey(BaseRow baseRow, List<Integer> list, List<TypeInformation<?>> list2, List<TypeSerializer<?>> list3) {
        if (list.size() == 1) {
            return BaseRowGetter.safeGet(baseRow, list.get(0).intValue(), list2.get(0), list3.get(0));
        }
        GenericRow genericRow = new GenericRow(list.size());
        for (int i = 0; i < list.size(); i++) {
            Object safeGet = BaseRowGetter.safeGet(baseRow, list.get(i).intValue(), list2.get(i), list3.get(i));
            if (safeGet == null) {
                return null;
            }
            genericRow.update(i, safeGet);
        }
        return genericRow;
    }
}
