Skip to content

Commit

Permalink
Merge pull request #1 from yugui/feature/record-field
Browse files Browse the repository at this point in the history
Support record field
  • Loading branch information
tagomoris committed Jan 20, 2014
2 parents 73edf36 + 281a4e9 commit a5fc480
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 28 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ With this configuration, flushing will be done in 0.25 seconds after record inpu

* support Load API
* with automatically configured flush/buffer options
* support RECORD field
* and support optional data fields
* support optional data fields
* support NULLABLE/REQUIRED/REPEATED field options
* OAuth installed application credentials support
* Google API discovery expiration
Expand Down
136 changes: 110 additions & 26 deletions lib/fluent/plugin/out_bigquery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,25 @@ def configure(conf)

@tablelist = @tables ? @tables.split(',') : [@table]

@fields = {}
@fields = RecordSchema.new
if @field_string
@field_string.split(',').each do |fieldname|
@fields[fieldname] = :string
@fields.register_field fieldname, :string
end
end
if @field_integer
@field_integer.split(',').each do |fieldname|
@fields[fieldname] = :integer
@fields.register_field fieldname, :integer
end
end
if @field_float
@field_float.split(',').each do |fieldname|
@fields[fieldname] = :float
@fields.register_field fieldname, :float
end
end
if @field_boolean
@field_boolean.split(',').each do |fieldname|
@fields[fieldname] = :boolean
@fields.register_field fieldname, :boolean
end
end

Expand Down Expand Up @@ -204,7 +204,7 @@ def insert(table_id, rows)
"rows" => rows
}
)
if res.status != 200
unless res.success?
# api_error? -> client cache clear
@cached_client = nil

Expand All @@ -227,32 +227,14 @@ def load
raise NotImplementedError # TODO
end

def format_record(record)
out = {}
@fields.each do |key, type|
value = record[key]
next if value.nil? # field does not exists, or null value
out[key] = case type
when :string then record[key].to_s
when :integer then record[key].to_i
when :float then record[key].to_f
when :boolean then !!record[key]
# when :record
else
raise "BUG: unknown field type #{type}"
end
end
out
end

def format_stream(tag, es)
super
buf = ''
es.each do |time, record|
row = if @time_field
format_record(record.merge({@time_field => @timef.format(time)}))
@fields.format(record.merge({@time_field => @timef.format(time)}))
else
format_record(record)
@fields.format(record)
end
buf << {"json" => row}.to_msgpack unless row.empty?
end
Expand Down Expand Up @@ -292,5 +274,107 @@ def write(chunk)
# client.authorization = flow.authorize # browser authentication !
# client
# end

class FieldSchema
def format(value)
raise NotImplementedError, "Must implement in a subclass"
end
end

class StringFieldSchema < FieldSchema
def type
:string
end

def format(value)
value.to_s
end
end

class IntegerFieldSchema < FieldSchema
def type
:integer
end

def format(value)
value.to_i
end
end

class FloatFieldSchema < FieldSchema
def type
:float
end

def format(value)
value.to_f
end
end

class BooleanFieldSchema < FieldSchema
def type
:boolean
end

def format(value)
!!value
end
end

class RecordSchema < FieldSchema
FIELD_TYPES = {
:string => StringFieldSchema,
:integer => IntegerFieldSchema,
:float => FloatFieldSchema,
:boolean => BooleanFieldSchema
}.freeze

def initialize
@fields = {}
end

def type
:record
end

def [](name)
@fields[name]
end

def register_field(name, type)
raise ConfigError, "field #{name} is registered twice" if @fields.key?(name)
if name[/\./]
recordname = $`
fieldname = $'
register_record_field(recordname)
@fields[recordname].register_field(fieldname, type)
else
schema = FIELD_TYPES[type]
raise ConfigError, "[Bug] Invalid field type #{type}" unless schema
@fields[name] = schema.new
end
end

def format(record)
out = {}
@fields.each do |key, schema|
value = record[key]
next if value.nil? # field does not exists, or null value
out[key] = schema.format(value)
end
out
end

private
def register_record_field(name)
if !@fields.key?(name)
@fields[name] = RecordSchema.new
else
unless @fields[name].kind_of?(RecordSchema)
raise ConfigError, "field #{name} is required to be a record but already registered as #{@field[name]}"
end
end
end
end
end
end

0 comments on commit a5fc480

Please sign in to comment.