Skip to content

Commit

Permalink
Enabled Enumerable for deferred row processing
Browse files Browse the repository at this point in the history
README now markdown (yay!)

Smoothed out test app
  • Loading branch information
afair committed Jun 9, 2014
1 parent 0bf17ef commit db91012
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 79 deletions.
160 changes: 103 additions & 57 deletions README.rdoc → README.md
Original file line number Diff line number Diff line change
@@ -1,45 +1,14 @@
= PostgreSQLCursor for handling large Result Sets
#PostgreSQLCursor for handling large Result Sets

{<img src="https://badge.fury.io/rb/postgresql_cursor.svg" alt="Gem Version" />}[http://badge.fury.io/rb/postgresql_cursor]

PostgreSQLCursor extends ActiveRecord to allow for efficient processing of queries
returning a large number of rows.
returning a large number of rows, and allows you to sort your result set.

== Why use this?
Version 0.5.0 has been refactored to install more smoothly into ActiveRecord.
It supports Rails and ActiveRecord 3.2.x and up.

ActiveRecord is designed and optimized for web performance. In a web transaction, only a "page" of
around 20 rows is returned to the user. When you do this

Model.find(:all, :conditions=>["id>0"]

The database returns all matching result set rows to ActiveRecord, which instantiates each row with
the data returned. This function returns an array of all these rows to the caller.

Asyncronous, Background, or Offline processing may require processing a large amount of data.
When there is a very large number of rows, this requires a lot more memory to hold the data. Ruby
does not return that memory after processing the array, and the causes your process to "bloat". If you
don't have enough memory, it will cause an exception.

== Enter find_each

To solve this problem, ActiveRecord gives us two alternative methods that work in "chunks" of your data:

Model.where("id>0").find_each { |model| model.process! }

Model.where("id>0").find_in_batches do |batch|
batch.each { |model| model.process! }
end

Optionally, you can specify a :batch_size option as the size of the "chunk", and defaults to 1000.

There are drawbacks with these methods:

* You cannot specify the order, it will be ordered by the primary key (usually id)
* The primary key must be numeric
* The query is rerun for each chunk (1000 rows), starting at the next id sequence.
* You cannot use overly complex queries as that will be rerun and incur more overhead.

== PostgreSQLCursor FTW!
##Use Cursors

PostgreSQLCursor was developed to take advantage of PostgreSQL's cursors. Cursors allow the program
to declare a cursor to run a given query returning "chunks" of rows to the application program while
Expand All @@ -53,34 +22,51 @@ performance.

With PostgreSQL, you can work with cursors as follows:

Model.where("id>0").each_row { |hash| Model.process(hash) }
```ruby
Product.where("id>0").order("name").each_row { |hash| Product.process(hash) }

Product.where("id>0").each_instance { |product| product.process! }
Product.where("id>0").each_instance(block_size:100_000) { |product| product.process }

Product.each_row { |hash| Product.process(hash) }
Product.each_instance { |product| product.process }

Model.where("id>0").each_instance { |model| model.process! }
Model.where("id>0").each_instance(block_size:100000) { |model| model.process! }
Product.each_row_by_sql("select * from products") { |hash| Product.process(hash) }
Product.each_instance_by_sql("select * from products") { |product| product.process }
```

Model.each_row_by_sql("select * from models") { |hash| Model.process(hash) }
###PostgreSQLCursor is an Enumerable

Model.each_instance_by_sql("select * from models") { |model| model.process }
If you do not pass in a block, the cursor is returned, which mixes in the Enumerable
libary. With that, you can pass it around, or chain in the awesome enumerable things
like `map` and `reduce`. Furthermore, the cursors already act as `lazy`, but you can
also chain in `lazy` when you want to keep the memory footprint small for rest of the process.

```ruby
Product.each_row.map {|r| r["id"].to_i } #=> [1, 2, 3, ...]
Product.each_instance.map { |r| r.id } #=> [1, 2, 3, ...]
Product.each_instance.lazy.inject(0) {|sum,r| sum + r.quantity } #=> 499500
```

All these methods take an options hash to control things more:

block_size:n The number of rows to fetch from the database each time (default 1000)
while:value Continue looping as long as the block returns this value
until:value Continue looping until the block returns this value
connection:conn Use this connection instead of the current model connection
fraction:float A value to set for the cursor_tuple_fraction variable.
PostgreSQL uses 0.1 (optimize for 10% of result set)
This library uses 1.0 (Optimize for 100% of the result set)
Do not override this value unless you understand it.
block_size:n The number of rows to fetch from the database each time (default 1000)
while:value Continue looping as long as the block returns this value
until:value Continue looping until the block returns this value
connection:conn Use this connection instead of the current Product connection
fraction:float A value to set for the cursor_tuple_fraction variable.
PostgreSQL uses 0.1 (optimize for 10% of result set)
This library uses 1.0 (Optimize for 100% of the result set)
Do not override this value unless you understand it.

Notes:

* Use cursors only for large result sets. It has more overhead with the database
* Use cursors *only* for large result sets. They have more overhead with the database
than ActiveRecord selecting all matching records.
* Aliases each_hash and each_hash_by_sql are provided ifor each_row and each_row_by_sql
* Aliases each_hash and each_hash_by_sql are provided for each_row and each_row_by_sql
if you prefer to express what types are being returned.

== Hashes vs. Instances
###Hashes vs. Instances

The each_row method returns the Hash of strings for speed (as this allows you to process a lot of rows).
Hashes are returned with String values, and you must take care of any type conversion.
Expand All @@ -92,16 +78,76 @@ If you find you need the types cast for your attributes, consider using each_ins
insead. ActiveRecord's read casting algorithm will only cast the values you need and
has become more efficient over time.

==Authors
Allen Fair, @allenfair, http://github.com/afair
###Select and Pluck

To limit the columns returned to just those you need, use `.select(:id, :name)`
query method.

```ruby
Product.select(:id, :name).each_row { |product| product.process }
```

Pluck is a great alternative instead of using a cursor. It does not instantiate
the row, and builds an array of result values, and translates the values into ruby
values (numbers, Timestamps. etc.). Using the cursor would still allow you to lazy
load them in batches.

```ruby
Product.newly_arrived.pluck(:id) #=> [1, 2, 3, 4, etc.]
Product.newly_arrived.each_row { |hash| }
Product.select(:id).each_row.map {|r| r["id"].to_i } # cursor instead of pluck
```

##Background: Why PostgreSQL Cursors?

ActiveRecord is designed and optimized for web performance. In a web transaction, only a "page" of
around 20 rows is returned to the user. When you do this

```ruby
Product.find_each { |product| product.process }
```

The database returns all matching result set rows to ActiveRecord, which instantiates each row with
the data returned. This function returns an array of all these rows to the caller.

Asyncronous, Background, or Offline processing may require processing a large amount of data.
When there is a very large number of rows, this requires a lot more memory to hold the data. Ruby
does not return that memory after processing the array, and the causes your process to "bloat". If you
don't have enough memory, it will cause an exception.

###ActiveRecord.find_each and find_in_batches

To solve this problem, ActiveRecord gives us two alternative methods that work in "chunks" of your data:

```ruby
Product.where("id>0").find_each { |model| Product.process }

Product.where("id>0").find_in_batches do |batch|
batch.each { |model| Product.process }
end
```

Optionally, you can specify a :batch_size option as the size of the "chunk", and defaults to 1000.

There are drawbacks with these methods:

* You cannot specify the order, it will be ordered by the primary key (usually id)
* The primary key must be numeric
* The query is rerun for each chunk (1000 rows), starting at the next id sequence.
* You cannot use overly complex queries as that will be rerun and incur more overhead.


##Meta
###Author
Allen Fair, [@allenfair](https://twitter/com/allenfair), http://github.com/afair

Thanks to:

* Iulian Dogariu, http://github.com/iulianu (Fixes)
* Julian Mehnle, julian@mehnle.net (Suggestions)
* ...And all the other contributers!

== Note on Patches/Pull Requests
###Note on Patches/Pull Requests

* Fork the project.
* Make your feature addition or bug fix.
Expand All @@ -111,6 +157,6 @@ Thanks to:
(if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull)
* Send me a pull request. Bonus points for topic branches.

== Copyright
###Copyright

Copyright (c) 2010-2014 Allen Fair. See LICENSE for details.
Copyright (c) 2010-2014 Allen Fair. See (MIT) LICENSE for details.
20 changes: 16 additions & 4 deletions lib/postgresql_cursor/active_record/relation/cursor_iterators.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ module CursorIterators
#
# Example:
# Post.where(user_id:123).each_row { |hash| Post.process(hash) }
# Post.each_row.map {|r| r["id"].to_i }
#
# Returns the number of rows yielded to the block
def each_row(options={}, &block)
options = {:connection => self.connection}.merge(options)
PostgreSQLCursor::Cursor.new(to_sql, options).each(&block)
if block_given?
PostgreSQLCursor::Cursor.new(to_sql, options).each(&block)
else
PostgreSQLCursor::Cursor.new(to_sql, options)
end
end
alias :each_hash :each_row

Expand All @@ -29,16 +34,23 @@ def each_row(options={}, &block)
#
# Example:
# Post.where(user_id:123).each_instance { |post| post.process }
# Post.where(user_id:123).each_instance.map { |post| post.process }
#
# Returns the number of rows yielded to the block
def each_instance(options={}, &block)
options = {:connection => self.connection}.merge(options)
options[:symbolize_keys] = false # Must be strings to initiate
PostgreSQLCursor::Cursor.new(to_sql, options).each do |row, column_types|
model = ::ActiveRecord::VERSION::MAJOR < 4 ? instantiate(row) : instantiate(row, column_types)
yield model

if block_given?
PostgreSQLCursor::Cursor.new(to_sql, options).each do |row, column_types|
model = ::ActiveRecord::VERSION::MAJOR < 4 ? instantiate(row) : instantiate(row, column_types)
yield model
end
else
PostgreSQLCursor::Cursor.new(to_sql, options).instance_iterator(self)
end
end

end
end
end
Expand Down
23 changes: 19 additions & 4 deletions lib/postgresql_cursor/active_record/sql_cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ def each_instance(options={}, &block)
#
# Example:
# Post.each_row_by_sql("select * from posts") { |hash| Post.process(hash) }
# Post.each_row_by_sql("select * from posts").count
#
# Returns the number of rows yielded to the block
def each_row_by_sql(sql, options={}, &block)
options = {:connection => self.connection}.merge(options)
PostgreSQLCursor::Cursor.new(sql, options).each(&block)
if block_given?
PostgreSQLCursor::Cursor.new(sql, options).each(&block)
else
PostgreSQLCursor::Cursor.new(sql, options)
end
end
alias :each_hash_by_sql :each_row_by_sql

Expand All @@ -59,14 +64,24 @@ def each_row_by_sql(sql, options={}, &block)
#
# Example:
# Post.each_instance_by_sql("select * from posts") { |post| post.process }
# Post.each_instance_by_sql("select * from posts").count
#
# Returns the number of rows yielded to the block
def each_instance_by_sql(sql, options={}, &block)
options = {:connection => self.connection}.merge(options)
PostgreSQLCursor::Cursor.new(sql, options).each do |row|
model = instantiate(row)
yield model
if block_given?
PostgreSQLCursor::Cursor.new(sql, options).each do |row, column_types|
model = ::ActiveRecord::VERSION::MAJOR < 4 ? instantiate(row) : instantiate(row, column_types)
yield model
end
else
PostgreSQLCursor::Cursor.new(sql, options).instance_iterator(self)
end

#PostgreSQLCursor::Cursor.new(sql, options).each do |row|
# model = instantiate(row)
# yield model
#end
end
end
end
Expand Down
22 changes: 19 additions & 3 deletions lib/postgresql_cursor/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def initialize(sql, options={})
@options = options
@connection = @options.fetch(:connection) { ::ActiveRecord::Base.connection }
@count = 0
@iterate = :rows
end

def instance_iterator(type)
@iterate = :instances
@type = type
self
end

# Public: Yields each row of the result set to the passed block
Expand All @@ -61,9 +68,18 @@ def each(&block)
while (row = fetch) do
break if row.size==0
@count += 1
row = cast_types(row, column_types) if options[:symbolize_keys]
row = row.symbolize_keys if options[:cast]
rc = yield(row, column_types)
if @iterate == :instances
model = if ::ActiveRecord::VERSION::MAJOR < 4
@type.send(:instantiate,row)
else
@type.send(:instantiate,row, column_types)
end
rc = yield(model)
else
row = cast_types(row, column_types) if options[:symbolize_keys]
row = row.symbolize_keys if options[:cast]
rc = yield(row, column_types)
end
break if has_do_until && rc == @options[:until]
break if has_do_while && rc != @options[:while]
end
Expand Down
21 changes: 12 additions & 9 deletions test-app/app.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
#!/usr/bin/env ruby
################################################################################
# To run this "app", do a "rake setup" first
# To work with this app, load it from the root's "rake console" task,
# then do: require_relative 'test-app/app'
################################################################################
require 'rubygems'
require 'bundler/setup'
require 'pg'
require 'active_record'
require 'postgresql_cursor'

ActiveRecord::Base.establish_connection(
adapter: 'postgresql',
database:ENV['USER'],
user: ENV['USER'],
)
ActiveRecord::Base.establish_connection( adapter: 'postgresql',
database: ENV['TEST_DATABASE'] || 'postgresql_cursor_test',
username: ENV['TEST_USER'] || ENV['USER'] || 'postgresql_cursor')

# create table products (id serial primary key);

Expand All @@ -21,7 +24,7 @@ def self.generate(max=1_000)
end
end

Product.destroy_all
Product.generate
Product.where("id>0").each_row(block_size:100) { |r| p r["id"] } # Hash
Product.where("id>0").each_instance(block_size:100) { |r| p r.id } # Instance
#Product.destroy_all
#Product.generate
#Product.where("id>0").each_row(block_size:100) { |r| p r["id"] } # Hash
#Product.where("id>0").each_instance(block_size:100) { |r| p r.id } # Instance
2 changes: 0 additions & 2 deletions test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,3 @@ def self.generate(max=1_000)

Product.destroy_all
Product.generate(1000)
#class MiniTest::Unit::TestCase
#end

0 comments on commit db91012

Please sign in to comment.