Handling external dependencies with Akka

Introduction

We commonly use Akka actors to handle external dependencies that we know may fail for reasons beyond our control, like connectivity issues or temporary outages, and, sometimes, we are not sure how to face these failure scenarios in a safe way. In this guide, you'll be shown how we should handle this and how to fail-fast and recover from extreme failure scenarios.

How to connect and reconnect every time it is needed?

The best way to deal with these scenarios is to have a Connection Actor to handle connections. The responsibility of this actor will be to connect to the dependency and send the connections to the actors that will use them. From now on, we'll call the latter “workers”. Here is where the error kernel pattern comes in. Basically, it is just a simple guideline you should always try to follow, which states that if an actor carries important internal state, in this case, the connection, then it should delegate dangerous tasks to child actors, so as to prevent the state-carrying actor from crashing (if you want to read more, check the Akka documentation here). The connection actor will have a State, that will be “connected” or “disconnected” and their behavior will be different in each state.

How should the Connection actor behave?

As we said before, the main responsibility of the connection actor is to connect to the dependency. So, how do we do it?
- Keeping the constructor as simple as possible. The first rule is to never use connection logic in the constructor. Akka has “supervision strategies” to handle failures, and the default strategy, called “one for one strategy,” will restart an actor that failed, but if the failure is in the constructor, instead of restarting it, Akka will kill it with an “ActorInitializationException”, and, hence, there will be no reconnection available if all the actors are dead.

  • Using messages to connect.

We should set the State to “disconnected” in the constructor and the actor can send a message to itself. When it receives that message, its behavior depends on the State. That is, if it is “connected,” it ignores the message; if not, it can handle our connection logic. It doesn't matter whether we are sending the message from the constructor, as messages are handled by actors after the initialization is ready, thus a failure in that logic will cause a restart in the actor and it will not kill it. In order to establish a connection, we'll add a try catch block with a specific exception in the connection logic (we need to do research and find out what exception our dependency will throw when it fails to connect). If we don't capture this exception, our actor will instantly restart multiple times when our dependency is down, and we won't be able to wait between attempts. On the one hand, if the connection fails with that specific exception, we'll schedule a reconnection after a certain amount of time. On the other hand, if it fails with any other exception, we'll let the actor restart. Once we are able to connect, we change the State to “connected.”

  • Creating Workers

The connection actor will be the “father” of the workers. Why? The answer is simple. We can use the Akka supervision strategy to restart the worker every time that it dies for any failure, and this will help us reconnect. The connection actor can send a message to itself again in the constructor to create workers, and, when that happens, we'll set the “one for one strategy” as supervision strategy with infinite retries (setting a negative number) and we'll tell it to restart if the worker fails. Depending on your dependency, there are some cases where you will need to restart the connection actor when a worker dies. In that case, you can use other supervision strategies, such as AllForOneStrategy.

  • Sending connections to the workers

In order to send connections to the workers, first we'll wait until they ask for one, and they will do that by sending a message. Once the connection actor receives this message, our behavior will depend on the State. That means that if it is “Connected,” we'll send them the connection they are requesting, but if it is “disconnected,” we'll completely ignore the request (the workers have to worry about that and ask for a connection again).

How should the Workers behave?

Workers will also have a State, and it'll also be “connected” or “disconnected.” The main responsibility of the workers is to use the connection we are giving to them. All the interactions between the workers and the connection should not have try / catch blocks and handle failures locally. You should let them fail instead, and, when this happens, the connection actor will restart the worker and therefore, it will reconnect.

  • Asking for a connection

In the constructor, we should set the State to “disconnected” and then, schedule a task to ask for a new connection every x amount of time. As we have seen before, the connection actor will ignore our message if its State is disconnected, so we'll keep on asking for a connection until we get one.

  • Getting a connection:

When we get a new connection, the actor’s behavior depends on the State. If it is “connected,” it will ignore the message, while if not it will use that connection and set the State to “connected.”

  • Health checks

This step depends on the use you are giving to the dependency. If you are not constantly using the connection (or you don’t know if you are) and you want to ensure that it’ll be working when you need it, you can schedule a health check. This health check is just a scheduled Akka message that the actor will receive every x amount of time, let’s say 5 seconds, and the actor that receives the message should perform an action that tells you if the dependency is working or not. In this case, you should kill the actor with an exception and let it reconnect.

Code sample

  • Connection actor
public class ConnectionActor extends AbstractActor {

    private State state;
    protected Connection connection;

