/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.internals.AutoOffsetResetInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorWrapper;
import org.apache.kafka.streams.processor.api.WrappedFixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.WrappedProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalResourcesNaming;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.NoOpProcessorWrapper;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.QuickUnion;
import org.apache.kafka.streams.processor.internals.RepartitionTopicConfig;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.UnwindowedUnversionedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.VersionedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.WindowedChangelogTopicConfig;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalTopologyBuilder {
    private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class);
    private static final String[] NO_PREDECESSORS = new String[0];
    private final Map<String, NodeFactory<?, ?, ?, ?>> nodeFactories = new LinkedHashMap();
    private final Map<String, StoreFactory> stateFactories = new HashMap<String, StoreFactory>();
    private final Map<String, StoreFactory> globalStateBuilders = new LinkedHashMap<String, StoreFactory>();
    private final Map<String, StateStore> globalStateStores = new LinkedHashMap<String, StateStore>();
    private final Set<String> rawSourceTopicNames = new HashSet<String>();
    private List<String> fullSourceTopicNames = null;
    private String sourceTopicPatternString = null;
    private final Map<String, InternalTopicProperties> internalTopicNamesWithProperties = new HashMap<String, InternalTopicProperties>();
    private final List<Set<String>> copartitionSourceGroups = new ArrayList<Set<String>>();
    private final Map<String, List<String>> nodeToSourceTopics = new HashMap<String, List<String>>();
    private final Map<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<String, Pattern>();
    private final Map<String, String> nodeToSinkTopic = new HashMap<String, String>();
    private final Map<String, Set<String>> stateStoreNameToRawSourceTopicNames = new HashMap<String, Set<String>>();
    private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<String, Set<Pattern>>();
    private final Map<String, String> storeToChangelogTopic = new HashMap<String, String>();
    private final Map<String, String> changelogTopicToStore = new HashMap<String, String>();
    private final Map<String, Optional<ReprocessFactory<?, ?, ?, ?>>> storeNameToReprocessOnRestore = new HashMap();
    private final Set<String> globalTopics = new HashSet<String>();
    private final Set<String> noneResetTopics = new HashSet<String>();
    private final Set<String> earliestResetTopics = new HashSet<String>();
    private final Set<String> latestResetTopics = new HashSet<String>();
    private final Map<String, Duration> durationResetTopics = new HashMap<String, Duration>();
    private final Set<Pattern> noneResetPatterns = new HashSet<Pattern>();
    private final Set<Pattern> earliestResetPatterns = new HashSet<Pattern>();
    private final Set<Pattern> latestResetPatterns = new HashSet<Pattern>();
    private final Map<Pattern, Duration> durationResetPatterns = new HashMap<Pattern, Duration>();
    private final QuickUnion<String> nodeGrouper = new QuickUnion();
    private final Set<String> subscriptionUpdates = new HashSet<String>();
    private final ProcessorWrapper processorWrapper;
    private String applicationId = null;
    private Map<Integer, Set<String>> nodeGroups = null;
    private Map<Integer, Set<String>> subtopologyIdToStateStoreNames = null;
    private final String topologyName;
    private NamedTopology namedTopology;
    private TopologyConfig topologyConfigs;
    private boolean hasPersistentStores = false;
    private final boolean ensureExplicitInternalResourceNaming;
    private final Set<InternalResourcesNaming> implicitInternalNames = new LinkedHashSet<InternalResourcesNaming>();
    private final Map<String, Pattern> topicToPatterns = new HashMap<String, Pattern>();
    private static final NodeComparator NODE_COMPARATOR = new NodeComparator();
    private static final GlobalStoreComparator GLOBALSTORE_COMPARATOR = new GlobalStoreComparator();
    private static final SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator();

    public InternalTopologyBuilder() {
        this.topologyName = null;
        this.ensureExplicitInternalResourceNaming = false;
        this.processorWrapper = new NoOpProcessorWrapper();
    }

    public InternalTopologyBuilder(TopologyConfig topologyConfigs) {
        Objects.requireNonNull(topologyConfigs, "topologyConfigs cannot be null");
        this.topologyConfigs = topologyConfigs;
        this.topologyName = topologyConfigs.topologyName;
        this.ensureExplicitInternalResourceNaming = topologyConfigs.ensureExplicitInternalResourceNaming;
        try {
            this.processorWrapper = (ProcessorWrapper)topologyConfigs.getConfiguredInstance("processor.wrapper.class", ProcessorWrapper.class, topologyConfigs.originals());
        }
        catch (Exception e) {
            String errorMessage = String.format("Unable to instantiate ProcessorWrapper from value of config %s. Please provide a valid class that implements the ProcessorWrapper interface.", "processor.wrapper.class");
            log.error(errorMessage, (Throwable)e);
            throw new ConfigException(errorMessage, (Object)e);
        }
    }

    public final InternalTopologyBuilder setApplicationId(String applicationId) {
        Objects.requireNonNull(applicationId, "applicationId can't be null");
        this.applicationId = applicationId;
        return this;
    }

    public final synchronized void setStreamsConfig(StreamsConfig applicationConfig) {
        Objects.requireNonNull(applicationConfig, "config can't be null");
        Properties topologyOverrides = this.topologyConfigs == null ? new Properties() : this.topologyConfigs.topologyOverrides;
        this.topologyConfigs = new TopologyConfig(this.topologyName, applicationConfig, topologyOverrides);
    }

    public final synchronized void setNamedTopology(NamedTopology namedTopology) {
        this.namedTopology = namedTopology;
    }

    public synchronized TopologyConfig topologyConfigs() {
        return this.topologyConfigs;
    }

    public String topologyName() {
        return this.topologyName;
    }

    public NamedTopology namedTopology() {
        return this.namedTopology;
    }

    public final synchronized InternalTopologyBuilder rewriteTopology(StreamsConfig config) {
        Objects.requireNonNull(config, "config can't be null");
        this.setApplicationId(config.getString("application.id"));
        this.setStreamsConfig(config);
        if (this.topologyConfigs.cacheSize == 0L) {
            for (StoreFactory storeFactory : this.stateFactories.values()) {
                storeFactory.withCachingDisabled();
            }
            for (StoreFactory storeFactory : this.globalStateBuilders.values()) {
                storeFactory.withCachingDisabled();
            }
        }
        for (StoreFactory storeFactory : this.globalStateBuilders.values()) {
            storeFactory.configure(config);
            this.globalStateStores.put(storeFactory.storeName(), (StateStore)storeFactory.builder().build());
        }
        return this;
    }

    private void verifyName(String name) {
        Objects.requireNonNull(name, "name cannot be null");
        if (this.nodeFactories.containsKey(name)) {
            throw new TopologyException("Processor " + name + " is already added.");
        }
    }

    public final void addSource(AutoOffsetResetInternal offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer, String ... topics) {
        this.verifyName(name);
        Objects.requireNonNull(topics, "topics cannot be a null array");
        if (topics.length == 0) {
            throw new TopologyException("topics cannot be empty");
        }
        for (String topic : topics) {
            Objects.requireNonNull(topic, "topic names cannot be null");
            this.validateTopicNotAlreadyRegistered(topic);
            this.maybeAddToResetList(this.noneResetTopics, this.earliestResetTopics, this.latestResetTopics, this.durationResetTopics, offsetReset, topic);
            this.rawSourceTopicNames.add(topic);
        }
        this.nodeFactories.put(name, new SourceNodeFactory(name, topics, null, timestampExtractor, keyDeserializer, valDeserializer));
        this.nodeToSourceTopics.put(name, Arrays.asList(topics));
        this.nodeGrouper.add(name);
        this.nodeGroups = null;
    }

    public final void addSource(AutoOffsetResetInternal offsetReset, String name, TimestampExtractor timestampExtractor, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer, Pattern topicPattern) {
        this.verifyName(name);
        Objects.requireNonNull(topicPattern, "topicPattern cannot be null");
        for (String sourceTopicName : this.rawSourceTopicNames) {
            if (!topicPattern.matcher(sourceTopicName).matches()) continue;
            throw new TopologyException("Pattern " + String.valueOf(topicPattern) + " will match a topic that has already been registered by another source.");
        }
        this.maybeAddToResetList(this.noneResetPatterns, this.earliestResetPatterns, this.latestResetPatterns, this.durationResetPatterns, offsetReset, topicPattern);
        this.nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, timestampExtractor, keyDeserializer, valDeserializer));
        this.nodeToSourcePatterns.put(name, topicPattern);
        this.nodeGrouper.add(name);
        this.nodeGroups = null;
    }

    public final <K, V> void addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String ... predecessorNames) {
        this.verifyName(name);
        Objects.requireNonNull(topic, "topic cannot be null");
        this.verifyParents(name, predecessorNames);
        this.addSink(name, new StaticTopicNameExtractor(topic), keySerializer, valSerializer, partitioner, predecessorNames);
        this.nodeToSinkTopic.put(name, topic);
    }

    public final <K, V> void addSink(String name, TopicNameExtractor<? super K, ? super V> topicExtractor, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String ... predecessorNames) {
        this.verifyName(name);
        Objects.requireNonNull(topicExtractor, "topicExtractor cannot be null");
        this.verifyParents(name, predecessorNames);
        this.nodeFactories.put(name, new SinkNodeFactory<K, V>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])predecessorNames);
        this.nodeGroups = null;
    }

    public final void addProcessor(String name, ProcessorSupplier<?, ?, ?, ?> processorSupplier, String ... predecessorNames) {
        this.verifyName(name);
        ApiUtils.checkSupplier(processorSupplier);
        this.verifyParents(name, predecessorNames);
        this.nodeFactories.put(name, new ProcessorNodeFactory(name, predecessorNames, processorSupplier));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])predecessorNames);
        this.nodeGroups = null;
    }

    public final <KIn, VIn, VOut> void addProcessor(String name, FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier, String ... predecessorNames) {
        this.verifyName(name);
        ApiUtils.checkSupplier(processorSupplier);
        this.verifyParents(name, predecessorNames);
        this.nodeFactories.put(name, new FixedKeyProcessorNodeFactory<KIn, VIn, VOut>(name, predecessorNames, processorSupplier));
        this.nodeGrouper.add(name);
        this.nodeGrouper.unite(name, (String[])predecessorNames);
        this.nodeGroups = null;
    }

    private void verifyParents(String processorName, String ... predecessorNames) {
        Objects.requireNonNull(predecessorNames, "predecessorNames cannot be a null array");
        if (predecessorNames.length == 0) {
            throw new TopologyException("predecessorNames cannot be empty");
        }
        for (String predecessor : predecessorNames) {
            Objects.requireNonNull(predecessor, "parent name cannot be null");
            if (!this.nodeFactories.containsKey(predecessor)) {
                if (predecessor.equals(processorName)) {
                    throw new TopologyException("Parent node " + predecessor + " is unknown (self-reference).");
                }
                throw new TopologyException("Parent node " + predecessor + " is unknown.");
            }
            if (!this.nodeToSinkTopic.containsKey(predecessor)) continue;
            throw new TopologyException("Sink " + predecessor + " cannot be used a parent.");
        }
    }

    public final void addStateStore(StoreBuilder<?> storeBuilder, String ... processorNames) {
        Objects.requireNonNull(storeBuilder, "storeBuilder cannot be null");
        this.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder), false, processorNames);
    }

    public final void addStateStore(StoreFactory storeFactory, String ... processorNames) {
        this.addStateStore(storeFactory, false, processorNames);
    }

    public final void addStateStore(StoreFactory storeFactory, boolean allowOverride, String ... processorNames) {
        Objects.requireNonNull(storeFactory, "stateStoreFactory cannot be null");
        String storeName = storeFactory.storeName();
        Objects.requireNonNull(storeName, "state store name cannot be null");
        StoreFactory stateFactory = this.stateFactories.get(storeName);
        if (!allowOverride && stateFactory != null && !stateFactory.isCompatibleWith(storeFactory)) {
            throw new TopologyException("A different StateStore has already been added with the name " + storeName);
        }
        if (this.globalStateBuilders.containsKey(storeName)) {
            throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
        }
        this.stateFactories.put(storeName, storeFactory);
        if (processorNames != null) {
            for (String processorName : processorNames) {
                Objects.requireNonNull(processorName, "processor cannot not be null");
                this.connectProcessorAndStateStore(processorName, storeName);
            }
        }
        this.nodeGroups = null;
    }

    public final <KIn, VIn> void addGlobalStore(String sourceName, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valueDeserializer, String topic, String processorName, ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier, boolean reprocessOnRestore) {
        this.verifyName(sourceName);
        Objects.requireNonNull(topic, "topic cannot be null");
        this.validateTopicNotAlreadyRegistered(topic);
        this.verifyName(processorName);
        if (sourceName.equals(processorName)) {
            throw new TopologyException("sourceName and processorName must be different.");
        }
        ApiUtils.checkSupplier(stateUpdateSupplier);
        Set<StoreBuilder<?>> stores = stateUpdateSupplier.stores();
        if (stores == null || stores.size() != 1) {
            throw new IllegalArgumentException("Global stores must pass in suppliers with exactly one store but got " + (stores != null ? stores.size() : 0));
        }
        StoreFactory storeFactory = StoreBuilderWrapper.wrapStoreBuilder(stores.iterator().next());
        String storeName = storeFactory.storeName();
        if (this.stateFactories.containsKey(storeName)) {
            throw new TopologyException("A different StateStore has already been added with the name " + storeName);
        }
        if (this.globalStateBuilders.containsKey(storeName)) {
            throw new TopologyException("A different GlobalStateStore has already been added with the name " + storeName);
        }
        String[] topics = new String[]{topic};
        String[] predecessors = new String[]{sourceName};
        ProcessorNodeFactory<KIn, VIn, Void, Void> nodeFactory = new ProcessorNodeFactory<KIn, VIn, Void, Void>(processorName, predecessors, stateUpdateSupplier);
        this.globalTopics.add(topic);
        this.nodeFactories.put(sourceName, new SourceNodeFactory<KIn, VIn>(sourceName, topics, null, timestampExtractor, keyDeserializer, valueDeserializer));
        this.storeNameToReprocessOnRestore.put(storeFactory.storeName(), reprocessOnRestore ? Optional.of(new ReprocessFactory<KIn, VIn, Void, Void>(stateUpdateSupplier, keyDeserializer, valueDeserializer)) : Optional.empty());
        this.nodeToSourceTopics.put(sourceName, Arrays.asList(topics));
        this.nodeGrouper.add(sourceName);
        nodeFactory.addStateStore(storeFactory.storeName());
        this.nodeFactories.put(processorName, nodeFactory);
        this.nodeGrouper.add(processorName);
        this.nodeGrouper.unite(processorName, (String[])predecessors);
        this.globalStateBuilders.put(storeFactory.storeName(), storeFactory);
        storeFactory.withLoggingDisabled();
        this.connectSourceStoreAndTopic(storeFactory.storeName(), topic);
        this.nodeGroups = null;
    }

    private void validateTopicNotAlreadyRegistered(String topic) {
        if (this.rawSourceTopicNames.contains(topic) || this.globalTopics.contains(topic)) {
            throw new TopologyException("Topic " + topic + " has already been registered by another source.");
        }
        for (Pattern pattern : this.nodeToSourcePatterns.values()) {
            if (!pattern.matcher(topic).matches()) continue;
            throw new TopologyException("Topic " + topic + " matches a Pattern already registered by another source.");
        }
    }

    public Long historyRetention(String storeName) {
        return this.stateFactories.get(storeName).historyRetention();
    }

    public boolean isStoreVersioned(String storeName) {
        return this.stateFactories.get(storeName).isVersionedStore();
    }

    public final void connectProcessorAndStateStores(String processorName, String ... stateStoreNames) {
        Objects.requireNonNull(processorName, "processorName cannot be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot be a null array");
        if (stateStoreNames.length == 0) {
            throw new TopologyException("stateStoreNames cannot be empty");
        }
        if (this.nodeToSourceTopics.containsKey(processorName) || this.nodeToSourcePatterns.containsKey(processorName) || this.nodeToSinkTopic.containsKey(processorName)) {
            throw new TopologyException("State stores cannot be connect to sources or sinks.");
        }
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "state store name cannot be null");
            this.connectProcessorAndStateStore(processorName, stateStoreName);
        }
        this.nodeGroups = null;
    }

    public String storeForChangelogTopic(String topicName) {
        return this.changelogTopicToStore.get(topicName);
    }

    public void connectSourceStoreAndTopic(String sourceStoreName, String topic) {
        if (this.storeToChangelogTopic.containsKey(sourceStoreName)) {
            throw new TopologyException("Source store " + sourceStoreName + " is already added.");
        }
        this.storeToChangelogTopic.put(sourceStoreName, topic);
        this.changelogTopicToStore.put(topic, sourceStoreName);
    }

    public final void addInternalTopic(String topicName, InternalTopicProperties internalTopicProperties) {
        Objects.requireNonNull(topicName, "topicName can't be null");
        Objects.requireNonNull(internalTopicProperties, "internalTopicProperties can't be null");
        this.internalTopicNamesWithProperties.put(topicName, internalTopicProperties);
    }

    public final void copartitionSources(Collection<String> sourceNodes) {
        this.copartitionSourceGroups.add(new HashSet<String>(sourceNodes));
    }

    public final void maybeUpdateCopartitionSourceGroups(String replacedNodeName, String optimizedNodeName) {
        for (Set<String> copartitionSourceGroup : this.copartitionSourceGroups) {
            if (!copartitionSourceGroup.contains(replacedNodeName)) continue;
            copartitionSourceGroup.remove(replacedNodeName);
            copartitionSourceGroup.add(optimizedNodeName);
        }
    }

    public void validateCopartition() {
        List allCopartitionedSourceTopics = this.copartitionSourceGroups.stream().map(sourceGroup -> sourceGroup.stream().flatMap(sourceNodeName -> this.nodeToSourceTopics.getOrDefault(sourceNodeName, Collections.emptyList()).stream()).collect(Collectors.toSet())).collect(Collectors.toList());
        for (Set copartition : allCopartitionedSourceTopics) {
            HashMap numberOfPartitionsPerTopic = new HashMap();
            copartition.forEach(topic -> {
                InternalTopicProperties prop = this.internalTopicNamesWithProperties.get(topic);
                if (prop != null && prop.numberOfPartitions().isPresent()) {
                    numberOfPartitionsPerTopic.put(topic, prop.numberOfPartitions().get());
                }
            });
            if (numberOfPartitionsPerTopic.isEmpty() || !copartition.equals(numberOfPartitionsPerTopic.keySet())) continue;
            Collection partitionNumbers = numberOfPartitionsPerTopic.values();
            Integer first = (Integer)partitionNumbers.iterator().next();
            for (Integer partitionNumber : partitionNumbers) {
                if (partitionNumber.equals(first)) continue;
                String msg = String.format("Following topics do not have the same number of partitions: [%s]", new TreeMap(numberOfPartitionsPerTopic));
                throw new TopologyException(msg);
            }
        }
    }

    private void connectProcessorAndStateStore(String processorName, String stateStoreName) {
        if (this.globalStateBuilders.containsKey(stateStoreName)) {
            throw new TopologyException("Global StateStore " + stateStoreName + " can be used by a Processor without being specified; it should not be explicitly passed.");
        }
        if (!this.stateFactories.containsKey(stateStoreName)) {
            throw new TopologyException("StateStore " + stateStoreName + " is not added yet.");
        }
        if (!this.nodeFactories.containsKey(processorName)) {
            throw new TopologyException("Processor " + processorName + " is not added yet.");
        }
        StoreFactory storeFactory = this.stateFactories.get(stateStoreName);
        Iterator<String> iter = storeFactory.connectedProcessorNames().iterator();
        if (iter.hasNext()) {
            String user = iter.next();
            this.nodeGrouper.unite(user, (String[])new String[]{processorName});
        }
        storeFactory.connectedProcessorNames().add(processorName);
        NodeFactory<?, ?, ?, ?> nodeFactory = this.nodeFactories.get(processorName);
        if (!(nodeFactory instanceof ProcessorNodeFactory)) {
            throw new TopologyException("cannot connect a state store " + stateStoreName + " to a source node or a sink node.");
        }
        ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory)nodeFactory;
        processorNodeFactory.addStateStore(stateStoreName);
        this.connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory);
    }

    private Set<SourceNodeFactory<?, ?>> findSourcesForProcessorPredecessors(String[] predecessors) {
        HashSet sourceNodes = new HashSet();
        for (String predecessor : predecessors) {
            NodeFactory<?, ?, ?, ?> nodeFactory = this.nodeFactories.get(predecessor);
            if (nodeFactory instanceof SourceNodeFactory) {
                sourceNodes.add((SourceNodeFactory)nodeFactory);
                continue;
            }
            if (!(nodeFactory instanceof ProcessorNodeFactory)) continue;
            sourceNodes.addAll(this.findSourcesForProcessorPredecessors(nodeFactory.predecessors));
        }
        return sourceNodes;
    }

    private <KIn, VIn, KOut, VOut> void connectStateStoreNameToSourceTopicsOrPattern(String stateStoreName, ProcessorNodeFactory<KIn, VIn, KOut, VOut> processorNodeFactory) {
        if (this.stateStoreNameToRawSourceTopicNames.containsKey(stateStoreName) || this.stateStoreNameToSourceRegex.containsKey(stateStoreName)) {
            return;
        }
        HashSet<String> sourceTopics = new HashSet<String>();
        HashSet<Pattern> sourcePatterns = new HashSet<Pattern>();
        Set<SourceNodeFactory<?, ?>> sourceNodesForPredecessor = this.findSourcesForProcessorPredecessors(processorNodeFactory.predecessors);
        for (SourceNodeFactory<?, ?> sourceNodeFactory : sourceNodesForPredecessor) {
            if (sourceNodeFactory.pattern != null) {
                sourcePatterns.add(sourceNodeFactory.pattern);
                continue;
            }
            sourceTopics.addAll(sourceNodeFactory.topics);
        }
        if (!sourceTopics.isEmpty()) {
            this.stateStoreNameToRawSourceTopicNames.put(stateStoreName, Collections.unmodifiableSet(sourceTopics));
        }
        if (!sourcePatterns.isEmpty()) {
            this.stateStoreNameToSourceRegex.put(stateStoreName, Collections.unmodifiableSet(sourcePatterns));
        }
    }

    private <T> void maybeAddToResetList(Collection<T> noneResets, Collection<T> earliestResets, Collection<T> latestResets, Map<T, Duration> durationReset, AutoOffsetResetInternal offsetReset, T item) {
        if (offsetReset != null) {
            switch (offsetReset.offsetResetStrategy()) {
                case NONE: {
                    noneResets.add(item);
                    break;
                }
                case EARLIEST: {
                    earliestResets.add(item);
                    break;
                }
                case LATEST: {
                    latestResets.add(item);
                    break;
                }
                case BY_DURATION: {
                    durationReset.put(item, offsetReset.duration());
                    break;
                }
                default: {
                    throw new TopologyException(String.format("Unrecognized reset format %s", offsetReset));
                }
            }
        }
    }

    public synchronized Map<Integer, Set<String>> nodeGroups() {
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        return this.nodeGroups;
    }

    private Map<Integer, Set<String>> makeNodeGroups() {
        LinkedHashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<Integer, Set<String>>();
        HashMap<String, Set<String>> rootToNodeGroup = new HashMap<String, Set<String>>();
        int nodeGroupId = 0;
        for (String nodeName : this.nodeFactories.keySet()) {
            nodeGroupId = this.putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
        }
        return nodeGroups;
    }

    private int putNodeGroupName(String nodeName, int nodeGroupId, Map<Integer, Set<String>> nodeGroups, Map<String, Set<String>> rootToNodeGroup) {
        int newNodeGroupId = nodeGroupId;
        String root = this.nodeGrouper.root(nodeName);
        Set<String> nodeGroup = rootToNodeGroup.get(root);
        if (nodeGroup == null) {
            nodeGroup = new HashSet<String>();
            rootToNodeGroup.put(root, nodeGroup);
            nodeGroups.put(newNodeGroupId++, nodeGroup);
        }
        nodeGroup.add(nodeName);
        return newNodeGroupId;
    }

    public synchronized ProcessorTopology buildTopology() {
        HashSet<String> allNodes = new HashSet<String>();
        for (Set<String> value : this.nodeGroups().values()) {
            allNodes.addAll(value);
        }
        allNodes.removeAll(this.globalNodeGroups());
        this.initializeSubscription();
        this.initializeSubtopologyIdToStateStoreNamesMap();
        return this.build(allNodes);
    }

    public synchronized ProcessorTopology buildSubtopology(int topicGroupId) {
        Set<String> nodeGroup = this.nodeGroups().get(topicGroupId);
        return this.build(nodeGroup);
    }

    public synchronized ProcessorTopology buildGlobalStateTopology() {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        Set<String> globalGroups = this.globalNodeGroups();
        if (globalGroups.isEmpty()) {
            return null;
        }
        return this.build(globalGroups);
    }

    private Set<String> globalNodeGroups() {
        HashSet<String> globalGroups = new HashSet<String>();
        for (Map.Entry<Integer, Set<String>> nodeGroup : this.nodeGroups().entrySet()) {
            Set<String> nodes = nodeGroup.getValue();
            for (String node : nodes) {
                if (!this.isGlobalSource(node)) continue;
                globalGroups.addAll(nodes);
            }
        }
        return globalGroups;
    }

    private ProcessorTopology build(Set<String> nodeGroup) {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        LinkedHashMap processorMap = new LinkedHashMap();
        HashMap topicSourceMap = new HashMap();
        HashMap topicSinkMap = new HashMap();
        LinkedHashMap<String, StateStore> stateStoreMap = new LinkedHashMap<String, StateStore>();
        HashSet<String> repartitionTopics = new HashSet<String>();
        for (NodeFactory<?, ?, ?, ?> factory : this.nodeFactories.values()) {
            if (nodeGroup != null && !nodeGroup.contains(factory.name)) continue;
            ProcessorNode<Object, Object, Object, Object> node = factory.build();
            processorMap.put(node.name(), node);
            if (factory instanceof ProcessorNodeFactory) {
                this.buildProcessorNode(processorMap, stateStoreMap, (ProcessorNodeFactory)factory, node);
                continue;
            }
            if (factory instanceof SourceNodeFactory) {
                this.buildSourceNode(topicSourceMap, repartitionTopics, (SourceNodeFactory)factory, (SourceNode)node);
                continue;
            }
            if (factory instanceof SinkNodeFactory) {
                this.buildSinkNode(processorMap, topicSinkMap, repartitionTopics, (SinkNodeFactory)factory, (SinkNode)node);
                continue;
            }
            throw new TopologyException("Unknown definition class: " + factory.getClass().getName());
        }
        return new ProcessorTopology(new ArrayList(processorMap.values()), topicSourceMap, topicSinkMap, new ArrayList<StateStore>(stateStoreMap.values()), new ArrayList<StateStore>(this.globalStateStores.values()), this.storeToChangelogTopic, repartitionTopics, this.storeNameToReprocessOnRestore);
    }

    private void buildSinkNode(Map<String, ProcessorNode<?, ?, ?, ?>> processorMap, Map<String, SinkNode<?, ?>> topicSinkMap, Set<String> repartitionTopics, SinkNodeFactory<?, ?> sinkNodeFactory, SinkNode<?, ?> node) {
        SinkNode<?, ?> sinkNode = node;
        for (String predecessorName : sinkNodeFactory.predecessors) {
            ProcessorNode processor = InternalTopologyBuilder.getProcessor(processorMap, predecessorName);
            processor.addChild(sinkNode);
            if (!(sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor)) continue;
            String topic = ((StaticTopicNameExtractor)sinkNodeFactory.topicExtractor).topicName;
            if (this.internalTopicNamesWithProperties.containsKey(topic)) {
                String decoratedTopic = this.decorateTopic(topic);
                topicSinkMap.put(decoratedTopic, node);
                repartitionTopics.add(decoratedTopic);
                continue;
            }
            topicSinkMap.put(topic, node);
        }
    }

    private static <KIn, VIn, KOut, VOut> ProcessorNode<KIn, VIn, KOut, VOut> getProcessor(Map<String, ProcessorNode<?, ?, ?, ?>> processorMap, String predecessor) {
        return processorMap.get(predecessor);
    }

    private void buildSourceNode(Map<String, SourceNode<?, ?>> topicSourceMap, Set<String> repartitionTopics, SourceNodeFactory<?, ?> sourceNodeFactory, SourceNode<?, ?> node) {
        List<String> topics = sourceNodeFactory.pattern != null ? sourceNodeFactory.topics(this.subscriptionUpdates()) : sourceNodeFactory.topics;
        for (String topic : topics) {
            if (this.internalTopicNamesWithProperties.containsKey(topic)) {
                String decoratedTopic = this.decorateTopic(topic);
                topicSourceMap.put(decoratedTopic, node);
                repartitionTopics.add(decoratedTopic);
                continue;
            }
            topicSourceMap.put(topic, node);
        }
    }

    private void buildProcessorNode(Map<String, ProcessorNode<?, ?, ?, ?>> processorMap, Map<String, StateStore> stateStoreMap, ProcessorNodeFactory<?, ?, ?, ?> factory, ProcessorNode<Object, Object, Object, Object> node) {
        for (String predecessor : factory.predecessors) {
            ProcessorNode predecessorNode = InternalTopologyBuilder.getProcessor(processorMap, predecessor);
            predecessorNode.addChild(node);
        }
        for (String stateStoreName : factory.stateStoreNames) {
            StateStore store;
            if (stateStoreMap.containsKey(stateStoreName)) continue;
            if (this.stateFactories.containsKey(stateStoreName)) {
                StoreFactory storeFactory = this.stateFactories.get(stateStoreName);
                if (storeFactory.loggingEnabled() && !this.storeToChangelogTopic.containsKey(stateStoreName)) {
                    String prefix = this.topologyConfigs == null ? this.applicationId : ProcessorContextUtils.topicNamePrefix(this.topologyConfigs.applicationConfigs.originals(), this.applicationId);
                    String changelogTopic = ProcessorStateManager.storeChangelogTopic(prefix, stateStoreName, this.topologyName);
                    this.storeToChangelogTopic.put(stateStoreName, changelogTopic);
                    this.changelogTopicToStore.put(changelogTopic, stateStoreName);
                }
                if (this.topologyConfigs != null) {
                    storeFactory.configure(this.topologyConfigs.applicationConfigs);
                }
                store = storeFactory.builder().build();
                stateStoreMap.put(stateStoreName, store);
            } else {
                store = this.globalStateStores.get(stateStoreName);
                stateStoreMap.put(stateStoreName, store);
            }
            if (!store.persistent()) continue;
            this.hasPersistentStores = true;
        }
    }

    public Map<String, StateStore> globalStateStores() {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        return Collections.unmodifiableMap(this.globalStateStores);
    }

    public Set<String> allStateStoreNames() {
        Objects.requireNonNull(this.applicationId, "topology has not completed optimization");
        HashSet<String> allNames = new HashSet<String>(this.stateFactories.keySet());
        allNames.addAll(this.globalStateStores.keySet());
        return Collections.unmodifiableSet(allNames);
    }

    public boolean hasStore(String name) {
        return this.stateFactories.containsKey(name) || this.globalStateStores.containsKey(name);
    }

    public boolean hasPersistentStores() {
        return this.hasPersistentStores;
    }

    public synchronized Map<TopologyMetadata.Subtopology, TopicsInfo> subtopologyToTopicsInfo() {
        LinkedHashMap<TopologyMetadata.Subtopology, TopicsInfo> topicGroups = new LinkedHashMap<TopologyMetadata.Subtopology, TopicsInfo>();
        if (this.nodeGroups == null) {
            this.nodeGroups = this.makeNodeGroups();
        }
        for (Map.Entry<Integer, Set<String>> entry : this.nodeGroups.entrySet()) {
            HashSet<String> sinkTopics = new HashSet<String>();
            HashSet<String> sourceTopics = new HashSet<String>();
            HashMap<String, RepartitionTopicConfig> repartitionTopics = new HashMap<String, RepartitionTopicConfig>();
            HashMap<String, InternalTopicConfig> stateChangelogTopics = new HashMap<String, InternalTopicConfig>();
            for (String node : entry.getValue()) {
                String topic;
                List<String> topics = this.nodeToSourceTopics.get(node);
                if (topics != null) {
                    for (String topic2 : topics) {
                        if (this.globalTopics.contains(topic2)) continue;
                        if (this.internalTopicNamesWithProperties.containsKey(topic2)) {
                            String internalTopic = this.decorateTopic(topic2);
                            RepartitionTopicConfig repartitionTopicConfig = this.buildRepartitionTopicConfig(internalTopic, this.internalTopicNamesWithProperties.get(topic2).numberOfPartitions());
                            repartitionTopics.put(repartitionTopicConfig.name(), repartitionTopicConfig);
                            sourceTopics.add(repartitionTopicConfig.name());
                            continue;
                        }
                        sourceTopics.add(topic2);
                    }
                }
                if ((topic = this.nodeToSinkTopic.get(node)) != null) {
                    if (this.internalTopicNamesWithProperties.containsKey(topic)) {
                        sinkTopics.add(this.decorateTopic(topic));
                    } else {
                        sinkTopics.add(topic);
                    }
                }
                for (StoreFactory stateFactory : this.stateFactories.values()) {
                    String topicName;
                    if (!stateFactory.connectedProcessorNames().contains(node) || !this.storeToChangelogTopic.containsKey(stateFactory.storeName()) || stateChangelogTopics.containsKey(topicName = this.storeToChangelogTopic.get(stateFactory.storeName()))) continue;
                    InternalTopicConfig internalTopicConfig = this.createChangelogTopicConfig(stateFactory, topicName);
                    stateChangelogTopics.put(topicName, internalTopicConfig);
                }
            }
            if (sourceTopics.isEmpty()) continue;
            topicGroups.put(new TopologyMetadata.Subtopology(entry.getKey(), this.topologyName), new TopicsInfo(Collections.unmodifiableSet(sinkTopics), Collections.unmodifiableSet(sourceTopics), Collections.unmodifiableMap(repartitionTopics), Collections.unmodifiableMap(stateChangelogTopics)));
        }
        return Collections.unmodifiableMap(topicGroups);
    }

    public Map<String, List<String>> nodeToSourceTopics() {
        return Collections.unmodifiableMap(this.nodeToSourceTopics);
    }

    private RepartitionTopicConfig buildRepartitionTopicConfig(String internalTopic, Optional<Integer> numberOfPartitions) {
        return numberOfPartitions.map(partitions -> new RepartitionTopicConfig(internalTopic, Collections.emptyMap(), (int)partitions, true)).orElse(new RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
    }

    private void setRegexMatchedTopicsToSourceNodes() {
        if (this.hasSubscriptionUpdates()) {
            for (String nodeName : this.nodeToSourcePatterns.keySet()) {
                SourceNodeFactory sourceNode = (SourceNodeFactory)this.nodeFactories.get(nodeName);
                List<String> sourceTopics = sourceNode.topics(this.subscriptionUpdates);
                this.nodeToSourceTopics.put(nodeName, sourceTopics);
                this.rawSourceTopicNames.addAll(sourceTopics);
            }
            log.debug("Updated nodeToSourceTopics: {}", this.nodeToSourceTopics);
        }
    }

    private void setRegexMatchedTopicToStateStore() {
        if (this.hasSubscriptionUpdates()) {
            for (Map.Entry<String, Set<Pattern>> storePattern : this.stateStoreNameToSourceRegex.entrySet()) {
                HashSet<String> updatedTopicsForStateStore = new HashSet<String>();
                for (String subscriptionUpdateTopic : this.subscriptionUpdates()) {
                    for (Pattern pattern : storePattern.getValue()) {
                        if (!pattern.matcher(subscriptionUpdateTopic).matches()) continue;
                        updatedTopicsForStateStore.add(subscriptionUpdateTopic);
                    }
                }
                if (updatedTopicsForStateStore.isEmpty()) continue;
                Collection storeTopics = this.stateStoreNameToRawSourceTopicNames.get(storePattern.getKey());
                if (storeTopics != null) {
                    updatedTopicsForStateStore.addAll(storeTopics);
                }
                this.stateStoreNameToRawSourceTopicNames.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore));
            }
        }
    }

    private InternalTopicConfig createChangelogTopicConfig(StoreFactory factory, String name) {
        if (factory.isVersionedStore()) {
            return new VersionedChangelogTopicConfig(name, factory.logConfig(), factory.historyRetention());
        }
        if (factory.isWindowStore()) {
            return new WindowedChangelogTopicConfig(name, factory.logConfig(), factory.retentionPeriod());
        }
        return new UnwindowedUnversionedChangelogTopicConfig(name, factory.logConfig());
    }

    public boolean hasOffsetResetOverrides() {
        return this.noneResetTopics.size() + this.noneResetPatterns.size() + this.earliestResetTopics.size() + this.earliestResetPatterns.size() + this.latestResetTopics.size() + this.latestResetPatterns.size() + this.durationResetTopics.size() + this.durationResetPatterns.size() > 0;
    }

    public AutoOffsetResetStrategy offsetResetStrategy(String topic) {
        if (this.maybeDecorateInternalSourceTopics(this.noneResetTopics).contains(topic) || this.noneResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) {
            return AutoOffsetResetStrategy.NONE;
        }
        if (this.maybeDecorateInternalSourceTopics(this.earliestResetTopics).contains(topic) || this.earliestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) {
            return AutoOffsetResetStrategy.EARLIEST;
        }
        if (this.maybeDecorateInternalSourceTopics(this.latestResetTopics).contains(topic) || this.latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) {
            return AutoOffsetResetStrategy.LATEST;
        }
        if (this.maybeDecorateInternalSourceTopics(this.durationResetTopics.keySet()).contains(topic)) {
            return AutoOffsetResetStrategy.fromString((String)("by_duration:" + this.durationResetTopics.get(topic).toString()));
        }
        Optional<Duration> resetDuration = this.findDuration(topic);
        if (resetDuration.isPresent()) {
            return AutoOffsetResetStrategy.fromString((String)("by_duration:" + String.valueOf(resetDuration.get())));
        }
        if (this.containsTopic(topic)) {
            return null;
        }
        throw new IllegalStateException(String.format("Unable to lookup offset reset strategy for the following topic as it does not exist in the topology%s: %s", this.hasNamedTopology() ? this.topologyName : "", topic));
    }

    private Optional<Duration> findDuration(String topic) {
        List resetDuration = this.durationResetPatterns.entrySet().stream().filter(e -> ((Pattern)e.getKey()).matcher(topic).matches()).map(Map.Entry::getValue).collect(Collectors.toList());
        if (resetDuration.size() > 1) {
            throw new IllegalStateException("Found more than one reset duration for topic: " + topic);
        }
        return resetDuration.isEmpty() ? Optional.empty() : resetDuration.stream().findAny();
    }

    public Map<String, List<String>> stateStoreNameToFullSourceTopicNames() {
        HashMap<String, List<String>> results = new HashMap<String, List<String>>();
        for (Map.Entry<String, Set<String>> entry : this.stateStoreNameToRawSourceTopicNames.entrySet()) {
            results.put(entry.getKey(), this.maybeDecorateInternalSourceTopics((Collection<String>)entry.getValue()));
        }
        return results;
    }

    public Collection<String> sourceTopicsForStore(String storeName) {
        return this.maybeDecorateInternalSourceTopics((Collection<String>)this.stateStoreNameToRawSourceTopicNames.get(storeName));
    }

    public synchronized Collection<Set<String>> copartitionGroups() {
        List copartitionSourceTopics = this.copartitionSourceGroups.stream().map(sourceGroup -> sourceGroup.stream().flatMap(node -> this.maybeDecorateInternalSourceTopics((Collection<String>)this.nodeToSourceTopics.get(node)).stream()).collect(Collectors.toSet())).collect(Collectors.toList());
        LinkedHashMap topicsToCopartitionGroup = new LinkedHashMap();
        for (Set topics : copartitionSourceTopics) {
            String topic2;
            if (topics == null) continue;
            Set<String> coparititonGroup = null;
            Iterator iterator = topics.iterator();
            while (iterator.hasNext() && (coparititonGroup = (Set)topicsToCopartitionGroup.get(topic2 = (String)iterator.next())) == null) {
            }
            if (coparititonGroup == null) {
                coparititonGroup = new HashSet<String>();
            }
            coparititonGroup.addAll(this.maybeDecorateInternalSourceTopics(topics));
            for (String topic2 : topics) {
                topicsToCopartitionGroup.put(topic2, coparititonGroup);
            }
        }
        HashSet uniqueCopartitionGroups = new HashSet(topicsToCopartitionGroup.values());
        return List.copyOf(uniqueCopartitionGroups);
    }

    private List<String> maybeDecorateInternalSourceTopics(Collection<String> sourceTopics) {
        if (sourceTopics == null) {
            return Collections.emptyList();
        }
        ArrayList<String> decoratedTopics = new ArrayList<String>();
        for (String topic : sourceTopics) {
            if (this.internalTopicNamesWithProperties.containsKey(topic)) {
                decoratedTopics.add(this.decorateTopic(topic));
                continue;
            }
            decoratedTopics.add(topic);
        }
        return decoratedTopics;
    }

    public String decoratePseudoTopic(String topic) {
        return this.decorateTopic(topic);
    }

    private String decorateTopic(String topic) {
        String prefix;
        if (this.applicationId == null) {
            throw new TopologyException("there are internal topics and applicationId hasn't been set. Call setApplicationId first");
        }
        String string = prefix = this.topologyConfigs == null ? this.applicationId : ProcessorContextUtils.topicNamePrefix(this.topologyConfigs.applicationConfigs.originals(), this.applicationId);
        if (this.hasNamedTopology()) {
            return prefix + "-" + this.topologyName + "-" + topic;
        }
        return prefix + "-" + topic;
    }

    void initializeSubscription() {
        if (this.usesPatternSubscription()) {
            log.debug("Found pattern subscribed source topics, initializing consumer's subscription pattern.");
            this.sourceTopicPatternString = this.buildSourceTopicsPatternString();
        } else {
            log.debug("No source topics using pattern subscription found, initializing consumer's subscription collection.");
            this.fullSourceTopicNames = this.maybeDecorateInternalSourceTopics(this.rawSourceTopicNames);
            Collections.sort(this.fullSourceTopicNames);
        }
    }

    private String buildSourceTopicsPatternString() {
        List<String> allSourceTopics = this.maybeDecorateInternalSourceTopics(this.rawSourceTopicNames);
        Collections.sort(allSourceTopics);
        StringBuilder builder = new StringBuilder();
        for (String topic : allSourceTopics) {
            builder.append(topic).append("|");
        }
        for (Pattern sourcePattern : this.nodeToSourcePatterns.values()) {
            builder.append(sourcePattern.pattern()).append("|");
        }
        if (builder.length() > 0) {
            builder.setLength(builder.length() - 1);
        }
        return builder.toString();
    }

    boolean usesPatternSubscription() {
        return !this.nodeToSourcePatterns.isEmpty();
    }

    public synchronized List<String> fullSourceTopicNames() {
        if (this.fullSourceTopicNames == null) {
            this.fullSourceTopicNames = this.maybeDecorateInternalSourceTopics(this.rawSourceTopicNames);
            Collections.sort(this.fullSourceTopicNames);
        }
        return this.fullSourceTopicNames;
    }

    synchronized String sourceTopicPatternString() {
        if (this.sourceTopicPatternString == null) {
            this.sourceTopicPatternString = this.buildSourceTopicsPatternString();
        }
        return this.sourceTopicPatternString;
    }

    public boolean containsTopic(String topic) {
        return this.fullSourceTopicNames().contains(topic) || this.usesPatternSubscription() && Pattern.compile(this.sourceTopicPatternString()).matcher(topic).matches() || this.changelogTopicToStore.containsKey(topic);
    }

    public boolean hasNoLocalTopology() {
        return this.nodeToSourcePatterns.isEmpty() && this.rawSourceTopicNames.isEmpty();
    }

    public boolean hasGlobalStores() {
        return !this.globalStateStores.isEmpty();
    }

    private boolean isGlobalSource(String nodeName) {
        NodeFactory<?, ?, ?, ?> nodeFactory = this.nodeFactories.get(nodeName);
        if (nodeFactory instanceof SourceNodeFactory) {
            List<String> topics = ((SourceNodeFactory)nodeFactory).topics;
            return topics != null && topics.size() == 1 && this.globalTopics.contains(topics.get(0));
        }
        return false;
    }

    public Set<String> stateStoreNamesForSubtopology(int subtopologyId) {
        return this.subtopologyIdToStateStoreNames.get(subtopologyId);
    }

    private void initializeSubtopologyIdToStateStoreNamesMap() {
        HashMap<Integer, Set<String>> storeNames = new HashMap<Integer, Set<String>>();
        for (Map.Entry<Integer, Set<String>> nodeGroup : this.makeNodeGroups().entrySet()) {
            Set<String> subtopologyNodes = nodeGroup.getValue();
            boolean isNodeGroupOfGlobalStores = this.nodeGroupContainsGlobalSourceNode(subtopologyNodes);
            if (isNodeGroupOfGlobalStores) continue;
            int subtopologyId = nodeGroup.getKey();
            HashSet<String> subtopologyStoreNames = new HashSet<String>();
            for (String nodeName : subtopologyNodes) {
                AbstractNode node = this.nodeFactories.get(nodeName).describe();
                if (!(node instanceof Processor)) continue;
                subtopologyStoreNames.addAll(((Processor)node).stores());
            }
            storeNames.put(subtopologyId, subtopologyStoreNames);
        }
        this.subtopologyIdToStateStoreNames = storeNames;
    }

    public TopologyDescription describe() {
        TopologyDescription description = new TopologyDescription(this.topologyName);
        for (Map.Entry<Integer, Set<String>> nodeGroup : this.makeNodeGroups().entrySet()) {
            Set<String> allNodesOfGroups = nodeGroup.getValue();
            boolean isNodeGroupOfGlobalStores = this.nodeGroupContainsGlobalSourceNode(allNodesOfGroups);
            if (!isNodeGroupOfGlobalStores) {
                this.describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
                continue;
            }
            this.describeGlobalStore(description, allNodesOfGroups, nodeGroup.getKey());
        }
        return description;
    }

    private void describeGlobalStore(TopologyDescription description, Set<String> nodes, int id) {
        Iterator<String> it = nodes.iterator();
        while (it.hasNext()) {
            String node = it.next();
            if (!this.isGlobalSource(node)) continue;
            it.remove();
            String processorNode = nodes.iterator().next();
            description.addGlobalStore(new GlobalStore(node, processorNode, ((ProcessorNodeFactory)this.nodeFactories.get((Object)processorNode)).stateStoreNames.iterator().next(), this.nodeToSourceTopics.get(node).get(0), id));
            break;
        }
    }

    private boolean nodeGroupContainsGlobalSourceNode(Set<String> allNodesOfGroups) {
        for (String node : allNodesOfGroups) {
            if (!this.isGlobalSource(node)) continue;
            return true;
        }
        return false;
    }

    private static void updateSize(AbstractNode node, int delta) {
        node.size += delta;
        for (TopologyDescription.Node predecessor : node.predecessors()) {
            InternalTopologyBuilder.updateSize((AbstractNode)predecessor, delta);
        }
    }

    private void describeSubtopology(TopologyDescription description, Integer subtopologyId, Set<String> nodeNames) {
        HashMap<String, AbstractNode> nodesByName = new HashMap<String, AbstractNode>();
        for (String nodeName : nodeNames) {
            nodesByName.put(nodeName, this.nodeFactories.get(nodeName).describe());
        }
        for (AbstractNode node : nodesByName.values()) {
            for (String predecessorName : this.nodeFactories.get((Object)node.name()).predecessors) {
                AbstractNode predecessor = (AbstractNode)nodesByName.get(predecessorName);
                node.addPredecessor(predecessor);
                predecessor.addSuccessor(node);
                InternalTopologyBuilder.updateSize(predecessor, node.size);
            }
        }
        description.addSubtopology(new SubtopologyDescription(subtopologyId, new HashSet<TopologyDescription.Node>(nodesByName.values())));
    }

    private static String nodeNames(Set<TopologyDescription.Node> nodes) {
        StringBuilder sb = new StringBuilder();
        if (!nodes.isEmpty()) {
            for (TopologyDescription.Node n : nodes) {
                sb.append(n.name());
                sb.append(", ");
            }
        } else {
            return "none";
        }
        sb.deleteCharAt(sb.length() - 1);
        sb.deleteCharAt(sb.length() - 1);
        return sb.toString();
    }

    private Set<String> subscriptionUpdates() {
        return Collections.unmodifiableSet(this.subscriptionUpdates);
    }

    private boolean hasSubscriptionUpdates() {
        return !this.subscriptionUpdates.isEmpty();
    }

    synchronized void addSubscribedTopicsFromAssignment(Set<TopicPartition> partitions, String logPrefix) {
        if (this.usesPatternSubscription()) {
            HashSet<String> assignedTopics = new HashSet<String>();
            for (TopicPartition topicPartition : partitions) {
                assignedTopics.add(topicPartition.topic());
            }
            Set<String> existingTopics = this.subscriptionUpdates();
            if (!existingTopics.equals(assignedTopics)) {
                assignedTopics.addAll(existingTopics);
                this.updateSubscribedTopics(assignedTopics, logPrefix);
            }
        }
    }

    synchronized void addSubscribedTopicsFromMetadata(Set<String> topics, String logPrefix) {
        if (this.usesPatternSubscription() && !this.subscriptionUpdates().equals(topics)) {
            this.updateSubscribedTopics(topics, logPrefix);
        }
    }

    private void updateSubscribedTopics(Set<String> topics, String logPrefix) {
        this.subscriptionUpdates.clear();
        this.subscriptionUpdates.addAll(topics);
        log.debug("{}found {} topics possibly matching subscription", (Object)logPrefix, (Object)topics.size());
        this.setRegexMatchedTopicsToSourceNodes();
        this.setRegexMatchedTopicToStateStore();
    }

    public synchronized List<String> allSourcePatternStrings() {
        return this.nodeToSourcePatterns.values().stream().map(Pattern::pattern).collect(Collectors.toList());
    }

    public boolean hasNamedTopology() {
        return this.topologyName != null;
    }

    public synchronized Map<String, StoreFactory> stateStores() {
        return this.stateFactories;
    }

    public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier(String name, FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier) {
        return ProcessorWrapper.asWrappedFixedKey(this.processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier));
    }

    public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier(String name, ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier) {
        return ProcessorWrapper.asWrapped(this.processorWrapper.wrapProcessorSupplier(name, processorSupplier));
    }

    public void addImplicitInternalNames(InternalResourcesNaming internalResourcesNaming) {
        this.implicitInternalNames.add(internalResourcesNaming);
    }

    public void checkUnprovidedNames() {
        if (!this.implicitInternalNames.isEmpty()) {
            StringBuilder result = new StringBuilder();
            ArrayList<String> changelogTopics = new ArrayList<String>();
            ArrayList<String> stateStores = new ArrayList<String>();
            ArrayList<String> repartitionTopics = new ArrayList<String>();
            for (InternalResourcesNaming internalResourcesNaming : this.implicitInternalNames) {
                if (!Utils.isBlank((String)internalResourcesNaming.changelogTopic())) {
                    changelogTopics.add(internalResourcesNaming.changelogTopic());
                }
                if (!Utils.isBlank((String)internalResourcesNaming.stateStore())) {
                    stateStores.add(internalResourcesNaming.stateStore());
                }
                if (Utils.isBlank((String)internalResourcesNaming.repartitionTopic())) continue;
                repartitionTopics.add(internalResourcesNaming.repartitionTopic());
            }
            if (!changelogTopics.isEmpty()) {
                result.append(String.format("Following changelog topic(s) has not been named: %s%n", String.join((CharSequence)", ", changelogTopics)));
            }
            if (!stateStores.isEmpty()) {
                result.append(String.format("Following state store(s) has not been named: %s%n", String.join((CharSequence)", ", stateStores)));
            }
            if (!repartitionTopics.isEmpty()) {
                result.append(String.format("Following repartition topic(s) has not been named: %s%n", String.join((CharSequence)", ", repartitionTopics)));
            }
            if (this.ensureExplicitInternalResourceNaming) {
                throw new TopologyException(result.toString());
            }
            log.warn("Explicit naming for internal resources is currently disabled. If you want to enforce user-defined names for all internal resources, set ensure.explicit.internal.resource.naming to true. Note: Changing internal resource names may require a full streams application reset for an already deployed application. Consult the documentation on naming operators for more details. {}", (Object)result);
        }
    }

    private class SourceNodeFactory<KIn, VIn>
    extends NodeFactory<KIn, VIn, KIn, VIn> {
        private final List<String> topics;
        private final Pattern pattern;
        private final Deserializer<KIn> keyDeserializer;
        private final Deserializer<VIn> valDeserializer;
        private final TimestampExtractor timestampExtractor;

        private SourceNodeFactory(String name, String[] topics, Pattern pattern, TimestampExtractor timestampExtractor, Deserializer<KIn> keyDeserializer, Deserializer<VIn> valDeserializer) {
            super(name, NO_PREDECESSORS);
            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
            this.pattern = pattern;
            this.keyDeserializer = keyDeserializer;
            this.valDeserializer = valDeserializer;
            this.timestampExtractor = timestampExtractor;
        }

        List<String> topics(Collection<String> subscribedTopics) {
            if (subscribedTopics.isEmpty()) {
                return Collections.singletonList(String.valueOf(this.pattern));
            }
            ArrayList<String> matchedTopics = new ArrayList<String>();
            for (String update : subscribedTopics) {
                if (this.pattern == InternalTopologyBuilder.this.topicToPatterns.get(update)) {
                    matchedTopics.add(update);
                    continue;
                }
                if (InternalTopologyBuilder.this.topicToPatterns.containsKey(update) && this.isMatch(update)) {
                    throw new TopologyException("Topic " + update + " is already matched for another regex pattern " + String.valueOf(InternalTopologyBuilder.this.topicToPatterns.get(update)) + " and hence cannot be matched to this regex pattern " + String.valueOf(this.pattern) + " any more.");
                }
                if (!this.isMatch(update)) continue;
                InternalTopologyBuilder.this.topicToPatterns.put(update, this.pattern);
                matchedTopics.add(update);
            }
            return matchedTopics;
        }

        @Override
        public ProcessorNode<KIn, VIn, KIn, VIn> build() {
            return new SourceNode<KIn, VIn>(this.name, this.timestampExtractor, this.keyDeserializer, this.valDeserializer);
        }

        private boolean isMatch(String topic) {
            return this.pattern.matcher(topic).matches();
        }

        @Override
        Source describe() {
            return new Source(this.name, (Set<String>)(this.topics.isEmpty() ? null : new HashSet<String>(this.topics)), this.pattern);
        }
    }

    private class SinkNodeFactory<KIn, VIn>
    extends NodeFactory<KIn, VIn, Void, Void> {
        private final Serializer<KIn> keySerializer;
        private final Serializer<VIn> valSerializer;
        private final StreamPartitioner<? super KIn, ? super VIn> partitioner;
        private final TopicNameExtractor<? super KIn, ? super VIn> topicExtractor;

        private SinkNodeFactory(String name, String[] predecessors, TopicNameExtractor<? super KIn, ? super VIn> topicExtractor, Serializer<KIn> keySerializer, Serializer<VIn> valSerializer, StreamPartitioner<? super KIn, ? super VIn> partitioner) {
            super(name, (String[])predecessors.clone());
            this.topicExtractor = topicExtractor;
            this.keySerializer = keySerializer;
            this.valSerializer = valSerializer;
            this.partitioner = partitioner;
        }

        @Override
        public ProcessorNode<KIn, VIn, Void, Void> build() {
            if (this.topicExtractor instanceof StaticTopicNameExtractor) {
                String topic = ((StaticTopicNameExtractor)this.topicExtractor).topicName;
                if (InternalTopologyBuilder.this.internalTopicNamesWithProperties.containsKey(topic)) {
                    return new SinkNode<KIn, VIn>(this.name, new StaticTopicNameExtractor(InternalTopologyBuilder.this.decorateTopic(topic)), this.keySerializer, this.valSerializer, this.partitioner);
                }
                return new SinkNode<KIn, VIn>(this.name, this.topicExtractor, this.keySerializer, this.valSerializer, this.partitioner);
            }
            return new SinkNode<KIn, VIn>(this.name, this.topicExtractor, this.keySerializer, this.valSerializer, this.partitioner);
        }

        @Override
        Sink<KIn, VIn> describe() {
            return new Sink<KIn, VIn>(this.name, this.topicExtractor);
        }
    }

    private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut>
    extends NodeFactory<KIn, VIn, KOut, VOut> {
        private final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier;
        final Set<String> stateStoreNames = new HashSet<String>();

        ProcessorNodeFactory(String name, String[] predecessors, ProcessorSupplier<KIn, VIn, KOut, VOut> supplier) {
            super(name, (String[])predecessors.clone());
            this.supplier = supplier;
        }

        public void addStateStore(String stateStoreName) {
            this.stateStoreNames.add(stateStoreName);
        }

        @Override
        public ProcessorNode<KIn, VIn, KOut, VOut> build() {
            return new ProcessorNode(this.name, this.supplier.get(), this.stateStoreNames);
        }

        @Override
        Processor describe() {
            return new Processor(this.name, new HashSet<String>(this.stateStoreNames));
        }
    }

    private static class FixedKeyProcessorNodeFactory<KIn, VIn, VOut>
    extends ProcessorNodeFactory<KIn, VIn, KIn, VOut> {
        private final FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier;

        FixedKeyProcessorNodeFactory(String name, String[] predecessors, FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier) {
            super(name, (String[])predecessors.clone(), null);
            this.supplier = supplier;
        }

        @Override
        public ProcessorNode<KIn, VIn, KIn, VOut> build() {
            return new ProcessorNode(this.name, this.supplier.get(), (Set<String>)this.stateStoreNames);
        }

        @Override
        Processor describe() {
            return new Processor(this.name, new HashSet<String>(this.stateStoreNames));
        }
    }

    public static class ReprocessFactory<KIn, VIn, KOut, VOut> {
        private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
        private final Deserializer<KIn> keyDeserializer;
        private final Deserializer<VIn> valueDeserializer;

        private ReprocessFactory(ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier, Deserializer<KIn> key, Deserializer<VIn> value) {
            this.processorSupplier = processorSupplier;
            this.keyDeserializer = key;
            this.valueDeserializer = value;
        }

        public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
            return this.processorSupplier;
        }

        public Deserializer<KIn> keyDeserializer() {
            return this.keyDeserializer;
        }

        public Deserializer<VIn> valueDeserializer() {
            return this.valueDeserializer;
        }
    }

    private static abstract class NodeFactory<KIn, VIn, KOut, VOut> {
        final String name;
        final String[] predecessors;

        NodeFactory(String name, String[] predecessors) {
            this.name = name;
            this.predecessors = predecessors;
        }

        public abstract ProcessorNode<KIn, VIn, KOut, VOut> build();

        abstract AbstractNode describe();
    }

    public static class TopicsInfo {
        public final Set<String> sinkTopics;
        public final Set<String> sourceTopics;
        public final Map<String, InternalTopicConfig> stateChangelogTopics;
        public final Map<String, InternalTopicConfig> repartitionSourceTopics;

        TopicsInfo(Set<String> sinkTopics, Set<String> sourceTopics, Map<String, InternalTopicConfig> repartitionSourceTopics, Map<String, InternalTopicConfig> stateChangelogTopics) {
            this.sinkTopics = sinkTopics;
            this.sourceTopics = sourceTopics;
            this.stateChangelogTopics = stateChangelogTopics;
            this.repartitionSourceTopics = repartitionSourceTopics;
        }

        public Set<InternalTopicConfig> nonSourceChangelogTopics() {
            HashSet<InternalTopicConfig> topicConfigs = new HashSet<InternalTopicConfig>();
            for (Map.Entry<String, InternalTopicConfig> changelogTopicEntry : this.stateChangelogTopics.entrySet()) {
                if (this.sourceTopics.contains(changelogTopicEntry.getKey())) continue;
                topicConfigs.add(changelogTopicEntry.getValue());
            }
            return topicConfigs;
        }

        public Set<String> changelogTopics() {
            return Collections.unmodifiableSet(this.stateChangelogTopics.keySet());
        }

        public Set<String> sourceTopicChangelogs() {
            return this.sourceTopics.stream().filter(this.stateChangelogTopics::containsKey).collect(Collectors.toSet());
        }

        public boolean equals(Object o) {
            if (o instanceof TopicsInfo) {
                TopicsInfo other = (TopicsInfo)o;
                return other.sourceTopics.equals(this.sourceTopics) && other.stateChangelogTopics.equals(this.stateChangelogTopics);
            }
            return false;
        }

        public int hashCode() {
            long n = (long)this.sourceTopics.hashCode() << 32 | (long)this.stateChangelogTopics.hashCode();
            return (int)(n % 0xFFFFFFFFL);
        }

        public String toString() {
            return "TopicsInfo{sinkTopics=" + String.valueOf(this.sinkTopics) + ", sourceTopics=" + String.valueOf(this.sourceTopics) + ", repartitionSourceTopics=" + String.valueOf(this.repartitionSourceTopics) + ", stateChangelogTopics=" + String.valueOf(this.stateChangelogTopics) + "}";
        }
    }

    public static abstract class AbstractNode
    implements TopologyDescription.Node {
        final String name;
        final Set<TopologyDescription.Node> predecessors = new TreeSet<TopologyDescription.Node>(NODE_COMPARATOR);
        final Set<TopologyDescription.Node> successors = new TreeSet<TopologyDescription.Node>(NODE_COMPARATOR);
        int size;

        AbstractNode(String name) {
            Objects.requireNonNull(name, "name cannot be null");
            this.name = name;
            this.size = 1;
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public Set<TopologyDescription.Node> predecessors() {
            return Collections.unmodifiableSet(this.predecessors);
        }

        @Override
        public Set<TopologyDescription.Node> successors() {
            return Collections.unmodifiableSet(this.successors);
        }

        public void addPredecessor(TopologyDescription.Node predecessor) {
            this.predecessors.add(predecessor);
        }

        public void addSuccessor(TopologyDescription.Node successor) {
            this.successors.add(successor);
        }
    }

    public static final class Processor
    extends AbstractNode
    implements TopologyDescription.Processor {
        private final Set<String> stores;

        public Processor(String name, Set<String> stores) {
            super(name);
            this.stores = stores;
        }

        @Override
        public Set<String> stores() {
            return Collections.unmodifiableSet(this.stores);
        }

        public String toString() {
            return "Processor: " + this.name + " (stores: " + String.valueOf(this.stores) + ")\n      --> " + InternalTopologyBuilder.nodeNames(this.successors) + "\n      <-- " + InternalTopologyBuilder.nodeNames(this.predecessors);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Processor processor = (Processor)o;
            return this.name.equals(processor.name) && this.stores.equals(processor.stores) && this.predecessors.equals(processor.predecessors);
        }

        public int hashCode() {
            return Objects.hash(this.name, this.stores);
        }
    }

    public static final class TopologyDescription
    implements org.apache.kafka.streams.TopologyDescription {
        private final TreeSet<TopologyDescription.Subtopology> subtopologies = new TreeSet<TopologyDescription.Subtopology>(SUBTOPOLOGY_COMPARATOR);
        private final TreeSet<TopologyDescription.GlobalStore> globalStores = new TreeSet<TopologyDescription.GlobalStore>(GLOBALSTORE_COMPARATOR);
        private final String namedTopology;

        public TopologyDescription() {
            this(null);
        }

        public TopologyDescription(String namedTopology) {
            this.namedTopology = namedTopology;
        }

        public void addSubtopology(TopologyDescription.Subtopology subtopology) {
            this.subtopologies.add(subtopology);
        }

        public void addGlobalStore(TopologyDescription.GlobalStore globalStore) {
            this.globalStores.add(globalStore);
        }

        @Override
        public Set<TopologyDescription.Subtopology> subtopologies() {
            return Collections.unmodifiableSet(this.subtopologies);
        }

        @Override
        public Set<TopologyDescription.GlobalStore> globalStores() {
            return Collections.unmodifiableSet(this.globalStores);
        }

        public String toString() {
            TopologyDescription.Subtopology subtopology;
            StringBuilder sb = new StringBuilder();
            if (this.namedTopology == null) {
                sb.append("Topologies:\n ");
            } else {
                sb.append("Topology: ").append(this.namedTopology).append(":\n ");
            }
            TopologyDescription.Subtopology[] sortedSubtopologies = this.subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]);
            TopologyDescription.GlobalStore[] sortedGlobalStores = this.globalStores.descendingSet().toArray(new TopologyDescription.GlobalStore[0]);
            int expectedId = 0;
            int subtopologiesIndex = sortedSubtopologies.length - 1;
            int globalStoresIndex = sortedGlobalStores.length - 1;
            while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
                sb.append("  ");
                subtopology = sortedSubtopologies[subtopologiesIndex];
                TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                if (subtopology.id() == expectedId) {
                    sb.append(subtopology);
                    --subtopologiesIndex;
                } else {
                    sb.append(globalStore);
                    --globalStoresIndex;
                }
                ++expectedId;
            }
            while (subtopologiesIndex != -1) {
                subtopology = sortedSubtopologies[subtopologiesIndex];
                sb.append("  ");
                sb.append(subtopology);
                --subtopologiesIndex;
            }
            while (globalStoresIndex != -1) {
                TopologyDescription.GlobalStore globalStore = sortedGlobalStores[globalStoresIndex];
                sb.append("  ");
                sb.append(globalStore);
                --globalStoresIndex;
            }
            return sb.toString();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TopologyDescription that = (TopologyDescription)o;
            return this.subtopologies.equals(that.subtopologies) && this.globalStores.equals(that.globalStores);
        }

        public int hashCode() {
            return Objects.hash(this.subtopologies, this.globalStores);
        }
    }

    public static final class GlobalStore
    implements TopologyDescription.GlobalStore {
        private final Source source;
        private final Processor processor;
        private final int id;

        public GlobalStore(String sourceName, String processorName, String storeName, String topicName, int id) {
            this.source = new Source(sourceName, Collections.singleton(topicName), null);
            this.processor = new Processor(processorName, Collections.singleton(storeName));
            this.source.successors.add(this.processor);
            this.processor.predecessors.add(this.source);
            this.id = id;
        }

        @Override
        public int id() {
            return this.id;
        }

        @Override
        public TopologyDescription.Source source() {
            return this.source;
        }

        @Override
        public TopologyDescription.Processor processor() {
            return this.processor;
        }

        public String toString() {
            return "Sub-topology: " + this.id + " for global store (will not generate tasks)\n    " + this.source.toString() + "\n    " + this.processor.toString() + "\n";
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            GlobalStore that = (GlobalStore)o;
            return this.source.equals(that.source) && this.processor.equals(that.processor);
        }

        public int hashCode() {
            return Objects.hash(this.source, this.processor);
        }
    }

    public static final class SubtopologyDescription
    implements TopologyDescription.Subtopology {
        private final int id;
        private final Set<TopologyDescription.Node> nodes;

        public SubtopologyDescription(int id, Set<TopologyDescription.Node> nodes) {
            this.id = id;
            this.nodes = new TreeSet<TopologyDescription.Node>(NODE_COMPARATOR);
            this.nodes.addAll(nodes);
        }

        @Override
        public int id() {
            return this.id;
        }

        @Override
        public Set<TopologyDescription.Node> nodes() {
            return Collections.unmodifiableSet(this.nodes);
        }

        Iterator<TopologyDescription.Node> nodesInOrder() {
            return this.nodes.iterator();
        }

        public String toString() {
            return "Sub-topology: " + this.id + "\n" + this.nodesAsString() + "\n";
        }

        private String nodesAsString() {
            StringBuilder sb = new StringBuilder();
            for (TopologyDescription.Node node : this.nodes) {
                sb.append("    ");
                sb.append(node);
                sb.append('\n');
            }
            return sb.toString();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SubtopologyDescription that = (SubtopologyDescription)o;
            return this.id == that.id && this.nodes.equals(that.nodes);
        }

        public int hashCode() {
            return Objects.hash(this.id, this.nodes);
        }
    }

    private static class NodeComparator
    implements Comparator<TopologyDescription.Node>,
    Serializable {
        private NodeComparator() {
        }

        @Override
        public int compare(TopologyDescription.Node node1, TopologyDescription.Node node2) {
            if (node1.equals(node2)) {
                return 0;
            }
            int size1 = ((AbstractNode)node1).size;
            int size2 = ((AbstractNode)node2).size;
            if (size1 != size2) {
                return size2 - size1;
            }
            return node1.name().compareTo(node2.name());
        }
    }

    private static class GlobalStoreComparator
    implements Comparator<TopologyDescription.GlobalStore>,
    Serializable {
        private GlobalStoreComparator() {
        }

        @Override
        public int compare(TopologyDescription.GlobalStore globalStore1, TopologyDescription.GlobalStore globalStore2) {
            if (globalStore1.equals(globalStore2)) {
                return 0;
            }
            return globalStore1.id() - globalStore2.id();
        }
    }

    private static class SubtopologyComparator
    implements Comparator<TopologyDescription.Subtopology>,
    Serializable {
        private SubtopologyComparator() {
        }

        @Override
        public int compare(TopologyDescription.Subtopology subtopology1, TopologyDescription.Subtopology subtopology2) {
            if (subtopology1.equals(subtopology2)) {
                return 0;
            }
            return subtopology1.id() - subtopology2.id();
        }
    }

    public static final class Sink<K, V>
    extends AbstractNode
    implements TopologyDescription.Sink {
        private final TopicNameExtractor<? super K, ? super V> topicNameExtractor;

        public Sink(String name, TopicNameExtractor<? super K, ? super V> topicNameExtractor) {
            super(name);
            this.topicNameExtractor = topicNameExtractor;
        }

        public Sink(String name, String topic) {
            super(name);
            this.topicNameExtractor = new StaticTopicNameExtractor<K, V>(topic);
        }

        @Override
        public String topic() {
            if (this.topicNameExtractor instanceof StaticTopicNameExtractor) {
                return ((StaticTopicNameExtractor)this.topicNameExtractor).topicName;
            }
            return null;
        }

        public TopicNameExtractor<? super K, ? super V> topicNameExtractor() {
            if (this.topicNameExtractor instanceof StaticTopicNameExtractor) {
                return null;
            }
            return this.topicNameExtractor;
        }

        @Override
        public void addSuccessor(TopologyDescription.Node successor) {
            throw new UnsupportedOperationException("Sinks don't have successors.");
        }

        public String toString() {
            if (this.topicNameExtractor instanceof StaticTopicNameExtractor) {
                return "Sink: " + this.name + " (topic: " + this.topic() + ")\n      <-- " + InternalTopologyBuilder.nodeNames(this.predecessors);
            }
            return "Sink: " + this.name + " (extractor class: " + String.valueOf(this.topicNameExtractor) + ")\n      <-- " + InternalTopologyBuilder.nodeNames(this.predecessors);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Sink sink = (Sink)o;
            return this.name.equals(sink.name) && this.topicNameExtractor.equals(sink.topicNameExtractor) && this.predecessors.equals(sink.predecessors);
        }

        public int hashCode() {
            return Objects.hash(this.name, this.topicNameExtractor);
        }
    }

    public static final class Source
    extends AbstractNode
    implements TopologyDescription.Source {
        private final Set<String> topics;
        private final Pattern topicPattern;

        public Source(String name, Set<String> topics, Pattern pattern) {
            super(name);
            if (topics == null && pattern == null) {
                throw new IllegalArgumentException("Either topics or pattern must be not-null, but both are null.");
            }
            if (topics != null && pattern != null) {
                throw new IllegalArgumentException("Either topics or pattern must be null, but both are not null.");
            }
            this.topics = topics;
            this.topicPattern = pattern;
        }

        @Override
        public Set<String> topicSet() {
            return this.topics;
        }

        @Override
        public Pattern topicPattern() {
            return this.topicPattern;
        }

        @Override
        public void addPredecessor(TopologyDescription.Node predecessor) {
            throw new UnsupportedOperationException("Sources don't have predecessors.");
        }

        public String toString() {
            String topicsString = this.topics == null ? this.topicPattern.toString() : this.topics.toString();
            return "Source: " + this.name + " (topics: " + topicsString + ")\n      --> " + InternalTopologyBuilder.nodeNames(this.successors);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Source source = (Source)o;
            return this.name.equals(source.name) && Objects.equals(this.topics, source.topics) && (this.topicPattern == null ? source.topicPattern == null : this.topicPattern.pattern().equals(source.topicPattern.pattern()));
        }

        public int hashCode() {
            return Objects.hash(this.name, this.topics, this.topicPattern);
        }
    }
}

