Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store metadata in an SQLite database (gh-68) #443

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Regenerate db working
  • Loading branch information
jmathai committed May 7, 2023
commit b3377b1ab0b597433ebf017f932ea9bd903bc310
2 changes: 1 addition & 1 deletion elodie/plugins/sqlite/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ CREATE TABLE "metadata" (
"longitude" REAL,
"original_name" TEXT,
"title" TEXT,
"_modified" INTEGER,
"_modified" TEXT,
PRIMARY KEY("id" AUTOINCREMENT)
);
59 changes: 36 additions & 23 deletions elodie/plugins/sqlite/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
#from google.oauth2.credentials import Credentials

from elodie.geolocation import place_name
from elodie.media.photo import Photo
from elodie.media.video import Video
from elodie.localstorage import Db
from elodie.media.base import Base, get_all_subclasses
from elodie.plugins.plugins import PluginBase

class SQLite(PluginBase):
Expand Down Expand Up @@ -60,27 +60,27 @@ def __init__(self):
self._create_schema()

def after(self, file_path, destination_folder, final_file_path, metadata):

# We check if the source path exists in the database already.
# If it does then we assume that this is an update operation.
full_destination_path = '{}{}'.format(destination_folder, final_file_path)
self.cursor.execute("SELECT `path` FROM `metadata` WHERE `path`=:path", {'path': file_path})

if(self.cursor.fetchone() is None):
self.log(u'SQLite plugin inserting {}'.format(file_path))
sql_statement, sql_values = self._insert_row_sql(full_destination_path, metadata)
else:
self.log(u'SQLite plugin updating {}'.format(file_path))
sql_statement, sql_values = self._update_row_sql(file_path, full_destination_path, metadata)

self.cursor.execute(sql_statement, sql_values)
self._upsert(file_path, destination_folder, final_file_path, metadata)

def batch(self):
pass

def before(self, file_path, destination_folder):
pass

def generate_db(self):
db = Db()
for checksum, file_path in db.all():
subclasses = get_all_subclasses()
media = Base.get_class_by_file(file_path, get_all_subclasses())
media.set_checksum(
db.checksum(file_path)
)
metadata = media.get_metadata()
destination_folder = os.path.dirname(file_path)
final_file_path = '{}{}'.format(os.path.sep, os.path.basename(file_path))
self._upsert(file_path, destination_folder, final_file_path, metadata)

