/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.sleuth.instrument.messaging;

import java.lang.reflect.Type;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.context.scope.refresh.RefreshScopeRefreshedEvent;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.instrument.messaging.FunctionMessageSpanCustomizer;
import org.springframework.cloud.sleuth.instrument.messaging.MessageAndSpan;
import org.springframework.cloud.sleuth.instrument.messaging.MessageAndSpans;
import org.springframework.cloud.sleuth.instrument.messaging.SleuthMessagingSpan;
import org.springframework.cloud.sleuth.instrument.messaging.TraceMessageHandler;
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.context.ApplicationListener;
import org.springframework.core.env.Environment;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class TraceFunctionAroundWrapper
extends FunctionAroundWrapper
implements ApplicationListener<RefreshScopeRefreshedEvent> {
    private static final Log log = LogFactory.getLog(TraceFunctionAroundWrapper.class);
    private final Environment environment;
    private final Tracer tracer;
    private final Propagator propagator;
    private final Propagator.Setter<MessageHeaderAccessor> injector;
    private final Propagator.Getter<MessageHeaderAccessor> extractor;
    private final TraceMessageHandler traceMessageHandler;
    private final List<FunctionMessageSpanCustomizer> customizers;
    final Map<String, String> functionToDestinationCache = new ConcurrentHashMap<String, String>();

    public TraceFunctionAroundWrapper(Environment environment, Tracer tracer, Propagator propagator, Propagator.Setter<MessageHeaderAccessor> injector, Propagator.Getter<MessageHeaderAccessor> extractor) {
        this(environment, tracer, propagator, injector, extractor, Collections.emptyList());
    }

    public TraceFunctionAroundWrapper(Environment environment, Tracer tracer, Propagator propagator, Propagator.Setter<MessageHeaderAccessor> injector, Propagator.Getter<MessageHeaderAccessor> extractor, List<FunctionMessageSpanCustomizer> customizers) {
        this.environment = environment;
        this.tracer = tracer;
        this.propagator = propagator;
        this.injector = injector;
        this.extractor = extractor;
        this.customizers = customizers;
        this.traceMessageHandler = TraceMessageHandler.forNonSpringIntegration(this.tracer, this.propagator, this.injector, this.extractor, this.customizers);
    }

    protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
        if (FunctionTypeUtils.isCollectionOfMessage((Type)targetFunction.getOutputType())) {
            return targetFunction.apply(message);
        }
        if (targetFunction.isInputTypePublisher() || targetFunction.isOutputTypePublisher()) {
            if (message != null && !(message instanceof Publisher)) {
                this.logDebugAboutMessageTypes(message);
                return targetFunction.apply(message);
            }
            return this.reactorStream((Publisher)message, targetFunction);
        }
        if (message != null && !(message instanceof Message)) {
            this.logDebugAboutMessageTypes(message);
            return targetFunction.apply(message);
        }
        return this.nonReactorStream((Message<byte[]>)((Message)message), targetFunction);
    }

    private void logDebugAboutMessageTypes(Object message) {
        if (log.isDebugEnabled()) {
            String messageClass = message.getClass().getName();
            log.debug((Object)("We only support tracing for Message types. You need to wrap your function type [" + messageClass + "] into [Message<" + messageClass + ">]"));
        }
    }

    private Object reactorStream(Publisher messageStream, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
        if (messageStream == null && targetFunction.isSupplier()) {
            return this.reactorStreamSupplier(messageStream, targetFunction);
        }
        Type itemType = FunctionTypeUtils.getGenericType((Type)targetFunction.getInputType());
        Class itemTypeClass = FunctionTypeUtils.getRawType((Type)itemType);
        if (!itemTypeClass.equals(Message.class)) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("Target function [" + targetFunction.getFunctionDefinition() + "] has raw input type [" + itemType + "] and should be [" + Message.class + "]. Will not wrap it."));
            }
            return targetFunction.apply((Object)messageStream);
        }
        Publisher messagePublisher = messageStream;
        if (FunctionTypeUtils.isMono((Type)targetFunction.getInputType())) {
            return this.reactorMonoStream(targetFunction, (Publisher<Message>)messagePublisher);
        }
        return this.reactorFluxStream(targetFunction, (Publisher<Message>)messagePublisher);
    }

    private Object reactorMonoStream(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Publisher<Message> messagePublisher) {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Will instrument a stream Mono function");
        }
        Mono mono = Mono.from(messagePublisher).doOnNext(m -> this.tracer.withSpan(null)).map(msg -> this.traceMessageHandler.wrapInputMessage((Message<?>)msg, this.inputDestination(targetFunction.getFunctionDefinition()))).flatMap(msg -> Mono.deferContextual(ctx -> {
            MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope)ctx.get(MessageAndSpansAndScope.class);
            messageAndSpansAndScope.messageAndSpans = msg;
            messageAndSpansAndScope.span = msg.childSpan;
            this.setNameAndTag(targetFunction, msg.childSpan);
            messageAndSpansAndScope.scope = this.tracer.withSpan(msg.childSpan);
            return Mono.just((Object)msg.msg);
        }));
        if (targetFunction.isConsumer()) {
            return targetFunction.apply(this.reactorStreamConsumer(mono));
        }
        Publisher function = (Publisher)targetFunction.apply((Object)mono);
        if (function instanceof Mono) {
            return this.messageMono(targetFunction, (Mono<Message>)((Mono)function));
        }
        return this.messageFlux(targetFunction, (Flux<Message>)((Flux)function));
    }

    private Mono<Message> messageMono(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Mono<Message> function) {
        return Mono.deferContextual(contextView -> {
            MessageAndSpansAndScope msg = (MessageAndSpansAndScope)contextView.get(MessageAndSpansAndScope.class);
            return function.doOnNext(message -> {
                msg.end();
                msg.handle();
            }).map(msgResult -> {
                MessageAndSpan messageAndSpan = this.traceMessageHandler.wrapOutputMessage((Message<?>)msgResult, msg.messageAndSpans.parentSpan, this.outputDestination(targetFunction.getFunctionDefinition()));
                this.traceMessageHandler.afterMessageHandled(messageAndSpan.span, null);
                return messageAndSpan.msg;
            }).doOnError(msg::error).doFinally(signalType -> {
                if (!msg.isHandled()) {
                    msg.end();
                }
            });
        }).contextWrite(contextView -> contextView.put(MessageAndSpansAndScope.class, (Object)new MessageAndSpansAndScope()));
    }

    private Object reactorFluxStream(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Publisher<Message> messagePublisher) {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Will instrument a stream Flux function");
        }
        Flux flux = Flux.from(messagePublisher).doOnNext(m -> this.tracer.withSpan(null)).map(msg -> this.traceMessageHandler.wrapInputMessage((Message<?>)msg, this.inputDestination(targetFunction.getFunctionDefinition()))).flatMap(msg -> Flux.deferContextual(ctx -> {
            MessageAndSpansAndScope messageAndSpansAndScope = (MessageAndSpansAndScope)ctx.get(MessageAndSpansAndScope.class);
            messageAndSpansAndScope.messageAndSpans = msg;
            messageAndSpansAndScope.span = msg.childSpan;
            this.setNameAndTag(targetFunction, msg.childSpan);
            messageAndSpansAndScope.scope = this.tracer.withSpan(msg.childSpan);
            return Mono.just((Object)msg.msg);
        }));
        if (targetFunction.isConsumer()) {
            return targetFunction.apply(this.reactorStreamConsumer(flux));
        }
        Publisher function = (Publisher)targetFunction.apply((Object)flux);
        if (function instanceof Mono) {
            return this.messageMono(targetFunction, (Mono<Message>)((Mono)function));
        }
        return this.messageFlux(targetFunction, (Flux<Message>)((Flux)function));
    }

    private Flux<Message> messageFlux(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Flux<Message> function) {
        return Flux.deferContextual(contextView -> {
            MessageAndSpansAndScope msg = (MessageAndSpansAndScope)contextView.get(MessageAndSpansAndScope.class);
            return function.doOnNext(message -> {
                msg.end();
                msg.handle();
            }).map(msgResult -> {
                MessageAndSpan messageAndSpan = this.traceMessageHandler.wrapOutputMessage((Message<?>)msgResult, msg.messageAndSpans.parentSpan, this.outputDestination(targetFunction.getFunctionDefinition()));
                this.traceMessageHandler.afterMessageHandled(messageAndSpan.span, null);
                return messageAndSpan.msg;
            }).doOnError(msg::error).doFinally(signalType -> {
                if (!msg.isHandled()) {
                    msg.end();
                }
            });
        }).contextWrite(contextView -> contextView.put(MessageAndSpansAndScope.class, (Object)new MessageAndSpansAndScope()));
    }

    private Object reactorStreamConsumer(Object result) {
        if (result instanceof Mono) {
            return Mono.deferContextual(contextView -> {
                MessageAndSpansAndScope msg = (MessageAndSpansAndScope)contextView.get(MessageAndSpansAndScope.class);
                return ((Mono)result).doOnError(msg::error).doFinally(signalType -> msg.end());
            }).contextWrite(contextView -> contextView.put(MessageAndSpansAndScope.class, (Object)new MessageAndSpansAndScope()));
        }
        return Flux.deferContextual(contextView -> {
            MessageAndSpansAndScope msg = (MessageAndSpansAndScope)contextView.get(MessageAndSpansAndScope.class);
            return ((Flux)result).doOnError(msg::error).doFinally(signalType -> msg.end());
        }).contextWrite(contextView -> contextView.put(MessageAndSpansAndScope.class, (Object)new MessageAndSpansAndScope()));
    }

    private Object reactorStreamSupplier(Publisher<?> message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
        Publisher publisher = (Publisher)targetFunction.get();
        if (publisher instanceof Mono) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Will instrument a stream Mono supplier");
            }
            Mono mono = (Mono)publisher;
            publisher = ReactorSleuth.tracedMono(this.tracer, this.tracer.currentTraceContext(), targetFunction.getFunctionDefinition(), () -> mono, (msg, s) -> this.customizedInputMessageSpan((Span)s, (Message<?>)(msg instanceof Message ? (Message)msg : null))).map(object -> this.toMessage(object)).map(object -> this.getMessageAndSpans((Message)object, targetFunction.getFunctionDefinition(), this.setNameAndTag(targetFunction, this.tracer.currentSpan()))).doOnNext(wrappedOutputMessage -> this.customizedOutputMessageSpan(((MessageAndSpan)wrappedOutputMessage).span, ((MessageAndSpan)wrappedOutputMessage).msg)).doOnNext(wrappedOutputMessage -> this.traceMessageHandler.afterMessageHandled(((MessageAndSpan)wrappedOutputMessage).span, null)).map(wrappedOutputMessage -> ((MessageAndSpan)wrappedOutputMessage).msg);
        } else {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Will instrument a stream Flux supplier");
            }
            Flux flux = (Flux)publisher;
            publisher = ReactorSleuth.tracedFlux(this.tracer, this.tracer.currentTraceContext(), targetFunction.getFunctionDefinition(), () -> flux, (msg, s) -> this.customizedInputMessageSpan((Span)s, (Message<?>)(msg instanceof Message ? (Message)msg : null))).map(object -> this.toMessage(object)).map(object -> this.getMessageAndSpans((Message)object, targetFunction.getFunctionDefinition(), this.setNameAndTag(targetFunction, this.tracer.currentSpan()))).doOnNext(wrappedOutputMessage -> this.customizedOutputMessageSpan(((MessageAndSpan)wrappedOutputMessage).span, ((MessageAndSpan)wrappedOutputMessage).msg)).doOnNext(wrappedOutputMessage -> this.traceMessageHandler.afterMessageHandled(((MessageAndSpan)wrappedOutputMessage).span, null)).map(wrappedOutputMessage -> ((MessageAndSpan)wrappedOutputMessage).msg);
        }
        return publisher;
    }

    private Span setNameAndTag(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Span span) {
        return span.name(targetFunction.getFunctionDefinition()).tag(SleuthMessagingSpan.Tags.FUNCTION_NAME.getKey(), targetFunction.getFunctionDefinition());
    }

    private Object nonReactorStream(Message<byte[]> message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
        Object result;
        Span span;
        MessageAndSpans invocationMessage = null;
        if (message == null && targetFunction.isSupplier()) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Creating a span for a supplier");
            }
            span = this.setNameAndTag(targetFunction, this.tracer.nextSpan());
            this.customizedInputMessageSpan(span, null);
        } else {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Will retrieve the tracing headers from the message");
            }
            invocationMessage = this.traceMessageHandler.wrapInputMessage(message, this.inputDestination(targetFunction.getFunctionDefinition()));
            if (log.isDebugEnabled()) {
                log.debug((Object)("Wrapped input msg " + invocationMessage));
            }
            span = this.setNameAndTag(targetFunction, invocationMessage.childSpan);
        }
        Exception throwable = null;
        try (Tracer.SpanInScope ws = this.tracer.withSpan(span.start());){
            result = invocationMessage == null ? targetFunction.get() : targetFunction.apply((Object)invocationMessage.msg);
        }
        catch (Exception e) {
            throwable = e;
            throw e;
        }
        finally {
            this.traceMessageHandler.afterMessageHandled(span, throwable);
        }
        if (result == null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)"Returned message is null - we have a consumer");
            }
            return null;
        }
        Message<?> msgResult = this.toMessage(result);
        if (log.isDebugEnabled()) {
            log.debug((Object)"Will instrument the output message");
        }
        MessageAndSpan wrappedOutputMessage = invocationMessage != null ? this.traceMessageHandler.wrapOutputMessage(msgResult, invocationMessage.parentSpan, this.outputDestination(targetFunction.getFunctionDefinition())) : this.getMessageAndSpans(msgResult, targetFunction.getFunctionDefinition(), span);
        if (log.isDebugEnabled()) {
            log.debug((Object)("Wrapped output msg " + wrappedOutputMessage));
        }
        this.traceMessageHandler.afterMessageHandled(wrappedOutputMessage.span, null);
        return wrappedOutputMessage.msg;
    }

    MessageAndSpan getMessageAndSpans(Message<?> resultMessage, String name, Span spanFromMessage) {
        return this.traceMessageHandler.wrapOutputMessage(resultMessage, spanFromMessage, this.outputDestination(name));
    }

    private void customizedInputMessageSpan(Span spanToCustomize, Message<?> msg) {
        this.customizers.forEach(cust -> cust.customizeInputMessageSpan(spanToCustomize, msg));
    }

    private void customizedOutputMessageSpan(Span spanToCustomize, Message<?> msg) {
        this.customizers.forEach(cust -> cust.customizeOutputMessageSpan(spanToCustomize, msg));
    }

    private Message<?> toMessage(Object result) {
        if (!(result instanceof Message)) {
            return MessageBuilder.withPayload((Object)result).build();
        }
        return (Message)result;
    }

    String inputDestination(String functionDefinition) {
        return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> {
            String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-in-0";
            String bindingProperty = this.environment.containsProperty(bindingMappingProperty) ? this.environment.getProperty(bindingMappingProperty) : s + "-in-0";
            return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s);
        });
    }

    String outputDestination(String functionDefinition) {
        return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> {
            String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-out-0";
            String bindingProperty = this.environment.containsProperty(bindingMappingProperty) ? this.environment.getProperty(bindingMappingProperty) : s + "-out-0";
            return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s);
        });
    }

    public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Context refreshed, will reset the cache");
        }
        this.functionToDestinationCache.clear();
    }

    static class MessageAndSpansAndScope {
        MessageAndSpans messageAndSpans;
        Span span;
        Tracer.SpanInScope scope;
        boolean handled;

        MessageAndSpansAndScope() {
        }

        void error(Throwable throwable) {
            if (this.span != null) {
                this.span.error(throwable);
            }
        }

        void handle() {
            this.handled = true;
        }

        boolean isHandled() {
            return this.handled;
        }

        void end() {
            if (this.span != null) {
                this.span.end();
            }
            if (this.scope != null) {
                this.scope.close();
            }
        }
    }
}

