Skip to content

[Z Design Doc] DNS based service discovery and implementation notes

Richard Hightower edited this page Sep 29, 2015 · 13 revisions

Both SkyDNS/etcd and Consul provide DNS support. QBit has support for discovery via pushed json files (Chef, consul templating, etc), and from interacting directly with Consul.

To implement a ServiceDiscovery provider, one has to implement this interface:

ServiceDiscovery

package io.advantageous.qbit.service.discovery.spi;

import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.qbit.service.discovery.impl.ServiceHealthCheckIn;
import io.advantageous.qbit.util.ConcurrentHashSet;

import java.util.Collections;
import java.util.List;
import java.util.Queue;

/**
 * Service Discovery Provider.
 * created by rhightower on 3/24/15.
 */
public interface ServiceDiscoveryProvider {

    default void registerServices(Queue<EndpointDefinition> registerQueue) {
    }

    default void checkIn(Queue<ServiceHealthCheckIn> checkInsQueue) {
    }

    default List<EndpointDefinition> loadServices(String serviceName) {
        return Collections.emptyList();
    }

    default void unregisterServices(ConcurrentHashSet<EndpointDefinition> endpointDefinitions) {
    }
}

In the case of the file system based approach checkIn, registerServices, and unregisterServices are not used as the services are read from JSON files. In the case of the Consul implementation, checkIn, registerServices and unregisterServices delegate to calling Consul where checkIn is the service checking in with Consul before its Consul TTL runs out.

The SkyDNS model could at first just be a DNS model that is similar to the ServiceDiscoveryFileSystemProvider push JSON file approach, in that it only needs to provide loadServices. Later integration can be done with etcd to listen to changes, and then trigger a re-loadServices.

The ServiceDiscoveryProvider interface is used by the ServiceDiscovery impl which there is only one a this point ServiceDiscoveryImpl. The ServiceDiscoveryImpl takes a primary and a secondary ServiceDiscoveryProvider so if there is outage then one can use this secondary. This is the primary mechnsim it supports to fall back to JSON files using the ServiceDiscoveryFileSystemProvider if for example Consul is down. In this way a Consul outage, does not cause a complete outage. (Consul is reliable but new infrastructure that is using Consul is prone to non optimal Consul setups. This is more learning curve in my experience than any inherent problems with Consul.)

I propose initially to implement a ServiceDiscoveryDNSProvider which will expect DNS entries in the SkyDNS format so to speak.

The ServiceDiscovery interface is as follows:

/**
 * Service Discovery
 * created by rhightower on 3/23/15.
 */
public interface ServiceDiscovery extends Startable, Stoppable {

    static String uniqueString(int port) {
        try {
            return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-');
        } catch (UnknownHostException e) {
            return port + "-" + UUID.randomUUID().toString();
        }
    }

    default EndpointDefinition register(
            final String serviceName,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port);
    }

    default EndpointDefinition registerWithTTL(
            final String serviceName,
            final int port,
            final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port, timeToLiveSeconds);
    }

    @SuppressWarnings("UnusedReturnValue")
    default EndpointDefinition registerWithIdAndTimeToLive(
            final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port, timeToLiveSeconds);
    }

    default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port);
    }


    void watch(String serviceName);

    default void checkIn(String serviceId, HealthStatus healthStatus) {

    }


    default void checkInOk(String serviceId) {

    }

    default List<EndpointDefinition> loadServices(final String serviceName) {

        return Collections.emptyList();
    }

    default List<EndpointDefinition> loadServicesNow(final String serviceName) {

        return Collections.emptyList();
    }

    default void start() {
    }

    default void stop() {
    }


    default Set<EndpointDefinition> localDefinitions() {
        return Collections.emptySet();
    }
}

Ok rather than explaining what each method does, I added comments.

ServiceDiscovery with comments

package io.advantageous.qbit.service.discovery;

import io.advantageous.qbit.service.Startable;
import io.advantageous.qbit.service.Stoppable;
import io.advantageous.qbit.service.health.HealthStatus;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;

/**
 * Service Discovery
 * created by rhightower on 3/23/15.
 */
public interface ServiceDiscovery extends Startable, Stoppable {


    /**
     * Generates a unique string, used for generating unique service ids
     * @param port port
     * @return unique id incorporating host name if possible.
     */
    static String uniqueString(int port) {
        try {
            return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-');
        } catch (UnknownHostException e) {
            return port + "-" + UUID.randomUUID().toString();
        }
    }

