package org.kurator.akka;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import joptsimple.internal.Strings;
import org.kurator.akka.data.WorkflowArtifact;
import org.kurator.akka.data.WorkflowProduct;
import org.kurator.akka.messages.ControlMessage;
import org.kurator.akka.messages.EndOfStream;
import org.kurator.akka.messages.ExceptionMessage;
import org.kurator.akka.messages.Failure;
import org.kurator.akka.messages.FutureComplete;
import org.kurator.akka.messages.Initialize;
import org.kurator.akka.messages.ProductPublication;
import org.kurator.akka.messages.Start;
import org.kurator.akka.messages.Success;
import org.kurator.akka.messages.WrappedMessage;
import org.kurator.akka.metadata.BroadcastEventCountChecker;
import org.kurator.akka.metadata.BroadcastEventCounter;
import org.kurator.akka.metadata.MetadataReader;
import org.kurator.akka.metadata.MetadataWriter;
import org.kurator.log.Log;
import org.kurator.log.Logger;
import org.kurator.log.SilentLogger;

/* loaded from: input_file:org/kurator/akka/KuratorActor.class */
public abstract class KuratorActor extends UntypedActor {
    public static final String EOL = System.getProperty("line.separator");
    private static Integer nextActorId = 1;
    public final int id;
    private WorkflowRunner runner;
    protected String name;
    protected Map<String, Object> settings;
    protected Map<String, Object> configuration;
    private List<MetadataWriter> metadataWriters;
    private List<MetadataReader> metadataReaders;
    protected ActorRef workflowRef;
    private WrappedMessage receivedWrappedMessage;
    public boolean endOnEos = true;
    public boolean sendEosOnEnd = true;
    public boolean needsTrigger = false;
    protected volatile InputStream inStream = System.in;
    protected volatile PrintStream outStream = System.out;
    protected volatile PrintStream errStream = System.err;
    private List<ActorConfig> listenerConfigs = new LinkedList();
    private Set<ActorRef> listeners = new HashSet();
    protected Map<String, String> inputs = new HashMap();
    protected ActorFSM state = ActorFSM.CONSTRUCTED;
    protected Logger logger = new SilentLogger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kurator/akka/KuratorActor$ActorFSM.class */
    public enum ActorFSM {
        CONSTRUCTED,
        BUILT,
        INITIALIZED,
        STARTED,
        ENDED
    }

    public KuratorActor() {
        this.metadataWriters = null;
        this.metadataReaders = null;
        synchronized (nextActorId) {
            Integer num = nextActorId;
            nextActorId = Integer.valueOf(nextActorId.intValue() + 1);
            this.id = num.intValue();
        }
        this.metadataReaders = new LinkedList();
        this.metadataReaders.add(new BroadcastEventCountChecker());
        this.metadataWriters = new LinkedList();
        this.metadataWriters.add(new BroadcastEventCounter());
    }

    public synchronized KuratorActor errorStream(PrintStream printStream) {
        this.errStream = printStream;
        return this;
    }

    public synchronized KuratorActor inputStream(InputStream inputStream) {
        this.inStream = inputStream;
        return this;
    }

    public synchronized KuratorActor outputStream(PrintStream printStream) {
        this.outStream = printStream;
        return this;
    }

    public synchronized KuratorActor listeners(List<ActorConfig> list) {
        Contract.requires(this.state, ActorFSM.CONSTRUCTED);
        if (list != null) {
            this.listenerConfigs = list;
        }
        return this;
    }

    public synchronized KuratorActor runner(WorkflowRunner workflowRunner) {
        Contract.requires(this.state, ActorFSM.CONSTRUCTED);
        this.runner = workflowRunner;
        return this;
    }

    public synchronized void settings(Map<String, Object> map) {
        Contract.requires(this.state, ActorFSM.CONSTRUCTED);
        this.settings = map;
    }

    public synchronized KuratorActor inputs(Map<String, String> map) {
        this.inputs = map;
        return this;
    }

    public synchronized KuratorActor setNeedsTrigger(boolean z) {
        Contract.requires(this.state, ActorFSM.CONSTRUCTED);
        this.needsTrigger = z;
        return this;
    }

    public synchronized KuratorActor metadataWriters(List<MetadataWriter> list) {
        if (this.metadataWriters == null) {
            this.metadataWriters = list;
        } else {
            this.metadataWriters.addAll(list);
        }
        return this;
    }

