package org.kurator.akka;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import joptsimple.internal.Strings;
import org.kurator.util.SystemClasspathManager;
import org.python.core.PyBoolean;
import org.python.core.PyException;
import org.python.core.PyInteger;
import org.python.core.PyObject;
import org.python.core.PySystemState;
import org.python.icu.text.PluralRules;
import org.python.util.PythonInterpreter;

/* loaded from: input_file:org/kurator/akka/PythonActor.class */
public class PythonActor extends KuratorActor {
    public Class<? extends Object> inputType = Object.class;
    public Class<? extends Object> outputType = Object.class;
    public boolean broadcastNulls = false;
    public boolean outputTypeIsInputType = false;
    protected boolean usingPythonModule = false;
    protected String functionQualifier = "";
    protected String onInit = null;
    protected String onStart = null;
    protected String onData = null;
    protected String onEnd = null;
    protected PythonInterpreter interpreter;
    protected PyObject none;
    protected static String DEFAULT_ON_INIT = "on_init";
    protected static String DEFAULT_ON_START = "on_start";
    protected static String DEFAULT_ON_DATA = "on_data";
    protected static String DEFAULT_ON_END = "on_end";
    private static final String commonScriptHeader = "_KURATOR_INPUT_=None" + EOL + "_KURATOR_RESULT_=None" + EOL + "_KURATOR_OUTPUT_=None" + EOL + "_KURATOR_MORE_DATA_=False" + EOL + "" + EOL + "import types" + EOL + "import inspect" + EOL + "" + EOL + "def _is_generator(g):" + EOL + "  return isinstance(g, types.GeneratorType)" + EOL + "" + EOL + "def _is_global_function(f):" + EOL + "  return f in globals() and inspect.isfunction(globals()[f])" + EOL + "" + EOL + "def _function_arg_count(f):" + EOL + "  return len(inspect.getargspec(f)[0])" + EOL + "" + EOL + "def _get_next_data():" + EOL + "  global _KURATOR_OUTPUT_" + EOL + "  global _KURATOR_RESULT_" + EOL + "  global _KURATOR_MORE_DATA_" + EOL + "  try:" + EOL + "    _KURATOR_OUTPUT_=_KURATOR_RESULT_.next()" + EOL + "    return" + EOL + "  except StopIteration:" + EOL + "    _KURATOR_OUTPUT_=None" + EOL + "    _KURATOR_MORE_DATA_=False" + EOL;
    private static final String statelessOnInitWrapperTemplate = "def _call_oninit():" + EOL + "  %s%s()" + EOL;
    private static final String statefulOnInitWrapperTemplate = "def _call_oninit():" + EOL + "  global _KURATOR_STATE_" + EOL + "  %s%s(_KURATOR_STATE_)" + EOL;
    private static final String statelessOnStartWrapperTemplate = "def _call_onstart():" + EOL + "  global _KURATOR_OUTPUT_" + EOL + "  global _KURATOR_RESULT_" + EOL + "  global _KURATOR_MORE_DATA_" + EOL + "  _KURATOR_RESULT_ = %s%s()" + EOL + "  if _is_generator(_KURATOR_RESULT_):" + EOL + "    _KURATOR_MORE_DATA_=True" + EOL + "  else:" + EOL + "    _KURATOR_MORE_DATA_=False" + EOL + "    _KURATOR_OUTPUT_=_KURATOR_RESULT_" + EOL;
    private static final String statefulOnStartWrapperTemplate = "def _call_onstart():" + EOL + "  global _KURATOR_STATE_" + EOL + "  global _KURATOR_OUTPUT_" + EOL + "  global _KURATOR_RESULT_" + EOL + "  global _KURATOR_MORE_DATA_" + EOL + "  _KURATOR_RESULT_ = %s%s(_KURATOR_STATE_)" + EOL + "  if _is_generator(_KURATOR_RESULT_):" + EOL + "    _KURATOR_MORE_DATA_=True" + EOL + "  else:" + EOL + "    _KURATOR_MORE_DATA_=False" + EOL + "    _KURATOR_OUTPUT_=_KURATOR_RESULT_" + EOL;
    private static final String statelessOnDataWrapperTemplate = "def _call_ondata():" + EOL + "  global _KURATOR_INPUT_" + EOL + "  global _KURATOR_OUTPUT_" + EOL + "  global _KURATOR_RESULT_" + EOL + "  global _KURATOR_MORE_DATA_" + EOL + "  _KURATOR_RESULT_ = %s%s(_KURATOR_INPUT_)" + EOL + "  if _is_generator(_KURATOR_RESULT_):" + EOL + "    _KURATOR_MORE_DATA_=True" + EOL + "  else:" + EOL + "    _KURATOR_MORE_DATA_=False" + EOL + "    _KURATOR_OUTPUT_=_KURATOR_RESULT_" + EOL;
    private static final String statefulOnDataWrapperTemplate = "def _call_ondata():" + EOL + "  global _KURATOR_STATE_" + EOL + "  global _KURATOR_INPUT_" + EOL + "  global _KURATOR_OUTPUT_" + EOL + "  global _KURATOR_RESULT_" + EOL + "  global _KURATOR_MORE_DATA_" + EOL + "  _KURATOR_RESULT_ = %s%s(_KURATOR_INPUT_," + EOL + "                          _KURATOR_STATE_)" + EOL + "  if _is_generator(_KURATOR_RESULT_):" + EOL + "    _KURATOR_MORE_DATA_=True" + EOL + "  else:" + EOL + "    _KURATOR_MORE_DATA_=False" + EOL + "    _KURATOR_OUTPUT_=_KURATOR_RESULT_" + EOL;
    private static final String onEndWrapperTemplate = "def _call_onend():" + EOL + "  %s%s()" + EOL;
    private static final String statefulOnEndWrapperTemplate = "def _call_onend():" + EOL + "  global _KURATOR_STATE_" + EOL + "  %s%s(_KURATOR_STATE_)" + EOL;

