The Cameo Pattern

Sometimes an actor needs to get data from several sources to complete a request. Let's take an actor that returns all notifications from different social networks for a given user as an example. A first approach could be to use futures the following way:

public class NotificationsActor extends AbstractActor {

    private static final long TIMEOUT = 5000;
    private ActorRef twitterNotificationsActor;
    private ActorRef facebookNotificationsActor;

    public NotificationsActor(ActorRef twitterNotificationsActor,
                              ActorRef facebookNotificationsActor) {
        this.twitterNotificationsActor = twitterNotificationsActor;
        this.facebookNotificationsActor = facebookNotificationsActor;

        receive(ReceiveBuilder
                        .match(GetNotifications.class, this::getNotifications)
                        .build()
        );
    }

    public static Props props(ActorRef twitterNotificationsActor,
                              ActorRef facebookNotificationsActor){
        return Props.create(NotificationsActor.class,
                            twitterNotificationsActor,
                            facebookNotificationsActor);
    }

    private void getNotifications(GetNotifications getNotifications) {
        Set<Notification> notifications = new HashSet<>();
        ActorRef originalSender = sender();
        ActorRef self = self();

        Future<Object> twitterNotificationsFuture = Patterns.ask(twitterNotificationsActor,
                   new GetNotifications(), TIMEOUT);
        Future<Object> facebookNotificationsFuture = Patterns.ask(facebookNotificationsActor,
                   new GetNotifications(), TIMEOUT);

        Iterable<Future<Object>> futures = Lists.newArrayList(twitterNotificationsFuture, 
                   facebookNotificationsFuture);

        Future<Set<Notification>> unionFuture = fold(notifications, futures , 
              new Function2<Set<Notification>, Object, Set<Notification>>() {
            @Override
            public Set<Notification> apply(Set<Notification> previousResult,
                          Object currentResponse) {
                previousResult.addAll(((NotificationsResponse)
                                       currentResponse).getNotifications());
                return previousResult;
            }
        }, context().dispatcher());

        unionFuture.onSuccess(new OnSuccess<Set<Notification>>() {
            @Override
            public void onSuccess(Set<Notification> notifications) throws Throwable {
                originalSender.tell(new NotificationsResponse(notifications),
                                    self);
            }
        }, context().dispatcher());
    }

}

In the previous code we used the Futures.fold method from Akka to merge the results of both calls into a new Set. The fold method returns a new future that will be completed when the last of the input futures gets completed. We use this third future to send the response to the original sender. In this example, we are handling only the happy path and blissfully ignoring the possibility of failures or timeouts.
But this approach makes the code harder to test and reason about (even in this simple example). A better way is to use the Cameo pattern. The idea behind this pattern is to start a new actor responsible for waiting for all the data to be available, process it and send the response.

public class NotificationsActor extends AbstractActor {

    private ActorRef twitterNotificationsActor;
    private ActorRef facebookNotificationsActor;

    public NotificationsActor(ActorRef twitterNotificationsActor,
                              ActorRef facebookNotificationsActor) {
        this.twitterNotificationsActor = twitterNotificationsActor;
        this.facebookNotificationsActor = facebookNotificationsActor;

        receive(ReceiveBuilder
                  .match(GetNotifications.class, this::getNotifications)
                  .build()
        );
    }

    public static Props props(ActorRef twitterNotificationsActor,
                              ActorRef facebookNotificationsActor){
        return Props.create(NotificationsActor.class,
                            twitterNotificationsActor,
                            facebookNotificationsActor);
    }

    private void getNotifications(GetNotifications getNotifications) {
        ActorRef originalSender = sender();

        ActorRef notificationsResponseHandler = context()
             .actorOf(NotificationsResponseHandlerActor.props(originalSender));

        twitterNotificationsActor.tell(new GetNotifications(),
                                       notificationsResponseHandler);
        facebookNotificationsActor.tell(new GetNotifications(),
                                       notificationsResponseHandler);
    }

}
public class NotificationsResponseHandlerActor extends AbstractActor {

    private ActorRef originalSender;
    private FacebookNotificationsResponse facebookNotificationsResponse = null;
    private TwitterNotificationsResponse twitterNotificationsResponse = null;

    public NotificationsResponseHandlerActor(ActorRef originalSender) {
        this.originalSender = originalSender;
        receive(ReceiveBuilder
          .match(FacebookNotificationsResponse.class, this::handleFacebookNotificationsResponse)
          .match(TwitterNotificationsResponse.class, this::handleTwitterNotificationsResponse)
          .build()
        );
    }

    private void handleTwitterNotificationsResponse(TwitterNotificationsResponse twitterNotificationsResponse) {
        this.twitterNotificationsResponse = twitterNotificationsResponse;
        if(allResponsesArrived()){
            sendResponseToOriginalSenderAndShutdown();
        }
    }

    private void handleFacebookNotificationsResponse(FacebookNotificationsResponse facebookNotificationsResponse) {
        this.facebookNotificationsResponse = facebookNotificationsResponse;
        if(allResponsesArrived()){
            sendResponseToOriginalSenderAndShutdown();
        }
    }

    public static Props props(ActorRef originalSender){
        return Props.create(NotificationsResponseHandlerActor.class,
                            originalSender);
    }

    private boolean allResponsesArrived(){
        return facebookNotificationsResponse != null &&
               twitterNotificationsResponse != null;
    }

    private void sendResponseToOriginalSenderAndShutdown(){
        Set<Notification> allNotifications = new HashSet<>();
        allNotifications.addAll(twitterNotificationsResponse.getNotifications());
        allNotifications.addAll(facebookNotificationsResponse.getNotifications());
        originalSender.tell(new NotificationsResponse(allNotifications), self());
        context().stop(self());
    }

}

The code in NotificationsActor gets reduced to just starting the response handler actor and sending the messages to get the notifications from Facebook and Twitter. The NotificationsResponseHandlerActor waits until all the needed data is available and, after that, just sends the response to the original sender and terminates itself. We are again ignoring the case where the response to one or both of the messages never arrives. To handle this case, we can simply use a scheduled message to send an appropriate response to the original sender and shutdown the NotificationsResponseHandlerActor.
This pattern can be used whenever we need to synchronize the result of running several parallel processes.