-
Notifications
You must be signed in to change notification settings - Fork 140
[Z Design Doc] DNS based service discovery and implementation notes
Both SkyDNS/etcd and Consul provide DNS support. QBit has support for discovery via pushed json files (Chef, consul templating, etc), and from interacting directly with Consul.
To implement a ServiceDiscovery
provider, one has to implement this interface:
package io.advantageous.qbit.service.discovery.spi;
import io.advantageous.qbit.service.discovery.EndpointDefinition;
import io.advantageous.qbit.service.discovery.impl.ServiceHealthCheckIn;
import io.advantageous.qbit.util.ConcurrentHashSet;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
/**
* Service Discovery Provider.
* created by rhightower on 3/24/15.
*/
public interface ServiceDiscoveryProvider {
default void registerServices(Queue<EndpointDefinition> registerQueue) {
}
default void checkIn(Queue<ServiceHealthCheckIn> checkInsQueue) {
}
default List<EndpointDefinition> loadServices(String serviceName) {
return Collections.emptyList();
}
default void unregisterServices(ConcurrentHashSet<EndpointDefinition> endpointDefinitions) {
}
}
In the case of the file system based approach checkIn
, registerServices
, and unregisterServices
are not used as the services are read from JSON files. In the case of the Consul implementation, checkIn
, registerServices
and unregisterServices
delegate to calling Consul where checkIn
is the service checking in with Consul before its Consul TTL runs out.
The SkyDNS model could at first just be a DNS model that is similar to the ServiceDiscoveryFileSystemProvider
push JSON file approach, in that it only needs to provide loadServices
. Later integration can be done with etcd
to listen to changes, and then trigger a re-loadServices
.
The ServiceDiscoveryProvider
interface is used by the ServiceDiscovery
impl which there is only one a this point ServiceDiscoveryImpl
. The ServiceDiscoveryImpl
takes a primary and a secondary ServiceDiscoveryProvider
so if there is outage then one can use this secondary. This is the primary mechnsim it supports to fall back to JSON files using the ServiceDiscoveryFileSystemProvider
if for example Consul is down. In this way a Consul outage, does not cause a complete outage. (Consul is reliable but new infrastructure that is using Consul is prone to non optimal Consul setups. This is more learning curve in my experience than any inherent problems with Consul.)
I propose initially to implement a ServiceDiscoveryDNSProvider
which will expect DNS entries in the SkyDNS format so to speak.
The ServiceDiscovery
interface is as follows:
/**
* Service Discovery
* created by rhightower on 3/23/15.
*/
public interface ServiceDiscovery extends Startable, Stoppable {
static String uniqueString(int port) {
try {
return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-');
} catch (UnknownHostException e) {
return port + "-" + UUID.randomUUID().toString();
}
}
default EndpointDefinition register(
final String serviceName,
final int port) {
return new EndpointDefinition(HealthStatus.PASS,
serviceName + "." + uniqueString(port),
serviceName, null, port);
}
default EndpointDefinition registerWithTTL(
final String serviceName,
final int port,
final int timeToLiveSeconds) {
return new EndpointDefinition(HealthStatus.PASS,
serviceName + "." + uniqueString(port),
serviceName, null, port, timeToLiveSeconds);
}
@SuppressWarnings("UnusedReturnValue")
default EndpointDefinition registerWithIdAndTimeToLive(
final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) {
return new EndpointDefinition(HealthStatus.PASS,
serviceId,
serviceName, null, port, timeToLiveSeconds);
}
default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) {
return new EndpointDefinition(HealthStatus.PASS,
serviceId,
serviceName, null, port);
}
void watch(String serviceName);
default void checkIn(String serviceId, HealthStatus healthStatus) {
}
default void checkInOk(String serviceId) {
}
default List<EndpointDefinition> loadServices(final String serviceName) {
return Collections.emptyList();
}
default List<EndpointDefinition> loadServicesNow(final String serviceName) {
return Collections.emptyList();
}
default void start() {
}
default void stop() {
}
default Set<EndpointDefinition> localDefinitions() {
return Collections.emptySet();
}
}
Ok rather than explaining what each method does, I added comments.
package io.advantageous.qbit.service.discovery;
import io.advantageous.qbit.service.Startable;
import io.advantageous.qbit.service.Stoppable;
import io.advantageous.qbit.service.health.HealthStatus;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* Service Discovery
* created by rhightower on 3/23/15.
*/
public interface ServiceDiscovery extends Startable, Stoppable {
/**
* Generates a unique string, used for generating unique service ids
* @param port port
* @return unique id incorporating host name if possible.
*/
static String uniqueString(int port) {
try {
return port + "-" + InetAddress.getLocalHost().getHostName().replace('.', '-');
} catch (UnknownHostException e) {
return port + "-" + UUID.randomUUID().toString();
}
}
/**
* Register the service with the service discovery service if applicable.
* @param serviceName serviceName
* @param port port
* @return EndpointDefinition
*/
default EndpointDefinition register(
final String serviceName,
final int port) {
return new EndpointDefinition(HealthStatus.PASS,
serviceName + "." + uniqueString(port),
serviceName, null, port);
}
/**
* Register with the service discovery system and specify a TTL so that if
* the service does not send a checkIn that it is marked down.
* TTL is time to live.
* @param serviceName service name
* @param port port
* @param timeToLiveSeconds ttl
* @return EndpointDefinition
*/
default EndpointDefinition registerWithTTL(
final String serviceName,
final int port,
final int timeToLiveSeconds) {
return new EndpointDefinition(HealthStatus.PASS,
serviceName + "." + uniqueString(port),
serviceName, null, port, timeToLiveSeconds);
}
/**
* Register an end point given an id, and a TTL.
* This gets used if you want to be specific about what you call the service.
* @param serviceName service name
* @param serviceId service id
* @param port port
* @param timeToLiveSeconds ttl
* @return EndpointDefinition
*/
@SuppressWarnings("UnusedReturnValue")
default EndpointDefinition registerWithIdAndTimeToLive(
final String serviceName, final String serviceId, final int port, final int timeToLiveSeconds) {
return new EndpointDefinition(HealthStatus.PASS,
serviceId,
serviceName, null, port, timeToLiveSeconds);
}
/**
* Register with id. Specify a unique id that is not autogenerated
* @param serviceName service name
* @param serviceId service id
* @param port port
* @return EndpointDefinition
*/
default EndpointDefinition registerWithId(final String serviceName, final String serviceId, final int port) {
return new EndpointDefinition(HealthStatus.PASS,
serviceId,
serviceName, null, port);
}
/**
* Watch for changes in this service name and send change events if the service changes.
* @param serviceName
*/
void watch(String serviceName);
/**
* CheckIn with the service discovery mechanism. The service may be marked as down if it does
* not check in, in the amount of time specified by the ttl if the service disovery provider supports
* ttl and checkin (Consul does).
* @param serviceId
* @param healthStatus
*/
default void checkIn(String serviceId, HealthStatus healthStatus) {
}
/** This is like `checkIn` but does an HealthStatus.SUCCESS if applicable.
*
* @param serviceId serviceId
*/
default void checkInOk(String serviceId) {
}
/**
* Load the services.
*
* Depending on the underlying implementation the services are either periodically loaded
* or loaded whenever a change is detected.
*
* This version of the method is based on the last event change and / or the last poll of the
* services from the remote system (i.e., Consul) if applicable.
*
* This may also may trigger a remote call, but it will always return right away.
* @param serviceName service name
* @return list of EndpointDefinition
*/
default List<EndpointDefinition> loadServices(final String serviceName) {
return Collections.emptyList();
}
/**
* See `loadServices` this is like `loadServices` except it forces a remote call.
* This is a blocking call to loadServices.
* @param serviceName service name.
* @return list of EndpointDefinition
*/
default List<EndpointDefinition> loadServicesNow(final String serviceName) {
return Collections.emptyList();
}
/**
* Start the service discovery system if applicable.
*/
default void start() {
}
/**
* Stop the service discovery system if applicable.
*/
default void stop() {
}
/**
* This just loads the end points that were registered locally.
* This are the endpoints that this JVM and this ServiceDiscovery is managing.
* @return set of EndpointDefinitions
*/
default Set<EndpointDefinition> localDefinitions() {
return Collections.emptySet();
}
}
To take this discussion further we have EndpointDefinition
and HealthStatus
.
package io.advantageous.qbit.service.discovery;
import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.service.health.HealthStatus;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import static io.advantageous.qbit.service.discovery.ServiceDiscovery.uniqueString;
/**
* Service Definition
* Contains a healthStatus, unique id, name, host, port and a timeToLive in seconds.
* This describes all parts of a service as far as something like a ServiceDiscovery system like
* [Consul](https://consul.io/) is concerned.
*
* The `timeToLive` field is for ttl checkins if the underlying system supports it.
*
* The `HealthStatus` represents the current state of this system as returned from the remote
* service discovery system.
*
* created by rhightower on 3/23/15.
*/
public class EndpointDefinition {
/**
* Current health status.
*/
private final HealthStatus healthStatus;
/**
* Unique id of the system.
*/
private final String id;
/**
* Name of the service, i.e., EventBus, StatsEngine, etc.
*/
private final String name;
/**
* Host name.
*/
private final String host;
/**
* Port of the service.
*/
private final int port;
/**
* Time to live: how long until this service has to check in with the remote service discovery
* system if applicable. Whether this is used or needed depends on the underlying service discovery system.
*/
private final long timeToLive;
/**
* Find host
* @return hostname
*/
static String findHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new IllegalStateException("unable to find host name");
}
}
/**
* Create a new one with default TTL of 20 seconds.
* @param healthStatus healthStatus
* @param id id
* @param name name
* @param host post
* @param port port
*/
public EndpointDefinition(
final HealthStatus healthStatus,
final String id,
final String name,
final String host,
final int port) {
this.healthStatus = healthStatus;
this.id = id;
this.name = name;
this.host = host;
this.port = port;
this.timeToLive = Sys.sysProp(EndpointDefinition.class.getName()+".timeToLive", 20L);
}
/**
* Create a new one with default TTL of 20 seconds.
* @param healthStatus healthStatus
* @param id id
* @param name name
* @param host post
* @param port port
*/
public EndpointDefinition(
final HealthStatus healthStatus,
final String id,
final String name,
final String host,
final int port,
final long timeToLive) {
this.healthStatus = healthStatus;
this.id = id;
this.name = name;
this.host = host;
this.port = port;
this.timeToLive = timeToLive;
}
/**
* Creates a list of service definitions.
* @param endpointDefinitions vararg array of service definitions
* @return list of service definitions
*/
public static List<EndpointDefinition> serviceDefinitions(final EndpointDefinition... endpointDefinitions) {
return Lists.list(endpointDefinitions);
}
/**
* Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
* @param name name
* @return serviceDefinition
*/
public static EndpointDefinition serviceDefinition(final String name) {
return new EndpointDefinition(HealthStatus.PASS,
name + "-" + uniqueString(0), name, findHostName(), 0);
}
/**
* Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
* @param name service name
* @param port port
* @return serviceDefinition
*/
public static EndpointDefinition serviceDefinition(final String name, int port) {
return new EndpointDefinition(HealthStatus.PASS,
name + "-" + uniqueString(port), name, findHostName(), 0);
}
/**
* Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
* @param id id
* @param name name
* @param host host
* @param port port
* @return EndpointDefinition
*/
public static EndpointDefinition serviceDefinition(
final String id,
final String name,
final String host,
final int port) {
return new EndpointDefinition(HealthStatus.PASS,
id, name, host, port);
}
/**
* Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
* @param name name
* @param host host
* @param port port
* @return serviceDefinition
*/
public static EndpointDefinition serviceDefinition(
final String name,
final String host,
final int port) {
return new EndpointDefinition(HealthStatus.PASS,
name + "-" + uniqueString(port), name, host, port);
}
/**
* Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
* @param name name
* @param host host
* @return serviceDefinition
*/
public static EndpointDefinition serviceDefinition(
final String name,
final String host
) {
return new EndpointDefinition(HealthStatus.PASS,
name + "-" + uniqueString(0), name, host, 0);
}
/**
* Creates a EndpointDefinition for a service, i.e., a serviceDefinition.
* @param id id
* @param name name
* @param host host
* @return EndpointDefinition
*/
public static EndpointDefinition serviceDefinitionWithId(
final String name,
final String host,
final String id) {
return new EndpointDefinition(HealthStatus.PASS,
id, name, host, 0);
}
public HealthStatus getHealthStatus() {
return healthStatus;
}
public String getId() {
return id;
}
public String getName() {
return name;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
@SuppressWarnings("SimplifiableIfStatement")
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof EndpointDefinition)) return false;
EndpointDefinition that = (EndpointDefinition) o;
if (port != that.port) return false;
if (healthStatus != that.healthStatus) return false;
if (host != null ? !host.equals(that.host) : that.host != null) return false;
if (id != null ? !id.equals(that.id) : that.id != null) return false;
return !(name != null ? !name.equals(that.name) : that.name != null);
}
@Override
public int hashCode() {
int result = healthStatus != null ? healthStatus.hashCode() : 0;
result = 31 * result + (id != null ? id.hashCode() : 0);
result = 31 * result + (name != null ? name.hashCode() : 0);
result = 31 * result + (host != null ? host.hashCode() : 0);
result = 31 * result + port;
return result;
}
@Override
public String toString() {
return "ServiceDefinition{" +
"status=" + healthStatus +
", id='" + id + '\'' +
", name='" + name + '\'' +
", host='" + host + '\'' +
", port=" + port +
'}';
}
public long getTimeToLive() {
return timeToLive;
}
}
We can reuse the concept of register with etcd, but we don't yet have use for ttl that I know of with etcd. CheckIn would be a no-op. But the first part of this exercise will be to access the DNS style entries of sky-dns that relies on etcd.
Follow instructions at https://github.com/skynetservices/skydns.
First we download and install etcd.
etcd is written in Go and uses the Raft consensus algorithm to manage a highly-available replicated log. See etcdctl for a simple command line client. --https://github.com/coreos/etcd
$ mkdir skydns
$ cd skydns
#Download and run etcd
$ curl -L https://github.com/coreos/etcd/releases/download/v2.2.0/etcd-v2.2.0-darwin-amd64.zip -o etcd-v2.2.0-darwin-amd64.zip
$ unzip etcd-v2.2.0-darwin-amd64.zip
$ cd etcd-v2.2.0-darwin-amd64
$ ./etcd
#Download and run skydns
$ export GOPATH=`pwd`
$ go get github.com/skynetservices/skydns
$ cd $GOPATH/src/github.com/skynetservices/skydns
If you want skydns to point to different servers do this.
export ETCD_MACHINES='http://192.168.0.1:4001,http://192.168.0.2:4001'
To run skydns
$ pwd
/.../skydns/src/github.com/skynetservices/skydns
$ ./skydns
By default it will use port 4001 on localhost.
To setup config, of sky dns, we post to etcd and kv pair.
curl -XPUT http://127.0.0.1:4001/v2/keys/skydns/config \
-d value='{"dns_addr":"127.0.0.1:5354","ttl":3600, \
"nameservers": ["8.8.8.8:53","8.8.4.4:53"]}'
{"action":"set","node":{"key":"/skydns/config",
"value":"{\"dns_addr\":\"127.0.0.1:5354\",
\"ttl\":3600, \"nameservers\":
[\"8.8.8.8:53\",\"8.8.4.4:53\"]}",
"modifiedIndex":5,"createdIndex":5}}
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting