/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsDeprecatedLogger;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.commons.transport.Connection;
import com.aliyun.odps.commons.transport.Response;
import com.aliyun.odps.commons.util.IOUtils;
import com.aliyun.odps.data.ArrayRecord;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordPack;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.rest.RestClient;
import com.aliyun.odps.tunnel.Configuration;
import com.aliyun.odps.tunnel.StreamClient;
import com.aliyun.odps.tunnel.StreamUploadWriter;
import com.aliyun.odps.tunnel.TableTunnel$AjcClosure1;
import com.aliyun.odps.tunnel.TableTunnel$DownloadSession$AjcClosure1;
import com.aliyun.odps.tunnel.TableTunnel$DownloadSession$AjcClosure3;
import com.aliyun.odps.tunnel.TableTunnel$UploadSession$AjcClosure1;
import com.aliyun.odps.tunnel.TableTunnel$UploadSession$AjcClosure3;
import com.aliyun.odps.tunnel.TableTunnel$UploadSession$AjcClosure5;
import com.aliyun.odps.tunnel.TableTunnel$UploadSession$AjcClosure7;
import com.aliyun.odps.tunnel.TableTunnel$UploadSession$AjcClosure9;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TunnelTableSchema;
import com.aliyun.odps.tunnel.io.Checksum;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import com.aliyun.odps.tunnel.io.TunnelBufferedWriter;
import com.aliyun.odps.tunnel.io.TunnelRecordReader;
import com.aliyun.odps.tunnel.io.TunnelRecordWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.runtime.reflect.Factory;

public class TableTunnel {
    private Configuration config;
    private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;

    public TableTunnel(Odps odps) {
        this.config = new Configuration(odps);
    }

    public UploadSession createUploadSession(String projectName, String tableName) throws TunnelException {
        return new UploadSession(projectName, tableName, null, null);
    }

