Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(ext/kv): add limits #18415

Merged
merged 1 commit into from
Mar 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions cli/tests/unit/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1004,3 +1004,136 @@ dbTest("key ordering", async (db) => {
[true],
]);
});

dbTest("key size limit", async (db) => {
// 1 byte prefix + 1 byte suffix + 2045 bytes key
const lastValidKey = new Uint8Array(2046).fill(1);
const firstInvalidKey = new Uint8Array(2047).fill(1);

await db.set([lastValidKey], 1);

assertEquals(await db.get([lastValidKey]), {
key: [lastValidKey],
value: 1,
versionstamp: "00000000000000010000",
});

await assertRejects(
async () => await db.set([firstInvalidKey], 1),
TypeError,
"key too large for write (max 2048 bytes)",
);

await assertRejects(
async () => await db.get([firstInvalidKey]),
TypeError,
"key too large for read (max 2049 bytes)",
);
});

dbTest("value size limit", async (db) => {
const lastValidValue = new Uint8Array(65536);
const firstInvalidValue = new Uint8Array(65537);

await db.set(["a"], lastValidValue);
assertEquals(await db.get(["a"]), {
key: ["a"],
value: lastValidValue,
versionstamp: "00000000000000010000",
});

await assertRejects(
async () => await db.set(["b"], firstInvalidValue),
TypeError,
"value too large (max 65536 bytes)",
);
});

dbTest("operation size limit", async (db) => {
const lastValidKeys: Deno.KvKey[] = new Array(10).fill(0).map((
_,
i,
) => ["a", i]);
const firstInvalidKeys: Deno.KvKey[] = new Array(11).fill(0).map((
_,
i,
) => ["a", i]);

assertEquals((await db.getMany(lastValidKeys)).length, 10);

await assertRejects(
async () => await db.getMany(firstInvalidKeys),
TypeError,
"too many ranges (max 10)",
);

assertEquals(
(await collect(db.list({
prefix: ["a"],
}, {
batchSize: 1000,
}))).length,
0,
);

assertRejects(
async () =>
await collect(db.list({
prefix: ["a"],
}, {
batchSize: 1001,
})),
TypeError,
"too many entries (max 1000)",
);

// when batchSize is not specified, limit is used but is clamped to 500
assertEquals(
(await collect(db.list({
prefix: ["a"],
}, {
limit: 1001,
}))).length,
0,
);

assertEquals(
await db.atomic().check(...lastValidKeys.map((key) => ({
key,
versionstamp: null,
}))).mutate(...lastValidKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation))).commit(),
true,
);

await assertRejects(
async () =>
await db.atomic().check(...firstInvalidKeys.map((key) => ({
key,
versionstamp: null,
}))).mutate(...lastValidKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation))).commit(),
TypeError,
"too many checks (max 10)",
);

