Skip to content
This repository was archived by the owner on Feb 2, 2021. It is now read-only.

Kiril/initial #1

Merged
merged 4 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
source "https://rubygems.org"

gemspec
126 changes: 126 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,129 @@
# fluent-plugin-AzureStorageAppend

[Fluentd](https://fluentd.org/) out plugin to do something.

Azure Storage Append Blob output plugin buffers logs in local file and uploads them to Azure Storage Append Blob periodically.

## Installation

### RubyGems

```
$ gem install fluent-plugin-azure-storage-append-blob
```

### Bundler

Add following line to your Gemfile:

```ruby
gem "fluent-plugin-azure-storage-append-blob"
```

And then execute:

```
$ bundle
```

## Configuration

```
<match pattern>
type azure-storage-append-blob

azure_storage_account <your azure storage account>
azure_storage_access_key <your azure storage access key>
azure_container <your azure storage container>
auto_create_container true
path logs/
azure_blob_name_format %{path}%{time_slice}_%{index}.log
time_slice_format %Y%m%d-%H
# if you want to use %{tag} or %Y/%m/%d/ like syntax in path / azure_blob_name_format,
# need to specify tag for %{tag} and time for %Y/%m/%d in <buffer> argument.
<buffer tag,time>
@type file
path /var/log/fluent/azurestorageappendblob
timekey 120 # 2 minutes
timekey_wait 60
timekey_use_utc true # use utc
</buffer>
</match>
```

### azure_storage_account (Required)

Your Azure Storage Account Name. This can be got from Azure Management potal.
This parameter is required when environment variable 'AZURE_STORAGE_ACCOUNT' is not set.

### azure_storage_access_key (Required)

Your Azure Storage Access Key(Primary or Secondary). This also can be got from Azure Management potal.
This parameter is required when environment variable 'AZURE_STORAGE_ACCESS_KEY' is not set.

### azure_container (Required)

Azure Storage Container name

### auto_create_container

This plugin create container if not exist when you set 'auto_create_container' to true.
Default: true

### azure_object_key_format

The format of Azure Storage object keys. You can use several built-in variables:

- %{path}
- %{time_slice}
- %{index}

to decide keys dynamically.

%{path} is exactly the value of *path* configured in the configuration file. E.g., "logs/" in the example configuration above.
%{time_slice} is the time-slice in text that are formatted with *time_slice_format*.
%{index} is used only if your blob exceed Azure 50000 blocks limit per blob to prevent data loss. Its not required to use this parameter.

The default format is "%{path}%{time_slice}-%{index}.log".

For instance, using the example configuration above, actual object keys on Azure Storage will be something like:

```
"logs/20130111-22-0.log"
"logs/20130111-23-0.log"
"logs/20130112-00-0.log"
```

With the configuration:

```
azure_object_key_format %{path}/events/ts=%{time_slice}/events.log
path log
time_slice_format %Y%m%d-%H
```

You get:

```
"log/events/ts=20130111-22/events.log"
"log/events/ts=20130111-23/events.log"
"log/events/ts=20130112-00/events.log"
```

The [fluent-mixin-config-placeholders](https://github.com/tagomoris/fluent-mixin-config-placeholders) mixin is also incorporated, so additional variables such as %{hostname}, etc. can be used in the azure_object_key_format. This could prove useful in preventing filename conflicts when writing from multiple servers.

```
azure_object_key_format %{path}/events/ts=%{time_slice}/events-%{hostname}.log
```

### time_slice_format

Format of the time used as the file name. Default is '%Y%m%d'. Use '%Y%m%d%H' to split files hourly.

### Run tests
$ gem install bundler
$ bundle install
$ bundle exec rake test

# Contributing

Expand Down
13 changes: 13 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require "bundler"
Bundler::GemHelper.install_tasks

require "rake/testtask"

Rake::TestTask.new(:test) do |t|
t.libs.push("lib", "test")
t.test_files = FileList["test/**/test_*.rb"]
t.verbose = true
t.warning = true
end

task default: [:test]
28 changes: 28 additions & 0 deletions fluent-plugin-azure-storage-append-blob.gemspec
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
lib = File.expand_path("../lib", __FILE__)
$LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-azure-storage-append-blob"
spec.version = "0.1.0"
spec.authors = ["Microsoft"]
spec.email = [""]

spec.summary = "Azure Storage Append Blob output plugin for Fluentd event collector"
spec.description = spec.summary
# spec.homepage = "TODO: Put your gem's website or public repo URL here."
spec.license = "MIT"

test_files, files = `git ls-files -z`.split("\x0").partition do |f|
f.match(%r{^(test|spec|features)/})
end
spec.files = files
spec.executables = files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.test_files = test_files
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 1.14"
spec.add_development_dependency "rake", "~> 12.0"
spec.add_development_dependency "test-unit", "~> 3.0"
spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
spec.add_runtime_dependency "azure-storage-blob", "~> 1.0.0"
end
175 changes: 175 additions & 0 deletions lib/fluent/plugin/out_azure-storage-append-blob.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#---------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
#--------------------------------------------------------------------------------------------*/

require 'fluent/plugin/output'
require 'azure/storage/blob'
require 'time'
require 'tempfile'

module Fluent
module Plugin
class AzureStorageAppendBlobOut < Fluent::Plugin::Output
Fluent::Plugin.register_output("azure-storage-append-blob", self)

helpers :formatter, :inject

DEFAULT_FORMAT_TYPE = "out_file"

config_param :path, :string, :default => ""
config_param :azure_storage_account, :string, :default => nil
config_param :azure_storage_access_key, :string, :default => nil, :secret => true
config_param :azure_container, :string, :default => nil
config_param :azure_object_key_format, :string, :default => "%{path}%{time_slice}-%{index}.log"
config_param :auto_create_container, :bool, :default => true
config_param :format, :string, :default => DEFAULT_FORMAT_TYPE
config_param :time_slice_format, :string, :default => '%Y%m%d'

DEFAULT_FORMAT_TYPE = "out_file"
AZURE_BLOCK_SIZE_LIMIT = 4 * 1024 * 1024 - 1

config_section :format do
config_set_default :@type, DEFAULT_FORMAT_TYPE
end

config_section :buffer do
config_set_default :chunk_keys, ['time']
config_set_default :timekey, (60 * 60 * 24)
end

attr_reader :bs

def configure(conf)
super

@formatter = formatter_create

if @localtime
@path_slicer = Proc.new {|path|
Time.now.strftime(path)
}
else
@path_slicer = Proc.new {|path|
Time.now.utc.strftime(path)
}
end

if @azure_container.nil?
raise ConfigError, 'azure_container is needed'
end
end

def multi_workers_ready?
true
end

def start
super

@bs = Azure::Storage::Blob::BlobService.create(storage_account_name: @azure_storage_account, storage_access_key: @azure_storage_access_key)

ensure_container

@azure_storage_path = ''
@last_azure_storage_path = ''
@current_index = 0
end

def format(tag, time, record)
r = inject_values_to_record(tag, time, record)
@formatter.format(tag, time, r)
end

def write(chunk)
metadata = chunk.metadata
tmp = Tempfile.new("azure-")
begin
chunk.write_to(tmp)
tmp.close

generate_log_name(metadata, @current_index)
if @last_azure_storage_path != @azure_storage_path
@current_index = 0
generate_log_name(metadata, @current_index)
end

content = File.open(tmp.path, 'rb') { |file| file.read }

append_blob(content)
@last_azure_storage_path = @azure_storage_path
ensure
tmp.unlink
end
end

private
def ensure_container
if ! @bs.list_containers.find { |c| c.name == @azure_container }
if @auto_create_container
@bs.create_container(@azure_container)
else
raise "The specified container does not exist: container = #{@azure_container}"
end
end
end

private
def generate_log_name(metadata, index)
time_slice = if metadata.timekey.nil?
''.freeze
else
Time.at(metadata.timekey).utc.strftime(@time_slice_format)
end

path = @path_slicer.call(@path)
values_for_object_key = {
"%{path}" => path,
"%{time_slice}" => time_slice,
"%{index}" => index
}
storage_path = @azure_object_key_format.gsub(%r(%{[^}]+}), values_for_object_key)
@azure_storage_path = extract_placeholders(storage_path, metadata)
end

private
def append_blob(content)
position = 0
log.debug "azure_storage_append_blob: append_blob.start: Content size: #{content.length}"
loop do
begin
size = [content.length - position, AZURE_BLOCK_SIZE_LIMIT].min
log.debug "azure_storage_append_blob: append_blob.chunk: content[#{position}..#{position + size}]"
@bs.append_blob_block(@azure_container, @azure_storage_path, content[position..position + size])
position += size
break if position >= content.length
rescue Azure::Core::Http::HTTPError => ex
status_code = ex.status_code

if status_code == 409 # exceeds azure block limit
@current_index += 1
old_azure_storage_path = @azure_storage_path
generate_log_name(metadata, time_slice, @current_index)

# If index is not a part of format, rethrow exception.
if old_azure_storage_path == @azure_storage_path
log.warn "azure_storage_append_blob: append_blob: blocks limit reached, you need to use %{index} for the format."
raise
end

log.debug "azure_storage_append_blob: append_blob: blocks limit reached, creating new blob #{@azure_storage_path}."
@bs.create_append_blob(@azure_container, @azure_storage_path)
elsif status_code == 404 # blob not found
log.debug "azure_storage_append_blob: append_blob: #{@azure_storage_path} blob doesn't exist, creating new blob."
@bs.create_append_blob(@azure_container, @azure_storage_path)
else
raise
end
end
end
log.debug "azure_storage_append_blob: append_blob.complete"
end

end
end
end
8 changes: 8 additions & 0 deletions test/helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
$LOAD_PATH.unshift(File.expand_path("../../", __FILE__))
require "test-unit"
require "fluent/test"
require "fluent/test/driver/output"
require "fluent/test/helpers"

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)
Loading