package org.kurator.akka;

import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import java.util.concurrent.Callable;
import org.kurator.akka.messages.EndOfStream;
import org.kurator.akka.messages.FutureComplete;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;

/* loaded from: input_file:org/kurator/akka/FutureActor.class */
public class FutureActor extends KuratorActor {
    private int futuresCompleted = 0;
    private int futuresInitialized = 0;
    private boolean waitingForFutures = false;

    public FutureActor() {
        this.endOnEos = false;
    }

    @Override // org.kurator.akka.KuratorActor
    protected void onFutureComplete(FutureComplete futureComplete) throws Exception {
        broadcast(futureComplete.getValue());
        this.futuresCompleted++;
        if (this.waitingForFutures && this.futuresCompleted == this.futuresInitialized) {
            endStreamAndStop();
        }
    }

    @Override // org.kurator.akka.KuratorActor
    protected void onEndOfStream(EndOfStream endOfStream) throws Exception {
        this.waitingForFutures = true;
    }

    protected void future(Callable<?> callable, ExecutionContext executionContext) {
        Future future = Futures.future(callable, executionContext);
        this.futuresInitialized++;
        future.onComplete(new OnComplete() { // from class: org.kurator.akka.FutureActor.1
            @Override // akka.dispatch.OnComplete
            public void onComplete(Throwable th, Object obj) throws Throwable {
                FutureActor.this.self().tell(new FutureComplete(obj), FutureActor.this.getSelf());
            }
        }, executionContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void future(Callable callable, ExecutionContext executionContext, Mapper mapper) {
        Future map = Futures.future(callable, executionContext).map(mapper, executionContext);
        this.futuresInitialized++;
        map.onComplete(new OnComplete() { // from class: org.kurator.akka.FutureActor.2
            @Override // akka.dispatch.OnComplete
            public void onComplete(Throwable th, Object obj) throws Throwable {
                FutureActor.this.self().tell(new FutureComplete(obj), FutureActor.this.getSelf());
            }
        }, executionContext);
    }
}
