-
Notifications
You must be signed in to change notification settings - Fork 290
Add New Source
Here is how the new plugin system works. Each data source need to implement one function:
type Sourcer interface {
Generate(*Flow) *Dataset
}
Simple enough?
Here are the an example implementation for Cassandra data source.
func (s *CassandraSource) Generate(f *flow.Flow) *flow.Dataset {
return s.genShardInfos(f).RoundRobin(s.Concurrency).Mapper(MapperReadShard)
}
genShardInfos(f)
will generate an initial dataset with a list of CassandraShardInfo
objects.
The CassandraShardInfo
objects are distributed to a few number of executors. Each executor takes one CassandraShardInfo
object, connect to the source, and read the corresponding Cassandra shard.
It should be obvious that since CassandraShardInfo
objects are sent to remote executors, the objects should be serializable and deserializable.
With this simple Sourcer
interface, an actual data source can be implemented in pure Go in any way you want.
-
The
MapperReadShard
is a pure Go function. -
CassandraShardInfo
object should be serializable and deserializable. -
Data partitioning is determined by the number of
CassandraShardInfo
objects generated fromgenShardInfos(f)
. OneCassandraShardInfo
object corresponds to one data partition.
See cassandra_source.go for the example.