Skip to content

Commit

Permalink
Refactor deck and google drive client
Browse files Browse the repository at this point in the history
  • Loading branch information
tgrunnagle committed Dec 9, 2022
1 parent 34c9faf commit cbb6d07
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 125 deletions.
4 changes: 2 additions & 2 deletions example/example_local_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@

"output_image_layout": "sheet",
"output_sheet_max_width": 10,
"output_x_padding": 0,
"output_y_padding": 0,
"output_padding": [0, 0],
"output_scaling": [1, 1],
"output_padding_colorstring": "#E0FFFF"
}
100 changes: 58 additions & 42 deletions scripts/deck.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#!/usr/bin/python
from math import ceil
import contextlib

import PIL.Image
from PIL import Image as PILImage
from PIL.Image import Image
from card import Card
from config_enums import ImageLayout
from typing import Tuple
from placement import Placement


class Deck:

Expand All @@ -14,12 +18,12 @@ def __init__(self, name: str, config: dict):
self._name = name
self._cards: list[Card] = list()
self._layout = config.get('output_image_layout') or ImageLayout.SHEET
self._padding = (
config.get('output_x_padding') or 0,
config.get('output_y_padding') or 0
)
self._padding_color = config.get('output_padding_colorstring') or '#000000'
self._sheet_max_width = config.get('output_sheet_max_width') or Deck.DEFAULT_MAX_WIDTH
self._padding = config.get('output_padding')
self._scaling = config.get('output_scaling')
self._padding_color = config.get(
'output_padding_colorstring') or '#000000'
self._sheet_max_width = config.get(
'output_sheet_max_width') or Deck.DEFAULT_MAX_WIDTH

def get_size(self):
return len(self._cards)
Expand All @@ -35,66 +39,78 @@ def get_dimensions(self) -> Tuple[int, int]:
def add_card(self, card: Card):
self._cards.append(card)

def render(self) -> list[PIL.Image.Image]:
def render(self) -> list[Image]:
return \
self._render_singletons() \
if self._layout == ImageLayout.SINGLETON \
else self._render_sheets()

def _render_sheets(self) -> list[PIL.Image.Image]:
def _render_sheets(self) -> list[Image]:
num_cards = len(self._cards)
if num_cards == 0:
return []

(num_w, num_h) = self.get_dimensions()

card_place = self._cards[0].get_placement()
card_place.w = card_place.w + 2 * self._padding[0]
card_place.h = card_place.h + 2 * self._padding[1]

deck_pix_w = card_place.w * num_w
deck_pix_h = card_place.h * num_h

deck_image = PIL.Image.new('RGBA', (int(deck_pix_w), int(deck_pix_h)))

card_index = 0
deck_image = None
for y in range(num_h):
for x in range(num_w):
card = self._cards[card_index]
card_index = card_index + 1

with self._render_with_padding(card) as card_image:
x_start = x * card_place.w
x_end = (x + 1) * card_place.w
y_start = y * card_place.h
y_end = (y + 1) * card_place.h
with self._render_card(card) as card_image:
if deck_image is None:
x_step = card_image.width
y_step = card_image.height
deck_pix_w = card_image.width * num_w
deck_pix_h = card_image.height * num_h
deck_image = PILImage.new('RGBA', (deck_pix_w, deck_pix_h))

deck_image.paste(im=card_image, box=(
x_start, y_start, x_end, y_end))
x * x_step, y * y_step, (x + 1) * x_step, (y + 1) * y_step))

return [deck_image]

def _render_singletons(self) -> list[PIL.Image.Image]:
def _render_singletons(self) -> list[Image]:
num_cards = len(self._cards)
if num_cards == 0:
return []
result = []
for card_index in range(num_cards):
card = self._cards[card_index]
result.append(self._render_with_padding(card))
result.append(self._render_card(card))
return result