def _create_schema(self):
self.database_schema = '{}{}{}'.format(
os.path.dirname(os.path.realpath(__file__)),
Expand All @@ -90,7 +90,6 @@ def _create_schema(self):

with open(self.database_schema, 'r') as fp_schema:
sql_statement = fp_schema.read()
print(sql_statement)
self.cursor.executescript(sql_statement)

def _insert_row_sql(self, final_path, metadata):
Expand All @@ -103,11 +102,11 @@ def _insert_row_sql(self, final_path, metadata):
"""INSERT INTO `metadata` (
`hash`, `path`, `album`, `camera_make`, `camera_model`,
`date_taken`, `latitude`, `location_name`, `longitude`,
`original_name`, `title`)
`original_name`, `title`, `_modified`)
VALUES (
:hash, :path, :album, :camera_make, :camera_model,
:date_taken, :latitude, :location_name, :longitude,
:original_name, :title)""",
:original_name, :title, datetime('now'))""",
self._sql_values(final_path, metadata)
)

Expand All @@ -123,26 +122,40 @@ def _sql_values(self, final_path, metadata, current_path=None):
'album': metadata['album'],
'camera_make': metadata['camera_make'],
'camera_model': metadata['camera_model'],
'date_taken': metadata['date_taken'],
'date_taken': time.strftime('%Y-%m-%d %H:%M:%S', metadata['date_taken']),
'latitude': metadata['latitude'],
'location_name': place_name(metadata['latitude'], metadata['longitude'])['default'],
'longitude': metadata['longitude'],
'original_name': metadata['original_name'],
'title': metadata['title'],
'current_path': current_path,
'_modified': timestamp
'current_path': current_path
}

def _update_row_sql(self, current_path, final_path, metadata):
timestamp = int(time.time())
return (
"""UPDATE `metadata` SET `hash`=:hash, `path`=:path, `album`=:album, `camera_make`=:camera_make,
`camera_model`=:camera_model, `date_taken`=:date_taken, `latitude`=:latitude,
`longitude`=:longitude, `original_name`=:original_name, `title`=:title
`longitude`=:longitude, `original_name`=:original_name, `title`=:title, `_modified`=datetime('now')
WHERE `path`=:current_path""",
self._sql_values(final_path, metadata, current_path)
)

def _upsert(self, file_path, destination_folder, final_file_path, metadata):
# We check if the source path exists in the database already.
# If it does then we assume that this is an update operation.
full_destination_path = '{}{}'.format(destination_folder, final_file_path)
self.cursor.execute("SELECT `path` FROM `metadata` WHERE `path`=:path", {'path': file_path})

if(self.cursor.fetchone() is None):
self.log(u'SQLite plugin inserting {}'.format(file_path))
sql_statement, sql_values = self._insert_row_sql(full_destination_path, metadata)
else:
self.log(u'SQLite plugin updating {}'.format(file_path))
sql_statement, sql_values = self._update_row_sql(file_path, full_destination_path, metadata)

self.cursor.execute(sql_statement, sql_values)

def _validate_schema(self):
try:
self.cursor.execute('SELECT * FROM `metadata` LIMIT 1');
Expand Down
34 changes: 31 additions & 3 deletions elodie/tests/plugins/sqlite/sqlite_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
import mock
import os
import sys
import time
from tempfile import gettempdir

sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))))
sys.path.insert(0, os.path.abspath(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))))

import helper
from elodie.config import load_config
from elodie.localstorage import Db
from elodie.media.text import Text
from elodie.plugins.sqlite.sqlite import SQLite
from elodie.media.audio import Audio
from elodie.media.photo import Photo

# Globals to simplify mocking configs
db_schema = helper.get_file('plugins/sqlite/schema.sql')
Expand All @@ -29,7 +30,7 @@

mock_metadata = {
'checksum': 'checksum-val',
'date_taken': 1234567890,
'date_taken': time.localtime(),
'camera_make': 'camera_make-val',
'camera_model': 'camera_model-val',
'latitude': 0.1,
Expand Down Expand Up @@ -143,3 +144,30 @@ def test_sqlite_update_multiple():
assert results[0]['title'] == 'title-val-upd', results[0]
assert results[1]['path'] == '/new-folder/path/new-file/path2.jpg', results[1]
assert results[1]['title'] == 'title-val-2-upd', results[1]

@mock.patch('elodie.config.config_file', '%s/config.ini-sqlite-regenerate-db' % gettempdir())
def test_sqlite_regenerate_db():
with open('%s/config.ini-sqlite-regenerate-db' % gettempdir(), 'w') as f:
f.write(config_string_fmt)
if hasattr(load_config, 'config'):
del load_config.config

sqlite_plugin = SQLite()
db = Db()
file_path_1 = helper.get_file('with-original-name.txt')
file_path_2 = helper.get_file('valid.txt')
db.add_hash('1', file_path_1, True)
db.add_hash('2', file_path_2, True)

sqlite_plugin.generate_db()

results = sqlite_plugin._run_query(
'SELECT * FROM `metadata`',
{}
);

assert len(results) == 2, results
assert results[0]['hash'] == 'e2275f3d95c4b55e35bd279bec3f86fcf76b3f3cc0abbf4183725c89a72f94c4', results[0]['hash']
assert results[0]['path'] == file_path_1, results[0]['path']
assert results[1]['hash'] == '3c19a5d751cf19e093b7447297731124d9cc987d3f91a9d1872c3b1c1b15639a', results[1]['hash']
assert results[1]['path'] == file_path_2, results[1]['path']