/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.odps.tunnel.io;

import com.aliyun.odps.commons.util.RetryExceedLimitException;
import com.aliyun.odps.commons.util.RetryStrategy;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.io.Checksum;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import java.io.IOException;

public class TunnelBufferedWriter
implements RecordWriter {
    private ProtobufRecordPack bufferedPack;
    private TableTunnel.UploadSession session;
    private RetryStrategy retry;
    private long bufferSize;
    private long bytesWritten;
    private static final long BUFFER_SIZE_DEFAULT = 0xA00000L;
    private static final long BUFFER_SIZE_MIN = 0x100000L;
    private static final long BUFFER_SIZE_MAX = 1048576000L;

    public TunnelBufferedWriter(TableTunnel.UploadSession session, CompressOption option) throws IOException {
        this.bufferedPack = new ProtobufRecordPack(session.getSchema(), new Checksum(), option);
        this.session = session;
        this.bufferSize = 0xA00000L;
        this.retry = new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF);
        this.bytesWritten = 0L;
    }

    public void setBufferSize(long bufferSize) {
        if (bufferSize < 0x100000L) {
            throw new IllegalArgumentException("buffer size must >= 1048576, now: " + bufferSize);
        }
        if (bufferSize > 1048576000L) {
            throw new IllegalArgumentException("buffer size must <= 1048576000, now: " + bufferSize);
        }
        this.bufferSize = bufferSize;
    }

    public void setRetryStrategy(RetryStrategy strategy) {
        this.retry = strategy;
    }

    @Override
    public void write(Record r) throws IOException {
        this.bufferedPack.append(r);
        if (this.bufferedPack.getTotalBytes() > this.bufferSize) {
            this.flush();
        }
    }

    @Override
    public void close() throws IOException {
        this.flush();
    }

    public long getTotalBytes() throws IOException {
        this.flush();
        return this.bytesWritten;
    }

    private void flush() throws IOException {
        long delta = this.bufferedPack.getTotalBytesWritten();
        if (delta > 0L) {
            this.bytesWritten += delta;
            Long blockId = this.session.getAvailBlockId();
            while (true) {
                try {
                    this.session.writeBlock(blockId, this.bufferedPack);
                    this.bufferedPack.reset();
                    return;
                }
                catch (IOException e) {
                    try {
                        this.retry.onFailure(e);
                    }
                    catch (RetryExceedLimitException ignore) {
                        throw e;
                    }
                }
            }
        }
    }
}

