Skip to content

Commit

Permalink
f
Browse files Browse the repository at this point in the history
  • Loading branch information
Rajiv Roopan committed Jun 16, 2015
2 parents a01f7f3 + a4d74a0 commit 8eb4f92
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 99 deletions.
123 changes: 123 additions & 0 deletions core/management/commands/index_taxi_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
__author__ = 'rajiv'
import json
import csv
from elasticsearch import Elasticsearch
from datetime import datetime

from django.conf import settings

# create instance of elasticsearch
es = Elasticsearch(["http://alpha.geogekko.com:9200/"])

INDEX_NAME = 'nyc_taxi_data'
TYPE_NAME = 'taxi'

FILENAME = '/mnt/data/taxi/trip_data_1.csv'

def create_taxi_index ():
print "creating taxi index..."
es.indices.create(index=INDEX_NAME, body={
"mappings":{
"taxi":{
"properties":{
"pickup_datetime":{
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"dropoff_datetime":{
"type":"date",
"format" : "yyyy-MM-dd HH:mm:ss"
},
"weekday": {
"type": "integer"
},
"distance":{
"type" : "float"
},
"pickup_location": {
"type": "geo_point"
},
"dropoff_location": {
"type": "geo_point"
},
"subtotal_fare": {
"type": "float"
},
"total_fare": {
"type": "float"
}
}
}
}
})

def delete_taxi_index():
print "Deleting taxi index..."
es.indices.delete(index=INDEX_NAME)


def do_bulk_index(bulk_data):
# bulk index the data
print("bulk indexing...")
res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)

def valid(data):
if data and data != 0 and data != '':
return True

return False

def index_data():
delete_taxi_index()
create_taxi_index()

reader = csv.reader(open(FILENAME,'rU'), delimiter=',', dialect=csv.excel_tab)

header = reader.next()

count = 0
bulk_data = []

for row in reader:
if valid(row[5]) and valid(row[6]) and valid(row[7]) and valid(row[8]):
count = count + 1

pickup_time = datetime.strptime(row[1], "%Y-%m-%d %H:%M:%S")

data_dict = {
"pickup_datetime": row[1],
"dropoff_datetime": row[2],
"weekday": pickup_time.weekday(),
"distance": row[4],
"pickup_location": {
"lat": row[6],
"lon": row[5]
},
"dropoff_location": {
"lat": row[8],
"lon": row[7]
},
"subtotal_fare": row[11],
"total_fare": row[16]
}

op_dict = {
"index": {
"_index": INDEX_NAME,
"_type": TYPE_NAME
}
}
bulk_data.append(op_dict)
bulk_data.append(data_dict)

if count % 200000 == 0:
do_bulk_index(bulk_data)
del bulk_data[:]

do_bulk_index(bulk_data)
print count

if __name__ == '__main__':

index_data()

50 changes: 26 additions & 24 deletions core/management/commands/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from tweepy import Stream
from elasticsearch import Elasticsearch
import csv
import sys

from datetime import date

Expand All @@ -20,31 +21,35 @@

# Bounding boxes for geolocations
# Online-Tool to create boxes (c+p as raw CSV): http://boundingbox.klokantech.com/

#southwest corner then north east
GEOBOX_WORLD = [-180,-90,180,90]
GEOBOX_NY = [-78.57, 36.54, -68.74, 43.78]
GEOBOX_CA = [-123.16, 39.52, -115.76, 32.6]
GEOBOX_SF = [-122.5777, 37.1803, -121.7216, 38.0164]

FILE_DIR = '/mnt/data/twitter/'

FILE_OUT = None
TODAY_DATE_STR = ''
CSV_WRITER = None
CITY = sys.argv[1]

def get_csv_writer():
def get_file_writer():
global TODAY_DATE_STR
global FILE_OUT
global CSV_WRITER

current_date_str = date.today().strftime("%m-%d-%Y")

if TODAY_DATE_STR != current_date_str:
TODAY_DATE_STR = current_date_str
filename = 'twitter_ny_' + current_date_str + '.csv'
filename = FILE_DIR + 'twitter_' + CITY + '_' + current_date_str + '.txt'

if FILE_OUT:
FILE_OUT.close()

FILE_OUT = open(filename, 'w')
CSV_WRITER = csv.writer(FILE_OUT, delimiter=',', dialect=csv.excel_tab)

return CSV_WRITER
return FILE_OUT

class TweetStreamListener(StreamListener):

Expand All @@ -57,23 +62,9 @@ def on_data(self, data):

# add text and sentiment info to elasticsearch
if (tweet['text'] is not None and tweet['id'] is not None and tweet['created_at'] is not None and tweet['user']['id'] is not None and tweet['user']['name'] is not None and tweet['user']['followers_count'] is not None and tweet['user']['statuses_count'] is not None and tweet['user']['description'] is not None and tweet['coordinates'] is not None and tweet['coordinates'] is not 'null' ):
to_write = []
to_write.append(tweet["timestamp_ms"])
to_write.append(tweet["created_at"])
to_write.append(tweet["user"]["id"])
to_write.append(tweet["user"]["screen_name"])
to_write.append(tweet["user"]["name"])
to_write.append(tweet["source"])
to_write.append(tweet["user"]["location"])
to_write.append(tweet["user"]["followers_count"])
to_write.append(tweet["user"]["friends_count"])
to_write.append(tweet["coordinates"]["coordinates"][1])
to_write.append(tweet["coordinates"]["coordinates"][0])
to_write.append(tweet["text"])

writer = get_csv_writer()
print to_write
writer.writerow(to_write)
writer = get_file_writer()
print data
FILE_OUT.write(data)

return True

Expand Down Expand Up @@ -120,6 +111,15 @@ def index_tweet():
# })

if __name__ == '__main__':
geo = None
if CITY == 'ca':
geo = GEOBOX_CA
elif CITY == 'sf':
geo = GEOBOX_SF
elif CITY == 'ny':
geo = GEOBOX_NY
else:
geo = GEOBOX_NY

# create instance of the tweepy tweet stream listener
listener = TweetStreamListener()
Expand All @@ -131,5 +131,7 @@ def index_tweet():
# create instance of the tweepy stream
stream = Stream(auth, listener)



# search twitter for "congress" keyword
stream.filter(locations=GEOBOX_NY )
stream.filter(locations=geo )
75 changes: 0 additions & 75 deletions core/management/commands/stream_ca.py

This file was deleted.

0 comments on commit 8eb4f92

Please sign in to comment.