package org.kurator.akka.actors;

import akka.actor.ActorRef;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.kurator.akka.KuratorActor;
import org.kurator.akka.messages.EndOfStream;

/* loaded from: input_file:org/kurator/akka/actors/IntegerStreamMerger.class */
public class IntegerStreamMerger extends KuratorActor {
    public int streamCount = 1;
    private Map<ActorRef, Queue<Object>> inputQueues = new HashMap();
    private Integer lastSent = Integer.MIN_VALUE;

    public IntegerStreamMerger() {
        this.endOnEos = false;
    }

    @Override // org.kurator.akka.KuratorActor
    public void onEndOfStream(EndOfStream endOfStream) throws Exception {
        addToInputQueue(getSender(), endOfStream);
        fire();
    }

    @Override // org.kurator.akka.KuratorActor
    public void onData(Object obj) throws Exception {
        ActorRef sender = getSender();
        if (obj instanceof Integer) {
            if (((Integer) obj).intValue() <= this.lastSent.intValue()) {
                return;
            } else {
                addToInputQueue(sender, obj);
            }
        }
        fire();
    }

    private void addToInputQueue(ActorRef actorRef, Object obj) {
        Queue<Object> queue = this.inputQueues.get(actorRef);
        if (queue == null) {
            queue = new LinkedList();
            this.inputQueues.put(actorRef, queue);
        }
        queue.add(obj);
    }

    private boolean isReadyToFire() {
        if (this.inputQueues.size() < this.streamCount) {
            return false;
        }
        Iterator<Queue<Object>> it = this.inputQueues.values().iterator();
        while (it.hasNext()) {
            if (it.next().size() < 1) {
                return false;
            }
        }
        return true;
    }

    private void fire() throws Exception {
        while (isReadyToFire()) {
            HashSet<Map.Entry> hashSet = new HashSet();
            for (Map.Entry<ActorRef, Queue<Object>> entry : this.inputQueues.entrySet()) {
                if (entry.getValue().peek() instanceof EndOfStream) {
                    hashSet.add(entry);
                }
            }
            for (Map.Entry entry2 : hashSet) {
                this.inputQueues.remove((ActorRef) entry2.getKey());
                int i = this.streamCount - 1;
                this.streamCount = i;
                if (i == 0) {
                    endStreamAndStop((EndOfStream) ((Queue) entry2.getValue()).peek());
                    return;
                }
            }
            Integer num = Integer.MAX_VALUE;
            Iterator<Queue<Object>> it = this.inputQueues.values().iterator();
            while (it.hasNext()) {
                Integer num2 = (Integer) it.next().peek();
                if (num2.intValue() < num.intValue()) {
                    num = num2;
                }
            }
            for (Queue<Object> queue : this.inputQueues.values()) {
                while (queue.peek() == num) {
                    queue.remove();
                }
            }
            if (num.intValue() > this.lastSent.intValue()) {
                this.lastSent = num;
                broadcast(num);
            }
        }
    }
}