await assertRejects(
async () =>
await db.atomic().check(...lastValidKeys.map((key) => ({
key,
versionstamp: null,
}))).mutate(...firstInvalidKeys.map((key) => ({
key,
type: "set",
value: 1,
} satisfies Deno.KvMutation))).commit(),
TypeError,
"too many mutations (max 10)",
);
});
2 changes: 1 addition & 1 deletion cli/tsc/dts/lib.deno.unstable.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1862,7 +1862,7 @@ declare namespace Deno {
* matches an expected versionstamp.
*
* Keys have a maximum length of 2048 bytes after serialization. Values have a
* maximum length of 16 KiB after serialization. Serialization of both keys
* maximum length of 64 KiB after serialization. Serialization of both keys
* and values is somewhat opaque, but one can usually assume that the
* serialization of any value is about the same length as the resulting string
* of a JSON serialization of that same value.
Expand Down
2 changes: 1 addition & 1 deletion ext/kv/01_db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class Kv {

let batchSize = options.batchSize ?? (options.limit ?? 100);
if (batchSize <= 0) throw new Error("batchSize must be positive");
if (batchSize > 500) batchSize = 500;
if (options.batchSize === undefined && batchSize > 500) batchSize = 500;

return new KvListIterator({
limit: options.limit,
Expand Down
12 changes: 12 additions & 0 deletions ext/kv/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,15 @@ pub enum MutationKind {
Min(Value),
Max(Value),
}

impl MutationKind {
pub fn value(&self) -> Option<&Value> {
match self {
MutationKind::Set(value) => Some(value),
MutationKind::Sum(value) => Some(value),
MutationKind::Min(value) => Some(value),
MutationKind::Max(value) => Some(value),
MutationKind::Delete => None,
}
}
}
124 changes: 113 additions & 11 deletions ext/kv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ use serde::Serialize;

pub use crate::interface::*;

const MAX_WRITE_KEY_SIZE_BYTES: usize = 2048;
// range selectors can contain 0x00 or 0xff suffixes
const MAX_READ_KEY_SIZE_BYTES: usize = MAX_WRITE_KEY_SIZE_BYTES + 1;
const MAX_VALUE_SIZE_BYTES: usize = 65536;
const MAX_READ_RANGES: usize = 10;
const MAX_READ_ENTRIES: usize = 1000;
const MAX_CHECKS: usize = 10;
const MAX_MUTATIONS: usize = 10;

struct UnstableChecker {
pub unstable: bool,
}
Expand Down Expand Up @@ -218,13 +227,27 @@ where
state.resource_table.get::<DatabaseResource<DBH::DB>>(rid)?;
resource.db.clone()
};

if ranges.len() > MAX_READ_RANGES {
return Err(type_error(format!(
"too many ranges (max {})",
MAX_READ_RANGES
)));
}

let mut total_entries = 0usize;

let read_ranges = ranges
.into_iter()
.map(|(prefix, start, end, limit, reverse, cursor)| {
let selector = RawSelector::from_tuple(prefix, start, end)?;

let (start, end) =
decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
check_read_key_size(&start)?;
check_read_key_size(&end)?;

total_entries += limit as usize;
Ok(ReadRange {
start,
end,
Expand All @@ -234,6 +257,14 @@ where
})
})
.collect::<Result<Vec<_>, AnyError>>()?;

if total_entries > MAX_READ_ENTRIES {
return Err(type_error(format!(
"too many entries (max {})",
MAX_READ_ENTRIES
)));
}

let opts = SnapshotReadOptions {
consistency: consistency.into(),
};
Expand Down Expand Up @@ -499,32 +530,53 @@ where
resource.db.clone()
};

for key in checks
.iter()
.map(|c| &c.0)
.chain(mutations.iter().map(|m| &m.0))
{
if key.is_empty() {
return Err(type_error("key cannot be empty"));
}
if checks.len() > MAX_CHECKS {
return Err(type_error(format!("too many checks (max {})", MAX_CHECKS)));
}

if mutations.len() + enqueues.len() > MAX_MUTATIONS {
return Err(type_error(format!(
"too many mutations (max {})",
MAX_MUTATIONS
)));
}

let checks = checks
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, AnyError>>()
.collect::<Result<Vec<KvCheck>, AnyError>>()
.with_context(|| "invalid check")?;
let mutations = mutations
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, AnyError>>()
.collect::<Result<Vec<KvMutation>, AnyError>>()
.with_context(|| "invalid mutation")?;
let enqueues = enqueues
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, AnyError>>()
.collect::<Result<Vec<Enqueue>, AnyError>>()
.with_context(|| "invalid enqueue")?;

for key in checks
.iter()
.map(|c| &c.key)
.chain(mutations.iter().map(|m| &m.key))
{
if key.is_empty() {
return Err(type_error("key cannot be empty"));
}

check_write_key_size(key)?;
}

for value in mutations.iter().flat_map(|m| m.kind.value()) {
check_value_size(value)?;
}

for enqueue in &enqueues {
check_enqueue_payload_size(&enqueue.payload)?;
}

let atomic_write = AtomicWrite {
checks,
mutations,
Expand All @@ -549,3 +601,53 @@ fn op_kv_encode_cursor(
let cursor = encode_cursor(&selector, &boundary_key)?;
Ok(cursor)
}

fn check_read_key_size(key: &[u8]) -> Result<(), AnyError> {
if key.len() > MAX_READ_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for read (max {} bytes)",
MAX_READ_KEY_SIZE_BYTES
)))
} else {
Ok(())
}
}

fn check_write_key_size(key: &[u8]) -> Result<(), AnyError> {
if key.len() > MAX_WRITE_KEY_SIZE_BYTES {
Err(type_error(format!(
"key too large for write (max {} bytes)",
MAX_WRITE_KEY_SIZE_BYTES
)))
} else {
Ok(())
}
}

fn check_value_size(value: &Value) -> Result<(), AnyError> {
let payload = match value {
Value::Bytes(x) => x,
Value::V8(x) => x,
Value::U64(_) => return Ok(()),
};

if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"value too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
}
}

fn check_enqueue_payload_size(payload: &[u8]) -> Result<(), AnyError> {
if payload.len() > MAX_VALUE_SIZE_BYTES {
Err(type_error(format!(
"enqueue payload too large (max {} bytes)",
MAX_VALUE_SIZE_BYTES
)))
} else {
Ok(())
}
}