/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.ChannelUri;
import io.aeron.ErrorCode;
import io.aeron.driver.AeronClient;
import io.aeron.driver.ClientProxy;
import io.aeron.driver.ClientRequestAdapter;
import io.aeron.driver.Configuration;
import io.aeron.driver.CongestionControl;
import io.aeron.driver.DataPacketDispatcher;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.IpcPublication;
import io.aeron.driver.IpcSubscriptionLink;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.NetworkPublication;
import io.aeron.driver.NetworkPublicationThreadLocals;
import io.aeron.driver.NetworkSubscriptionLink;
import io.aeron.driver.PublicationImage;
import io.aeron.driver.PublicationLink;
import io.aeron.driver.PublicationParams;
import io.aeron.driver.ReceiverProxy;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.SenderProxy;
import io.aeron.driver.SpySubscriptionLink;
import io.aeron.driver.SubscriberPosition;
import io.aeron.driver.SubscriptionLink;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.buffer.RawLogFactory;
import io.aeron.driver.cmd.DriverConductorCmd;
import io.aeron.driver.exceptions.ControlProtocolException;
import io.aeron.driver.media.ReceiveChannelEndpoint;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.media.UdpChannel;
import io.aeron.driver.status.PublisherLimit;
import io.aeron.driver.status.ReceiveChannelStatus;
import io.aeron.driver.status.ReceiverHwm;
import io.aeron.driver.status.ReceiverPos;
import io.aeron.driver.status.SendChannelStatus;
import io.aeron.driver.status.SenderLimit;
import io.aeron.driver.status.SenderPos;
import io.aeron.driver.status.SubscriberPos;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;
import org.agrona.BitUtil;
import org.agrona.collections.ArrayListUtil;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;
import org.agrona.concurrent.status.UnsafeBufferPosition;