def _render_with_padding(self, card: Card) -> PIL.Image.Image:
if self._padding == (0, 0):
return card.render()
with card.render() as card_image:
padded_image = PIL.Image.new(
'RGBA',
(
card_image.width + 2 * self._padding[0],
card_image.height + 2 * self._padding[1],
),
color=self._padding_color
)
padded_image.paste(
card_image, (self._padding[0], self._padding[1]))
return padded_image
def _render_card(self, card: Card) -> Image:
image = card.render()
for fn in [self._pad, self._scale]:
image = self._then_close(fn, image)

return image

def _then_close(self, fn, closable):
with contextlib.closing(closable):
return fn(closable)

def _pad(self, card_image: Image) -> Image:
if self._padding is None:
return card_image.copy()

padded_image = PILImage.new(
'RGBA',
(
int(card_image.width + 2 * self._padding[0]),
int(card_image.height + 2 * self._padding[1]),
),
color=self._padding_color
)
padded_image.paste(
card_image, (int(self._padding[0]), int(self._padding[1])))
return padded_image

def _scale(self, card_image: Image) -> Image:
if self._scaling is None:
return card_image.copy()

return card_image.resize(
(int(card_image.width * self._scaling[0]), int(card_image.height * self._scaling[1])))
169 changes: 98 additions & 71 deletions scripts/google_drive_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,44 +24,56 @@ def __init__(self, secrets_file: str):
self._secrets_file = secrets_file
self._cached_creds: Optional[Credentials] = None

def copy_file(self, id_or_name: str, source_folder: Optional[str], target_folder: str) -> str:
creds = self._get_creds()

lookup_id = id_or_name
if source_folder is not None:
lookup_ids = self.get_ids(id_or_name, source_folder)
if len(lookup_ids) != 1:
raise Exception('Expected a single source file.')
lookup_id = lookup_ids[0]

service = build('drive', 'v3', credentials=creds)

copy = service.files().copy(
fileId=lookup_id,
fields='id',
body={
'parents': [target_folder]
}
).execute()
return copy.get('id')

def create_json(self, source: str, target_name: str, target_folder_id: str) -> str:
return self._create_file('application/json', source, target_name, target_folder_id)

def create_png(self, source: str, target_name: str, target_folder_id: str) -> str:
return self._create_file('image/png', source, target_name, target_folder_id)

def create_csv(self, name: str, target_folder_id: str) -> str:
creds = self._get_creds()
service = build('sheets', 'v4', credentials=creds)
metadata = {
'properties': {
'title': name,
},
}
spreadsheet = service.spreadsheets().create(
body=metadata,
fields='spreadsheetId').execute()
id = spreadsheet.get('spreadsheetId')

driveService = build('drive', 'v3', credentials=creds)
file = driveService.files().get(
fileId=id,
fields='parents').execute()
oldParents = ",".join(file.get('parents'))
file = driveService.files().update(
fileId=id,
addParents=target_folder_id,
removeParents=oldParents).execute()
return id

def _create_file(self, mime_type: str, source: str, target_name: str, target_folder_id: str) -> str:
creds = self._get_creds()
service = build('drive', 'v3', credentials=creds)
media = MediaFileUpload(source, mimetype=mime_type)
metadata = {
'name': target_name,
'parents': [target_folder_id]
}
file = service.files().create(
body=metadata,
media_body=media).execute()
body={
'name': target_name,
'parents': [target_folder_id]
},
media_body=media
).execute()

service.permissions().create(
fileId=file.get('id'),
body={
'role': 'writer',
'type': 'anyone',
},
).execute()

return file.get('id')

