package org.kurator.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import joptsimple.internal.Strings;
import org.kurator.akka.data.WorkflowProduct;
import org.kurator.akka.messages.ControlMessage;
import org.kurator.akka.messages.ExceptionMessage;
import org.kurator.akka.messages.Failure;
import org.kurator.akka.messages.Initialize;
import org.kurator.akka.messages.ProductPublication;
import org.kurator.akka.messages.Start;
import org.kurator.akka.messages.Success;
import org.kurator.log.Log;
import org.kurator.log.Logger;
import org.kurator.log.SilentLogger;
import scala.concurrent.Future;
import scala.util.Try;

/* loaded from: input_file:org/kurator/akka/Workflow.class */
public class Workflow extends UntypedActor {
    private final ActorSystem actorSystem;
    private final String name;
    private ActorRef inputActor;
    private final InputStream inStream;
    private final PrintStream outStream;
    private final PrintStream errStream;
    private final WorkflowRunner runner;
    private final Set<ActorRef> actors = new HashSet();
    private Logger logger = new SilentLogger();
    private List<WorkflowProduct> products = new LinkedList();
    final Map<ActorRef, Set<ActorRef>> actorConnections = new HashMap();

    public Workflow(ActorSystem actorSystem, String str, InputStream inputStream, PrintStream printStream, PrintStream printStream2, WorkflowRunner workflowRunner) {
        this.actorSystem = actorSystem;
        this.name = str;
        this.inStream = inputStream;
        this.outStream = printStream;
        this.errStream = printStream2;
        this.runner = workflowRunner;
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
        this.logger.setSource("WORKFLOW");
    }

    public void setInput(ActorRef actorRef) {
        this.inputActor = actorRef;
    }

    private void actor(ActorRef actorRef) {
        this.actors.add(actorRef);
        this.logger.trace("Now watching " + Log.ACTOR(this.runner.name(actorRef)));
        getContext().watch(actorRef);
    }

    public void setActors(Set<ActorRef> set) {
        Iterator<ActorRef> it = set.iterator();
        while (it.hasNext()) {
            actor(it.next());
        }
    }

    public void connection(ActorRef actorRef, ActorRef actorRef2) {
        Set<ActorRef> set = this.actorConnections.get(actorRef);
        if (set == null) {
            set = new HashSet();
            this.actorConnections.put(actorRef, set);
        }
        set.add(actorRef2);
    }

    private void initialize() throws Exception {
        this.logger.debug("Initializing actors");
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Initialize initialize = new Initialize();
        for (ActorRef actorRef : this.actors) {
            this.logger.comm("Sending INITIALIZE message to " + Log.ACTOR(this.runner.name(actorRef)));
            arrayList.add(Patterns.ask(actorRef, initialize, Constants.TIMEOUT));
            arrayList2.add(this.runner.name(actorRef));
        }
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < arrayList.size(); i++) {
            Future future = (Future) arrayList.get(i);
            String str = (String) arrayList2.get(i);
            future.ready(Constants.TIMEOUT_DURATION, null);
            this.logger.comm("Waiting for INITIALIZE response from " + Log.ACTOR(str));
            ControlMessage controlMessage = (ControlMessage) ((Try) future.value().get()).get();
            this.logger.comm("Received INITIALIZE response from " + Log.ACTOR(str));
            if (controlMessage instanceof Failure) {
                this.logger.error("Actor reports error during initialization: " + controlMessage);
                linkedList.add((Failure) controlMessage);
            }
        }
        Object success = linkedList.size() == 0 ? new Success() : new Failure("Error initializing workflow '" + this.name + Strings.SINGLE_QUOTE, linkedList);
        this.logger.debug("Done initializing actors");
        this.logger.comm("Sending INITIALIZE response to RUNNER");
        getSender().tell(success, getSelf());
    }

    @Override // akka.actor.UntypedActor
    public void onReceive(Object obj) throws Exception {
        this.logger.trace("Received message of type " + obj.getClass() + " from " + getSender());
        this.logger.value("Received message: ", obj.toString());
        if (obj instanceof Initialize) {
            this.logger.comm("Handling INITIALIZE message from RUNNER");
            initialize();
            this.logger.comm("Done handling INITIALIZE message");
            return;
        }
        if (obj instanceof Start) {
            this.logger.comm("Handling START message from RUNNER");
            this.logger.debug("Starting actors");
            for (ActorRef actorRef : this.actors) {
                this.logger.trace("Sending START message to " + Log.ACTOR(this.runner.name(actorRef)));
                actorRef.tell(obj, getSelf());
            }
            this.logger.debug("Run starting with " + this.actors.size() + " active actors");
            this.logger.comm("Done handling START message");
            return;
        }
        if (obj instanceof ProductPublication) {
            this.logger.comm("Received PUBLISH_PRODUCT message");
            ProductPublication productPublication = (ProductPublication) obj;
            this.logger.value("Adding to list of workflow products: " + productPublication.product);
            this.products.add(productPublication.product);
            this.logger.trace("Workflow has yielded " + this.products.size() + " products so far.");
            this.logger.comm("Done handling PUBLISH_PRODUCT message");
            return;
        }
        if (obj instanceof ExceptionMessage) {
            this.errStream.println(sender() + " threw an uncaught exception:");
            Exception exception = ((ExceptionMessage) obj).getException();
            exception.printStackTrace(this.errStream);
            this.runner.setLastException(exception);
            return;
        }
        if (!(obj instanceof Terminated)) {
            if (this.inputActor != null) {
                this.logger.comm("Forwarding unhandled message to input actor " + Log.ACTOR(this.runner.name(this.inputActor)));
                this.inputActor.tell(obj, getSelf());
                return;
            }
            return;
        }
        ActorRef actor = ((Terminated) obj).actor();
        this.logger.trace("Handling TERMINATED message from " + Log.ACTOR(this.runner.name(actor)));
        this.actors.remove(actor);
        this.logger.debug(Log.ACTOR(this.runner.name(actor)) + " has stopped");
        this.logger.debug("Number of active actors is now " + this.actors.size());
        if (this.actors.size() != 0) {
            this.logger.debug("Currently active actors include " + activeActors(3));
            return;
        }
        this.logger.trace("Stopping because all actors have stopped");
        this.logger.debug("Stopping WORKFLOW");
        getContext().stop(getSelf());
        this.logger.debug("Shutting down ActorSystem");
        this.actorSystem.shutdown();
        this.logger.comm("Sending " + this.products.size() + " workflow PRODUCTS to RUNNER");
        this.runner.setWorkflowProducts(this.products);
    }

    private String activeActors(int i) {
        StringBuffer stringBuffer = new StringBuffer();
        int i2 = 0;
        Iterator<ActorRef> it = this.actors.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ActorRef next = it.next();
            i2++;
            if (i2 > i) {
                stringBuffer.append(" ...");
                break;
            }
            if (stringBuffer.length() > 2) {
                stringBuffer.append(", ");
            }
            stringBuffer.append(Log.ACTOR(this.runner.name(next)));
        }
        return stringBuffer.toString();
    }
}
