/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.datahub;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsDeprecatedLogger;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure1;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure11;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure13;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure3;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure5;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure7;
import com.aliyun.odps.datahub.DatahubClient$AjcClosure9;
import com.aliyun.odps.datahub.DatahubConfiguration;
import com.aliyun.odps.datahub.DatahubException;
import com.aliyun.odps.datahub.DatahubReader;
import com.aliyun.odps.datahub.DatahubTableSchema;
import com.aliyun.odps.datahub.DatahubWriter;
import com.aliyun.odps.datahub.PackReader;
import com.aliyun.odps.datahub.PackType;
import com.aliyun.odps.datahub.ReadPackResult;
import com.aliyun.odps.datahub.ReplicatorStatus;
import com.aliyun.odps.rest.RestClient;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.runtime.internal.Conversions;
import org.aspectj.runtime.reflect.Factory;

public class DatahubClient {
    private String projectName;
    private String tableName;
    private DatahubConfiguration conf;
    private RestClient datahubServiceClient;
    private HashMap<String, String> headers;
    private TableSchema schema = new TableSchema();
    private List<Long> shards = new ArrayList<Long>();
    private final Long MAX_WAITING_MILLISECOND = 120000L;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_3;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_4;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_5;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_6;

    public DatahubClient(Odps odps, String projectName, String tableName, String datahubEndpoint) throws OdpsException {
        this.conf = new DatahubConfiguration(odps);
        this.projectName = projectName;
        this.tableName = tableName;
        this.headers = new HashMap();
        this.headers.put("Content-Length", String.valueOf(0));
        this.headers.put("x-odps-tunnel-stream-version", "1");
        this.setEndpoint(datahubEndpoint);
        this.initiate();
    }

    public String getProjectName() {
        return this.projectName;
    }

    public String getTableName() {
        return this.tableName;
    }

