/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.handler;

import com.netflix.turbine.data.TurbineData;
import com.netflix.turbine.discovery.Instance;
import com.netflix.turbine.handler.HandlerQueueTuple;
import com.netflix.turbine.handler.PerformanceCriteria;
import com.netflix.turbine.handler.TurbineDataHandler;
import com.netflix.turbine.monitor.TurbineDataMonitor;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TurbineDataDispatcher<K extends TurbineData> {
    private static final Logger logger = LoggerFactory.getLogger(TurbineDataDispatcher.class);
    private final ConcurrentHashMap<Instance, ConcurrentHashMap<String, HandlerQueueTuple<K>>> eventHandlersForHosts;
    private ConcurrentHashMap<Instance, AtomicInteger> iterationsWithoutHandlers;
    private ConcurrentHashMap<String, HandlerQueueTuple<K>> handlerTuples;
    private volatile boolean stopped = false;
    private final String name;

    public TurbineDataDispatcher(String name) {
        this.name = name;
        this.eventHandlersForHosts = new ConcurrentHashMap();
        this.iterationsWithoutHandlers = new ConcurrentHashMap();
        this.handlerTuples = new ConcurrentHashMap();
    }

    public String getName() {
        return this.name;
    }

    public void handleHostLost(Instance host) {
        Map eventHandlers = this.eventHandlersForHosts.remove(host);
        if (eventHandlers != null) {
            for (HandlerQueueTuple tuple : eventHandlers.values()) {
                tuple.getHandler().handleHostLost(host);
            }
            eventHandlers.clear();
        }
        this.iterationsWithoutHandlers.remove(host);
    }

    public void registerEventHandler(Instance host, TurbineDataHandler<K> handler) {
        HandlerQueueTuple<K> previousTuple;
        ConcurrentHashMap<String, HandlerQueueTuple<String>> prevHandlers;
        ConcurrentHashMap<String, HandlerQueueTuple<Object>> eventHandlers = this.eventHandlersForHosts.get(host);
        if (eventHandlers == null && (prevHandlers = this.eventHandlersForHosts.putIfAbsent(host, eventHandlers = new ConcurrentHashMap())) != null) {
            eventHandlers = prevHandlers;
        }
        if ((previousTuple = eventHandlers.get(handler.getName())) != null) {
            throw new RuntimeException("Handler has already been registered: " + handler.getName() + ", existing handlers: " + eventHandlers.keySet());
        }
        HandlerQueueTuple<K> tuple = this.getHandlerQueueTuple(handler);
        if (!tuple.previouslyStopped()) {
            previousTuple = eventHandlers.putIfAbsent(handler.getName(), tuple);
            if (previousTuple != null) {
                throw new RuntimeException("Handler has already been registered: " + handler.getName() + ", existing handlers: " + eventHandlers.keySet());
            }
        } else {
            logger.info("Found handler tuple to be stopped: " + handler.getName() + " will not associate with host : " + host.getHostname());
        }
    }

    private HandlerQueueTuple<K> getHandlerQueueTuple(TurbineDataHandler<K> handler) {
        HandlerQueueTuple<K> previousTuple = this.handlerTuples.get(handler.getName());
        if (previousTuple != null) {
            return previousTuple;
        }
        HandlerQueueTuple<K> tuple = new HandlerQueueTuple<K>(handler);
        previousTuple = this.handlerTuples.putIfAbsent(handler.getName(), tuple);
        if (previousTuple != null) {
            return previousTuple;
        }
        logger.info("\n\nJust added and starting handler tuple: " + handler.getName());
        try {
            tuple.start();
            return tuple;
        }
        catch (Exception e) {
            logger.error("Caught failure when registering handler");
            this.deregisterEventHandler(handler);
            throw new RuntimeException(e);
        }
    }

    public void deregisterEventHandler(String handlerName) {
        logger.info("Removing event handler: " + handlerName);
        HandlerQueueTuple<K> tuple = this.handlerTuples.get(handlerName);
        if (tuple != null) {
            tuple.stop();
            HandlerQueueTuple<K> key = this.handlerTuples.remove(handlerName);
            logger.info(" tuples : " + this.handlerTuples.keySet());
            logger.info("Removed handler queue tuple for handler: " + key);
        }
        for (Instance host : this.eventHandlersForHosts.keySet()) {
            ConcurrentHashMap<String, HandlerQueueTuple<K>> eventHandlers = this.eventHandlersForHosts.get(host);
            HandlerQueueTuple<K> handlerTupleForThisHost = eventHandlers.remove(handlerName);
            if (handlerTupleForThisHost == null) continue;
            handlerTupleForThisHost.getHandler().handleHostLost(host);
        }
    }

    public void deregisterEventHandler(TurbineDataHandler<K> handler) {
        this.deregisterEventHandler(handler.getName());
    }

    public TurbineDataHandler<K> findHandlerForHost(Instance host, String handlerName) {
        HandlerQueueTuple tuple;
        Map eventHandlers = this.eventHandlersForHosts.get(host);
        if (eventHandlers != null && (tuple = (HandlerQueueTuple)eventHandlers.get(handlerName)) != null) {
            return tuple.getHandler();
        }
        return null;
    }

    public boolean pushData(Instance host, Collection<K> statsData) {
        if (this.stopped) {
            return false;
        }
        Map eventHandlers = this.eventHandlersForHosts.get(host);
        if (eventHandlers == null) {
            return false;
        }
        for (HandlerQueueTuple tuple : eventHandlers.values()) {
            tuple.pushData(statsData);
        }
        AtomicInteger count = this.getIterationWithoutHandlerCount(host);
        if (eventHandlers.size() == 0) {
            count.incrementAndGet();
            if (count.get() > 5) {
                logger.info("We no longer have handlers to dispatch to");
                return false;
            }
        } else {
            count.set(0);
        }
        return true;
    }

    public boolean pushData(Instance host, K statsData) {
        if (this.stopped) {
            return false;
        }
        Map eventHandlers = this.eventHandlersForHosts.get(host);
        if (eventHandlers == null) {
            return false;
        }
        for (HandlerQueueTuple tuple : eventHandlers.values()) {
            tuple.pushData(statsData);
        }
        AtomicInteger count = this.getIterationWithoutHandlerCount(host);
        if (eventHandlers.size() == 0) {
            count.incrementAndGet();
            if (count.get() > 5) {
                logger.info("We no longer have handlers to dispatch to");
                return false;
            }
        } else {
            count.set(0);
        }
        return true;
    }

    public void pushData(Collection<String> handlers, Collection<K> statsData) {
        if (this.stopped) {
            return;
        }
        for (String handlerName : handlers) {
            HandlerQueueTuple<Collection<K>> tuple = this.handlerTuples.get(handlerName);
            if (tuple == null) continue;
            tuple.pushData(statsData);
        }
    }

    public void pushData(Collection<String> handlers, K statsData) {
        if (this.stopped) {
            logger.info("Dispatcher has been stopped, will not deliver data");
            return;
        }
        for (String handlerName : handlers) {
            HandlerQueueTuple<K> tuple = this.handlerTuples.get(handlerName);
            if (tuple == null) continue;
            tuple.pushData(statsData);
        }
    }

    public void stopDispatcher() {
        this.stopped = true;
        for (HandlerQueueTuple<K> tuple : this.handlerTuples.values()) {
            tuple.stop();
            HandlerQueueTuple<K> key = this.handlerTuples.remove(tuple.getHandler().getName());
            logger.info(" tuples : " + this.handlerTuples.keySet());
            logger.info("Key: " + key);
        }
        this.handlerTuples.clear();
        for (Instance host : this.eventHandlersForHosts.keySet()) {
            this.handleHostLost(host);
            Map eventHandlers = this.eventHandlersForHosts.get(host);
            if (eventHandlers == null) continue;
            eventHandlers.clear();
        }
        this.eventHandlersForHosts.clear();
    }

    private AtomicInteger getIterationWithoutHandlerCount(Instance host) {
        AtomicInteger prevCount;
        AtomicInteger count = this.iterationsWithoutHandlers.get(host);
        if (count == null && (prevCount = this.iterationsWithoutHandlers.putIfAbsent(host, count = new AtomicInteger(0))) != null) {
            count = prevCount;
        }
        return count;
    }

    public boolean running() {
        for (HandlerQueueTuple<K> tuple : this.handlerTuples.values()) {
            if (!tuple.running()) continue;
            return true;
        }
        return false;
    }

    public Set<String> getAllHandlerNames() {
        return this.handlerTuples.keySet();
    }

    public static class UnitTest {
        Instance instance = new Instance("test", "cluster", true);
        private static PerformanceCriteria perfCriteria = new PerformanceCriteria(){

            @Override
            public boolean isCritical() {
                return false;
            }

            @Override
            public int getMaxQueueSize() {
                return 1;
            }

            @Override
            public int numThreads() {
                return 1;
            }
        };

        @Test(expected=RuntimeException.class)
        public void testRegisterDuplicateHandler() throws Exception {
            TurbineDataDispatcher dispatcher = new TurbineDataDispatcher("TEST");
            TurbineDataHandler h1 = (TurbineDataHandler)Mockito.mock(TurbineDataHandler.class);
            Mockito.when((Object)h1.getName()).thenReturn((Object)"h1");
            Mockito.when((Object)h1.getCriteria()).thenReturn((Object)perfCriteria);
            dispatcher.registerEventHandler(this.instance, h1);
            dispatcher.registerEventHandler(this.instance, h1);
        }

        @Test
        public void testRegsiterAndDeregister() throws Exception {
            int i;
            List<TestData> dataSet = Collections.singletonList(new TestData(null, null));
            TurbineDataMonitor monitor = (TurbineDataMonitor)Mockito.mock(TurbineDataMonitor.class);
            Mockito.when((Object)monitor.getName()).thenReturn((Object)"publisher");
            TurbineDataDispatcher<List<TestData>> dispatcher = new TurbineDataDispatcher<List<TestData>>("TEST");
            TurbineDataHandler h1 = (TurbineDataHandler)Mockito.mock(TurbineDataHandler.class);
            TurbineDataHandler h2 = (TurbineDataHandler)Mockito.mock(TurbineDataHandler.class);
            Mockito.when((Object)h1.getName()).thenReturn((Object)"h1");
            Mockito.when((Object)h2.getName()).thenReturn((Object)"h2");
            Mockito.when((Object)h1.getCriteria()).thenReturn((Object)perfCriteria);
            Mockito.when((Object)h2.getCriteria()).thenReturn((Object)perfCriteria);
            dispatcher.registerEventHandler(this.instance, h1);
            dispatcher.registerEventHandler(this.instance, h2);
            dispatcher.pushData(this.instance, dataSet);
            Thread.sleep(1000L);
            ((TurbineDataHandler)Mockito.verify((Object)h1, (VerificationMode)Mockito.times((int)1))).handleData(Matchers.anyList());
            dispatcher.deregisterEventHandler(h1);
            dispatcher.pushData(this.instance, dataSet);
            ((TurbineDataHandler)Mockito.verify((Object)h1, (VerificationMode)Mockito.times((int)1))).handleData(Matchers.anyList());
            dispatcher.deregisterEventHandler(h1);
            dispatcher.deregisterEventHandler(h2);
            for (i = 0; i < 5; ++i) {
                Assert.assertTrue((boolean)dispatcher.pushData(this.instance, dataSet));
            }
            ((TurbineDataHandler)Mockito.verify((Object)h1, (VerificationMode)Mockito.times((int)1))).handleData(Matchers.anyList());
            for (i = 0; i < 4; ++i) {
                Assert.assertFalse((boolean)dispatcher.pushData(this.instance, dataSet));
            }
            dispatcher.stopDispatcher();
        }

        @Test
        public void testEventRejection() throws Exception {
            TestData data = new TestData(null, null);
            final List<TestData> dataSet = Collections.singletonList(data);
            TurbineDataMonitor monitor = (TurbineDataMonitor)Mockito.mock(TurbineDataMonitor.class);
            Mockito.when((Object)monitor.getName()).thenReturn((Object)"publisher");
            final TurbineDataDispatcher dispatcher = new TurbineDataDispatcher("TEST");
            TurbineDataHandler h1 = (TurbineDataHandler)Mockito.mock(TurbineDataHandler.class);
            TurbineDataHandler h2 = (TurbineDataHandler)Mockito.mock(TurbineDataHandler.class);
            Mockito.when((Object)h1.getName()).thenReturn((Object)"h1");
            Mockito.when((Object)h2.getName()).thenReturn((Object)"h2");
            Mockito.when((Object)h1.getCriteria()).thenReturn((Object)perfCriteria);
            Mockito.when((Object)h2.getCriteria()).thenReturn((Object)perfCriteria);
            dispatcher.registerEventHandler(this.instance, h1);
            dispatcher.registerEventHandler(this.instance, h2);
            Answer<Void> answer = new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    Thread.sleep(10L);
                    return null;
                }
            };
            ((TurbineDataHandler)Mockito.doAnswer((Answer)answer).when((Object)h1)).handleData(dataSet);
            final AtomicLong totalCounter = new AtomicLong(0L);
            ExecutorService executor = Executors.newFixedThreadPool(100);
            for (int i = 0; i < 100; ++i) {
                executor.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        while (true) {
                            dispatcher.pushData(UnitTest.this.instance, dataSet);
                            totalCounter.incrementAndGet();
                            Thread.sleep(10L);
                        }
                    }
                });
            }
            Thread.sleep(3000L);
            executor.shutdownNow();
            dispatcher.stopDispatcher();
        }

        public class TestData
        extends TurbineData {
            public TestData(String type, String name) {
                super(null, type, name);
            }

            @Override
            public HashMap<String, Long> getNumericAttributes() {
                return null;
            }

            @Override
            public HashMap<String, String> getStringAttributes() {
                return null;
            }

            @Override
            public HashMap<String, Map<String, ? extends Number>> getNestedMapAttributes() {
                return null;
            }
        }
    }
}

