Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce mode of SQLite storage file to 0o600 #1057

Merged
merged 12 commits into from
Nov 15, 2023
19 changes: 19 additions & 0 deletions ops/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pickle
import shutil
import sqlite3
import stat
import subprocess
from datetime import timedelta
from pathlib import Path
Expand Down Expand Up @@ -59,11 +60,29 @@ def __init__(self, filename: Union['Path', str]):
# sqlite3.connect creates the file silently if it does not exist
logger.debug(f"Initializing SQLite local storage: {filename}.")

if filename != ":memory:":
self._ensure_db_permissions(str(filename))
self._db = sqlite3.connect(str(filename),
isolation_level=None,
timeout=self.DB_LOCK_TIMEOUT.total_seconds())
self._setup()

def _ensure_db_permissions(self, filename: str):
"""Make sure that the DB file has appropriately secure permissions."""
mode = stat.S_IRUSR | stat.S_IWUSR
if os.path.exists(filename):
try:
os.chmod(filename, mode)
except OSError as e:
raise RuntimeError(f"Unable to adjust access permission of {filename!r}") from e
return
tonyandrewmeyer marked this conversation as resolved.
Show resolved Hide resolved

try:
fd = os.open(filename, os.O_CREAT | os.O_EXCL, mode=mode)
except OSError as e:
raise RuntimeError(f"Unable to adjust access permission of {filename!r}") from e
os.close(fd)

def _setup(self):
"""Make the database ready to be used as storage."""
# Make sure that the database is locked until the connection is closed,
Expand Down
37 changes: 37 additions & 0 deletions test/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import io
import os
import pathlib
import stat
import sys
import tempfile
import typing
import unittest
import unittest.mock
from test.test_helpers import BaseTestCase, fake_script, fake_script_calls
from textwrap import dedent

Expand Down Expand Up @@ -218,6 +220,41 @@ class TestSQLiteStorage(StoragePermutations, BaseTestCase):
def create_storage(self):
return ops.storage.SQLiteStorage(':memory:')

def test_permissions_new(self):
with tempfile.TemporaryDirectory() as temp_dir:
filename = os.path.join(temp_dir, ".unit-state.db")
storage = ops.storage.SQLiteStorage(filename)
self.assertEqual(stat.S_IMODE(os.stat(filename).st_mode), stat.S_IRUSR | stat.S_IWUSR)
storage.close()

def test_permissions_existing(self):
with tempfile.TemporaryDirectory() as temp_dir:
filename = os.path.join(temp_dir, ".unit-state.db")
ops.storage.SQLiteStorage(filename).close()
# Set the file to access that will need fixing for user, group, and other.
os.chmod(filename, 0o744)
storage = ops.storage.SQLiteStorage(filename)
self.assertEqual(stat.S_IMODE(os.stat(filename).st_mode), stat.S_IRUSR | stat.S_IWUSR)
storage.close()

@unittest.mock.patch("os.path.exists")
def test_permissions_race(self, exists: unittest.mock.MagicMock):
exists.return_value = False
with tempfile.TemporaryDirectory() as temp_dir:
filename = os.path.join(temp_dir, ".unit-state.db")
# Create an existing file, but the mock will simulate a race condition saying that it
# does not exist.
open(filename, "w").close()
self.assertRaises(RuntimeError, ops.storage.SQLiteStorage, filename)

@unittest.mock.patch("os.chmod")
def test_permissions_failure(self, chmod: unittest.mock.MagicMock):
chmod.side_effect = OSError
with tempfile.TemporaryDirectory() as temp_dir:
filename = os.path.join(temp_dir, ".unit-state.db")
open(filename, "w").close()
self.assertRaises(RuntimeError, ops.storage.SQLiteStorage, filename)


def setup_juju_backend(test_case: unittest.TestCase, state_file: pathlib.Path):
"""Create fake scripts for pretending to be state-set and state-get."""
Expand Down
Loading