package org.apache.hadoop.hdds.scm.storage;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.ChunkBuffer;
import org.apache.hadoop.ozone.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.ozone.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hdds/scm/storage/BlockOutputStream.class */
public class BlockOutputStream extends OutputStream {
    public static final Logger LOG = LoggerFactory.getLogger(BlockOutputStream.class);
    public static final String EXCEPTION_MSG = "Unexpected Storage Container Exception: ";
    private AtomicReference<BlockID> blockID;
    private final ContainerProtos.BlockData.Builder containerBlockData;
    private XceiverClientFactory xceiverClientFactory;
    private XceiverClientSpi xceiverClient;
    private OzoneClientConfig config;
    private int chunkIndex;
    private final AtomicLong chunkOffset = new AtomicLong();
    private final BufferPool bufferPool;
    private final AtomicReference<IOException> ioException;
    private final ExecutorService responseExecutor;
    private long totalDataFlushedLength;
    private long writtenDataLength;
    private List<ChunkBuffer> bufferList;
    private final CommitWatcher commitWatcher;
    private final List<DatanodeDetails> failedServers;
    private final Checksum checksum;
    private int flushPeriod;
    private int currentBufferRemaining;
    private ChunkBuffer currentBuffer;
    private final Token<? extends TokenIdentifier> token;