    /**
     * Register the service with the service discovery service if applicable.
     * @param serviceName serviceName
     * @param port port
     * @return EndpointDefinition
     */
    default EndpointDefinition register(
            final String serviceName,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port);
    }

    /**
     * Register with the service discovery system and specify a TTL so that if
     * the service does not send a checkIn that it is marked down.
     * TTL is time to live.
     * @param serviceName service name
     * @param port port
     * @param timeToLiveSeconds ttl
     * @return EndpointDefinition
     */
    default EndpointDefinition registerWithTTL(
            final String serviceName,
            final int port,
            final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceName + "." + uniqueString(port),
                serviceName, null, port, timeToLiveSeconds);
    }

    /**
     * Register an end point given an id, and a TTL.
     * This gets used if you want to be specific about what you call the service.
     * @param serviceName service name
     * @param serviceId service id
     * @param port port
     * @param timeToLiveSeconds ttl
     * @return EndpointDefinition
     */
    @SuppressWarnings("UnusedReturnValue")
    default EndpointDefinition registerWithIdAndTimeToLive(
            final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port, timeToLiveSeconds);
    }

    /**
     * Register with id. Specify a unique id that is not autogenerated
     * @param serviceName service name
     * @param serviceId service id
     * @param port port
     * @return EndpointDefinition
     */
    default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                serviceId,
                serviceName, null, port);
    }


    /**
     * Watch for changes in this service name and send change events if the service changes.
     * @param serviceName
     */
    void watch(String serviceName);

    /**
     * CheckIn with the service discovery mechanism. The service may be marked as down if it does
     * not check in, in the amount of time specified by the ttl if the service disovery provider supports
     * ttl and checkin (Consul does).
     * @param serviceId
     * @param healthStatus
     */
    default void checkIn(String serviceId, HealthStatus healthStatus) {

    }


    /** This is like `checkIn` but does an HealthStatus.SUCCESS if applicable.
     *
     * @param serviceId serviceId
     */
    default void checkInOk(String serviceId) {

    }

    /**
     * Load the services.
     *
     * Depending on the underlying implementation the services are either periodically loaded
     * or loaded whenever a change is detected.
     *
     * This version of the method is based on the last event change and / or the last poll of the
     * services from the remote system (i.e., Consul) if applicable.
     *
     * This may also may trigger a remote call, but it will always return right away.
     * @param serviceName service name
     * @return list of EndpointDefinition
     */
    default List<EndpointDefinition> loadServices(final String serviceName) {

        return Collections.emptyList();
    }

    /**
     * See `loadServices` this is like `loadServices` except it forces a remote call.
     * This is a blocking call to loadServices.
     * @param serviceName service name.
     * @return list of EndpointDefinition
     */
    default List<EndpointDefinition> loadServicesNow(final String serviceName) {

        return Collections.emptyList();
    }

    /**
     * Start the service discovery system if applicable.
     */
    default void start() {
    }

    /**
     * Stop the service discovery system if applicable.
     */
    default void stop() {
    }


    /**
     * This just loads the end points that were registered locally.
     * This are the endpoints that this JVM and this ServiceDiscovery is managing.
     * @return set of EndpointDefinitions
     */
    default Set<EndpointDefinition> localDefinitions() {
        return Collections.emptySet();
    }
}

To take this discussion further we have EndpointDefinition and HealthStatus.

EndpointDefinition

package io.advantageous.qbit.service.discovery;

import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.service.health.HealthStatus;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import static io.advantageous.qbit.service.discovery.ServiceDiscovery.uniqueString;

/**
 * Service Definition
 * Contains a healthStatus, unique id, name, host, port and a timeToLive in seconds.
 * This describes all parts of a service as far as something like a ServiceDiscovery system like
 * [Consul](https://consul.io/) is concerned.
 *
 * The `timeToLive` field is for ttl checkins if the underlying system supports it.
 *
 * The `HealthStatus` represents the current state of this system as returned from the remote
 * service discovery system.
 *
 * created by rhightower on 3/23/15.
 */
public class EndpointDefinition {


    /**
     * Current health status.
     */
    private final HealthStatus healthStatus;

    /**
     * Unique id of the system.
     */
    private final String id;

    /**
     * Name of the service, i.e., EventBus, StatsEngine, etc.
     */
    private final String name;

    /**
     * Host name.
     */
    private final String host;

    /**
     * Port of the service.
     */
    private final int port;

    /**
     * Time to live: how long until this service has to check in with the remote service discovery
     * system if applicable. Whether this is used or needed depends on the underlying service discovery system.
     */
    private final long timeToLive;


    /**
     * Find host
     * @return hostname
     */
    static String findHostName() {
        try {
            return InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            throw new IllegalStateException("unable to find host name");
        }
    }


    /**
     * Create a new one with default TTL of 20 seconds.
     * @param healthStatus healthStatus
     * @param id id
     * @param name name
     * @param host post
     * @param port port
     */
    public EndpointDefinition(
            final HealthStatus healthStatus,
            final String id,
            final String name,
            final String host,
            final int port) {
        this.healthStatus = healthStatus;
        this.id = id;
        this.name = name;
        this.host = host;
        this.port = port;
        this.timeToLive = Sys.sysProp(EndpointDefinition.class.getName()+".timeToLive", 20L);
    }


    /**
     * Create a new one with default TTL of 20 seconds.
     * @param healthStatus healthStatus
     * @param id id
     * @param name name
     * @param host post
     * @param port port
     */
    public EndpointDefinition(
            final HealthStatus healthStatus,
            final String id,
            final String name,
            final String host,
            final int port,
            final long timeToLive) {
        this.healthStatus = healthStatus;
        this.id = id;
        this.name = name;
        this.host = host;
        this.port = port;
        this.timeToLive = timeToLive;
    }

