Skip to content

Commit 79c3cb4

Browse files
committed
Pass Spans to OutputGate lockWhile
1 parent 3909af1 commit 79c3cb4

File tree

11 files changed

+335
-177
lines changed

11 files changed

+335
-177
lines changed

src/workerd/api/actor-state.c++

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,9 @@ jsg::Promise<void> DurableObjectStorageOperations::setAlarm(
500500
// they want immediate execution.
501501
kj::Date dateNowKjDate = static_cast<int64_t>(dateNow()) * kj::MILLISECONDS + kj::UNIX_EPOCH;
502502

503-
auto maybeBackpressure = transformMaybeBackpressure(
504-
js, options, getCache(OP_PUT_ALARM).setAlarm(kj::max(scheduledTime, dateNowKjDate), options));
503+
auto maybeBackpressure = transformMaybeBackpressure(js, options,
504+
getCache(OP_PUT_ALARM)
505+
.setAlarm(kj::max(scheduledTime, dateNowKjDate), options, context.getCurrentTraceSpan()));
505506

506507
// setAlarm() is billed as a single write unit.
507508
context.addTask(updateStorageWriteUnit(context, currentActorMetrics(), 1));
@@ -516,10 +517,11 @@ jsg::Promise<void> DurableObjectStorageOperations::putOne(
516517

517518
auto units = billingUnits(key.size() + buffer.size());
518519

519-
jsg::Promise<void> maybeBackpressure = transformMaybeBackpressure(
520-
js, options, getCache(OP_PUT).put(kj::mv(key), kj::mv(buffer), options));
521-
522520
auto& context = IoContext::current();
521+
522+
jsg::Promise<void> maybeBackpressure = transformMaybeBackpressure(js, options,
523+
getCache(OP_PUT).put(kj::mv(key), kj::mv(buffer), options, context.getCurrentTraceSpan()));
524+
523525
context.addTask(updateStorageWriteUnit(context, currentActorMetrics(), units));
524526
return maybeBackpressure;
525527
}
@@ -556,8 +558,8 @@ jsg::Promise<void> DurableObjectStorageOperations::deleteAlarm(
556558
}).orDefault(PutOptions{}));
557559

558560
return context.attachSpans(js,
559-
transformMaybeBackpressure(
560-
js, options, getCache(OP_DELETE_ALARM).setAlarm(kj::none, options)),
561+
transformMaybeBackpressure(js, options,
562+
getCache(OP_DELETE_ALARM).setAlarm(kj::none, options, context.getCurrentTraceSpan())),
561563
kj::mv(userSpan));
562564
}
563565