def update_json(self, source: str, target_id: str):
Expand Down Expand Up @@ -101,30 +113,6 @@ def download_file(self, id_or_name: str, output_file_name: str, folder_id: Optio
with open(output_file_name, 'wb') as f:
f.write(stream.getbuffer())

def create_csv(self, name: str, target_folder_id: str) -> str:
creds = self._get_creds()
service = build('sheets', 'v4', credentials=creds)
metadata = {
'properties': {
'title': name,
},
}
spreadsheet = service.spreadsheets().create(
body=metadata,
fields='spreadsheetId').execute()
id = spreadsheet.get('spreadsheetId')

driveService = build('drive', 'v3', credentials=creds)
file = driveService.files().get(
fileId=id,
fields='parents').execute()
oldParents = ",".join(file.get('parents'))
file = driveService.files().update(
fileId=id,
addParents=target_folder_id,
removeParents=oldParents).execute()
return id

def download_csv(self, id_or_name: str, folder_id: Optional[str]) -> csv.DictReader:
creds = self._get_creds()
service = build('sheets', 'v4', credentials=creds)
Expand Down Expand Up @@ -195,29 +183,52 @@ def download_folder(self, folder_id: str, output_folder: str):
if page_token is None:
break

file_ids = list(
map(lambda f: (f.get('id') or '', f.get('name') or '', f.get('mimeType') or ''), search_results))
file_ids = map(lambda f: (f.get('id') or '', f.get('name') or '', f.get('mimeType') or ''), search_results)
for (id, name, mtype) in file_ids:

if name.endswith('.png'):
out_name = os.path.join(output_folder, name)
self.download_png(id, out_name, None)

elif mtype == 'application/vnd.google-apps.spreadsheet':
out_name = os.path.join(
output_folder, name.split('.')[0] + '.csv')
reader: csv.DictReader = self.download_csv(id, None)
with open(out_name, 'w+') as out_file:
writer = csv.DictWriter(
out_file, fieldnames=reader.fieldnames)
writer.writeheader()
for row in reader:
writer.writerow(row)
try:
if mtype == 'image/png':
out_name = os.path.join(output_folder, name)
self.download_file(id, out_name, None)

elif mtype == 'application/vnd.google-apps.spreadsheet':
out_name = os.path.join(
output_folder, name.split('.')[0] + '.csv')
reader: csv.DictReader = self.download_csv(id, None)
with open(out_name, 'w+') as out_file:
writer = csv.DictWriter(
out_file, fieldnames=reader.fieldnames)
writer.writeheader()
for row in reader:
writer.writerow(row)

else:
print('Warning: unsupported file "' + name + '"')
except Exception as e:
print('Warning: Failed to download ' + name + ', id ' + id)

else:
print('Warning: unsupported file "' + name + '"')
def copy_file(self, id_or_name: str, source_folder: Optional[str], target_folder: str) -> str:
creds = self._get_creds()

lookup_id = id_or_name
if source_folder is not None:
lookup_ids = self.get_ids(id_or_name, source_folder)
if len(lookup_ids) != 1:
raise Exception('Expected a single source file.')
lookup_id = lookup_ids[0]

service = build('drive', 'v3', credentials=creds)

copy = service.files().copy(
fileId=lookup_id,
fields='id',
body={
'parents': [target_folder]
}
).execute()
return copy.get('id')

def get_ids(self, name: str, folder_id: str) -> list[str]:
def get_ids(self, name: Optional[str], folder_id: str) -> list[str]:
creds = self._get_creds()
service = build('drive', 'v3', credentials=creds)

Expand All @@ -226,7 +237,9 @@ def get_ids(self, name: str, folder_id: str) -> list[str]:
while True:
# pylint: disable=maybe-no-member
response = service.files().list(
q=F"name='{name}' and '{folder_id}' in parents",
q=F"name='{name}' and '{folder_id}' in parents"
if name is not None
else F"'{folder_id}' in parents",
spaces='drive',
fields='nextPageToken, files(id)',
pageToken=page_token,
Expand All @@ -238,6 +251,20 @@ def get_ids(self, name: str, folder_id: str) -> list[str]:
break
return list(map(lambda f: f.get('id'), files))

def delete_folder_contents(self, folder_id: str) -> int:
ids = self.get_ids(None, folder_id)
if len(ids) == 0:
return 0

return len(list(map(self.delete_file, ids)))

def delete_file(self, file_id: str):
creds = self._get_creds()
service = build('drive', 'v3', credentials=creds)
service.files().delete(
fileId=file_id
).execute()

def _get_creds(self) -> Credentials:
if self._cached_creds is not None and self._cached_creds.valid:
return self._cached_creds
Expand Down
Loading

0 comments on commit cbb6d07

Please sign in to comment.