    /**
     * Creates a list of service definitions.
     * @param endpointDefinitions vararg array of service definitions
     * @return list of service definitions
     */
    public static List<EndpointDefinition> serviceDefinitions(final EndpointDefinition... endpointDefinitions) {
        return Lists.list(endpointDefinitions);
    }

    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name name
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(final String name) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(0), name, findHostName(), 0);
    }

    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name service name
     * @param port port
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(final String name, int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(port), name, findHostName(), 0);
    }

    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param id id
     * @param name name
     * @param host host
     * @param port port
     * @return EndpointDefinition
     */
    public static EndpointDefinition serviceDefinition(
            final String id,
            final String name,
            final String host,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                id, name, host, port);
    }

    /**
     *  Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name name
     * @param host host
     * @param port port
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(
            final String name,
            final String host,
            final int port) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(port), name, host, port);
    }


    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param name name
     * @param host host
     * @return serviceDefinition
     */
    public static EndpointDefinition serviceDefinition(
            final String name,
            final String host
    ) {

        return new EndpointDefinition(HealthStatus.PASS,
                name + "-" + uniqueString(0), name, host, 0);
    }



    /**
     * Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
     * @param id id
     * @param name name
     * @param host host
     * @return EndpointDefinition
     */
    public static EndpointDefinition serviceDefinitionWithId(
            final String name,
            final String host,
            final String id) {

        return new EndpointDefinition(HealthStatus.PASS,
                id, name, host, 0);
    }

    public HealthStatus getHealthStatus() {
        return healthStatus;
    }

    public String getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public String getHost() {
        return host;
    }

    public int getPort() {
        return port;
    }

    @SuppressWarnings("SimplifiableIfStatement")
    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof EndpointDefinition)) return false;

        EndpointDefinition that = (EndpointDefinition) o;

        if (port != that.port) return false;
        if (healthStatus != that.healthStatus) return false;
        if (host != null ? !host.equals(that.host) : that.host != null) return false;
        if (id != null ? !id.equals(that.id) : that.id != null) return false;
        return !(name != null ? !name.equals(that.name) : that.name != null);

    }

    @Override
    public int hashCode() {
        int result = healthStatus != null ? healthStatus.hashCode() : 0;
        result = 31 * result + (id != null ? id.hashCode() : 0);
        result = 31 * result + (name != null ? name.hashCode() : 0);
        result = 31 * result + (host != null ? host.hashCode() : 0);
        result = 31 * result + port;
        return result;
    }

    @Override
    public String toString() {
        return "ServiceDefinition{" +
                "status=" + healthStatus +
                ", id='" + id + '\'' +
                ", name='" + name + '\'' +
                ", host='" + host + '\'' +
                ", port=" + port +
                '}';
    }

    public long getTimeToLive() {

        return timeToLive;
    }
}

etcd and DNS support.

We can reuse the concept of register with etcd, but we don't yet have use for ttl that I know of with etcd. CheckIn would be a no-op. But the first part of this exercise will be to access the DNS style entries of sky-dns that relies on etcd.

Setting up etcd and sky-dns for dev

Follow instructions at https://github.com/skynetservices/skydns.

First we download and install etcd.

etcd is written in Go and uses the Raft consensus algorithm to manage a highly-available replicated log. See etcdctl for a simple command line client. --https://github.com/coreos/etcd

$ mkdir skydns
$ cd skydns

#Download and run etcd
$ curl -L  https://github.com/coreos/etcd/releases/download/v2.2.0/etcd-v2.2.0-darwin-amd64.zip -o etcd-v2.2.0-darwin-amd64.zip
$ unzip etcd-v2.2.0-darwin-amd64.zip
$ cd etcd-v2.2.0-darwin-amd64
$ ./etcd

#Download and run skydns
$ export GOPATH=`pwd`
$ go get github.com/skynetservices/skydns
$ cd $GOPATH/src/github.com/skynetservices/skydns

If you want skydns to point to different servers do this.

export ETCD_MACHINES='http://192.168.0.1:4001,http://192.168.0.2:4001'

To run skydns

$ pwd
/.../skydns/src/github.com/skynetservices/skydns
$ ./skydns

By default it will use port 4001 on localhost.

To setup config, of sky dns, we post to etcd and kv pair.

curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/config \
    -d value='{"dns_addr":"127.0.0.1:5354","ttl":3600, \
       "nameservers": ["8.8.8.8:53","8.8.4.4:53"]}'

Output from setting up sky dns

{"action":"set","node":{"key":"/skydns/config",
"value":"{\"dns_addr\":\"127.0.0.1:5354\",
\"ttl\":3600, \"nameservers\": 
[\"8.8.8.8:53\",\"8.8.4.4:53\"]}",
"modifiedIndex":5,"createdIndex":5}}

Tutorials

__

Docs

Getting Started

Basics

Concepts

REST

Callbacks and Reactor

Event Bus

Advanced

Integration

QBit case studies

QBit 2 Roadmap

-- Related Projects

Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting

Clone this wiki locally