public class DriverConductor
implements Agent {
    private final long imageLivenessTimeoutNs;
    private final long clientLivenessTimeoutNs;
    private final long publicationUnblockTimeoutNs;
    private final long statusMessageTimeoutNs;
    private long timeOfLastToDriverPositionChangeNs;
    private long timeOfLastTimeoutCheckNs;
    private long lastConsumerCommandPosition;
    private int nextSessionId = BitUtil.generateRandomisedId();
    private final NetworkPublicationThreadLocals networkPublicationThreadLocals = new NetworkPublicationThreadLocals();
    private final MediaDriver.Context context;
    private final RawLogFactory rawLogFactory;
    private final ReceiverProxy receiverProxy;
    private final SenderProxy senderProxy;
    private final ClientProxy clientProxy;
    private final RingBuffer toDriverCommands;
    private final ClientRequestAdapter clientRequestAdapter;
    private final ManyToOneConcurrentArrayQueue<DriverConductorCmd> driverCmdQueue;
    private final HashMap<String, SendChannelEndpoint> sendChannelEndpointByChannelMap = new HashMap();
    private final HashMap<String, ReceiveChannelEndpoint> receiveChannelEndpointByChannelMap = new HashMap();
    private final ArrayList<PublicationLink> publicationLinks = new ArrayList();
    private final ArrayList<NetworkPublication> networkPublications = new ArrayList();
    private final ArrayList<SubscriptionLink> subscriptionLinks = new ArrayList();
    private final ArrayList<PublicationImage> publicationImages = new ArrayList();
    private final ArrayList<AeronClient> clients = new ArrayList();
    private final ArrayList<IpcPublication> ipcPublications = new ArrayList();
    private final EpochClock epochClock;
    private final NanoClock nanoClock;
    private final Consumer<DriverConductorCmd> onDriverConductorCmdFunc = this::onDriverConductorCmd;
    private final CountersManager countersManager;
    private final AtomicCounter clientKeepAlives;

    public DriverConductor(MediaDriver.Context ctx) {
        long nowNs;
        this.context = ctx;
        this.imageLivenessTimeoutNs = ctx.imageLivenessTimeoutNs();
        this.clientLivenessTimeoutNs = ctx.clientLivenessTimeoutNs();
        this.publicationUnblockTimeoutNs = ctx.publicationUnblockTimeoutNs();
        this.statusMessageTimeoutNs = ctx.statusMessageTimeout();
        this.driverCmdQueue = ctx.driverCommandQueue();
        this.receiverProxy = ctx.receiverProxy();
        this.senderProxy = ctx.senderProxy();
        this.rawLogFactory = ctx.rawLogBuffersFactory();
        this.epochClock = ctx.epochClock();
        this.nanoClock = ctx.nanoClock();
        this.toDriverCommands = ctx.toDriverCommands();
        this.clientProxy = ctx.clientProxy();
        this.countersManager = this.context.countersManager();
        this.clientKeepAlives = this.context.systemCounters().get(SystemCounterDescriptor.CLIENT_KEEP_ALIVES);
        this.clientRequestAdapter = new ClientRequestAdapter(this.context.systemCounters().get(SystemCounterDescriptor.ERRORS), ctx.errorHandler(), this.toDriverCommands, this.clientProxy, this);
        this.timeOfLastTimeoutCheckNs = nowNs = this.nanoClock.nanoTime();
        this.timeOfLastToDriverPositionChangeNs = nowNs;
        this.lastConsumerCommandPosition = this.toDriverCommands.consumerPosition();
    }

    @Override
    public void onClose() {
        this.networkPublications.forEach(NetworkPublication::close);
        this.publicationImages.forEach(PublicationImage::close);
        this.ipcPublications.forEach(IpcPublication::close);
    }

    @Override
    public String roleName() {
        return "driver-conductor";
    }

    @Override
    public int doWork() throws Exception {
        int workCount = 0;
        workCount += this.clientRequestAdapter.receive();
        workCount += this.driverCmdQueue.drain(this.onDriverConductorCmdFunc, 10);
        long nowNs = this.nanoClock.nanoTime();
        workCount += this.processTimers(nowNs);
        ArrayList<PublicationImage> publicationImages = this.publicationImages;
        int size = publicationImages.size();
        for (int i = 0; i < size; ++i) {
            publicationImages.get(i).trackRebuild(nowNs, this.statusMessageTimeoutNs);
        }
        ArrayList<NetworkPublication> networkPublications = this.networkPublications;
        int size2 = networkPublications.size();
        for (int i = 0; i < size2; ++i) {
            workCount += networkPublications.get(i).updatePublisherLimit();
        }
        ArrayList<IpcPublication> ipcPublications = this.ipcPublications;
        int size3 = ipcPublications.size();
        for (int i = 0; i < size3; ++i) {
            workCount += ipcPublications.get(i).updatePublishersLimit();
        }
        return workCount;
    }

    public void onCreatePublicationImage(int sessionId, int streamId, int initialTermId, int activeTermId, int initialTermOffset, int termBufferLength, int senderMtuLength, InetSocketAddress controlAddress, InetSocketAddress sourceAddress, ReceiveChannelEndpoint channelEndpoint) {
        channelEndpoint.validateSenderMtuLength(senderMtuLength, this.context.initialWindowLength());
        UdpChannel udpChannel = channelEndpoint.udpChannel();
        String channel = udpChannel.originalUriString();
        long registrationId = this.toDriverCommands.nextCorrelationId();
        long joinPosition = LogBufferDescriptor.computePosition(activeTermId, initialTermOffset, Integer.numberOfTrailingZeros(termBufferLength), initialTermId);
        List<SubscriberPosition> subscriberPositions = this.createSubscriberPositions(sessionId, streamId, channelEndpoint, joinPosition);
        if (subscriberPositions.size() > 0) {
            RawLog rawLog = this.newPublicationImageLog(sessionId, streamId, initialTermId, termBufferLength, senderMtuLength, udpChannel, registrationId);
            CongestionControl congestionControl = this.context.congestionControlSupplier().newInstance(registrationId, udpChannel, streamId, sessionId, termBufferLength, senderMtuLength, this.nanoClock, this.context, this.countersManager);
            PublicationImage image = new PublicationImage(registrationId, this.imageLivenessTimeoutNs, channelEndpoint, controlAddress, sessionId, streamId, initialTermId, activeTermId, initialTermOffset, rawLog, udpChannel.isMulticast() ? Configuration.NAK_MULTICAST_DELAY_GENERATOR : Configuration.NAK_UNICAST_DELAY_GENERATOR, DriverConductor.positionArray(subscriberPositions), ReceiverHwm.allocate(this.countersManager, registrationId, sessionId, streamId, channel), ReceiverPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel), this.nanoClock, this.context.epochClock(), this.context.systemCounters(), sourceAddress, congestionControl, this.context.lossReport(), subscriberPositions.get(0).subscription().isReliable());
            this.publicationImages.add(image);
            this.receiverProxy.newPublicationImage(channelEndpoint, image);
            int size = subscriberPositions.size();
            for (int i = 0; i < size; ++i) {
                SubscriberPosition position = subscriberPositions.get(i);
                position.addLink(image);
                this.clientProxy.onAvailableImage(registrationId, streamId, sessionId, position.subscription().registrationId(), position.positionCounterId(), rawLog.fileName(), DriverConductor.generateSourceIdentity(sourceAddress));
            }
        }
    }

    SendChannelEndpoint senderChannelEndpoint(UdpChannel channel) {
        return this.sendChannelEndpointByChannelMap.get(channel.canonicalForm());
    }

    ReceiveChannelEndpoint receiverChannelEndpoint(UdpChannel channel) {
        return this.receiveChannelEndpointByChannelMap.get(channel.canonicalForm());
    }

    IpcPublication getSharedIpcPublication(long streamId) {
        return DriverConductor.findSharedIpcPublication(this.ipcPublications, streamId);
    }

    IpcPublication getIpcPublication(long registrationId) {
        int size = this.ipcPublications.size();
        for (int i = 0; i < size; ++i) {
            IpcPublication publication = this.ipcPublications.get(i);
            if (publication.registrationId() != registrationId) continue;
            return publication;
        }
        return null;
    }

    void onAddNetworkPublication(String channel, int streamId, long correlationId, long clientId, boolean isExclusive) {
        UdpChannel udpChannel = UdpChannel.parse(channel);
        ChannelUri channelUri = udpChannel.channelUri();
        PublicationParams params = PublicationParams.getPublicationParams(this.context, channelUri, isExclusive, false);
        PublicationParams.validateMtuForMaxMessage(params, isExclusive);
        SendChannelEndpoint channelEndpoint = this.getOrCreateSendChannelEndpoint(udpChannel);
        NetworkPublication publication = null;
        if (!isExclusive) {
            publication = DriverConductor.findPublication(this.networkPublications, streamId, channelEndpoint);
        }
        if (null == publication) {
            publication = this.newNetworkPublication(correlationId, streamId, channel, udpChannel, channelEndpoint, params, isExclusive);
        } else {
            PublicationParams.confirmMatch(channelUri, params, publication.rawLog());
        }
        this.publicationLinks.add(new PublicationLink(correlationId, publication, this.getOrAddClient(clientId)));
        this.clientProxy.onPublicationReady(correlationId, publication.registrationId(), streamId, publication.sessionId(), publication.rawLog().fileName(), publication.publisherLimitId(), isExclusive);
    }

    void cleanupSpies(NetworkPublication publication) {
        this.clientProxy.onUnavailableImage(publication.registrationId(), publication.streamId(), publication.channelEndpoint().originalUriString());
        int size = this.subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            this.subscriptionLinks.get(i).unlink(publication);
        }
    }

    void cleanupPublication(NetworkPublication publication) {
        this.senderProxy.removeNetworkPublication(publication);
        SendChannelEndpoint channelEndpoint = publication.channelEndpoint();
        if (channelEndpoint.shouldBeClosed()) {
            channelEndpoint.closeStatusIndicator();
            this.sendChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
            this.senderProxy.closeSendChannelEndpoint(channelEndpoint);
        }
    }

    void cleanupSubscriptionLink(SubscriptionLink subscription) {
        ReceiveChannelEndpoint channelEndpoint = subscription.channelEndpoint();
        if (null != channelEndpoint) {
            int streamId = subscription.streamId();
            if (0 == channelEndpoint.decRefToStream(subscription.streamId())) {
                this.receiverProxy.removeSubscription(channelEndpoint, streamId);
            }
            if (channelEndpoint.shouldBeClosed()) {
                channelEndpoint.closeStatusIndicator();
                this.receiveChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
                this.receiverProxy.closeReceiveChannelEndpoint(channelEndpoint);
            }
        }
    }

    void transitionToLinger(PublicationImage image) {
        this.clientProxy.onUnavailableImage(image.correlationId(), image.streamId(), image.channelUriString());
        this.receiverProxy.removeCoolDown(image.channelEndpoint(), image.sessionId(), image.streamId());
    }

    void transitionToLinger(IpcPublication publication) {
        this.clientProxy.onUnavailableImage(publication.registrationId(), publication.streamId(), "aeron:ipc");
    }

    void cleanupImage(PublicationImage image) {
        int size = this.subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            this.subscriptionLinks.get(i).unlink(image);
        }
    }

    void cleanupIpcPublication(IpcPublication publication) {
        int size = this.subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            this.subscriptionLinks.get(i).unlink(publication);
        }
    }

    void onAddIpcPublication(String channel, int streamId, long correlationId, long clientId, boolean isExclusive) {
        IpcPublication ipcPublication = this.getOrAddIpcPublication(correlationId, streamId, channel, isExclusive);
        this.publicationLinks.add(new PublicationLink(correlationId, ipcPublication, this.getOrAddClient(clientId)));
        this.clientProxy.onPublicationReady(correlationId, ipcPublication.registrationId(), streamId, ipcPublication.sessionId(), ipcPublication.rawLog().fileName(), ipcPublication.publisherLimitId(), isExclusive);
        this.linkIpcSubscriptions(ipcPublication);
    }

    void onRemovePublication(long registrationId, long correlationId) {
        PublicationLink publicationLink = null;
        ArrayList<PublicationLink> publicationLinks = this.publicationLinks;
        int size = publicationLinks.size();
        int lastIndex = size - 1;
        for (int i = 0; i < size; ++i) {
            PublicationLink publication = publicationLinks.get(i);
            if (registrationId != publication.registrationId()) continue;
            publicationLink = publication;
            ArrayListUtil.fastUnorderedRemove(publicationLinks, i, lastIndex);
            break;
        }
        if (null == publicationLink) {
            throw new ControlProtocolException(ErrorCode.UNKNOWN_PUBLICATION, "Unknown publication: " + registrationId);
        }
        publicationLink.close();
        this.clientProxy.operationSucceeded(correlationId);
    }

    void onAddDestination(long registrationId, String destinationChannel, long correlationId) {
        SendChannelEndpoint sendChannelEndpoint = null;
        int size = this.networkPublications.size();
        for (int i = 0; i < size; ++i) {
            NetworkPublication publication = this.networkPublications.get(i);
            if (registrationId != publication.registrationId()) continue;
            sendChannelEndpoint = publication.channelEndpoint();
            break;
        }
        if (null == sendChannelEndpoint) {
            throw new ControlProtocolException(ErrorCode.UNKNOWN_PUBLICATION, "Unknown publication: " + registrationId);
        }
        sendChannelEndpoint.validateAllowsManualControl();
        ChannelUri channelUri = ChannelUri.parse(destinationChannel);
        InetSocketAddress dstAddress = UdpChannel.destinationAddress(channelUri);
        this.senderProxy.addDestination(sendChannelEndpoint, dstAddress);
        this.clientProxy.operationSucceeded(correlationId);
    }

    void onRemoveDestination(long registrationId, String destinationChannel, long correlationId) {
        SendChannelEndpoint sendChannelEndpoint = null;
        int size = this.networkPublications.size();
        for (int i = 0; i < size; ++i) {
            NetworkPublication publication = this.networkPublications.get(i);
            if (registrationId != publication.registrationId()) continue;
            sendChannelEndpoint = publication.channelEndpoint();
            break;
        }
        if (null == sendChannelEndpoint) {
            throw new ControlProtocolException(ErrorCode.UNKNOWN_PUBLICATION, "Unknown publication: " + registrationId);
        }
        sendChannelEndpoint.validateAllowsManualControl();
        ChannelUri channelUri = ChannelUri.parse(destinationChannel);
        InetSocketAddress dstAddress = UdpChannel.destinationAddress(channelUri);
        this.senderProxy.removeDestination(sendChannelEndpoint, dstAddress);
        this.clientProxy.operationSucceeded(correlationId);
    }

    void onAddNetworkSubscription(String channel, int streamId, long registrationId, long clientId) {
        UdpChannel udpChannel = UdpChannel.parse(channel);
        String reliableParam = udpChannel.channelUri().get("reliable", "true");
        boolean isReliable = !"false".equals(reliableParam);
        this.checkForClashingSubscription(isReliable, udpChannel, streamId);
        ReceiveChannelEndpoint channelEndpoint = this.getOrCreateReceiveChannelEndpoint(udpChannel);
        int refCount = channelEndpoint.incRefToStream(streamId);
        if (1 == refCount) {
            this.receiverProxy.addSubscription(channelEndpoint, streamId);
        }
        AeronClient client = this.getOrAddClient(clientId);
        NetworkSubscriptionLink subscription = new NetworkSubscriptionLink(registrationId, channelEndpoint, streamId, channel, client, this.context.clientLivenessTimeoutNs(), isReliable);
        this.subscriptionLinks.add(subscription);
        this.clientProxy.operationSucceeded(registrationId);
        this.linkMatchingImages(channelEndpoint, subscription);
    }

    void onAddIpcSubscription(String channel, int streamId, long registrationId, long clientId) {
        IpcSubscriptionLink subscription = new IpcSubscriptionLink(registrationId, streamId, channel, this.getOrAddClient(clientId), this.context.clientLivenessTimeoutNs());
        this.subscriptionLinks.add(subscription);
        this.clientProxy.operationSucceeded(registrationId);
        int size = this.ipcPublications.size();
        for (int i = 0; i < size; ++i) {
            IpcPublication publication = this.ipcPublications.get(i);
            if (publication.streamId() != streamId || IpcPublication.Status.ACTIVE != publication.status()) continue;
            this.linkIpcSubscription(subscription, publication);
        }
    }

    void onAddSpySubscription(String channel, int streamId, long registrationId, long clientId) {
        UdpChannel udpChannel = UdpChannel.parse(channel);
        AeronClient client = this.getOrAddClient(clientId);
        SpySubscriptionLink subscriptionLink = new SpySubscriptionLink(registrationId, udpChannel, streamId, client, this.context.clientLivenessTimeoutNs());
        this.subscriptionLinks.add(subscriptionLink);
        this.clientProxy.operationSucceeded(registrationId);
        SendChannelEndpoint channelEndpoint = this.sendChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        int size = this.networkPublications.size();
        for (int i = 0; i < size; ++i) {
            NetworkPublication publication = this.networkPublications.get(i);
            if (streamId != publication.streamId() || channelEndpoint != publication.channelEndpoint() || NetworkPublication.Status.ACTIVE != publication.status()) continue;
            this.linkSpy(publication, subscriptionLink);
        }
    }

    void onRemoveSubscription(long registrationId, long correlationId) {
        SubscriptionLink subscription = DriverConductor.removeSubscriptionLink(this.subscriptionLinks, registrationId);
        if (null == subscription) {
            throw new ControlProtocolException(ErrorCode.UNKNOWN_SUBSCRIPTION, "Unknown Subscription: " + registrationId);
        }
        subscription.close();
        ReceiveChannelEndpoint channelEndpoint = subscription.channelEndpoint();
        if (null != channelEndpoint) {
            int refCount = channelEndpoint.decRefToStream(subscription.streamId());
            if (0 == refCount) {
                this.receiverProxy.removeSubscription(channelEndpoint, subscription.streamId());
            }
            if (channelEndpoint.shouldBeClosed()) {
                channelEndpoint.closeStatusIndicator();
                this.receiveChannelEndpointByChannelMap.remove(channelEndpoint.udpChannel().canonicalForm());
                this.receiverProxy.closeReceiveChannelEndpoint(channelEndpoint);
            }
        }
        this.clientProxy.operationSucceeded(correlationId);
    }

    void onClientKeepalive(long clientId) {
        this.clientKeepAlives.addOrdered(1L);
        AeronClient client = DriverConductor.findClient(this.clients, clientId);
        if (null != client) {
            client.timeOfLastKeepalive(this.nanoClock.nanoTime());
        }
    }

    private void onHeartbeatCheckTimeouts(long nowNs) {
        long nowMs = this.epochClock.time();
        this.toDriverCommands.consumerHeartbeatTime(nowMs);
        this.onCheckManagedResources(this.clients, nowNs, nowMs);
        this.onCheckManagedResources(this.publicationLinks, nowNs, nowMs);
        this.onCheckManagedResources(this.networkPublications, nowNs, nowMs);
        this.onCheckManagedResources(this.subscriptionLinks, nowNs, nowMs);
        this.onCheckManagedResources(this.publicationImages, nowNs, nowMs);
        this.onCheckManagedResources(this.ipcPublications, nowNs, nowMs);
    }

    private void onCheckForBlockedToDriverCommands(long nowNs) {
        long consumerPosition = this.toDriverCommands.consumerPosition();
        if (consumerPosition == this.lastConsumerCommandPosition) {
            if (this.toDriverCommands.producerPosition() > consumerPosition && nowNs > this.timeOfLastToDriverPositionChangeNs + this.clientLivenessTimeoutNs && this.toDriverCommands.unblock()) {
                this.context.systemCounters().get(SystemCounterDescriptor.UNBLOCKED_COMMANDS).orderedIncrement();
            }
        } else {
            this.timeOfLastToDriverPositionChangeNs = nowNs;
            this.lastConsumerCommandPosition = consumerPosition;
        }
    }

    private List<SubscriberPosition> createSubscriberPositions(int sessionId, int streamId, ReceiveChannelEndpoint channelEndpoint, long joinPosition) {
        ArrayList<SubscriberPosition> subscriberPositions = new ArrayList<SubscriberPosition>();
        int size = this.subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink subscription = this.subscriptionLinks.get(i);
            if (!subscription.matches(channelEndpoint, streamId)) continue;
            UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, subscription.registrationId(), sessionId, streamId, subscription.uri(), joinPosition);
            ((Position)position).setOrdered(joinPosition);
            subscriberPositions.add(new SubscriberPosition(subscription, position));
        }
        return subscriberPositions;
    }

    private static NetworkPublication findPublication(ArrayList<NetworkPublication> publications, int streamId, SendChannelEndpoint channelEndpoint) {
        int size = publications.size();
        for (int i = 0; i < size; ++i) {
            NetworkPublication publication = publications.get(i);
            if (streamId != publication.streamId() || channelEndpoint != publication.channelEndpoint() || NetworkPublication.Status.ACTIVE != publication.status() || publication.isExclusive()) continue;
            return publication;
        }
        return null;
    }

    private NetworkPublication newNetworkPublication(long registrationId, int streamId, String channel, UdpChannel udpChannel, SendChannelEndpoint channelEndpoint, PublicationParams params, boolean isExclusive) {
        int initialTermId;
        int sessionId = this.nextSessionId++;
        UnsafeBufferPosition senderPosition = SenderPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel);
        UnsafeBufferPosition senderLimit = SenderLimit.allocate(this.countersManager, registrationId, sessionId, streamId, channel);
        int n = initialTermId = params.isReplay ? params.initialTermId : BitUtil.generateRandomisedId();
        if (params.isReplay) {
            int bits = Integer.numberOfTrailingZeros(params.termLength);
            long position = LogBufferDescriptor.computePosition(params.termId, params.termOffset, bits, initialTermId);
            senderLimit.setOrdered(position);
            senderPosition.setOrdered(position);
        }
        RetransmitHandler retransmitHandler = new RetransmitHandler(this.nanoClock, this.context.systemCounters(), Configuration.RETRANSMIT_UNICAST_DELAY_GENERATOR, Configuration.RETRANSMIT_UNICAST_LINGER_GENERATOR);
        FlowControl flowControl = udpChannel.isMulticast() || udpChannel.hasExplicitControl() ? this.context.multicastFlowControlSupplier().newInstance(udpChannel, streamId, registrationId) : this.context.unicastFlowControlSupplier().newInstance(udpChannel, streamId, registrationId);
        NetworkPublication publication = new NetworkPublication(registrationId, channelEndpoint, this.nanoClock, this.epochClock, this.newNetworkPublicationLog(sessionId, streamId, initialTermId, udpChannel, registrationId, params), PublisherLimit.allocate(this.countersManager, registrationId, sessionId, streamId, channel), senderPosition, senderLimit, sessionId, streamId, initialTermId, params.mtuLength, this.context.systemCounters(), flowControl, retransmitHandler, this.networkPublicationThreadLocals, this.publicationUnblockTimeoutNs, isExclusive);
        channelEndpoint.incRef();
        this.networkPublications.add(publication);
        this.senderProxy.newNetworkPublication(publication);
        this.linkSpies(this.subscriptionLinks, publication);
        return publication;
    }

    private RawLog newNetworkPublicationLog(int sessionId, int streamId, int initialTermId, UdpChannel udpChannel, long registrationId, PublicationParams params) {
        RawLog rawLog = this.rawLogFactory.newNetworkPublication(udpChannel.canonicalForm(), sessionId, streamId, registrationId, params.termLength);
        UnsafeBuffer logMetaData = rawLog.metaData();
        LogBufferDescriptor.storeDefaultFrameHeader(logMetaData, DataHeaderFlyweight.createDefaultHeader(sessionId, streamId, initialTermId));
        LogBufferDescriptor.initialTermId(logMetaData, initialTermId);
        LogBufferDescriptor.mtuLength(logMetaData, params.mtuLength);
        LogBufferDescriptor.correlationId(logMetaData, registrationId);
        if (params.isReplay) {
            int activeIndex = LogBufferDescriptor.indexByTerm(initialTermId, params.termId);
            LogBufferDescriptor.rawTail(logMetaData, activeIndex, LogBufferDescriptor.packTail(params.termId, params.termOffset));
            LogBufferDescriptor.activePartitionIndex(logMetaData, activeIndex);
        } else {
            LogBufferDescriptor.initialiseTailWithTermId(logMetaData, 0, initialTermId);
        }
        return rawLog;
    }

    private RawLog newIpcPublicationLog(int sessionId, int streamId, int initialTermId, long registrationId, PublicationParams params) {
        RawLog rawLog = this.rawLogFactory.newIpcPublication(sessionId, streamId, registrationId, params.termLength);
        UnsafeBuffer logMetaData = rawLog.metaData();
        LogBufferDescriptor.storeDefaultFrameHeader(logMetaData, DataHeaderFlyweight.createDefaultHeader(sessionId, streamId, initialTermId));
        LogBufferDescriptor.initialTermId(logMetaData, initialTermId);
        LogBufferDescriptor.mtuLength(logMetaData, params.mtuLength);
        LogBufferDescriptor.correlationId(logMetaData, registrationId);
        LogBufferDescriptor.endOfStreamPosition(logMetaData, Long.MAX_VALUE);
        if (params.isReplay) {
            int activeIndex = LogBufferDescriptor.indexByTerm(initialTermId, params.termId);
            LogBufferDescriptor.rawTail(logMetaData, activeIndex, LogBufferDescriptor.packTail(params.termId, params.termOffset));
            LogBufferDescriptor.activePartitionIndex(logMetaData, activeIndex);
        } else {
            LogBufferDescriptor.initialiseTailWithTermId(logMetaData, 0, initialTermId);
        }
        return rawLog;
    }

    private RawLog newPublicationImageLog(int sessionId, int streamId, int initialTermId, int termBufferLength, int senderMtuLength, UdpChannel udpChannel, long correlationId) {
        RawLog rawLog = this.rawLogFactory.newNetworkedImage(udpChannel.canonicalForm(), sessionId, streamId, correlationId, termBufferLength);
        UnsafeBuffer logMetaData = rawLog.metaData();
        LogBufferDescriptor.storeDefaultFrameHeader(logMetaData, DataHeaderFlyweight.createDefaultHeader(sessionId, streamId, initialTermId));
        LogBufferDescriptor.initialTermId(logMetaData, initialTermId);
        LogBufferDescriptor.mtuLength(logMetaData, senderMtuLength);
        LogBufferDescriptor.correlationId(logMetaData, correlationId);
        LogBufferDescriptor.endOfStreamPosition(logMetaData, Long.MAX_VALUE);
        return rawLog;
    }

    private SendChannelEndpoint getOrCreateSendChannelEndpoint(UdpChannel udpChannel) {
        SendChannelEndpoint channelEndpoint = this.sendChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        if (null == channelEndpoint) {
            channelEndpoint = this.context.sendChannelEndpointSupplier().newInstance(udpChannel, SendChannelStatus.allocate(this.countersManager, udpChannel.originalUriString()), this.context);
            this.sendChannelEndpointByChannelMap.put(udpChannel.canonicalForm(), channelEndpoint);
            this.senderProxy.registerSendChannelEndpoint(channelEndpoint);
        }
        return channelEndpoint;
    }

    private void checkForClashingSubscription(boolean isReliable, UdpChannel udpChannel, int streamId) {
        ReceiveChannelEndpoint channelEndpoint = this.receiveChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        if (null != channelEndpoint) {
            ArrayList<SubscriptionLink> existingLinks = this.subscriptionLinks;
            int size = existingLinks.size();
            for (int i = 0; i < size; ++i) {
                SubscriptionLink subscription = existingLinks.get(i);
                if (!subscription.matches(channelEndpoint, streamId) || isReliable == subscription.isReliable()) continue;
                throw new IllegalStateException("Option conflicts with existing subscriptions: reliable=" + isReliable);
            }
        }
    }

    private void linkMatchingImages(ReceiveChannelEndpoint channelEndpoint, SubscriptionLink subscription) {
        long registrationId = subscription.registrationId();
        int streamId = subscription.streamId();
        String channel = subscription.uri();
        int size = this.publicationImages.size();
        for (int i = 0; i < size; ++i) {
            PublicationImage image = this.publicationImages.get(i);
            if (!image.matches(channelEndpoint, streamId) || !image.isAcceptingSubscriptions()) continue;
            long rebuildPosition = image.rebuildPosition();
            int sessionId = image.sessionId();
            UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel, rebuildPosition);
            ((Position)position).setOrdered(rebuildPosition);
            image.addSubscriber(position);
            subscription.link(image, position);
            this.clientProxy.onAvailableImage(image.correlationId(), streamId, sessionId, registrationId, ((ReadablePosition)position).id(), image.rawLog().fileName(), DriverConductor.generateSourceIdentity(image.sourceAddress()));
        }
    }

    private void linkIpcSubscriptions(IpcPublication publication) {
        int streamId = publication.streamId();
        ArrayList<SubscriptionLink> subscriptionLinks = this.subscriptionLinks;
        int size = subscriptionLinks.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink subscription = subscriptionLinks.get(i);
            if (!subscription.matches(streamId) || subscription.isLinked(publication)) continue;
            this.linkIpcSubscription((IpcSubscriptionLink)subscription, publication);
        }
    }

    private static ReadablePosition[] positionArray(List<SubscriberPosition> subscriberPositions) {
        int size = subscriberPositions.size();
        ReadablePosition[] positions = new ReadablePosition[subscriberPositions.size()];
        for (int i = 0; i < size; ++i) {
            positions[i] = subscriberPositions.get(i).position();
        }
        return positions;
    }

    private void linkIpcSubscription(IpcSubscriptionLink subscription, IpcPublication publication) {
        long joinPosition = publication.joinPosition();
        long registrationId = subscription.registrationId();
        int sessionId = publication.sessionId();
        int streamId = subscription.streamId();
        String channel = subscription.uri();
        UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel, joinPosition);
        ((Position)position).setOrdered(joinPosition);
        publication.addSubscriber(position);
        subscription.link(publication, position);
        this.clientProxy.onAvailableImage(publication.registrationId(), streamId, sessionId, registrationId, ((ReadablePosition)position).id(), publication.rawLog().fileName(), channel);
    }

    private void linkSpy(NetworkPublication publication, SubscriptionLink subscription) {
        long joinPosition = publication.consumerPosition();
        long registrationId = subscription.registrationId();
        int streamId = publication.streamId();
        int sessionId = publication.sessionId();
        String channel = subscription.uri();
        UnsafeBufferPosition position = SubscriberPos.allocate(this.countersManager, registrationId, sessionId, streamId, channel, joinPosition);
        ((Position)position).setOrdered(joinPosition);
        publication.addSubscriber(position);
        subscription.link(publication, position);
        this.clientProxy.onAvailableImage(LogBufferDescriptor.correlationId(publication.rawLog().metaData()), streamId, sessionId, registrationId, ((ReadablePosition)position).id(), publication.rawLog().fileName(), channel);
    }

    private ReceiveChannelEndpoint getOrCreateReceiveChannelEndpoint(UdpChannel udpChannel) {
        ReceiveChannelEndpoint channelEndpoint = this.receiveChannelEndpointByChannelMap.get(udpChannel.canonicalForm());
        if (null == channelEndpoint) {
            channelEndpoint = this.context.receiveChannelEndpointSupplier().newInstance(udpChannel, new DataPacketDispatcher(this.context.driverConductorProxy(), this.receiverProxy.receiver()), ReceiveChannelStatus.allocate(this.countersManager, udpChannel.originalUriString()), this.context);
            this.receiveChannelEndpointByChannelMap.put(udpChannel.canonicalForm(), channelEndpoint);
            this.receiverProxy.registerReceiveChannelEndpoint(channelEndpoint);
        }
        return channelEndpoint;
    }

    private void onDriverConductorCmd(DriverConductorCmd cmd) {
        cmd.execute(this);
    }

    private AeronClient getOrAddClient(long clientId) {
        AeronClient client = DriverConductor.findClient(this.clients, clientId);
        if (null == client) {
            client = new AeronClient(clientId, this.clientLivenessTimeoutNs, this.nanoClock.nanoTime());
            this.clients.add(client);
        }
        return client;
    }

    private IpcPublication getOrAddIpcPublication(long correlationId, int streamId, String channel, boolean isExclusive) {
        IpcPublication publication = null;
        if (!isExclusive) {
            publication = DriverConductor.findSharedIpcPublication(this.ipcPublications, streamId);
        }
        ChannelUri channelUri = ChannelUri.parse(channel);
        PublicationParams params = PublicationParams.getPublicationParams(this.context, channelUri, isExclusive, true);
        if (null == publication) {
            PublicationParams.validateMtuForMaxMessage(params, isExclusive);
            publication = this.addIpcPublication(correlationId, streamId, channel, isExclusive, params);
        } else {
            PublicationParams.confirmMatch(channelUri, params, publication.rawLog());
        }
        return publication;
    }

    private IpcPublication addIpcPublication(long registrationId, int streamId, String channel, boolean isExclusive, PublicationParams params) {
        int sessionId = this.nextSessionId++;
        int initialTermId = params.isReplay ? params.initialTermId : BitUtil.generateRandomisedId();
        RawLog rawLog = this.newIpcPublicationLog(sessionId, streamId, initialTermId, registrationId, params);
        IpcPublication publication = new IpcPublication(registrationId, sessionId, streamId, PublisherLimit.allocate(this.countersManager, registrationId, sessionId, streamId, channel), rawLog, this.publicationUnblockTimeoutNs, this.nanoClock.nanoTime(), this.context.systemCounters(), isExclusive);
        this.ipcPublications.add(publication);
        return publication;
    }

    private static AeronClient findClient(ArrayList<AeronClient> clients, long clientId) {
        AeronClient aeronClient = null;
        int size = clients.size();
        for (int i = 0; i < size; ++i) {
            AeronClient client = clients.get(i);
            if (client.clientId() != clientId) continue;
            aeronClient = client;
            break;
        }
        return aeronClient;
    }

    private static SubscriptionLink removeSubscriptionLink(ArrayList<SubscriptionLink> subscriptionLinks, long registrationId) {
        SubscriptionLink subscriptionLink = null;
        int size = subscriptionLinks.size();
        int lastIndex = size - 1;
        for (int i = 0; i < size; ++i) {
            SubscriptionLink subscription = subscriptionLinks.get(i);
            if (subscription.registrationId() != registrationId) continue;
            subscriptionLink = subscription;
            ArrayListUtil.fastUnorderedRemove(subscriptionLinks, i, lastIndex);
            break;
        }
        return subscriptionLink;
    }

    private static IpcPublication findSharedIpcPublication(ArrayList<IpcPublication> ipcPublications, long streamId) {
        IpcPublication ipcPublication = null;
        int size = ipcPublications.size();
        for (int i = 0; i < size; ++i) {
            IpcPublication publication = ipcPublications.get(i);
            if ((long)publication.streamId() != streamId || publication.isExclusive() || IpcPublication.Status.ACTIVE != publication.status()) continue;
            ipcPublication = publication;
            break;
        }
        return ipcPublication;
    }

    private <T extends DriverManagedResource> void onCheckManagedResources(ArrayList<T> list, long nowNs, long nowMs) {
        int lastIndex;
        for (int i = lastIndex = list.size() - 1; i >= 0; --i) {
            DriverManagedResource resource = (DriverManagedResource)list.get(i);
            resource.onTimeEvent(nowNs, nowMs, this);
            if (!resource.hasReachedEndOfLife()) continue;
            ArrayListUtil.fastUnorderedRemove(list, i, lastIndex);
            --lastIndex;
            resource.delete();
        }
    }

    private void linkSpies(ArrayList<SubscriptionLink> links, NetworkPublication publication) {
        int size = links.size();
        for (int i = 0; i < size; ++i) {
            SubscriptionLink subscription = links.get(i);
            if (!subscription.matches(publication) || subscription.isLinked(publication)) continue;
            this.linkSpy(publication, subscription);
        }
    }

    private int processTimers(long nowNs) {
        int workCount = 0;
        if (nowNs > this.timeOfLastTimeoutCheckNs + Configuration.HEARTBEAT_TIMEOUT_NS) {
            this.onHeartbeatCheckTimeouts(nowNs);
            this.onCheckForBlockedToDriverCommands(nowNs);
            this.timeOfLastTimeoutCheckNs = nowNs;
            workCount = 1;
        }
        return workCount;
    }

    private static String generateSourceIdentity(InetSocketAddress address) {
        return address.getHostString() + ':' + address.getPort();
    }
}