    public synchronized KuratorActor metadataReaders(List<MetadataReader> list) {
        if (this.metadataReaders == null) {
            this.metadataReaders = list;
        } else {
            this.metadataReaders.addAll(list);
        }
        return this;
    }

    public synchronized KuratorActor configuration(Map<String, Object> map) {
        Contract.requires(this.state, ActorFSM.CONSTRUCTED);
        this.configuration = map;
        return this;
    }

    @Override // akka.actor.UntypedActor
    public final synchronized void onReceive(Object obj) throws Exception {
        Contract.disallows(this.state, ActorFSM.ENDED);
        this.logger.trace("Received message of type " + obj.getClass() + " from " + getSender());
        this.logger.value("Received message: ", obj.toString());
        try {
            if (obj instanceof WrappedMessage) {
                this.receivedWrappedMessage = (WrappedMessage) obj;
                obj = unwrapMessage(this.receivedWrappedMessage);
            } else {
                this.receivedWrappedMessage = null;
            }
            if (!(obj instanceof ControlMessage)) {
                this.logger.comm("Received DATA from " + Log.ACTOR(this.runner.name(getSender())));
                this.logger.value("Received DATA", obj);
                Contract.requires(this.state, ActorFSM.STARTED, ActorFSM.INITIALIZED);
                if (this.state == ActorFSM.INITIALIZED) {
                    this.logger.trace("Invoking ON_START_EVENT handler");
                    this.state = ActorFSM.STARTED;
                    handleOnStart();
                }
                this.logger.trace("Invoking ON_DATA_EVENT handler");
                onData(obj);
            } else if (obj instanceof Initialize) {
                this.name = (String) this.configuration.get("name");
                this.logger.setSource(Log.ACTOR(this.name));
                this.logger.comm("Received INITIALIZE message from WORKFLOW");
                Contract.requires(this.state, ActorFSM.CONSTRUCTED);
                Iterator<ActorConfig> it = this.listenerConfigs.iterator();
                while (it.hasNext()) {
                    this.listeners.add(this.runner.getActorForConfig(it.next()));
                }
                try {
                    this.logger.trace("Invoking ON_INITIALIZE_EVENT handler");
                    onInitialize();
                    this.state = ActorFSM.INITIALIZED;
                    this.logger.comm("Sending INITIALIZE response to WORKFLOW");
                    getSender().tell(new Success(), getSelf());
                } catch (Exception e) {
                    String message = e.getMessage();
                    if (message == null || message.isEmpty()) {
                        message = e.toString();
                    }
                    LinkedList linkedList = new LinkedList();
                    linkedList.add(new Failure("Error initializing actor '" + this.name + Strings.SINGLE_QUOTE));
                    linkedList.add(new Failure(message.toString()));
                    getSender().tell(new Failure(linkedList), getSelf());
                    getContext().stop(getSelf());
                }
            } else if (obj instanceof Start) {
                this.logger.comm("Received START message from WORKFLOW");
                this.workflowRef = getSender();
                Contract.requires(this.state, ActorFSM.INITIALIZED, ActorFSM.STARTED);
                if (this.state == ActorFSM.INITIALIZED) {
                    this.state = ActorFSM.STARTED;
                    this.logger.trace("Invoking ON_START_EVENT handler");
                    handleOnStart();
                }
            } else if (obj instanceof EndOfStream) {
                this.logger.comm("Received END_OF_STREAM message from " + Log.ACTOR(this.runner.name(getSender())));
                Contract.requires(this.state, ActorFSM.INITIALIZED, ActorFSM.STARTED);
                this.logger.trace("Invoking ON_END_OF_STREAM_EVENT handler");
                onEndOfStream((EndOfStream) obj);
            } else if (obj instanceof FutureComplete) {
                Contract.requires(this.state, ActorFSM.INITIALIZED, ActorFSM.STARTED);
                onFutureComplete((FutureComplete) obj);
            }
        } catch (Exception e2) {
            reportException(e2);
            this.errStream.println(e2.getMessage());
            endStreamAndStop();
        }
    }

    private void handleOnStart() throws Exception {
        onStart();
        if (this.needsTrigger) {
            this.logger.trace("Invoking ON_TRIGGER_EVENT handler");
            onTrigger();
        }
    }