@@ -567,7 +569,7 @@ jsg::Promise<void> DurableObjectStorage::deleteAll(
567569
auto userSpan = context.makeUserTraceSpan("durable_object_storage_deleteAll"_kjc);
568570
auto options = configureOptions(kj::mv(maybeOptions).orDefault(PutOptions{}));
569571

570-
auto deleteAll = cache->deleteAll(options);
572+
auto deleteAll = cache->deleteAll(options, context.getCurrentTraceSpan());
571573

572574
context.addTask(updateStorageDeletes(context, currentActorMetrics(), kj::mv(deleteAll.count)));
573575

@@ -581,8 +583,11 @@ void DurableObjectTransaction::deleteAll() {
581583

582584
jsg::Promise<bool> DurableObjectStorageOperations::deleteOne(
583585
jsg::Lock& js, kj::String key, const PutOptions& options) {
584-
return transformCacheResult(
585-
js, getCache(OP_DELETE).delete_(kj::mv(key), options), options, [](jsg::Lock&, bool value) {
586+
auto& context = IoContext::current();
587+
588+
return transformCacheResult(js,
589+
getCache(OP_DELETE).delete_(kj::mv(key), options, context.getCurrentTraceSpan()), options,
590+
[](jsg::Lock&, bool value) {
586591
currentActorMetrics().addStorageDeletes(1);
587592
return value;
588593
});
@@ -614,10 +619,11 @@ jsg::Promise<void> DurableObjectStorageOperations::putMultiple(
614619
kvs.add(ActorCacheOps::KeyValuePair{kj::mv(field.name), kj::mv(buffer)});
615620
}
616621

617-
jsg::Promise<void> maybeBackpressure =
618-
transformMaybeBackpressure(js, options, getCache(OP_PUT).put(kvs.releaseAsArray(), options));
619-
620622
auto& context = IoContext::current();
623+
624+
jsg::Promise<void> maybeBackpressure = transformMaybeBackpressure(js, options,
625+
getCache(OP_PUT).put(kvs.releaseAsArray(), options, context.getCurrentTraceSpan()));
626+
621627
context.addTask(updateStorageWriteUnit(context, currentActorMetrics(), units));
622628

623629
return maybeBackpressure;
@@ -627,7 +633,10 @@ jsg::Promise<int> DurableObjectStorageOperations::deleteMultiple(
627633
jsg::Lock& js, kj::Array<kj::String> keys, const PutOptions& options) {
628634
auto numKeys = keys.size();
629635

630-
return transformCacheResult(js, getCache(OP_DELETE).delete_(kj::mv(keys), options), options,
636+
auto& context = IoContext::current();
637+
638+
return transformCacheResult(js,
639+
getCache(OP_DELETE).delete_(kj::mv(keys), options, context.getCurrentTraceSpan()), options,
631640
[numKeys](jsg::Lock&, uint count) -> int {
632641
currentActorMetrics().addStorageDeletes(numKeys);
633642
return count;

src/workerd/api/global-scope.c++

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,8 @@ kj::Promise<WorkerInterface::AlarmResult> ServiceWorkerGlobalScope::runAlarm(kj:
438438
}
439439
}
440440

441-
KJ_SWITCH_ONEOF(persistent.armAlarmHandler(scheduledTime, false, actorId)) {
441+
KJ_SWITCH_ONEOF(persistent.armAlarmHandler(
442+
scheduledTime, false, actorId, context.getCurrentTraceSpan())) {
442443
KJ_CASE_ONEOF(armResult, ActorCacheInterface::RunAlarmHandler) {
443444
auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
444445
if (handler.alarm == kj::none) {

src/workerd/io/actor-cache-test.c++

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -179,22 +179,22 @@ struct ActorCacheConvenienceWrappers {
179179
}
180180

181181
auto put(kj::StringPtr key, kj::StringPtr value, ActorCache::WriteOptions options = {}) {
182-
return target.put(kj::str(key), kj::heapArray(value.asBytes()), options);
182+
return target.put(kj::str(key), kj::heapArray(value.asBytes()), options, nullptr);
183183
}
184184
auto put(kj::ArrayPtr<const KeyValuePtr> kvs, ActorCache::WriteOptions options = {}) {
185185
return target.put(KJ_MAP(kv, kvs) {
186186
return ActorCache::KeyValuePair{kj::str(kv.key), kj::heapArray(kv.value.asBytes())};
187-
}, options);
187+
}, options, nullptr);
188188
}
189189
auto setAlarm(kj::Maybe<kj::Date> newTime, ActorCache::WriteOptions options = {}) {
190-
return target.setAlarm(newTime, options);
190+
return target.setAlarm(newTime, options, nullptr);
191191
}
192192

193193
auto delete_(kj::StringPtr key, ActorCache::WriteOptions options = {}) {
194-
return target.delete_(kj::str(key), options);
194+
return target.delete_(kj::str(key), options, nullptr);
195195
}
196196
auto delete_(kj::ArrayPtr<const kj::StringPtr> keys, ActorCache::WriteOptions options = {}) {
197-
return target.delete_(KJ_MAP(k, keys) { return kj::str(k); }, options);
197+
return target.delete_(KJ_MAP(k, keys) { return kj::str(k); }, options, nullptr);
198198
}
199199

200200
private:
@@ -700,7 +700,7 @@ KJ_TEST("ActorCache batching due to max storage RPC words") {
700700
for (int i = 0; i < 128; ++i) {
701701
test.cache.put(kj::str(i),
702702
kj::Array<const byte>(bigVal.begin(), bigVal.size(), kj::NullArrayDisposer::instance),
703-
ActorCache::WriteOptions());
703+
ActorCache::WriteOptions(), nullptr);
704704
}
705705

706706
auto mockTxn = mockStorage->expectCall("txn", ws).returnMock("transaction");
@@ -736,7 +736,7 @@ KJ_TEST("ActorCache deleteAll()") {
736736
test.put("baz", "789"); // overwrites a counted delete
737737
test.delete_("garply"); // uncounted delete
738738

739-
auto deleteAll = test.cache.deleteAll({});
739+
auto deleteAll = test.cache.deleteAll({}, nullptr);
740740

741741
// Post-deleteAll writes.
742742
test.put("grault", "12345");
@@ -819,7 +819,7 @@ KJ_TEST("ActorCache deleteAll() during transaction commit") {
819819

820820
// Issue a put and a deleteAll() here!
821821
test.put("bar", "456");
822-
test.cache.deleteAll({});
822+
test.cache.deleteAll({}, nullptr);
823823

824824
kj::mv(inProgressFlush).thenReturn(CAPNP());
825825
}
@@ -861,7 +861,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") {
861861
test.put("baz", "789"); // overwrites a counted delete
862862
test.delete_("garply"); // uncounted delete
863863

864-
auto deleteAllA = test.cache.deleteAll({});
864+
auto deleteAllA = test.cache.deleteAll({}, nullptr);
865865

866866
// Post-deleteAll writes.
867867
test.put("grault", "12345");
@@ -884,7 +884,7 @@ KJ_TEST("ActorCache deleteAll() again when previous one isn't done yet") {
884884
}
885885

886886
// Do another deleteAll() before the first one is done.
887-
auto deleteAllB = test.cache.deleteAll({});
887+
auto deleteAllB = test.cache.deleteAll({}, nullptr);
888888

889889
// And a write after that.
890890
test.put("fred", "2323");
@@ -4979,7 +4979,8 @@ KJ_TEST("ActorCache alarm get/put") {
49794979

49804980
{
49814981
// we have a cached time == nullptr, so we should not attempt to run an alarm
4982-
auto armResult = test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, false);
4982+
auto armResult =
4983+
test.cache.armAlarmHandler(10 * kj::SECONDS + kj::UNIX_EPOCH, false, "", nullptr);
49834984
KJ_ASSERT(armResult.is<ActorCache::CancelAlarmHandler>());
49844985
auto cancelResult = kj::mv(armResult.get<ActorCache::CancelAlarmHandler>());
49854986
KJ_ASSERT(cancelResult.waitBeforeCancel.poll(ws));
@@ -4997,7 +4998,7 @@ KJ_TEST("ActorCache alarm get/put") {
49974998
{
49984999
// Test that alarm handler handle clears alarm when dropped with no writes
49995000
{
5000-
auto armResult = test.cache.armAlarmHandler(oneMs, false);
5001+
auto armResult = test.cache.armAlarmHandler(oneMs, false, "", nullptr);
50015002
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50025003
}
50035004
mockStorage->expectCall("deleteAlarm", ws)
@@ -5010,7 +5011,7 @@ KJ_TEST("ActorCache alarm get/put") {
50105011

50115012
// Test that alarm handler handle does not clear alarm when dropped with writes
50125013
{
5013-
auto armResult = test.cache.armAlarmHandler(oneMs, false);
5014+
auto armResult = test.cache.armAlarmHandler(oneMs, false, "", nullptr);
50145015
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50155016
test.setAlarm(twoMs);
50165017
}
@@ -5024,7 +5025,7 @@ KJ_TEST("ActorCache alarm get/put") {
50245025

50255026
// Test that alarm handler handle does not cache delete when it fails
50265027
{
5027-
auto armResult = test.cache.armAlarmHandler(oneMs, false);
5028+
auto armResult = test.cache.armAlarmHandler(oneMs, false, "", nullptr);
50285029
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50295030
}
50305031
mockStorage->expectCall("deleteAlarm", ws)
@@ -5036,7 +5037,7 @@ KJ_TEST("ActorCache alarm get/put") {
50365037
{
50375038
// Test that alarm handler handle does not cache alarm delete when noCache == true
50385039
{
5039-
auto armResult = test.cache.armAlarmHandler(twoMs, true);
5040+
auto armResult = test.cache.armAlarmHandler(twoMs, true, "", nullptr);
50405041
KJ_ASSERT(armResult.is<ActorCache::RunAlarmHandler>());
50415042
}
50425043
mockStorage->expectCall("deleteAlarm", ws)
@@ -5090,7 +5091,7 @@ KJ_TEST("ActorCache alarm delete when flush fails") {
50905091
// we want to test that even if a flush is retried
50915092
// that the post-delete actions for a checked delete happen.
50925093
{
5093-
auto handle = test.cache.armAlarmHandler(oneMs, false);
5094+
auto handle = test.cache.armAlarmHandler(oneMs, false, "", nullptr);
50945095

50955096
auto time = expectCached(test.getAlarm());
50965097
KJ_ASSERT(time == kj::none);
@@ -5283,7 +5284,7 @@ KJ_TEST("ActorCache can wait for flush") {
52835284

52845285
{
52855286
// Join in on a scheduled deleteAll.
5286-
test.cache.deleteAll(ActorCacheWriteOptions{.allowUnconfirmed = false});
5287+
test.cache.deleteAll(ActorCacheWriteOptions{.allowUnconfirmed = false}, nullptr);
52875288

52885289
verify(
52895290
[&]() {
@@ -5301,7 +5302,7 @@ KJ_TEST("ActorCache can wait for flush") {
53015302

53025303
{
53035304
// Join in on a scheduled deleteAll with allowUnconfirmed.
5304-
test.cache.deleteAll(ActorCacheWriteOptions{.allowUnconfirmed = true});
5305+
test.cache.deleteAll(ActorCacheWriteOptions{.allowUnconfirmed = true}, nullptr);
53055306

53065307
verify(
53075308
[&]() {

0 commit comments

Comments
 (0)