    public UploadSession createUploadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new UploadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null);
    }

    public UploadSession getUploadSession(String projectName, String tableName, String id, long shares, long shareId) throws TunnelException {
        return this.getUploadSession(projectName, tableName, null, id, shares, shareId);
    }

    public UploadSession getUploadSession(String projectName, String tableName, PartitionSpec partitionSpec, String id, long shares, long shareId) throws TunnelException {
        if (shares < 1L) {
            throw new IllegalArgumentException("Invalid arguments, shares must >= 1");
        }
        if (shareId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shareId must >= 0");
        }
        if (shares <= shareId) {
            throw new IllegalArgumentException("Invalid arguments, shares must > shareId");
        }
        UploadSession session = partitionSpec != null ? this.getUploadSession(projectName, tableName, partitionSpec, id) : this.getUploadSession(projectName, tableName, id);
        session.shares = shares;
        session.curBlockId = shareId;
        return session;
    }

    public UploadSession getUploadSession(String projectName, String tableName, String id) throws TunnelException {
        return new UploadSession(projectName, tableName, null, id);
    }

    public UploadSession getUploadSession(String projectName, String tableName, PartitionSpec partitionSpec, String id) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new UploadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), id);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName) throws TunnelException {
        return new DownloadSession(projectName, tableName, null, null, null);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null, null);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, long shardId) throws TunnelException {
        if (shardId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shard id required.");
        }
        return new DownloadSession(projectName, tableName, null, shardId, null);
    }

    public DownloadSession createDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, long shardId) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        if (shardId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shard id required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), shardId, null);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, String id) throws TunnelException {
        return new DownloadSession(projectName, tableName, null, null, id);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, long shardId, String id) throws TunnelException {
        return new DownloadSession(projectName, tableName, null, shardId, id);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, String id) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), null, id);
    }

    public DownloadSession getDownloadSession(String projectName, String tableName, PartitionSpec partitionSpec, long shardId, String id) throws TunnelException {
        if (partitionSpec == null || partitionSpec.keys().size() == 0) {
            throw new IllegalArgumentException("Invalid arguments, partition spec required.");
        }
        if (shardId < 0L) {
            throw new IllegalArgumentException("Invalid arguments, shard id required.");
        }
        return new DownloadSession(projectName, tableName, partitionSpec.toString().replaceAll("'", ""), shardId, id);
    }

    @Deprecated
    public StreamClient createStreamClient(String projectName, String tableName) throws TunnelException {
        String string = projectName;
        String string2 = tableName;
        JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_0, (Object)this, (Object)this, (Object)string, (Object)string2);
        Object[] objectArray = new Object[]{this, string, string2, joinPoint};
        return (StreamClient)OdpsDeprecatedLogger.aspectOf().around(new TableTunnel$AjcClosure1(objectArray).linkClosureAndJoinPoint(69648));
    }

    public StreamUploadWriter createStreamUploadWriter(String projectName, String tableName) throws TunnelException, IOException {
        RestClient tunnelServiceClient = this.config.newRestClient(projectName);
        return new StreamUploadWriter(tunnelServiceClient, this.getResource(projectName, tableName));
    }

    private String getResource(String projectName, String tableName) {
        return this.config.getResource(projectName, tableName);
    }

    public void setEndpoint(String endpoint) {
        try {
            URI u = new URI(endpoint);
            this.config.setEndpoint(u);
        }
        catch (URISyntaxException e) {
            throw new IllegalArgumentException("Invalid endpoint.");
        }
    }

    static {
        TableTunnel.ajc$preClinit();
    }

    static /* synthetic */ StreamClient createStreamClient_aroundBody0(TableTunnel ajc$this, String projectName, String tableName, JoinPoint joinPoint) {
        return new StreamClient(ajc$this.config, projectName, tableName);
    }

    private static /* synthetic */ void ajc$preClinit() {
        Factory factory = new Factory("TableTunnel.java", TableTunnel.class);
        ajc$tjp_0 = factory.makeSJP("method-execution", (Signature)factory.makeMethodSig("1", "createStreamClient", "com.aliyun.odps.tunnel.TableTunnel", "java.lang.String:java.lang.String", "projectName:tableName", "com.aliyun.odps.tunnel.TunnelException", "com.aliyun.odps.tunnel.StreamClient"), 523);
    }

    public class DownloadSession {
        private String id;
        private String projectName;
        private String tableName;
        private String partitionSpec;
        private Long shardId;
        private long count;
        private TableSchema schema = new TableSchema();
        private DownloadStatus status = DownloadStatus.UNKNOWN;
        private Configuration conf;
        private RestClient tunnelServiceClient;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;

        DownloadSession(String projectName, String tableName, String partitionSpec, Long shardId, String downloadId) throws TunnelException {
            this.conf = TableTunnel.this.config;
            this.projectName = projectName;
            this.tableName = tableName;
            this.partitionSpec = partitionSpec;
            this.shardId = shardId;
            this.id = downloadId;
            this.tunnelServiceClient = this.conf.newRestClient(projectName);
            if (this.id == null) {
                this.initiate();
            } else {
                this.reload();
            }
        }

        public TunnelRecordReader openRecordReader(long start, long count) throws TunnelException, IOException {
            return this.openRecordReader(start, count, false);
        }

        public TunnelRecordReader openRecordReader(long start, long count, boolean compress) throws TunnelException, IOException {
            return this.openRecordReader(start, count, compress, null);
        }

        public TunnelRecordReader openRecordReader(long start, long count, CompressOption compress) throws TunnelException, IOException {
            return this.openRecordReader(start, count, compress, null);
        }

        public TunnelRecordReader openRecordReader(long start, long count, boolean compress, List<Column> columns) throws TunnelException, IOException {
            CompressOption option = compress ? new CompressOption() : new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
            return this.openRecordReader(start, count, option, columns);
        }

        public TunnelRecordReader openRecordReader(long start, long count, CompressOption compress, List<Column> columns) throws TunnelException, IOException {
            return new TunnelRecordReader(start, count, columns, compress, this.tunnelServiceClient, this);
        }

        private void initiate() throws TunnelException {
            block15: {
                HashMap<String, String> params = new HashMap<String, String>();
                HashMap<String, String> headers = new HashMap<String, String>();
                headers.put("Content-Length", String.valueOf(0));
                params.put("downloads", null);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                if (this.shardId != null) {
                    params.put("shard", String.valueOf(this.shardId));
                }
                Connection conn = null;
                try {
                    Connection connection = conn = this.tunnelServiceClient.connect(this.getResource(), "POST", params, headers);
                    JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_0, (Object)this, (Object)connection);
                    Object[] objectArray = new Object[]{this, connection, joinPoint};
                    Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$DownloadSession$AjcClosure1(objectArray).linkClosureAndJoinPoint(4112));
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        break block15;
                    }
                    TunnelException e = new TunnelException(conn.getInputStream());
                    e.setRequestId(resp.getHeader("x-odps-request-id"));
                    throw e;
                }
                catch (IOException e) {
                    throw new TunnelException("Failed to create download session with tunnel endpoint " + this.tunnelServiceClient.getEndpoint(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException e) {}
                    }
                }
            }
        }

        private void reload() throws TunnelException {
            block15: {
                HashMap<String, String> params = new HashMap<String, String>();
                HashMap<String, String> headers = new HashMap<String, String>();
                headers.put("Content-Length", String.valueOf(0));
                params.put("downloadid", this.id);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                if (this.shardId != null) {
                    params.put("shard", String.valueOf(this.shardId));
                }
                Connection conn = null;
                try {
                    Connection connection = conn = this.tunnelServiceClient.connect(this.getResource(), "GET", params, headers);
                    JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_1, (Object)this, (Object)connection);
                    Object[] objectArray = new Object[]{this, connection, joinPoint};
                    Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$DownloadSession$AjcClosure3(objectArray).linkClosureAndJoinPoint(4112));
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        break block15;
                    }
                    TunnelException e = new TunnelException(conn.getInputStream());
                    e.setRequestId(resp.getHeader("x-odps-request-id"));
                    throw e;
                }
                catch (IOException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException e) {}
                    }
                }
            }
        }

        public TableSchema getSchema() {
            return this.schema;
        }

        public long getRecordCount() {
            return this.count;
        }

        public String getId() {
            return this.id;
        }

        public DownloadStatus getStatus() throws TunnelException, IOException {
            this.reload();
            return this.status;
        }

        public String getPartitionSpec() {
            return this.partitionSpec;
        }

        public String getProjectName() {
            return this.projectName;
        }

        public String getTableName() {
            return this.tableName;
        }

        private String getResource() {
            return this.conf.getResource(this.projectName, this.tableName);
        }

        private void loadFromJson(InputStream is) throws TunnelException {
            try {
                JSONObject node3;
                Long node2;
                String json = IOUtils.readStreamAsString(is);
                JSONObject tree = JSONObject.parseObject((String)json);
                String node = tree.getString("DownloadID");
                if (node != null) {
                    this.id = node;
                }
                if ((node = tree.getString("Status")) != null) {
                    this.status = DownloadStatus.valueOf(node.toUpperCase());
                }
                if ((node2 = tree.getLong("RecordCount")) != null) {
                    this.count = node2;
                }
                if ((node3 = tree.getJSONObject("Schema")) != null) {
                    this.schema = new TunnelTableSchema(node3);
                }
            }
            catch (Exception e) {
                throw new TunnelException("Invalid json content.", e);
            }
        }

        static {
            DownloadSession.ajc$preClinit();
        }

        static /* synthetic */ Response getResponse_aroundBody0(DownloadSession downloadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        static /* synthetic */ Response getResponse_aroundBody2(DownloadSession downloadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        private static /* synthetic */ void ajc$preClinit() {
            Factory factory = new Factory("TableTunnel.java", DownloadSession.class);
            ajc$tjp_0 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 1332);
            ajc$tjp_1 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 1379);
        }
    }

    public static enum DownloadStatus {
        UNKNOWN,
        NORMAL,
        CLOSED,
        EXPIRED;

    }

    public class UploadSession {
        private String id;
        private TableSchema schema = new TableSchema();
        private String projectName;
        private String tableName;
        private String partitionSpec;
        private List<Long> blocks = new ArrayList<Long>();
        private UploadStatus status = UploadStatus.UNKNOWN;
        private Configuration conf;
        private RestClient tunnelServiceClient;
        private final Long totalBLocks = 20000L;
        private Long shares = 1L;
        private Long curBlockId = 0L;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_0;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_1;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_2;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_3;
        private static final /* synthetic */ JoinPoint.StaticPart ajc$tjp_4;

        UploadSession(String projectName, String tableName, String partitionSpec, String uploadId) throws TunnelException {
            this.conf = TableTunnel.this.config;
            this.projectName = projectName;
            this.tableName = tableName;
            this.partitionSpec = partitionSpec;
            this.id = uploadId;
            this.tunnelServiceClient = this.conf.newRestClient(projectName);
            if (this.id == null) {
                this.initiate();
            } else {
                this.reload();
            }
        }

        private void initiate() throws TunnelException {
            block14: {
                HashMap<String, String> params = new HashMap<String, String>();
                params.put("uploads", null);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                HashMap<String, String> headers = new HashMap<String, String>();
                headers.put("Content-Length", String.valueOf(0));
                Connection conn = null;
                try {
                    Connection connection = conn = this.tunnelServiceClient.connect(this.getResource(), "POST", params, headers);
                    JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_0, (Object)this, (Object)connection);
                    Object[] objectArray = new Object[]{this, connection, joinPoint};
                    Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$UploadSession$AjcClosure1(objectArray).linkClosureAndJoinPoint(4112));
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        break block14;
                    }
                    TunnelException e = new TunnelException(conn.getInputStream());
                    e.setRequestId(resp.getHeader("x-odps-request-id"));
                    throw e;
                }
                catch (IOException e) {
                    throw new TunnelException("Failed to create upload session with tunnel endpoint " + this.tunnelServiceClient.getEndpoint(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException e) {}
                    }
                }
            }
        }

        public synchronized Long getAvailBlockId() {
            if (this.curBlockId >= this.totalBLocks) {
                throw new RuntimeException("No more available blockId, already " + this.curBlockId);
            }
            Long old = this.curBlockId;
            this.curBlockId = this.curBlockId + this.shares;
            return old;
        }

        public void commit() throws TunnelException, IOException {
            this.completeUpload();
        }

        public void writeBlock(long blockId, RecordPack pack) throws IOException {
            Connection conn = null;
            try {
                if (pack instanceof ProtobufRecordPack) {
                    ProtobufRecordPack protoPack = (ProtobufRecordPack)pack;
                    conn = this.getConnection(blockId, protoPack.getCompressOption());
                    this.sendBlock(protoPack, conn);
                } else {
                    Record record;
                    RecordWriter writer = this.openRecordWriter(blockId);
                    RecordReader reader = pack.getRecordReader();
                    while ((record = reader.read()) != null) {
                        writer.write(record);
                    }
                    writer.close();
                }
            }
            catch (IOException e) {
                if (null != conn && !(e.getCause() instanceof TunnelException)) {
                    Connection connection = conn;
                    JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_1, (Object)this, (Object)connection);
                    Object[] objectArray = new Object[]{this, connection, joinPoint};
                    Response response = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$UploadSession$AjcClosure3(objectArray).linkClosureAndJoinPoint(4112));
                    if (!response.isOK()) {
                        TunnelException err = new TunnelException(conn.getInputStream());
                        throw new IOException(err.getErrorMsg(), err.getCause());
                    }
                }
                throw e;
            }
            catch (TunnelException e) {
                throw new IOException(e.getMessage(), e.getCause());
            }
            catch (OdpsException e) {
                throw new IOException(e.getMessage(), e.getCause());
            }
            finally {
                if (null != conn) {
                    conn.disconnect();
                }
            }
        }

        private void sendBlock(ProtobufRecordPack pack, Connection conn) throws IOException {
            if (null == conn) {
                throw new IOException("Invalid connection");
            }
            pack.complete();
            ByteArrayOutputStream baos = pack.getProtobufStream();
            baos.writeTo(conn.getOutputStream());
            conn.getOutputStream().close();
            baos.close();
            Connection connection = conn;
            JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_2, (Object)this, (Object)connection);
            Object[] objectArray = new Object[]{this, connection, joinPoint};
            Response response = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$UploadSession$AjcClosure5(objectArray).linkClosureAndJoinPoint(4112));
            if (!response.isOK()) {
                TunnelException err = new TunnelException(conn.getInputStream());
                err.setRequestId(response.getHeader("x-odps-request-id"));
                throw new IOException(err.getMessage(), err);
            }
        }

        public RecordWriter openRecordWriter(long blockId) throws TunnelException, IOException {
            return this.openRecordWriter(blockId, false);
        }

        public RecordWriter openRecordWriter(long blockId, boolean compress) throws TunnelException, IOException {
            CompressOption option = compress ? new CompressOption() : new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
            return this.openRecordWriter(blockId, option);
        }

        public RecordWriter openRecordWriter(long blockId, CompressOption compress) throws TunnelException, IOException {
            TunnelRecordWriter writer = null;
            Connection conn = null;
            try {
                conn = this.getConnection(blockId, compress);
                writer = new TunnelRecordWriter(this.schema, conn, compress);
            }
            catch (IOException e) {
                if (conn != null) {
                    conn.disconnect();
                }
                throw new TunnelException(e.getMessage(), e.getCause());
            }
            catch (TunnelException e) {
                throw e;
            }
            catch (OdpsException e) {
                throw new TunnelException(e.getMessage(), e);
            }
            return writer;
        }

        public RecordWriter openBufferedWriter() throws TunnelException {
            return this.openBufferedWriter(false);
        }

        public RecordWriter openBufferedWriter(boolean compress) throws TunnelException {
            CompressOption compressOption = compress ? this.conf.getCompressOption() : new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0);
            return this.openBufferedWriter(compressOption);
        }

        public RecordWriter openBufferedWriter(CompressOption compressOption) throws TunnelException {
            try {
                return new TunnelBufferedWriter(this, compressOption);
            }
            catch (IOException e) {
                throw new TunnelException(e.getMessage(), e.getCause());
            }
        }

        private Connection getConnection(long blockId, CompressOption compress) throws OdpsException, IOException {
            HashMap<String, String> params = new HashMap<String, String>();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("Transfer-Encoding", "chunked");
            headers.put("Content-Type", "application/octet-stream");
            headers.put("x-odps-tunnel-version", String.valueOf(4));
            switch (compress.algorithm) {
                case ODPS_RAW: {
                    break;
                }
                case ODPS_ZLIB: {
                    headers.put("Content-Encoding", "deflate");
                    break;
                }
                case ODPS_SNAPPY: {
                    headers.put("Content-Encoding", "x-snappy-framed");
                    break;
                }
                default: {
                    throw new TunnelException("invalid compression option.");
                }
            }
            params.put("uploadid", this.id);
            params.put("blockid", Long.toString(blockId));
            if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                params.put("partition", this.partitionSpec);
            }
            return this.tunnelServiceClient.connect(this.getResource(), "PUT", params, headers);
        }

        private void reload() throws TunnelException {
            block14: {
                HashMap<String, String> params = new HashMap<String, String>();
                HashMap<String, String> headers = new HashMap<String, String>();
                headers.put("Content-Length", String.valueOf(0));
                params.put("uploadid", this.id);
                if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                    params.put("partition", this.partitionSpec);
                }
                Connection conn = null;
                try {
                    Connection connection = conn = this.tunnelServiceClient.connect(this.getResource(), "GET", params, headers);
                    JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_3, (Object)this, (Object)connection);
                    Object[] objectArray = new Object[]{this, connection, joinPoint};
                    Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$UploadSession$AjcClosure7(objectArray).linkClosureAndJoinPoint(4112));
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        break block14;
                    }
                    TunnelException e = new TunnelException(conn.getInputStream());
                    throw e;
                }
                catch (IOException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                catch (TunnelException e) {
                    throw e;
                }
                catch (OdpsException e) {
                    throw new TunnelException(e.getMessage(), e);
                }
                finally {
                    if (conn != null) {
                        try {
                            conn.disconnect();
                        }
                        catch (IOException e) {}
                    }
                }
            }
        }

        public void commit(Long[] blocks) throws TunnelException, IOException {
            if (blocks == null) {
                throw new IllegalArgumentException("Invalid argument: blocks.");
            }
            HashMap<Long, Boolean> clientBlockMap = new HashMap<Long, Boolean>();
            for (Long blockId : blocks) {
                clientBlockMap.put(blockId, true);
            }
            Long[] serverBlocks = this.getBlockList();
            HashMap<Long, Boolean> serverBlockMap = new HashMap<Long, Boolean>();
            for (Long blockId : serverBlocks) {
                serverBlockMap.put(blockId, true);
            }
            if (serverBlockMap.size() != clientBlockMap.size()) {
                throw new TunnelException("Blocks not match, server: " + serverBlockMap.size() + ", tunnelServiceClient: " + clientBlockMap.size());
            }
            for (Long blockId : blocks) {
                if (serverBlockMap.containsKey(blockId)) continue;
                throw new TunnelException("Block not exsits on server, block id is " + blockId);
            }
            this.completeUpload();
        }

        private void completeUpload() throws TunnelException, IOException {
            HashMap<String, String> params = new HashMap<String, String>();
            HashMap<String, String> headers = new HashMap<String, String>();
            headers.put("Content-Length", String.valueOf(0));
            params.put("uploadid", this.id);
            if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
                params.put("partition", this.partitionSpec);
            }
            int count = 0;
            while (true) {
                ++count;
                Connection conn = null;
                try {
                    Connection connection = conn = this.tunnelServiceClient.connect(this.getResource(), "POST", params, headers);
                    JoinPoint joinPoint = Factory.makeJP((JoinPoint.StaticPart)ajc$tjp_4, (Object)this, (Object)connection);
                    Object[] objectArray = new Object[]{this, connection, joinPoint};
                    Response resp = (Response)OdpsDeprecatedLogger.aspectOf().aroundOdpsImpl(new TableTunnel$UploadSession$AjcClosure9(objectArray).linkClosureAndJoinPoint(4112));
                    if (resp.isOK()) {
                        this.loadFromJson(conn.getInputStream());
                        break;
                    }
                    if (resp.getStatus() == 500 && count < 3) {
                        try {
                            Thread.sleep(2 * count * 1000);
                            continue;
                        }
                        catch (InterruptedException e) {
                            throw new TunnelException(e.getMessage(), e);
                        }
                    }
                    try {
                        throw new TunnelException(conn.getInputStream());
                    }
                    catch (IOException e) {
                        throw new TunnelException(e.getMessage(), e);
                    }
                    catch (TunnelException e) {
                        throw e;
                    }
                    catch (OdpsException e) {
                        throw new TunnelException(e.getMessage(), e);
                    }
                }
                finally {
                    if (conn == null) continue;
                    conn.disconnect();
                    continue;
                }
                break;
            }
        }

        public String getId() {
            return this.id;
        }

        public TableSchema getSchema() {
            return this.schema;
        }

        public UploadStatus getStatus() throws TunnelException, IOException {
            this.reload();
            return this.status;
        }

        public Record newRecord() {
            return new ArrayRecord(this.getSchema().getColumns().toArray(new Column[0]));
        }

        public RecordPack newRecordPack() throws IOException {
            return new ProtobufRecordPack(this.schema);
        }

        public RecordPack newRecordPack(CompressOption option) throws IOException {
            return new ProtobufRecordPack(this.schema, new Checksum(), option);
        }

        public Long[] getBlockList() throws TunnelException, IOException {
            this.reload();
            return this.blocks.toArray(new Long[0]);
        }

        private String getResource() {
            return this.conf.getResource(this.projectName, this.tableName);
        }

        private void loadFromJson(InputStream is) throws TunnelException {
            try {
                String json = IOUtils.readStreamAsString(is);
                JSONObject tree = JSONObject.parseObject((String)json);
                String node = tree.getString("UploadID");
                if (node != null) {
                    this.id = node;
                }
                if ((node = tree.getString("Status")) != null) {
                    this.status = UploadStatus.valueOf(node.toUpperCase());
                }
                this.blocks.clear();
                JSONArray node2 = tree.getJSONArray("UploadedBlockList");
                if (node2 != null) {
                    for (int i = 0; i < node2.size(); ++i) {
                        this.blocks.add(node2.getJSONObject(i).getLong("BlockID"));
                    }
                }
                JSONObject node3 = tree.getJSONObject("Schema");
                if (node != null) {
                    this.schema = new TunnelTableSchema(node3);
                }
            }
            catch (Exception e) {
                throw new TunnelException("Invalid json content.", e);
            }
        }

        static {
            UploadSession.ajc$preClinit();
        }

        static /* synthetic */ Response getResponse_aroundBody0(UploadSession uploadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        static /* synthetic */ Response getResponse_aroundBody2(UploadSession uploadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        static /* synthetic */ Response getResponse_aroundBody4(UploadSession uploadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        static /* synthetic */ Response getResponse_aroundBody6(UploadSession uploadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        static /* synthetic */ Response getResponse_aroundBody8(UploadSession uploadSession, Connection connection, JoinPoint joinPoint) {
            return connection.getResponse();
        }

        private static /* synthetic */ void ajc$preClinit() {
            Factory factory = new Factory("TableTunnel.java", UploadSession.class);
            ajc$tjp_0 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 681);
            ajc$tjp_1 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 761);
            ajc$tjp_2 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 788);
            ajc$tjp_3 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 942);
            ajc$tjp_4 = factory.makeSJP("method-call", (Signature)factory.makeMethodSig("601", "getResponse", "com.aliyun.odps.commons.transport.Connection", "", "", "java.io.IOException", "com.aliyun.odps.commons.transport.Response"), 1028);
        }
    }

    public static enum UploadStatus {
        UNKNOWN,
        NORMAL,
        CLOSING,
        CLOSED,
        CANCELED,
        EXPIRED,
        CRITICAL;

    }
}

