/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.ExclusivePublication;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveConductor;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.RecordingFragmentReader;
import io.aeron.archive.Session;
import io.aeron.archive.SimplifiedControlledFragmentHandler;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.logbuffer.ExclusiveBufferClaim;
import io.aeron.logbuffer.FrameDescriptor;
import java.io.File;
import java.nio.ByteOrder;
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;

class ReplaySession
implements Session,
SimplifiedControlledFragmentHandler {
    static final long CONNECT_TIMEOUT_MS = 5000L;
    private static final int REPLAY_FRAGMENT_LIMIT = Archive.Configuration.replayFragmentLimit();
    private long connectDeadlineMs;
    private final long replaySessionId;
    private final long correlationId;
    private final ExclusiveBufferClaim bufferClaim = new ExclusiveBufferClaim();
    private final ExclusivePublication replayPublication;
    private final RecordingFragmentReader cursor;
    private ControlResponseProxy threadLocalControlResponseProxy;
    private final ControlSession controlSession;
    private final EpochClock epochClock;
    private State state = State.INIT;

    ReplaySession(long replayPosition, long replayLength, ArchiveConductor.ReplayPublicationSupplier supplier, ControlSession controlSession, File archiveDir, ControlResponseProxy threadLocalControlResponseProxy, long replaySessionId, long correlationId, EpochClock epochClock, String replayChannel, int replayStreamId, UnsafeBuffer descriptorBuffer, AtomicCounter recordingPosition) {
        this.controlSession = controlSession;
        this.threadLocalControlResponseProxy = threadLocalControlResponseProxy;
        this.replaySessionId = replaySessionId;
        this.correlationId = correlationId;
        this.epochClock = epochClock;
        RecordingDescriptorDecoder descriptorDecoder = new RecordingDescriptorDecoder();
        Catalog.wrapDescriptorDecoder(descriptorDecoder, descriptorBuffer);
        long startPosition = descriptorDecoder.startPosition();
        int mtuLength = descriptorDecoder.mtuLength();
        int termBufferLength = descriptorDecoder.termBufferLength();
        int initialTermId = descriptorDecoder.initialTermId();
        if (replayPosition - startPosition < 0L) {
            String errorMessage = "requested replay start position(=" + replayPosition + ") is before recording start position(=" + startPosition + ")";
            this.closeOnError(new IllegalArgumentException(errorMessage), errorMessage);
            this.cursor = null;
            this.replayPublication = null;
            return;
        }
        long stopPosition = descriptorDecoder.stopPosition();
        if (stopPosition != -1L && replayPosition >= stopPosition) {
            String errorMessage = "requested replay start position(=" + replayPosition + ") must be before current highest recorded position(=" + stopPosition + ")";
            this.closeOnError(new IllegalArgumentException(errorMessage), errorMessage);
            this.cursor = null;
            this.replayPublication = null;
            return;
        }
        RecordingFragmentReader cursor = null;
        try {
            cursor = new RecordingFragmentReader(descriptorDecoder, archiveDir, replayPosition, replayLength, recordingPosition);
        }
        catch (Exception ex) {
            this.closeOnError(ex, "failed to open cursor on a recording because: " + ex.getMessage());
        }
        this.cursor = cursor;
        ExclusivePublication replayPublication = null;
        try {
            replayPublication = supplier.newReplayPublication(replayChannel, replayStreamId, cursor.fromPosition(), mtuLength, initialTermId, termBufferLength);
        }
        catch (Exception ex) {
            this.closeOnError(ex, "failed to create replay publication because: " + ex.getMessage());
        }
        this.replayPublication = replayPublication;
        controlSession.sendOkResponse(correlationId, threadLocalControlResponseProxy);
        this.connectDeadlineMs = epochClock.time() + 5000L;
    }

    @Override
    public void close() {
        this.state = State.CLOSED;
        CloseHelper.quietClose(this.cursor);
        CloseHelper.quietClose(this.replayPublication);
    }

    @Override
    public long sessionId() {
        return this.replaySessionId;
    }

    @Override
    public int doWork() {
        int workCount = 0;
        switch (this.state) {
            case INIT: {
                workCount += this.init();
                break;
            }
            case REPLAY: {
                workCount += this.replay();
            }
        }
        return workCount;
    }

    @Override
    public void abort() {
        this.state = State.INACTIVE;
    }

    @Override
    public boolean isDone() {
        return this.state == State.INACTIVE;
    }

    @Override
    public boolean onFragment(UnsafeBuffer termBuffer, int offset, int length) {
        long result;
        if (this.isDone()) {
            return false;
        }
        int frameOffset = offset - 32;
        int frameType = FrameDescriptor.frameType(termBuffer, frameOffset);
        long l = result = frameType == 0 ? this.replayPublication.appendPadding(length) : this.replayFrame(termBuffer, offset, length, frameOffset);
        if (result > 0L) {
            return true;
        }
        if (result == -4L || result == -1L) {
            this.closeOnError(null, "replay stream has been shutdown mid-replay");
        }
        return false;
    }

    State state() {
        return this.state;
    }

    void setThreadLocalControlResponseProxy(ControlResponseProxy proxy) {
        this.threadLocalControlResponseProxy = proxy;
    }

    private int replay() {
        try {
            int polled = this.cursor.controlledPoll(this, REPLAY_FRAGMENT_LIMIT);
            if (this.cursor.isDone()) {
                this.state = State.INACTIVE;
            }
            return polled;
        }
        catch (Exception ex) {
            return this.closeOnError(ex, "cursor read failed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long replayFrame(UnsafeBuffer termBuffer, int offset, int length, int frameOffset) {
        long result = this.replayPublication.tryClaim(length, this.bufferClaim);
        if (result > 0L) {
            try {
                this.bufferClaim.flags(FrameDescriptor.frameFlags(termBuffer, frameOffset)).reservedValue(termBuffer.getLong(frameOffset + 24, ByteOrder.LITTLE_ENDIAN)).buffer().putBytes(this.bufferClaim.offset(), termBuffer, offset, length);
            }
            finally {
                this.bufferClaim.commit();
            }
        }
        return result;
    }

    private int init() {
        if (!this.replayPublication.isConnected()) {
            if (this.epochClock.time() > this.connectDeadlineMs) {
                return this.closeOnError(null, "no connection established for replay");
            }
            return 0;
        }
        this.state = State.REPLAY;
        return 1;
    }

    private int closeOnError(Throwable ex, String errorMessage) {
        this.state = State.INACTIVE;
        CloseHelper.quietClose(this.cursor);
        if (!this.controlSession.isDone()) {
            this.controlSession.sendResponse(this.correlationId, ControlResponseCode.ERROR, errorMessage, this.threadLocalControlResponseProxy);
        }
        if (ex != null) {
            LangUtil.rethrowUnchecked(ex);
        }
        return 0;
    }

    static enum State {
        INIT,
        REPLAY,
        INACTIVE,
        CLOSED;

    }
}