    public void loadShard(long shardNumber) throws OdpsException {
        if (shardNumber < 0L) {
            throw new DatahubException("invalid shard number");
        }
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        String path = this.getResource() + "/shards";
        Connection conn = null;
        try {
            params.put("shardnumber", Long.toString(shardNumber));
            Connection connection = conn = this.datahubServiceClient.connect(path, "POST", params, headers);
            JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_0, (Object)this, (Object)connection);
            Object[] objectArray = new Object[]{this, connection, joinPoint};
            Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new DatahubClient$AjcClosure1(objectArray).linkClosureAndJoinPoint(4112));
            if (!resp.isOK()) {
                DatahubException ex = new DatahubException(conn.getInputStream());
                ex.setRequestId(resp.getHeader("x-odps-request-id"));
                throw ex;
            }
        }
        catch (IOException e) {
            throw new DatahubException(e.getMessage(), e);
        }
        catch (DatahubException e) {
            throw e;
        }
        catch (OdpsException e) {
            throw new DatahubException(e.getMessage(), e);
        }
        finally {
            if (conn != null) {
                try {
                    conn.disconnect();
                }
                catch (IOException ignored) {}
            }
        }
    }

    public void waitForShardLoad() throws OdpsException {
        this.waitForShardLoad(this.MAX_WAITING_MILLISECOND);
    }

    public void waitForShardLoad(long timeout) throws OdpsException {
        if (timeout <= 0L) {
            throw new DatahubException("invalid waiting time");
        }
        long waitTime = timeout > this.MAX_WAITING_MILLISECOND ? this.MAX_WAITING_MILLISECOND : timeout;
        long now = System.currentTimeMillis();
        long end = now + waitTime;
        while (now < end) {
            try {
                if (this.isShardLoadCompleted()) {
                    return;
                }
                Thread.sleep(10000L);
                now = System.currentTimeMillis();
            }
            catch (Exception e) {
                throw new DatahubException(e.getMessage(), e);
            }
        }
        if (!this.isShardLoadCompleted()) {
            throw new DatahubException("load shard timeout");
        }
    }

    private boolean isShardLoadCompleted() {
        try {
            HashMap<Long, ShardState> shardStatusMap = this.getShardStatus();
            for (Map.Entry<Long, ShardState> entry : shardStatusMap.entrySet()) {
                ShardState status = entry.getValue();
                if (status == ShardState.LOADED) continue;
                return false;
            }
            return true;
        }
        catch (Exception exception) {
            return false;
        }
    }

    public HashMap<Long, ShardState> getShardStatus() throws OdpsException, IOException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> hdrs = new HashMap<String, String>(this.headers);
        try {
            Connection conn;
            String path = this.getResource() + "/shards";
            hdrs.put("x-odps-tunnel-version", String.valueOf(4));
            params.put("shardstatus", null);
            Connection connection = conn = this.datahubServiceClient.connect(path, "GET", params, hdrs);
            JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_1, (Object)this, (Object)connection);
            Object[] objectArray = new Object[]{this, connection, joinPoint};
            Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new DatahubClient$AjcClosure3(objectArray).linkClosureAndJoinPoint(4112));
            if (!resp.isOK()) {
                DatahubException ex = new DatahubException(conn.getInputStream());
                ex.setRequestId(resp.getHeader("x-odps-request-id"));
                throw ex;
            }
            return this.loadShardStatusFromJson(conn.getInputStream());
        }
        catch (Exception e) {
            throw new DatahubException(e.getMessage(), e);
        }
    }

    @Deprecated
    public ReplicatorStatus QueryReplicatorStatus(long shardId, PartitionSpec partitionSpec) throws OdpsException {
        long l = shardId;
        PartitionSpec partitionSpec2 = partitionSpec;
        JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_3, (Object)this, (Object)this, (Object)Conversions.longObject((long)l), (Object)partitionSpec2);
        Object[] objectArray = new Object[]{this, Conversions.longObject((long)l), partitionSpec2, joinPoint};
        return (ReplicatorStatus)OdpsDeprecatedLogger.aspectOf().around(new DatahubClient$AjcClosure7(objectArray).linkClosureAndJoinPoint(69648));
    }

    public void setEndpoint(String endpoint) throws OdpsException {
        try {
            URI u = new URI(endpoint);
            this.conf.setEndpoint(u);
            this.datahubServiceClient = this.conf.newRestClient(this.projectName);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid endpoint.");
        }
        catch (DatahubException e) {
            throw e;
        }
        catch (OdpsException e) {
            throw new DatahubException(e.getMessage(), e);
        }
    }

    public ReplicatorStatus QueryReplicatorStatus(long shardId) throws OdpsException {
        Object var3_2 = null;
        long l = shardId;
        DatahubClient datahubClient = this;
        JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_4, (Object)this, (Object)datahubClient, (Object)Conversions.longObject((long)l), var3_2);
        Object[] objectArray = new Object[]{this, datahubClient, Conversions.longObject((long)l), var3_2, joinPoint};
        return (ReplicatorStatus)OdpsDeprecatedLogger.aspectOf().around(new DatahubClient$AjcClosure9(objectArray).linkClosureAndJoinPoint(4112));
    }

    public Date getTableReplicatedTimeStamp() throws IOException, OdpsException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        params.put("query", "replicatedtimestamp");
        String path = this.getStreamResource();
        Connection conn = null;
        Connection connection = conn = this.datahubServiceClient.connect(path, "GET", params, headers);
        JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_5, (Object)this, (Object)connection);
        Object[] objectArray = new Object[]{this, connection, joinPoint};
        Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new DatahubClient$AjcClosure11(objectArray).linkClosureAndJoinPoint(4112));
        if (!resp.isOK()) {
            DatahubException ex = new DatahubException(conn.getInputStream());
            ex.setRequestId(resp.getHeader("x-odps-request-id"));
            throw ex;
        }
        String json = IOUtils.readStreamAsString(conn.getInputStream());
        JSONObject tree = JSON.parseObject((String)json);
        Long node = tree.getLong("table_replicated_timestamp");
        if (node != null) {
            return new Date(node);
        }
        throw new DatahubException("get table replicated timestamp fail");
    }

    public Date getTableTimestamp() throws IOException, OdpsException {
        long currentTimestamp;
        HashMap<Long, ShardState> shardStatus = this.getShardStatus();
        long timestamp = currentTimestamp = System.currentTimeMillis();
        for (Map.Entry<Long, ShardState> entry : shardStatus.entrySet()) {
            String brokerLastPackid;
            String loaderReplicatedPackid;
            long shardId = entry.getKey();
            PackReader reader = null;
            if (entry.getValue() != ShardState.LOADED || (loaderReplicatedPackid = this.QueryReplicatorStatus(shardId).GetLastReplicatedPackId()).equals(PackType.FIRST_PACK_ID) || (brokerLastPackid = (reader = this.openPackReader(shardId)).seek(currentTimestamp).getPackId()).equals(PackType.LAST_PACK_ID) || brokerLastPackid.equals(loaderReplicatedPackid)) continue;
            reader = this.openPackReader(shardId, loaderReplicatedPackid);
            ReadPackResult readPackResult = reader.readPackMeta();
            timestamp = Math.min(timestamp, reader.readPackMeta().getTimeStamp());
        }
        return new Date(timestamp);
    }

    private void initiate() throws OdpsException {
        block13: {
            HashMap<String, String> params = new HashMap<String, String>();
            params.put("query", "meta");
            params.put("type", "stream");
            Connection conn = null;
            try {
                this.datahubServiceClient = this.conf.newRestClient(this.projectName);
                Connection connection = conn = this.datahubServiceClient.connect(this.getResource(), "GET", params, this.headers);
                JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_6, (Object)this, (Object)connection);
                Object[] objectArray = new Object[]{this, connection, joinPoint};
                Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new DatahubClient$AjcClosure13(objectArray).linkClosureAndJoinPoint(4112));
                if (resp.isOK()) {
                    this.loadFromJson(conn.getInputStream());
                    break block13;
                }
                DatahubException e = new DatahubException(conn.getInputStream());
                e.setRequestId(resp.getHeader("x-odps-request-id"));
                throw e;
            }
            catch (IOException e) {
                throw new DatahubException(e.getMessage(), e);
            }
            catch (DatahubException e) {
                throw e;
            }
            catch (OdpsException e) {
                throw new DatahubException(e.getMessage(), e);
            }
            finally {
                if (conn != null) {
                    try {
                        conn.disconnect();
                    }
                    catch (IOException ignored) {}
                }
            }
        }
    }

    public TableSchema getStreamSchema() {
        return this.schema;
    }

    public TableSchema getStreamSchemaFromServer() throws OdpsException {
        this.initiate();
        return this.schema;
    }

    public List<Long> getShardList() {
        return this.shards;
    }

    public DatahubWriter openDatahubWriter(long shardId) throws OdpsException, IOException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("Content-Type", "application/octet-stream");
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        return new DatahubWriter(this.datahubServiceClient, this.getStreamResource(shardId), params, headers);
    }

    public DatahubWriter openDatahubWriter() throws OdpsException, IOException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("Content-Type", "application/octet-stream");
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        return new DatahubWriter(this.datahubServiceClient, this.getStreamResource(), params, headers);
    }

    public DatahubReader openDatahubReader(long shardId) throws OdpsException, IOException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        return new DatahubReader(this.datahubServiceClient, this.schema, this.getStreamResource(shardId), params, headers);
    }

    public DatahubReader openDatahubReader(long shardId, String packId) throws OdpsException, IOException {
        if (packId == null || packId.equals("")) {
            throw new IllegalArgumentException("Invalid pack id.");
        }
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        return new DatahubReader(this.datahubServiceClient, this.schema, this.getStreamResource(shardId), params, headers, packId);
    }

    public PackReader openPackReader(long shardId) throws OdpsException, IOException {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        return new PackReader(this.datahubServiceClient, this.schema, this.getStreamResource(shardId), params, headers);
    }

    public PackReader openPackReader(long shardId, String packId) throws OdpsException, IOException {
        if (packId == null || packId.equals("")) {
            throw new IllegalArgumentException("Invalid pack id.");
        }
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        return new PackReader(this.datahubServiceClient, this.schema, this.getStreamResource(shardId), params, headers, packId);
    }

    private String getResource() {
        return this.conf.getResource(this.projectName, this.tableName);
    }

    private String getStreamResource(long shardId) {
        return this.conf.getStreamUploadResource(this.projectName, this.tableName, shardId);
    }

    private String getStreamResource() {
        return this.conf.getStreamUploadResource(this.projectName, this.tableName);
    }

    private void loadFromJson(InputStream is) throws OdpsException {
        block5: {
            try {
                String json = IOUtils.readStreamAsString(is);
                JSONObject tree = JSON.parseObject((String)json);
                JSONObject schemaNode = tree.getJSONObject("Schema");
                if (schemaNode == null) {
                    throw new DatahubException("get table schema fail");
                }
                this.schema = new DatahubTableSchema(schemaNode);
                JSONArray node = tree.getJSONArray("Shards");
                if (node != null) {
                    for (int i = 0; i < node.size(); ++i) {
                        long shardId = node.getLongValue(i);
                        this.shards.add(shardId);
                    }
                    break block5;
                }
                throw new DatahubException("get shard fail");
            }
            catch (Exception e) {
                throw new DatahubException("Invalid json content.", e);
            }
        }
    }

    private HashMap<Long, ShardState> loadShardStatusFromJson(InputStream is) throws OdpsException {
        try {
            HashMap<Long, ShardState> shardStatus = new HashMap<Long, ShardState>();
            String json = IOUtils.readStreamAsString(is);
            JSONObject tree = JSON.parseObject((String)json);
            JSONArray node = tree.getJSONArray("ShardStatus");
            if (node != null) {
                for (int i = 0; i < node.size(); ++i) {
                    JSONObject status = node.getJSONObject(i);
                    ShardState state = ShardState.valueOf(status.getString("State").toUpperCase());
                    shardStatus.put(Long.parseLong(status.getString("ShardId")), state);
                }
            }
            return shardStatus;
        }
        catch (Exception e) {
            throw new DatahubException("Invalid json content.", e);
        }
    }

    static {
        DatahubClient.ajc$preClinit();
    }

    static /* synthetic */ Response getResponse_aroundBody0(DatahubClient datahubClient, Connection connection, JoinPoint joinPoint) {
        return connection.getResponse();
    }

    static /* synthetic */ Response getResponse_aroundBody2(DatahubClient datahubClient, Connection connection, JoinPoint joinPoint) {
        return connection.getResponse();
    }

    static /* synthetic */ Response getResponse_aroundBody4(DatahubClient datahubClient, Connection connection, JoinPoint joinPoint) {
        return connection.getResponse();
    }

    static /* synthetic */ ReplicatorStatus QueryReplicatorStatus_aroundBody6(DatahubClient ajc$this, long shardId, PartitionSpec partitionSpec, JoinPoint joinPoint) {
        HashMap<String, String> params = new HashMap<String, String>();
        HashMap<String, String> headers = new HashMap<String, String>(ajc$this.headers);
        headers.put("x-odps-tunnel-version", String.valueOf(4));
        params.put("query", "replicator");
        if (partitionSpec != null && partitionSpec.toString().length() > 0) {
            params.put("partition", partitionSpec.toString().replaceAll("'", ""));
        }
        String path = ajc$this.getStreamResource(shardId);
        Connection conn = null;
        try {
            Connection connection = conn = ajc$this.datahubServiceClient.connect(path, "GET", params, headers);
            JoinPoint joinPoint2 = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_2, (Object)ajc$this, (Object)connection);
            Object[] objectArray = new Object[]{ajc$this, connection, joinPoint2};
            Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new DatahubClient$AjcClosure5(objectArray).linkClosureAndJoinPoint(4112));
            if (!resp.isOK()) {
                DatahubException ex = new DatahubException(conn.getInputStream());
                ex.setRequestId(resp.getHeader("x-odps-request-id"));
                throw ex;
            }
            ReplicatorStatus replicatorStatus = new ReplicatorStatus(conn.getInputStream());
            return replicatorStatus;
        }
        catch (IOException e) {
            throw new DatahubException(e.getMessage(), e);
        }
        catch (DatahubException e) {
            throw e;
        }
        catch (OdpsException e) {
            throw new DatahubException(e.getMessage(), e);
        }
        finally {
            if (conn != null) {
                try {
                    conn.disconnect();
                }
                catch (IOException ignored) {}
            }
        }
    }

    static /* synthetic */ ReplicatorStatus QueryReplicatorStatus_aroundBody8(DatahubClient datahubClient, DatahubClient datahubClient2, long l, PartitionSpec partitionSpec, JoinPoint joinPoint) {
        return datahubClient2.QueryReplicatorStatus(l, partitionSpec);
    }

    static /* synthetic */ Response getResponse_aroundBody10(DatahubClient datahubClient, Connection connection, JoinPoint joinPoint) {
        return connection.getResponse();
    }

    static /* synthetic */ Response getResponse_aroundBody12(DatahubClient datahubClient, Connection connection, JoinPoint joinPoint) {
        return connection.getResponse();
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("DatahubClient.java", DatahubClient.class);
        ajc$tjp_0 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 123);
        ajc$tjp_1 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 246);
        ajc$tjp_2 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 287);
        ajc$tjp_3 = factory.makeSJP("method-execution", (Signature)factory.makeMethodSig("1", "QueryReplicatorStatus", "com.aliyun.odps.datahub.DatahubClient", "long:com.aliyun.odps.PartitionSpec", "shardId:partitionSpec", "com.aliyun.odps.OdpsException", "com.aliyun.odps.datahub.ReplicatorStatus"), 275);
        ajc$tjp_4 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("1", "QueryReplicatorStatus", "com.aliyun.odps.datahub.DatahubClient", "long:com.aliyun.odps.PartitionSpec", "shardId:partitionSpec", "com.aliyun.odps.OdpsException", "com.aliyun.odps.datahub.ReplicatorStatus"), 345);
        ajc$tjp_5 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 366);
        ajc$tjp_6 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 426);
    }

    public static enum ShardState {
        UNLOADED,
        LOADED,
        LOADING;

    }
}