    @Override // org.kurator.akka.KuratorActor
    protected synchronized void onInitialize() throws Exception {
        initializeJythonInterpreter();
        loadCommonHelperFunctions();
        loadCustomCode();
        configureCustomCode();
        this.onInit = loadEventHandler("onInit", DEFAULT_ON_INIT, 0, statelessOnInitWrapperTemplate, statefulOnInitWrapperTemplate);
        this.onStart = loadEventHandler("onStart", DEFAULT_ON_START, 0, statelessOnStartWrapperTemplate, statefulOnStartWrapperTemplate);
        this.onData = loadEventHandler("onData", DEFAULT_ON_DATA, 1, statelessOnDataWrapperTemplate, statefulOnDataWrapperTemplate);
        this.onEnd = loadEventHandler("onEnd", DEFAULT_ON_END, 0, onEndWrapperTemplate, statefulOnEndWrapperTemplate);
        Map<String, Object> initializeState = initializeState();
        applySettings();
        if (this.onInit != null) {
            this.interpreter.set("_KURATOR_STATE_", initializeState);
            this.interpreter.eval("_call_oninit()");
        }
    }

    private synchronized Map<String, Object> initializeState() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, Object> entry : this.settings.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    protected void configureCustomCode() throws Exception {
    }

    protected void loadCommonHelperFunctions() {
        this.interpreter.exec(commonScriptHeader);
    }

    protected synchronized void loadCustomCode() throws Exception {
        this.interpreter.set("__name__", "__kurator_actor__");
        String str = (String) this.configuration.get("script");
        if (str != null) {
            this.interpreter.execfile(str);
        }
        String str2 = (String) this.configuration.get("code");
        if (str2 != null) {
            this.interpreter.exec(str2);
        }
        String str3 = (String) this.configuration.get("module");
        if (str3 != null) {
            this.usingPythonModule = true;
            try {
                this.interpreter.exec("import " + str3);
                this.logger.info("Actor " + this.name + " imported module " + str3 + " from " + this.interpreter.eval(str3 + ".__file__").toString());
                this.functionQualifier = str3 + ".";
            } catch (PyException e) {
                this.logger.error("Error importing Python module " + str3 + PluralRules.KEYWORD_RULE_SEPARATOR + e.getMessage());
                System.out.println("Error: " + e);
                throw new Exception("Error importing Python module '" + str3 + "': " + e.value);
            }
        }
    }