    public ConnectionActor() {
        receive(ReceiveBuilder
                .match(Connect.class, this::handleConnect)
                .match(ConnectionRequest.class, this::handleDeliverConnection)
                .match(CreateWorkers.class, this::handleCreateWorkers)
                .build());

        setState(State.DISCONNECTED);
        self().tell(new Connect(), ActorRef.noSender());
        self().tell(new CreateWorkers(), ActorRef.noSender());
    }

    private void handleConnect(Connect connect) {
        //Here, you should also check if the connection is working
        if (connection != null && connection.isWorking()) {
            setState(State.CONNECTED);
            return;
        }

        try {
            //Create the connection
            connection = new Connection();

            if (connection.isWorking()) {
                setState(State.CONNECTED);
            }
        } catch (IOException e) {
            //Retry in 5 seconds
            int retryInterval = 5;
            reconnectAfterBackoffTime(retryInterval);
        }

    }

    private void reconnectAfterBackoffTime(int retryIntervalSeconds) {
        context().system().scheduler().scheduleOnce(
                Duration.create(retryIntervalSeconds, TimeUnit.SECONDS),
                self(),
                new Connect(),
                context().dispatcher(),
                ActorRef.noSender()
        );
    }

    private void handleDeliverConnection(ConnectionRequest connectionRequest) {
        if (getState() == State.CONNECTED && connection.isWorking()) {
            sender().tell(connection, self());
        }
    }

    private void handleCreateWorkers(CreateWorkers createWorkers) {
        //Create your workers here, you should use this supervision strategy:
        context().actorOf(Props.create(WorkerActor.class)
                        .withRouter(new RoundRobinPool(10)
                                .withSupervisorStrategy(new OneForOneStrategy(-1, Duration.Inf(), t -> restart(), true))),
                WorkerActor.class.getSimpleName());
    }

    private State getState() {
        return state;
    }

    private void setState(State state) {
        this.state = state;
    }

    private Connection getConnection() {
        return connection;
    }

    private void setConnection(Connection connection) {
        this.connection = connection;
    }

    private enum State {
        DISCONNECTED,
        CONNECTED
    }

}
}
  • Worker
public class WorkerActor extends AbstractActor {

    protected Cancellable requestConnectionScheduledTask;
    protected Cancellable healthCheckScheduledTask;
    private Connection connection;
    private State state;

    public WorkerActor() {
        receive(ReceiveBuilder
                .match(Connection.class, this::handleConnectionReceived)
                .match(HealthCheck.class, this::handleHealthCheck)
                .match(ConnectionRequest.class, this::requestConnection)
                .build());

        setState(State.DISCONNECTED);
        //request connection every 2 seconds
        requestConnectionScheduledTask = context().system().scheduler().schedule(FiniteDuration.Zero(), FiniteDuration.apply(2, TimeUnit.SECONDS), self(), new ConnectionRequest(), context().dispatcher(), self());
    }

    protected void handleHealthCheck(HealthCheck healthCheck) throws ConnectionClosedException {
        if (getState() == State.CONNECTED && !connection.isWorking()) {
            healthCheckScheduledTask.cancel();
            //Throw an exception to restart the actor
            throw new ConnectionClosedException("HealthCheck failed, connection is closed");
        }
    }

    protected void requestConnection(ConnectionRequest connectionRequest) {
        context().system().actorSelection("path/to/connectionActor")
                .tell(connectionRequest, self());
    }

    protected void handleConnectionReceived(Connection connection) throws IOException {
        //if state is connected, it means that we have a connection and it is working, so we ignore the new one
        if (getState() == State.DISCONNECTED) {
            this.connection = connection;
            setState(State.CONNECTED);
            //cancel the task to request connection
            requestConnectionScheduledTask.cancel();
            healthCheckScheduledTask = context().system().scheduler().schedule(FiniteDuration.Zero(), FiniteDuration.apply(5, TimeUnit.SECONDS), self(), new HealthCheck(), context().dispatcher(), self());
        }

        //From here, you can start using your connection

    }

    public State getState() {
        return state;
    }

    public void setState(State state) {
        this.state = state;
    }

    public enum State {
        DISCONNECTED,
        CONNECTED
    }

}

Conclusion

Don’t worry about actors restarting multiple times, when we work with Akka for the first time. We may think that letting actors die and start again is a bad idea, but it is not. Akka is prepared to handle that and it will ensure us that the connection logic will execute every time it is needed.