    public BlockOutputStream(BlockID blockID, XceiverClientFactory xceiverClientFactory, Pipeline pipeline, BufferPool bufferPool, OzoneClientConfig ozoneClientConfig, Token<? extends TokenIdentifier> token) throws IOException {
        this.xceiverClientFactory = xceiverClientFactory;
        this.config = ozoneClientConfig;
        this.blockID = new AtomicReference<>(blockID);
        this.containerBlockData = ContainerProtos.BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf()).addMetadata(ContainerProtos.KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build());
        this.xceiverClient = xceiverClientFactory.acquireClient(pipeline);
        this.bufferPool = bufferPool;
        this.token = token;
        refreshCurrentBuffer(bufferPool);
        this.flushPeriod = (int) (ozoneClientConfig.getStreamBufferFlushSize() / ozoneClientConfig.getStreamBufferSize());
        Preconditions.checkArgument(((long) this.flushPeriod) * ((long) ozoneClientConfig.getStreamBufferSize()) == ozoneClientConfig.getStreamBufferFlushSize());
        this.responseExecutor = Executors.newSingleThreadExecutor();
        this.commitWatcher = new CommitWatcher(bufferPool, this.xceiverClient);
        this.bufferList = null;
        this.totalDataFlushedLength = 0L;
        this.writtenDataLength = 0L;
        this.failedServers = new ArrayList(0);
        this.ioException = new AtomicReference<>(null);
        this.checksum = new Checksum(ozoneClientConfig.getChecksumType(), ozoneClientConfig.getBytesPerChecksum());
    }

    private void refreshCurrentBuffer(BufferPool bufferPool) {
        this.currentBuffer = bufferPool.getCurrentBuffer();
        this.currentBufferRemaining = this.currentBuffer != null ? this.currentBuffer.remaining() : 0;
    }

    public BlockID getBlockID() {
        return this.blockID.get();
    }

    public long getTotalAckDataLength() {
        return this.commitWatcher.getTotalAckDataLength();
    }

    public long getWrittenDataLength() {
        return this.writtenDataLength;
    }

    public List<DatanodeDetails> getFailedServers() {
        return this.failedServers;
    }

    @VisibleForTesting
    public XceiverClientSpi getXceiverClient() {
        return this.xceiverClient;
    }

    @VisibleForTesting
    public long getTotalDataFlushedLength() {
        return this.totalDataFlushedLength;
    }

    @VisibleForTesting
    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    public IOException getIoException() {
        return this.ioException.get();
    }

    @VisibleForTesting
    public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
        return this.commitWatcher.getCommitIndex2flushedDataMap();
    }

    @Override // java.io.OutputStream
    public void write(int i) throws IOException {
        checkOpen();
        allocateNewBufferIfNeeded();
        this.currentBuffer.put((byte) i);
        this.currentBufferRemaining--;
        writeChunkIfNeeded();
        this.writtenDataLength++;
        doFlushOrWatchIfNeeded();
    }

    private void writeChunkIfNeeded() throws IOException {
        if (this.currentBufferRemaining == 0) {
            writeChunk(this.currentBuffer);
        }
    }

    @Override // java.io.OutputStream
    public void write(byte[] bArr, int i, int i2) throws IOException {
        checkOpen();
        if (bArr == null) {
            throw new NullPointerException();
        }
        if (i < 0 || i > bArr.length || i2 < 0 || i + i2 > bArr.length || i + i2 < 0) {
            throw new IndexOutOfBoundsException();
        }
        if (i2 == 0) {
            return;
        }
        while (i2 > 0) {
            allocateNewBufferIfNeeded();
            int min = Math.min(this.currentBufferRemaining, i2);
            this.currentBuffer.put(bArr, i, min);
            this.currentBufferRemaining -= min;
            writeChunkIfNeeded();
            i += min;
            i2 -= min;
            this.writtenDataLength += min;
            doFlushOrWatchIfNeeded();
        }
    }

    private void doFlushOrWatchIfNeeded() throws IOException {
        if (this.currentBufferRemaining == 0) {
            if (this.bufferPool.getNumberOfUsedBuffers() % this.flushPeriod == 0) {
                updateFlushLength();
                executePutBlock(false, false);
            }
            if (this.bufferPool.getNumberOfUsedBuffers() == this.bufferPool.getCapacity()) {
                handleFullBuffer();
            }
        }
    }

    private void allocateNewBufferIfNeeded() {
        if (this.currentBufferRemaining == 0) {
            this.currentBuffer = this.bufferPool.allocateBuffer(this.config.getBytesPerChecksum());
            this.currentBufferRemaining = this.currentBuffer.remaining();
        }
    }

    private void updateFlushLength() {
        this.totalDataFlushedLength = this.writtenDataLength;
    }

    private boolean isBufferPoolFull() {
        return this.bufferPool.computeBufferData() == this.config.getStreamBufferMaxSize();
    }

    public void writeOnRetry(long j) throws IOException {
        if (j == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Retrying write length {} for blockID {}", Long.valueOf(j), this.blockID);
        }
        Preconditions.checkArgument(j <= this.config.getStreamBufferMaxSize());
        int i = 0;
        while (j > 0) {
            ChunkBuffer buffer = this.bufferPool.getBuffer(i);
            long min = Math.min(buffer.position(), j);
            if (!buffer.hasRemaining()) {
                writeChunk(buffer);
            }
            j -= min;
            i++;
            this.writtenDataLength += min;
            if (this.writtenDataLength % this.config.getStreamBufferFlushSize() == 0) {
                updateFlushLength();
                executePutBlock(false, false);
            }
            if (this.writtenDataLength == this.config.getStreamBufferMaxSize()) {
                handleFullBuffer();
            }
        }
    }

    private void handleFullBuffer() throws IOException {
        try {
            checkOpen();
            if (!this.commitWatcher.getFutureMap().isEmpty()) {
                waitOnFlushFutures();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            handleInterruptedException(e, true);
        } catch (ExecutionException e2) {
            handleExecutionException(e2);
        }
        watchForCommit(true);
    }

    private void adjustBuffersOnException() {
        this.commitWatcher.releaseBuffersOnException();
        refreshCurrentBuffer(this.bufferPool);
    }

    private void watchForCommit(boolean z) throws IOException {
        checkOpen();
        try {
            XceiverClientReply watchOnFirstIndex = z ? this.commitWatcher.watchOnFirstIndex() : this.commitWatcher.watchOnLastIndex();
            if (watchOnFirstIndex != null) {
                List<DatanodeDetails> datanodes = watchOnFirstIndex.getDatanodes();
                if (!datanodes.isEmpty()) {
                    LOG.warn("Failed to commit BlockId {} on {}. Failed nodes: {}", new Object[]{this.blockID, this.xceiverClient.getPipeline(), datanodes});
                    this.failedServers.addAll(datanodes);
                }
            }
            refreshCurrentBuffer(this.bufferPool);
        } catch (IOException e) {
            setIoException(e);
            throw getIoException();
        }
    }

    private CompletableFuture<ContainerProtos.ContainerCommandResponseProto> executePutBlock(boolean z, boolean z2) throws IOException {
        List<ChunkBuffer> list;
        checkOpen();
        long j = this.totalDataFlushedLength;
        if (z2) {
            list = null;
        } else {
            Preconditions.checkNotNull(this.bufferList);
            list = this.bufferList;
            this.bufferList = null;
            Preconditions.checkNotNull(list);
        }
        CompletableFuture<ContainerProtos.ContainerCommandResponseProto> completableFuture = null;
        try {
            XceiverClientReply putBlockAsync = ContainerProtocolCalls.putBlockAsync(this.xceiverClient, this.containerBlockData.build(), z, this.token);
            List<ChunkBuffer> list2 = list;
            completableFuture = putBlockAsync.getResponse().thenApplyAsync(containerCommandResponseProto -> {
                try {
                    validateResponse(containerCommandResponseProto);
                    if (getIoException() == null && !z2) {
                        BlockID fromProtobuf = BlockID.getFromProtobuf(containerCommandResponseProto.getPutBlock().getCommittedBlockLength().getBlockID());
                        Preconditions.checkState(this.blockID.get().getContainerBlockID().equals(fromProtobuf.getContainerBlockID()));
                        this.blockID.set(fromProtobuf);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding index " + putBlockAsync.getLogIndex() + " commitMap size " + this.commitWatcher.getCommitInfoMapSize() + " flushLength " + j + " numBuffers " + list2.size() + " blockID " + this.blockID + " bufferPool size" + this.bufferPool.getSize() + " currentBufferIndex " + this.bufferPool.getCurrentBufferIndex());
                        }
                        this.commitWatcher.updateCommitInfoMap(putBlockAsync.getLogIndex(), list2);
                    }
                    return containerCommandResponseProto;
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            }, (Executor) this.responseExecutor).exceptionally((Function<Throwable, ? extends U>) th -> {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("putBlock failed for blockID {} with exception {}", this.blockID, th.getLocalizedMessage());
                }
                CompletionException completionException = new CompletionException(th);
                setIoException(completionException);
                throw completionException;
            });
        } catch (IOException | ExecutionException e) {
            throw new IOException(EXCEPTION_MSG + e.toString(), e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            handleInterruptedException(e2, false);
        }
        this.commitWatcher.getFutureMap().put(Long.valueOf(j), completableFuture);
        return completableFuture;
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public void flush() throws IOException {
        if (this.xceiverClientFactory == null || this.xceiverClient == null || this.bufferPool == null || this.bufferPool.getSize() <= 0) {
            return;
        }
        if (!this.config.isStreamBufferFlushDelay() || this.writtenDataLength - this.totalDataFlushedLength >= this.config.getStreamBufferSize()) {
            try {
                handleFlush(false);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                handleInterruptedException(e, true);
            } catch (ExecutionException e2) {
                handleExecutionException(e2);
            }
        }
    }

    private void writeChunk(ChunkBuffer chunkBuffer) throws IOException {
        if (this.bufferList == null) {
            this.bufferList = new ArrayList();
        }
        this.bufferList.add(chunkBuffer);
        writeChunkToContainer(chunkBuffer.duplicate(0, chunkBuffer.position()));
    }

    private void handleFlush(boolean z) throws IOException, InterruptedException, ExecutionException {
        checkOpen();
        if (this.totalDataFlushedLength < this.writtenDataLength) {
            refreshCurrentBuffer(this.bufferPool);
            Preconditions.checkArgument(this.currentBuffer.position() > 0);
            if (this.currentBuffer.hasRemaining()) {
                writeChunk(this.currentBuffer);
            }
            updateFlushLength();
            executePutBlock(z, false);
        } else if (z) {
            executePutBlock(true, true);
        }
        waitOnFlushFutures();
        watchForCommit(false);
        checkOpen();
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.xceiverClientFactory == null || this.xceiverClient == null || this.bufferPool == null) {
            return;
        }
        try {
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            handleInterruptedException(e, true);
        } catch (ExecutionException e2) {
            handleExecutionException(e2);
        } finally {
            cleanup(false);
        }
        if (this.bufferPool.getSize() > 0) {
            handleFlush(true);
        }
    }

    private void waitOnFlushFutures() throws InterruptedException, ExecutionException {
        CompletableFuture.allOf((CompletableFuture[]) this.commitWatcher.getFutureMap().values().toArray(new CompletableFuture[this.commitWatcher.getFutureMap().size()])).get();
    }

    private void validateResponse(ContainerProtos.ContainerCommandResponseProto containerCommandResponseProto) throws IOException {
        try {
            IOException ioException = getIoException();
            if (ioException != null) {
                throw ioException;
            }
            ContainerProtocolCalls.validateContainerResponse(containerCommandResponseProto);
        } catch (StorageContainerException e) {
            setIoException(e);
            throw e;
        }
    }

    private void setIoException(Exception exc) {
        IOException ioException = getIoException();
        if (ioException != null) {
            LOG.debug("Previous request had already failed with " + ioException.toString() + " so subsequent request also encounters Storage Container Exception ", exc);
        } else {
            this.ioException.compareAndSet(null, new IOException(EXCEPTION_MSG + exc.toString(), exc));
        }
    }

    public void cleanup(boolean z) {
        if (this.xceiverClientFactory != null) {
            this.xceiverClientFactory.releaseClient(this.xceiverClient, z);
        }
        this.xceiverClientFactory = null;
        this.xceiverClient = null;
        this.commitWatcher.cleanup();
        if (this.bufferList != null) {
            this.bufferList.clear();
        }
        this.bufferList = null;
        this.responseExecutor.shutdown();
    }

    private void checkOpen() throws IOException {
        if (isClosed()) {
            throw new IOException("BlockOutputStream has been closed.");
        }
        if (getIoException() != null) {
            adjustBuffersOnException();
            throw getIoException();
        }
    }

    public boolean isClosed() {
        return this.xceiverClient == null;
    }

    private void writeChunkToContainer(ChunkBuffer chunkBuffer) throws IOException {
        int remaining = chunkBuffer.remaining();
        long andAdd = this.chunkOffset.getAndAdd(remaining);
        ByteString byteString = chunkBuffer.toByteString(this.bufferPool.byteStringConversion());
        ChecksumData computeChecksum = this.checksum.computeChecksum(chunkBuffer);
        ContainerProtos.ChunkInfo.Builder newBuilder = ContainerProtos.ChunkInfo.newBuilder();
        StringBuilder append = new StringBuilder().append(this.blockID.get().getLocalID()).append("_chunk_");
        int i = this.chunkIndex + 1;
        this.chunkIndex = i;
        ContainerProtos.ChunkInfo build = newBuilder.setChunkName(append.append(i).toString()).setOffset(andAdd).setLen(remaining).setChecksumData(computeChecksum.getProtoBufMessage()).build();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Writing chunk {} length {} at offset {}", new Object[]{build.getChunkName(), Integer.valueOf(remaining), Long.valueOf(andAdd)});
        }
        try {
            CompletableFuture<ContainerProtos.ContainerCommandResponseProto> response = ContainerProtocolCalls.writeChunkAsync(this.xceiverClient, build, this.blockID.get(), byteString, this.token).getResponse();
            response.thenApplyAsync(containerCommandResponseProto -> {
                try {
                    validateResponse(containerCommandResponseProto);
                } catch (IOException e) {
                    response.completeExceptionally(e);
                }
                return containerCommandResponseProto;
            }, (Executor) this.responseExecutor).exceptionally((Function<Throwable, ? extends U>) th -> {
                LOG.error("writing chunk failed " + build.getChunkName() + " blockID " + this.blockID + " with exception " + th.getLocalizedMessage());
                CompletionException completionException = new CompletionException(th);
                setIoException(completionException);
                throw completionException;
            });
        } catch (IOException | ExecutionException e) {
            throw new IOException(EXCEPTION_MSG + e.toString(), e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            handleInterruptedException(e2, false);
        }
        this.containerBlockData.addChunks(build);
    }

    @VisibleForTesting
    public void setXceiverClient(XceiverClientSpi xceiverClientSpi) {
        this.xceiverClient = xceiverClientSpi;
    }

    private void handleInterruptedException(Exception exc, boolean z) throws IOException {
        LOG.error("Command execution was interrupted.");
        if (!z) {
            throw new IOException(EXCEPTION_MSG + exc.toString(), exc);
        }
        handleExecutionException(exc);
    }

    private void handleExecutionException(Exception exc) throws IOException {
        setIoException(exc);
        adjustBuffersOnException();
        throw getIoException();
    }
}