    private synchronized void initializeJythonInterpreter() throws Exception {
        this.interpreter = new PythonInterpreter(null, new PySystemState());
        this.interpreter.setOut(this.outStream);
        this.interpreter.setErr(this.errStream);
        this.interpreter.exec("from org import python");
        this.interpreter.exec("import sys");
        this.interpreter.exec("import types");
        this.interpreter.exec("import inspect");
        this.none = this.interpreter.eval("None");
        if (this.configuration.containsKey("jython_path") && this.configuration.get("jython_path") != null) {
            this.interpreter.exec("sys.path.append(\"" + this.configuration.get("jython_path").toString() + "\")");
        }
        if (!this.configuration.containsKey("jython_home") || this.configuration.get("jython_home") == null) {
            return;
        }
        this.interpreter.exec("sys.path.append(\"" + this.configuration.get("jython_home").toString() + "/Lib/site-packages\")");
    }

    private Boolean isGlobalFunction(String str) {
        return Boolean.valueOf(((PyBoolean) this.interpreter.eval("_is_global_function('" + str + "')")).getBooleanValue());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Integer getArgCount(String str) {
        return Integer.valueOf(((PyInteger) this.interpreter.eval("_function_arg_count(" + str + ")")).asInt());
    }

    protected synchronized String loadEventHandler(String str, String str2, int i, String str3, String str4) throws Exception {
        return this.usingPythonModule ? loadEventHandlerModuleFunction(str, str2, i, str3, str4) : loadEventHandlerLocalFunction(str, str2, i, str3, str4);
    }

    private synchronized String loadEventHandlerLocalFunction(String str, String str2, int i, String str3, String str4) throws Exception {
        String str5 = null;
        String str6 = (String) this.configuration.get(str);
        if (str6 != null) {
            if (!isGlobalFunction(str6).booleanValue()) {
                throw new Exception("Custom " + str + " handler '" + str6 + "' not defined for actor '" + this.name + Strings.SINGLE_QUOTE);
            }
            str5 = str6;
        } else if (isGlobalFunction(str2).booleanValue()) {
            str5 = str2;
        }
        if (str5 != null) {
            int intValue = getArgCount(str5).intValue();
            if (intValue == i) {
                this.interpreter.exec(String.format(str3, this.functionQualifier, str5));
            } else if (intValue == i + 1) {
                this.interpreter.exec(String.format(str4, this.functionQualifier, str5));
            }
        }
        return str5;
    }

    private synchronized String loadEventHandlerModuleFunction(String str, String str2, int i, String str3, String str4) throws Exception {
        String str5 = null;
        String str6 = (String) this.configuration.get(str);
        if (str6 != null) {
            str5 = str6;
        } else if (isGlobalFunction(str2).booleanValue()) {
            str5 = str2;
        }
        try {
            int intValue = getArgCount(this.functionQualifier + str5).intValue();
            if (intValue == i) {
                this.interpreter.exec(String.format(str3, this.functionQualifier, str5));
            } else if (intValue == i + 1) {
                this.interpreter.exec(String.format(str4, this.functionQualifier, str5));
            }
            this.logger.trace("actualMethodName=" + str5);
            return str5;
        } catch (Exception e) {
            if (str6 != null) {
                throw new Exception("Custom " + str + " handler '" + str6 + "' not defined for actor '" + this.name + Strings.SINGLE_QUOTE);
            }
            return null;
        }
    }

    private synchronized void applySettings() {
        if (this.settings == null || this.functionQualifier.isEmpty()) {
            return;
        }
        for (Map.Entry<String, Object> entry : this.settings.entrySet()) {
            String str = this.functionQualifier + entry.getKey();
            Object value = entry.getValue();
            if (value instanceof String) {
                this.interpreter.exec(str + "='" + value + Strings.SINGLE_QUOTE);
            } else if (!(value instanceof Collection)) {
                this.interpreter.exec(str + "=" + value);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.kurator.akka.KuratorActor
    public synchronized void onStart() throws Exception {
        Map<String, Object> initializeState = initializeState();
        if (this.onStart != null) {
            this.interpreter.set("_KURATOR_STATE_", initializeState);
            this.interpreter.eval("_call_onstart()");
            broadcastOutputs();
        }
        if (this.onData == null) {
            endStreamAndStop();
        }
    }

    @Override // org.kurator.akka.KuratorActor
    public synchronized void onData(Object obj) throws Exception {
        Object mapInputs;
        this.logger.comm("onData: " + obj.toString());
        if (this.onData == null) {
            throw new Exception("No onData handler for actor " + this);
        }
        if (this.outputTypeIsInputType) {
            this.outputType = obj.getClass();
        }
        Map<String, Object> initializeState = initializeState();
        if (this.inputs.isEmpty()) {
            mapInputs = obj;
        } else {
            mapInputs = mapInputs(obj);
            ((Map) mapInputs).putAll(initializeState);
        }
        this.interpreter.set("_KURATOR_STATE_", initializeState);
        this.interpreter.set("_KURATOR_INPUT_", mapInputs);
        this.interpreter.eval("_call_ondata()");
        broadcastOutputs();
    }

    private synchronized Map<String, Object> mapInputs(Object obj) {
        HashMap hashMap = new HashMap();
        if (obj instanceof Map) {
            Map map = (Map) obj;
            for (Map.Entry<String, String> entry : this.inputs.entrySet()) {
                hashMap.put(entry.getValue(), map.get(entry.getKey()));
            }
        }
        return hashMap;
    }

    @Override // org.kurator.akka.KuratorActor
    protected synchronized void onEnd() {
        Map<String, Object> initializeState = initializeState();
        if (this.onEnd != null) {
            this.interpreter.set("_KURATOR_STATE_", initializeState);
            this.interpreter.eval("_call_onend()");
        }
        this.interpreter.close();
        this.interpreter = null;
    }

    private void broadcastOutputs() {
        if (!((Boolean) this.interpreter.get("_KURATOR_MORE_DATA_", Boolean.class)).booleanValue()) {
            broadcastOutput(this.interpreter.get("_KURATOR_OUTPUT_", this.outputType));
            return;
        }
        do {
            this.interpreter.eval("_get_next_data()");
            Object obj = this.interpreter.get("_KURATOR_OUTPUT_", this.outputType);
            if (obj != null) {
                broadcastOutput(obj);
            }
        } while (((Boolean) this.interpreter.get("_KURATOR_MORE_DATA_", Boolean.class)).booleanValue());
    }

    private void broadcastOutput(Object obj) {
        publishProducts(obj);
        if (obj != null || this.broadcastNulls) {
            broadcast(obj);
        }
    }

    private void publishProducts(Object obj) {
        if (obj == null || !(obj instanceof Map)) {
            return;
        }
        Map map = (Map) obj;
        Map<String, Object> map2 = (Map) map.get("products");
        if (map2 != null) {
            publishProducts(map2);
        }
        Map<String, String> map3 = (Map) map.get("artifacts");
        if (map3 != null) {
            publishArtifacts(map3);
        }
    }

    public static void updateClasspath() {
        for (String str : new String[]{"JYTHONPATH", "JYTHON_PATH"}) {
            addToClasspath(System.getenv(str));
        }
        for (String str2 : new String[]{"JYTHONHOME", "JYTHON_HOME"}) {
            String str3 = System.getenv(str2);
            if (str3 != null) {
                addToClasspath(str3 + "/Lib/site-packages");
            }
        }
        addToClasspath("packages");
    }

    private static void addToClasspath(String str) {
        if (str != null) {
            for (String str2 : str.split(System.getProperty("path.separator"))) {
                try {
                    SystemClasspathManager.addPath(str2);
                } catch (Exception e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String reconstructPythonSourcePath(String str) {
        int lastIndexOf = str.lastIndexOf("$py.class");
        return lastIndexOf != -1 ? str.substring(0, lastIndexOf) + ".py" : str;
    }
}