    protected void onInitialize() throws Exception {
        this.logger.trace("Executing default ON_INITIALIZE_EVENT handler");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onStart() throws Exception {
        this.logger.trace("Executing default ON_START_EVENT handler");
    }

    protected void onTrigger() throws Exception {
        this.logger.trace("Executing default ON_TRIGGER_EVENT handler");
    }

    protected void onEndOfStream(EndOfStream endOfStream) throws Exception {
        this.logger.trace("Executing default ON_END_OF_STREAM_EVENT handler");
        if (this.endOnEos) {
            endStreamAndStop(endOfStream);
        }
    }

    protected void onEnd() throws Exception {
        this.logger.trace("Executing default ON_END_EVENT handler");
    }

    protected void onFutureComplete(FutureComplete futureComplete) throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onData(Object obj) throws Exception {
        this.logger.trace("Executing default ON_DATA_EVENT handler");
    }

    private Object wrapMessage(Object obj) {
        if (this.metadataWriters == null) {
            return obj;
        }
        WrappedMessage wrappedMessage = new WrappedMessage(obj);
        Iterator<MetadataWriter> it = this.metadataWriters.iterator();
        while (it.hasNext()) {
            it.next().writeMetadata(this, wrappedMessage);
        }
        return wrappedMessage;
    }

    private Object unwrapMessage(WrappedMessage wrappedMessage) throws Exception {
        if (this.metadataReaders != null) {
            Iterator<MetadataReader> it = this.metadataReaders.iterator();
            while (it.hasNext()) {
                it.next().readMetadata(this, wrappedMessage);
            }
        }
        return wrappedMessage.unwrap();
    }

    public synchronized WrappedMessage getReceivedWrappedMessage() {
        return this.receivedWrappedMessage;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final synchronized void broadcast(Object obj) {
        Contract.requires(this.state, ActorFSM.INITIALIZED, ActorFSM.STARTED);
        Object wrapMessage = wrapMessage(obj);
        for (ActorRef actorRef : this.listeners) {
            if (actorRef != null) {
                this.logger.comm("Sending DATA to " + Log.ACTOR(this.runner.name(actorRef)));
                actorRef.tell(wrapMessage, getSelf());
                this.logger.value("Sent DATA", obj);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void endStreamAndStop(EndOfStream endOfStream) throws Exception {
        if (this.sendEosOnEnd) {
            broadcast(endOfStream != null ? endOfStream : new EndOfStream());
        }
        onEnd();
        getContext().stop(getSelf());
        this.state = ActorFSM.ENDED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void endStreamAndStop() throws Exception {
        endStreamAndStop(null);
    }

    protected final void reportException(Exception exc) {
        this.logger.error(exc.getMessage());
        this.runner.getWorkflowRef().tell(new ExceptionMessage(exc), getSelf());
    }

    public KuratorActor logger(Logger logger) {
        this.logger = logger;
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publishProducts(Map<String, Object> map) {
        if (map != null) {
            this.logger.debug("Publishing " + map.size() + " products.");
            for (Map.Entry<String, Object> entry : map.entrySet()) {
                publishProduct(entry.getKey(), entry.getValue());
            }
            this.logger.trace("Done publishing products");
        }
    }

    protected void publishProduct(String str, Object obj) {
        publishProduct(str, obj, obj.getClass().getName());
    }

    protected void publishProduct(String str, Object obj, String str2) {
        WorkflowProduct workflowProduct = new WorkflowProduct(this.name, str2, str, obj);
        this.logger.value("Published product:", str, obj);
        ProductPublication productPublication = new ProductPublication(workflowProduct);
        this.logger.trace("Sending product to " + this.workflowRef);
        this.logger.comm("Sending value PUBLICATION_REQUEST message to WORKFLOW");
        this.workflowRef.tell(productPublication, getSelf());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publishArtifacts(Map<String, String> map) {
        if (map != null) {
            this.logger.debug("Publishing " + map.size() + " artifacts.");
            for (Map.Entry<String, String> entry : map.entrySet()) {
                publishArtifact(entry.getKey(), entry.getValue());
            }
            this.logger.trace("Done publishing artifacts");
        }
    }

    protected void publishArtifact(String str, String str2) {
        publishProduct(str, str2, "File");
    }

    protected void publishArtifact(String str, String str2, String str3) {
        WorkflowArtifact workflowArtifact = new WorkflowArtifact(this.name, str3, str, str2);
        this.logger.value("Published artifact:", str, str2);
        ProductPublication productPublication = new ProductPublication(workflowArtifact);
        this.logger.comm("Sending artifact PUBLICATION_REQUEST message to WORKFLOW");
        this.workflowRef.tell(productPublication, getSelf());
    }
}
