/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Publication;
import io.aeron.archive.AbstractListRecordingsSession;
import io.aeron.archive.ArchiveConductor;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSessionDemuxer;
import io.aeron.archive.ListRecordingsForUriSession;
import io.aeron.archive.ListRecordingsSession;
import io.aeron.archive.Session;
import io.aeron.archive.codecs.ControlResponseCode;
import io.aeron.archive.codecs.SourceLocation;
import java.util.ArrayDeque;
import java.util.function.Supplier;
import org.agrona.CloseHelper;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue;
import org.agrona.concurrent.UnsafeBuffer;

class ControlSession
implements Session {
    static final long TIMEOUT_MS = 5000L;
    private static final int NO_ACTIVE_DEADLINE = -1;
    private final ArchiveConductor conductor;
    private final EpochClock epochClock;
    private final ArrayDeque<AbstractListRecordingsSession> listRecordingsSessions = new ArrayDeque();
    private final ManyToOneConcurrentLinkedQueue<Supplier<Boolean>> queuedResponses = new ManyToOneConcurrentLinkedQueue();
    private final ControlResponseProxy controlResponseProxy;
    private final long controlSessionId;
    private final long correlationId;
    private final ControlSessionDemuxer demuxer;
    private final Publication controlPublication;
    private State state = State.INIT;
    private long timeoutDeadlineMs = -1L;

    ControlSession(long controlSessionId, long correlationId, ControlSessionDemuxer demuxer, Publication controlPublication, ArchiveConductor conductor, EpochClock epochClock, ControlResponseProxy controlResponseProxy) {
        this.controlSessionId = controlSessionId;
        this.correlationId = correlationId;
        this.demuxer = demuxer;
        this.controlPublication = controlPublication;
        this.conductor = conductor;
        this.epochClock = epochClock;
        this.controlResponseProxy = controlResponseProxy;
    }

    @Override
    public long sessionId() {
        return this.controlSessionId;
    }

    @Override
    public void abort() {
        this.state = State.INACTIVE;
    }

    @Override
    public void close() {
        this.state = State.CLOSED;
        CloseHelper.quietClose(this.controlPublication);
        this.demuxer.notifyControlSessionClosed(this);
    }

    @Override
    public boolean isDone() {
        return this.state == State.INACTIVE;
    }

    @Override
    public int doWork() {
        int workCount = 0;
        if (this.state == State.INIT) {
            workCount += this.waitForConnection();
        }
        if (this.state == State.ACTIVE) {
            workCount = this.sendQueuedResponsesOrPollForRequests();
        }
        return workCount;
    }

    public void onStopRecording(long correlationId, int streamId, String channel) {
        this.conductor.stopRecording(correlationId, this, streamId, channel);
    }

    public void onStartRecording(long correlationId, String channel, int streamId, SourceLocation sourceLocation) {
        this.conductor.startRecordingSubscription(correlationId, this, streamId, channel, sourceLocation);
    }

    public void onListRecordingsForUri(long correlationId, long fromRecordingId, int recordCount, int streamId, String channel) {
        ListRecordingsForUriSession listRecordingsSession = this.conductor.newListRecordingsForUriSession(correlationId, fromRecordingId, recordCount, streamId, this.conductor.strippedChannelBuilder(channel).build(), this);
        this.listRecordingsSessions.add(listRecordingsSession);
        if (this.listRecordingsSessions.size() == 1) {
            this.conductor.addSession(listRecordingsSession);
        }
    }

    public void onListRecordings(long correlationId, long fromRecordingId, int recordCount) {
        ListRecordingsSession listRecordingsSession = this.conductor.newListRecordingsSession(correlationId, fromRecordingId, recordCount, this);
        this.listRecordingsSessions.add(listRecordingsSession);
        if (this.listRecordingsSessions.size() == 1) {
            this.conductor.addSession(listRecordingsSession);
        }
    }

    public void onStartReplay(long correlationId, long recordingId, long position, long length, int replayStreamId, String replayChannel) {
        this.conductor.startReplay(correlationId, this, recordingId, position, length, replayStreamId, replayChannel);
    }

    void onListRecordingSessionClosed(AbstractListRecordingsSession listRecordingsSession) {
        if (listRecordingsSession != this.listRecordingsSessions.poll()) {
            throw new IllegalStateException();
        }
        if (!this.isDone() && this.listRecordingsSessions.size() != 0) {
            this.conductor.addSession((Session)this.listRecordingsSessions.peek());
        }
    }

    void sendOkResponse(long correlationId, ControlResponseProxy proxy) {
        if (!proxy.sendResponse(this.controlSessionId, correlationId, 0L, ControlResponseCode.OK, null, this.controlPublication)) {
            this.queueResponse(correlationId, 0L, ControlResponseCode.OK, null);
        }
    }

    void sendRecordingUnknown(long correlationId, long recordingId, ControlResponseProxy proxy) {
        if (!proxy.sendResponse(this.controlSessionId, correlationId, recordingId, ControlResponseCode.RECORDING_UNKNOWN, null, this.controlPublication)) {
            this.queueResponse(correlationId, recordingId, ControlResponseCode.RECORDING_UNKNOWN, null);
        }
    }

    void sendResponse(long correlationId, ControlResponseCode code, String errorMessage, ControlResponseProxy proxy) {
        if (!proxy.sendResponse(this.controlSessionId, correlationId, 0L, code, errorMessage, this.controlPublication)) {
            this.queueResponse(correlationId, 0L, code, errorMessage);
        }
    }

    private void sendConnectResponse() {
        if (!this.controlResponseProxy.sendResponse(this.controlSessionId, this.correlationId, this.controlSessionId, ControlResponseCode.CONNECTED, null, this.controlPublication)) {
            this.queueResponse(this.correlationId, this.controlSessionId, ControlResponseCode.CONNECTED, null);
        }
    }

    int sendDescriptor(long correlationId, UnsafeBuffer descriptorBuffer, ControlResponseProxy proxy) {
        return proxy.sendDescriptor(this.controlSessionId, correlationId, descriptorBuffer, this.controlPublication);
    }

    int maxPayloadLength() {
        return this.controlPublication.maxPayloadLength();
    }

    private int sendQueuedResponsesOrPollForRequests() {
        int workCount = 0;
        if (!this.controlPublication.isConnected()) {
            this.state = State.INACTIVE;
        } else if (!this.queuedResponses.isEmpty()) {
            if (ControlSession.sendFirst(this.queuedResponses)) {
                this.queuedResponses.poll();
                this.timeoutDeadlineMs = -1L;
                ++workCount;
            } else if (this.timeoutDeadlineMs == -1L) {
                this.timeoutDeadlineMs = this.epochClock.time() + 5000L;
            } else if (this.hasGoneInactive()) {
                this.state = State.INACTIVE;
            }
        }
        return workCount;
    }

    private static boolean sendFirst(ManyToOneConcurrentLinkedQueue<Supplier<Boolean>> responseQueue) {
        return responseQueue.peek().get();
    }

    private int waitForConnection() {
        int workCount = 0;
        if (this.timeoutDeadlineMs == -1L) {
            this.timeoutDeadlineMs = this.epochClock.time() + 5000L;
        } else if (this.controlPublication.isConnected()) {
            this.timeoutDeadlineMs = -1L;
            this.state = State.ACTIVE;
            this.sendConnectResponse();
            ++workCount;
        } else if (this.hasGoneInactive()) {
            this.state = State.INACTIVE;
        }
        return workCount;
    }

    private boolean hasGoneInactive() {
        return this.timeoutDeadlineMs != -1L && this.epochClock.time() > this.timeoutDeadlineMs;
    }

    private void queueResponse(long correlationId, long relevantId, ControlResponseCode code, String message) {
        this.queuedResponses.offer(() -> this.controlResponseProxy.sendResponse(this.controlSessionId, correlationId, relevantId, code, message, this.controlPublication));
    }

    static enum State {
        INIT,
        ACTIVE,
        INACTIVE,
        CLOSED;

    }
}

