diff --git a/bindings/c/test/unit/unit_tests.cpp b/bindings/c/test/unit/unit_tests.cpp index 4f258eac41d..5ec1c6cec20 100644 --- a/bindings/c/test/unit/unit_tests.cpp +++ b/bindings/c/test/unit/unit_tests.cpp @@ -44,6 +44,8 @@ #include "fdbclient/Tuple.h" #include "flow/config.h" +#include "flow/DeterministicRandom.h" +#include "flow/IRandom.h" #include "fdb_api.hpp" @@ -2021,15 +2023,17 @@ TEST_CASE("fdb_transaction_add_conflict_range") { TEST_CASE("special-key-space valid transaction ID") { auto value = get_value("\xff\xff/tracing/transaction_id", /* snapshot */ false, {}); REQUIRE(value.has_value()); - uint64_t transaction_id = std::stoul(value.value()); - CHECK(transaction_id > 0); + UID transaction_id = UID::fromString(value.value()); + CHECK(transaction_id.first() > 0); + CHECK(transaction_id.second() > 0); } TEST_CASE("special-key-space custom transaction ID") { fdb::Transaction tr(db); fdb_check(tr.set_option(FDB_TR_OPTION_SPECIAL_KEY_SPACE_ENABLE_WRITES, nullptr, 0)); while (1) { - tr.set("\xff\xff/tracing/transaction_id", std::to_string(ULONG_MAX)); + UID randomTransactionID = UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()); + tr.set("\xff\xff/tracing/transaction_id", randomTransactionID.toString()); fdb::ValueFuture f1 = tr.get("\xff\xff/tracing/transaction_id", /* snapshot */ false); @@ -2046,8 +2050,8 @@ TEST_CASE("special-key-space custom transaction ID") { fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen)); REQUIRE(out_present); - uint64_t transaction_id = std::stoul(std::string(val, vallen)); - CHECK(transaction_id == ULONG_MAX); + UID transaction_id = UID::fromString(val); + CHECK(transaction_id == randomTransactionID); break; } } @@ -2074,8 +2078,9 @@ TEST_CASE("special-key-space set transaction ID after write") { fdb_check(f1.get(&out_present, (const uint8_t**)&val, &vallen)); REQUIRE(out_present); - uint64_t transaction_id = std::stoul(std::string(val, vallen)); - CHECK(transaction_id != 0); + UID transaction_id = UID::fromString(val); + CHECK(transaction_id.first() > 0); + CHECK(transaction_id.second() > 0); break; } } @@ -2140,7 +2145,9 @@ TEST_CASE("special-key-space tracing get range") { CHECK(out_count == 2); CHECK(std::string((char*)out_kv[1].key, out_kv[1].key_length) == tracingBegin + "transaction_id"); - CHECK(std::stoul(std::string((char*)out_kv[1].value, out_kv[1].value_length)) > 0); + UID transaction_id = UID::fromString(std::string((char*)out_kv[1].value)); + CHECK(transaction_id.first() > 0); + CHECK(transaction_id.second() > 0); break; } } diff --git a/fdbclient/CommitProxyInterface.h b/fdbclient/CommitProxyInterface.h index 8d068926ebb..149e77521de 100644 --- a/fdbclient/CommitProxyInterface.h +++ b/fdbclient/CommitProxyInterface.h @@ -162,7 +162,7 @@ struct CommitTransactionRequest : TimedRequest { bool firstInBatch() const { return (flags & FLAG_FIRST_IN_BATCH) != 0; } Arena arena; - SpanID spanContext; + SpanContext spanContext; CommitTransactionRef transaction; ReplyPromise reply; uint32_t flags; @@ -172,8 +172,8 @@ struct CommitTransactionRequest : TimedRequest { TenantInfo tenantInfo; - CommitTransactionRequest() : CommitTransactionRequest(SpanID()) {} - CommitTransactionRequest(SpanID const& context) : spanContext(context), flags(0) {} + CommitTransactionRequest() : CommitTransactionRequest(SpanContext()) {} + CommitTransactionRequest(SpanContext const& context) : spanContext(context), flags(0) {} template void serialize(Ar& ar) { @@ -242,7 +242,7 @@ struct GetReadVersionRequest : TimedRequest { FLAG_PRIORITY_MASK = PRIORITY_SYSTEM_IMMEDIATE, }; - SpanID spanContext; + SpanContext spanContext; uint32_t transactionCount; uint32_t flags; TransactionPriority priority; @@ -255,7 +255,7 @@ struct GetReadVersionRequest : TimedRequest { Version maxVersion; // max version in the client's version vector cache GetReadVersionRequest() : transactionCount(1), flags(0), maxVersion(invalidVersion) {} - GetReadVersionRequest(SpanID spanContext, + GetReadVersionRequest(SpanContext spanContext, uint32_t transactionCount, TransactionPriority priority, Version maxVersion, @@ -325,7 +325,7 @@ struct GetKeyServerLocationsReply { struct GetKeyServerLocationsRequest { constexpr static FileIdentifier file_identifier = 9144680; Arena arena; - SpanID spanContext; + SpanContext spanContext; Optional tenant; KeyRef begin; Optional end; @@ -340,7 +340,7 @@ struct GetKeyServerLocationsRequest { Version minTenantVersion; GetKeyServerLocationsRequest() : limit(0), reverse(false), minTenantVersion(latestVersion) {} - GetKeyServerLocationsRequest(SpanID spanContext, + GetKeyServerLocationsRequest(SpanContext spanContext, Optional const& tenant, KeyRef const& begin, Optional const& end, @@ -378,12 +378,12 @@ struct GetRawCommittedVersionReply { struct GetRawCommittedVersionRequest { constexpr static FileIdentifier file_identifier = 12954034; - SpanID spanContext; + SpanContext spanContext; Optional debugID; ReplyPromise reply; Version maxVersion; // max version in the grv proxy's version vector cache - explicit GetRawCommittedVersionRequest(SpanID spanContext, + explicit GetRawCommittedVersionRequest(SpanContext spanContext, Optional const& debugID = Optional(), Version maxVersion = invalidVersion) : spanContext(spanContext), debugID(debugID), maxVersion(maxVersion) {} diff --git a/fdbclient/CommitTransaction.h b/fdbclient/CommitTransaction.h index 53c87c43bd0..91bccaf7ba1 100644 --- a/fdbclient/CommitTransaction.h +++ b/fdbclient/CommitTransaction.h @@ -24,6 +24,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbclient/Knobs.h" +#include "flow/Tracing.h" // The versioned message has wire format : -1, version, messages static const int32_t VERSION_HEADER = -1; @@ -77,6 +78,7 @@ struct MutationRef { AndV2, CompareAndClear, Reserved_For_SpanContextMessage /* See fdbserver/SpanContextMessage.h */, + Reserved_For_OTELSpanContextMessage, MAX_ATOMIC_OP }; // This is stored this way for serialization purposes. @@ -190,7 +192,7 @@ struct CommitTransactionRef { Version read_snapshot = 0; bool report_conflicting_keys = false; bool lock_aware = false; // set when metadata mutations are present - Optional spanContext; + Optional spanContext; template force_inline void serialize(Ar& ar) { diff --git a/fdbclient/DatabaseContext.h b/fdbclient/DatabaseContext.h index 532bc1a0965..11f5b1beb76 100644 --- a/fdbclient/DatabaseContext.h +++ b/fdbclient/DatabaseContext.h @@ -141,7 +141,7 @@ struct WatchParameters : public ReferenceCounted { const Version version; const TagSet tags; - const SpanID spanID; + const SpanContext spanContext; const TaskPriority taskID; const Optional debugID; const UseProvisionalProxies useProvisionalProxies; @@ -151,11 +151,11 @@ struct WatchParameters : public ReferenceCounted { Optional value, Version version, TagSet tags, - SpanID spanID, + SpanContext spanContext, TaskPriority taskID, Optional debugID, UseProvisionalProxies useProvisionalProxies) - : tenant(tenant), key(key), value(value), version(version), tags(tags), spanID(spanID), taskID(taskID), + : tenant(tenant), key(key), value(value), version(version), tags(tags), spanContext(spanContext), taskID(taskID), debugID(debugID), useProvisionalProxies(useProvisionalProxies) {} }; @@ -416,12 +416,12 @@ class DatabaseContext : public ReferenceCounted, public FastAll Optional defaultTenant; struct VersionRequest { - SpanID spanContext; + SpanContext spanContext; Promise reply; TagSet tags; Optional debugID; - VersionRequest(SpanID spanContext, TagSet tags = TagSet(), Optional debugID = Optional()) + VersionRequest(SpanContext spanContext, TagSet tags = TagSet(), Optional debugID = Optional()) : spanContext(spanContext), tags(tags), debugID(debugID) {} }; diff --git a/fdbclient/FDBTypes.h b/fdbclient/FDBTypes.h index 9683c7e27ff..9f89237b511 100644 --- a/fdbclient/FDBTypes.h +++ b/fdbclient/FDBTypes.h @@ -29,30 +29,10 @@ #include #include -#include "flow/Arena.h" #include "flow/FastRef.h" #include "flow/ProtocolVersion.h" #include "flow/flow.h" -enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 }; - -inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) { - return static_cast(static_cast>(lhs) & - static_cast>(rhs)); -} - -struct SpanContext { - UID traceID; - uint64_t spanID; - TraceFlags m_Flags; - SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {} - SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {} - SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {} - SpanContext(Arena arena, const SpanContext& span) - : traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {} - bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; } -}; - typedef int64_t Version; typedef uint64_t LogEpoch; typedef uint64_t Sequence; diff --git a/fdbclient/IClientApi.h b/fdbclient/IClientApi.h index 91ef38eeae3..e1861432a15 100644 --- a/fdbclient/IClientApi.h +++ b/fdbclient/IClientApi.h @@ -27,6 +27,7 @@ #include "fdbclient/FDBTypes.h" #include "fdbclient/Tenant.h" +#include "flow/Tracing.h" #include "flow/ThreadHelper.actor.h" struct VersionVector; @@ -96,11 +97,11 @@ class ITransaction { virtual ThreadFuture commit() = 0; virtual Version getCommittedVersion() = 0; - // @todo This API and the "getSpanID()" API may help with debugging simulation + // @todo This API and the "getSpanContext()" API may help with debugging simulation // test failures. (These APIs are not currently invoked anywhere.) Remove them // later if they are not really needed. virtual VersionVector getVersionVector() = 0; - virtual UID getSpanID() = 0; + virtual SpanContext getSpanContext() = 0; virtual ThreadFuture getApproximateSize() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) = 0; diff --git a/fdbclient/IConfigTransaction.h b/fdbclient/IConfigTransaction.h index 63e058ee4c8..8f21679e27d 100644 --- a/fdbclient/IConfigTransaction.h +++ b/fdbclient/IConfigTransaction.h @@ -45,7 +45,7 @@ class IConfigTransaction : public ISingleThreadTransaction { // Not implemented: void setVersion(Version) override { throw client_invalid_operation(); } VersionVector getVersionVector() const override { throw client_invalid_operation(); } - UID getSpanID() const override { throw client_invalid_operation(); } + SpanContext getSpanContext() const override { throw client_invalid_operation(); } Future getKey(KeySelector const& key, Snapshot snapshot = Snapshot::False) override { throw client_invalid_operation(); } diff --git a/fdbclient/ISingleThreadTransaction.h b/fdbclient/ISingleThreadTransaction.h index bb5a4913f19..19beb4e5df9 100644 --- a/fdbclient/ISingleThreadTransaction.h +++ b/fdbclient/ISingleThreadTransaction.h @@ -95,7 +95,7 @@ class ISingleThreadTransaction : public ReferenceCounted commit() = 0; virtual Version getCommittedVersion() const = 0; virtual VersionVector getVersionVector() const = 0; - virtual UID getSpanID() const = 0; + virtual SpanContext getSpanContext() const = 0; virtual int64_t getApproximateSize() const = 0; virtual Future> getVersionstamp() = 0; virtual void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) = 0; diff --git a/fdbclient/MultiVersionTransaction.actor.cpp b/fdbclient/MultiVersionTransaction.actor.cpp index fe18292dde3..e281887e110 100644 --- a/fdbclient/MultiVersionTransaction.actor.cpp +++ b/fdbclient/MultiVersionTransaction.actor.cpp @@ -1105,13 +1105,13 @@ VersionVector MultiVersionTransaction::getVersionVector() { return VersionVector(); } -UID MultiVersionTransaction::getSpanID() { +SpanContext MultiVersionTransaction::getSpanContext() { auto tr = getTransaction(); if (tr.transaction) { - return tr.transaction->getSpanID(); + return tr.transaction->getSpanContext(); } - return UID(); + return SpanContext(); } ThreadFuture MultiVersionTransaction::getApproximateSize() { diff --git a/fdbclient/MultiVersionTransaction.h b/fdbclient/MultiVersionTransaction.h index b9d7a206591..1fb5c604ff3 100644 --- a/fdbclient/MultiVersionTransaction.h +++ b/fdbclient/MultiVersionTransaction.h @@ -378,7 +378,7 @@ class DLTransaction : public ITransaction, ThreadSafeReferenceCounted commit() override; Version getCommittedVersion() override; VersionVector getVersionVector() override; - UID getSpanID() override { return UID(); }; + SpanContext getSpanContext() override { return SpanContext(); }; ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; @@ -567,7 +567,7 @@ class MultiVersionTransaction : public ITransaction, ThreadSafeReferenceCounted< ThreadFuture commit() override; Version getCommittedVersion() override; VersionVector getVersionVector() override; - UID getSpanID() override; + SpanContext getSpanContext() override; ThreadFuture getApproximateSize() override; void setOption(FDBTransactionOptions::Option option, Optional value = Optional()) override; diff --git a/fdbclient/NativeAPI.actor.cpp b/fdbclient/NativeAPI.actor.cpp index ec8409ac91e..c57ed97d3c6 100644 --- a/fdbclient/NativeAPI.actor.cpp +++ b/fdbclient/NativeAPI.actor.cpp @@ -21,6 +21,7 @@ #include "fdbclient/NativeAPI.actor.h" #include +#include #include #include #include @@ -848,7 +849,9 @@ ACTOR Future assertFailure(GrvProxyInterface remote, Future attemptGRVFromOldProxies(std::vector oldProxies, std::vector newProxies) { - Span span(deterministicRandom()->randomUniqueID(), "VerifyCausalReadRisky"_loc); + auto debugID = nondeterministicRandom()->randomUniqueID(); + g_traceBatch.addEvent("AttemptGRVFromOldProxyDebug", debugID.first(), "NativeAPI.attemptGRVFromOldProxies.Start"); + Span span("VerifyCausalReadRisky"_loc); std::vector> replies; replies.reserve(oldProxies.size()); GetReadVersionRequest req( @@ -2789,13 +2792,13 @@ void updateTagMappings(Database cx, const GetKeyServerLocationsReply& reply) { ACTOR Future getKeyLocation_internal(Database cx, Optional tenant, Key key, - SpanID spanID, + SpanContext spanContext, Optional debugID, UseProvisionalProxies useProvisionalProxies, Reverse isBackward, Version version) { - state Span span("NAPI:getKeyLocation"_loc, spanID); + state Span span("NAPI:getKeyLocation"_loc, spanContext); if (isBackward) { ASSERT(key != allKeys.begin && key <= allKeys.end); } else { @@ -2883,7 +2886,7 @@ Future getKeyLocation(Database const& cx, Optional const& tenant, Key const& key, F StorageServerInterface::*member, - SpanID spanID, + SpanContext spanContext, Optional debugID, UseProvisionalProxies useProvisionalProxies, Reverse isBackward, @@ -2891,7 +2894,8 @@ Future getKeyLocation(Database const& cx, // we first check whether this range is cached Optional locationInfo = cx->getCachedLocation(tenant, key, isBackward); if (!locationInfo.present()) { - return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version); + return getKeyLocation_internal( + cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version); } bool onlyEndpointFailedAndNeedRefresh = false; @@ -2905,7 +2909,8 @@ Future getKeyLocation(Database const& cx, cx->invalidateCache(locationInfo.get().tenantEntry.prefix, key); // Refresh the cache with a new getKeyLocations made to proxies. - return getKeyLocation_internal(cx, tenant, key, spanID, debugID, useProvisionalProxies, isBackward, version); + return getKeyLocation_internal( + cx, tenant, key, spanContext, debugID, useProvisionalProxies, isBackward, version); } return locationInfo.get(); @@ -2922,7 +2927,7 @@ Future getKeyLocation(Reference trState, useTenant ? trState->tenant() : Optional(), key, member, - trState->spanID, + trState->spanContext, trState->debugID, trState->useProvisionalProxies, isBackward, @@ -2944,11 +2949,11 @@ ACTOR Future> getKeyRangeLocations_internal( KeyRange keys, int limit, Reverse reverse, - SpanID spanID, + SpanContext spanContext, Optional debugID, UseProvisionalProxies useProvisionalProxies, Version version) { - state Span span("NAPI:getKeyRangeLocations"_loc, spanID); + state Span span("NAPI:getKeyRangeLocations"_loc, spanContext); if (debugID.present()) g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getKeyLocations.Before"); @@ -3018,7 +3023,7 @@ Future> getKeyRangeLocations(Database const& c int limit, Reverse reverse, F StorageServerInterface::*member, - SpanID const& spanID, + SpanContext const& spanContext, Optional const& debugID, UseProvisionalProxies useProvisionalProxies, Version version) { @@ -3028,7 +3033,7 @@ Future> getKeyRangeLocations(Database const& c std::vector locations; if (!cx->getCachedLocations(tenant, keys, locations, limit, reverse)) { return getKeyRangeLocations_internal( - cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version); + cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version); } bool foundFailed = false; @@ -3049,7 +3054,7 @@ Future> getKeyRangeLocations(Database const& c if (foundFailed) { // Refresh the cache with a new getKeyRangeLocations made to proxies. return getKeyRangeLocations_internal( - cx, tenant, keys, limit, reverse, spanID, debugID, useProvisionalProxies, version); + cx, tenant, keys, limit, reverse, spanContext, debugID, useProvisionalProxies, version); } return locations; @@ -3069,7 +3074,7 @@ Future> getKeyRangeLocations(ReferencespanID, + trState->spanContext, trState->debugID, trState->useProvisionalProxies, version); @@ -3098,7 +3103,7 @@ ACTOR Future warmRange_impl(Reference trState, KeyRange keys, CLIENT_KNOBS->WARM_RANGE_SHARD_LIMIT, Reverse::False, - trState->spanID, + trState->spanContext, trState->debugID, trState->useProvisionalProxies, version)); @@ -3129,38 +3134,35 @@ ACTOR Future warmRange_impl(Reference trState, KeyRange return Void(); } -SpanID generateSpanID(bool transactionTracingSample, SpanID parentContext = SpanID()) { - uint64_t txnId = deterministicRandom()->randomUInt64(); +SpanContext generateSpanID(bool transactionTracingSample, SpanContext parentContext = SpanContext()) { if (parentContext.isValid()) { - if (parentContext.first() > 0) { - txnId = parentContext.first(); - } - uint64_t tokenId = parentContext.second() > 0 ? deterministicRandom()->randomUInt64() : 0; - return SpanID(txnId, tokenId); - } else if (transactionTracingSample) { - uint64_t tokenId = deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE - ? deterministicRandom()->randomUInt64() - : 0; - return SpanID(txnId, tokenId); - } else { - return SpanID(txnId, 0); + return SpanContext(parentContext.traceID, deterministicRandom()->randomUInt64(), parentContext.m_Flags); + } + if (transactionTracingSample) { + return SpanContext(deterministicRandom()->randomUniqueID(), + deterministicRandom()->randomUInt64(), + deterministicRandom()->random01() <= FLOW_KNOBS->TRACING_SAMPLE_RATE + ? TraceFlags::sampled + : TraceFlags::unsampled); } + return SpanContext( + deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64(), TraceFlags::unsampled); } TransactionState::TransactionState(Database cx, Optional tenant, TaskPriority taskID, - SpanID spanID, + SpanContext spanContext, Reference trLogInfo) - : cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanID(spanID), readVersionObtainedFromGrvProxy(true), - tenant_(tenant), tenantSet(tenant.present()) {} + : cx(cx), trLogInfo(trLogInfo), options(cx), taskID(taskID), spanContext(spanContext), + readVersionObtainedFromGrvProxy(true), tenant_(tenant), tenantSet(tenant.present()) {} Reference TransactionState::cloneAndReset(Reference newTrLogInfo, bool generateNewSpan) const { - SpanID newSpanID = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanID; + SpanContext newSpanContext = generateNewSpan ? generateSpanID(cx->transactionTracingSample) : spanContext; Reference newState = - makeReference(cx, tenant_, cx->taskID, newSpanID, newTrLogInfo); + makeReference(cx, tenant_, cx->taskID, newSpanContext, newTrLogInfo); if (!cx->apiVersionAtLeast(16)) { newState->options = options; @@ -3218,12 +3220,12 @@ ACTOR Future> getValue(Reference trState, UseTenant useTenant, TransactionRecordLogInfo recordLogInfo) { state Version ver = wait(version); - state Span span("NAPI:getValue"_loc, trState->spanID); + state Span span("NAPI:getValue"_loc, trState->spanContext); if (useTenant && trState->tenant().present()) { - span.addTag("tenant"_sr, trState->tenant().get()); + span.addAttribute("tenant"_sr, trState->tenant().get()); } - span.addTag("key"_sr, key); + span.addAttribute("key"_sr, key); trState->cx->validateVersion(ver); loop { @@ -3349,7 +3351,7 @@ ACTOR Future getKey(Reference trState, wait(success(version)); state Optional getKeyID = Optional(); - state Span span("NAPI:getKey"_loc, trState->spanID); + state Span span("NAPI:getKey"_loc, trState->spanContext); if (trState->debugID.present()) { getKeyID = nondeterministicRandom()->randomUniqueID(); @@ -3448,8 +3450,8 @@ ACTOR Future getKey(Reference trState, } } -ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanID spanContext) { - state Span span("NAPI:waitForCommittedVersion"_loc, { spanContext }); +ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanContext spanContext) { + state Span span("NAPI:waitForCommittedVersion"_loc, spanContext); try { loop { choose { @@ -3483,14 +3485,14 @@ ACTOR Future waitForCommittedVersion(Database cx, Version version, Span } ACTOR Future getRawVersion(Reference trState) { - state Span span("NAPI:getRawVersion"_loc, { trState->spanID }); + state Span span("NAPI:getRawVersion"_loc, trState->spanContext); loop { choose { when(wait(trState->cx->onProxiesChanged())) {} when(GetReadVersionReply v = wait(basicLoadBalance(trState->cx->getGrvProxies(UseProvisionalProxies::False), &GrvProxyInterface::getConsistentReadVersion, - GetReadVersionRequest(trState->spanID, + GetReadVersionRequest(trState->spanContext, 0, TransactionPriority::IMMEDIATE, trState->cx->ssVersionVectorCache.getMaxVersion()), @@ -3512,7 +3514,7 @@ ACTOR Future readVersionBatcher( uint32_t flags); ACTOR Future watchValue(Database cx, Reference parameters) { - state Span span("NAPI:watchValue"_loc, parameters->spanID); + state Span span("NAPI:watchValue"_loc, parameters->spanContext); state Version ver = parameters->version; cx->validateVersion(parameters->version); ASSERT(parameters->version != latestVersion); @@ -3522,7 +3524,7 @@ ACTOR Future watchValue(Database cx, Reference p parameters->tenant.name, parameters->key, &StorageServerInterface::watchValue, - parameters->spanID, + parameters->spanContext, parameters->debugID, parameters->useProvisionalProxies, Reverse::False, @@ -3741,15 +3743,15 @@ ACTOR Future watchValueMap(Future version, Optional value, Database cx, TagSet tags, - SpanID spanID, + SpanContext spanContext, TaskPriority taskID, Optional debugID, UseProvisionalProxies useProvisionalProxies) { state Version ver = wait(version); - wait(getWatchFuture( - cx, - makeReference(tenant, key, value, ver, tags, spanID, taskID, debugID, useProvisionalProxies))); + wait(getWatchFuture(cx, + makeReference( + tenant, key, value, ver, tags, spanContext, taskID, debugID, useProvisionalProxies))); return Void(); } @@ -3795,10 +3797,11 @@ Future getExactRange(Reference trState, Reverse reverse, UseTenant useTenant) { state RangeResultFamily output; - state Span span("NAPI:getExactRange"_loc, trState->spanID); + // TODO - ljoswiak parent or link? + state Span span("NAPI:getExactRange"_loc, trState->spanContext); if (useTenant && trState->tenant().present()) { - span.addTag("tenant"_sr, trState->tenant().get()); + span.addAttribute("tenant"_sr, trState->tenant().get()); } // printf("getExactRange( '%s', '%s' )\n", keys.begin.toString().c_str(), keys.end.toString().c_str()); @@ -4155,9 +4158,9 @@ Future getRange(Reference trState, state KeySelector originalBegin = begin; state KeySelector originalEnd = end; state RangeResultFamily output; - state Span span("NAPI:getRange"_loc, trState->spanID); + state Span span("NAPI:getRange"_loc, trState->spanContext); if (useTenant && trState->tenant().present()) { - span.addTag("tenant"_sr, trState->tenant().get()); + span.addAttribute("tenant"_sr, trState->tenant().get()); } try { @@ -4631,7 +4634,7 @@ ACTOR Future getRangeStreamFragment(Reference trState, GetRangeLimits limits, Snapshot snapshot, Reverse reverse, - SpanID spanContext) { + SpanContext spanContext) { loop { state std::vector locations = wait(getKeyRangeLocations(trState, @@ -4924,7 +4927,7 @@ ACTOR Future getRangeStream(Reference trState, // FIXME: better handling to disable row limits ASSERT(!limits.hasRowLimit()); - state Span span("NAPI:getRangeStream"_loc, trState->spanID); + state Span span("NAPI:getRangeStream"_loc, trState->spanContext); state Version version = wait(fVersion); trState->cx->validateVersion(version); @@ -5047,7 +5050,7 @@ Transaction::Transaction(Database const& cx, Optional const& tenant) cx->taskID, generateSpanID(cx->transactionTracingSample), createTrLogInfoProbabilistically(cx))), - span(trState->spanID, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanID) { + span(trState->spanContext, "Transaction"_loc), backoff(CLIENT_KNOBS->DEFAULT_BACKOFF), tr(trState->spanContext) { if (DatabaseContext::debugUseTags) { debugAddTags(trState); } @@ -5182,7 +5185,7 @@ ACTOR Future watch(Reference watch, Database cx, Future tenant, TagSet tags, - SpanID spanID, + SpanContext spanContext, TaskPriority taskID, Optional debugID, UseProvisionalProxies useProvisionalProxies) { @@ -5210,7 +5213,7 @@ ACTOR Future watch(Reference watch, watch->value, cx, tags, - spanID, + spanContext, taskID, debugID, useProvisionalProxies); @@ -5243,7 +5246,7 @@ Future Transaction::watch(Reference watch) { populateAndGetTenant( trState, watch->key, readVersion.isValid() && readVersion.isReady() ? readVersion.get() : latestVersion), trState->options.readTags, - trState->spanID, + trState->spanContext, trState->taskID, trState->debugID, trState->useProvisionalProxies); @@ -5716,7 +5719,7 @@ void TransactionOptions::reset(Database const& cx) { void Transaction::resetImpl(bool generateNewSpan) { flushTrLogsIfEnabled(); trState = trState->cloneAndReset(createTrLogInfoProbabilistically(trState->cx), generateNewSpan); - tr = CommitTransactionRequest(trState->spanID); + tr = CommitTransactionRequest(trState->spanContext); readVersion = Future(); metadataVersion = Promise>(); extraConflictRanges.clear(); @@ -5731,7 +5734,7 @@ void Transaction::reset() { void Transaction::fullReset() { resetImpl(true); - span = Span(trState->spanID, "Transaction"_loc); + span = Span(trState->spanContext, "Transaction"_loc); backoff = CLIENT_KNOBS->DEFAULT_BACKOFF; } @@ -5852,8 +5855,8 @@ ACTOR void checkWrites(Reference trState, ACTOR static Future commitDummyTransaction(Reference trState, KeyRange range) { state Transaction tr(trState->cx); state int retries = 0; - state Span span("NAPI:dummyTransaction"_loc, trState->spanID); - tr.span.addParent(span.context); + state Span span("NAPI:dummyTransaction"_loc, trState->spanContext); + tr.span.setParent(span.context); loop { try { TraceEvent("CommitDummyTransaction").detail("Key", range.begin).detail("Retries", retries); @@ -5896,7 +5899,7 @@ void Transaction::setupWatches() { watches[i]->value, trState->cx, trState->options.readTags, - trState->spanID, + trState->spanContext, trState->taskID, trState->debugID, trState->useProvisionalProxies)); @@ -6019,7 +6022,7 @@ ACTOR static Future tryCommit(Reference trState, Future readVersion) { state TraceInterval interval("TransactionCommit"); state double startTime = now(); - state Span span("NAPI:tryCommit"_loc, trState->spanID); + state Span span("NAPI:tryCommit"_loc, trState->spanContext); state Optional debugID = trState->debugID; if (debugID.present()) { TraceEvent(interval.begin()).detail("Parent", debugID.get()); @@ -6509,10 +6512,11 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional(value.get(), Unversioned())); + TEST(true); // Adding link in FDBTransactionOptions::SPAN_PARENT + span.setParent(BinaryReader::fromStringRef(value.get(), IncludeVersion())); break; case FDBTransactionOptions::REPORT_CONFLICTING_KEYS: @@ -6555,7 +6559,7 @@ void Transaction::setOption(FDBTransactionOptions::Option option, Optional getConsistentReadVersion(SpanID parentSpan, +ACTOR Future getConsistentReadVersion(SpanContext parentSpan, DatabaseContext* cx, uint32_t transactionCount, TransactionPriority priority, @@ -6670,7 +6674,7 @@ ACTOR Future readVersionBatcher(DatabaseContext* cx, } g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), debugID.get().first()); } - span.addParent(req.spanContext); + span.addLink(req.spanContext); requests.push_back(req.reply); for (auto tag : req.tags) { ++tags[tag]; @@ -6726,10 +6730,10 @@ ACTOR Future readVersionBatcher(DatabaseContext* cx, ACTOR Future extractReadVersion(Reference trState, Location location, - SpanID spanContext, + SpanContext spanContext, Future f, Promise> metadataVersion) { - state Span span(spanContext, location, { trState->spanID }); + state Span span(spanContext, location, trState->spanContext); GetReadVersionReply rep = wait(f); double replyTime = now(); double latency = replyTime - trState->startTime; @@ -6902,7 +6906,7 @@ Future Transaction::getReadVersion(uint32_t flags) { } Location location = "NAPI:getReadVersion"_loc; - UID spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanID); + SpanContext spanContext = generateSpanID(trState->cx->transactionTracingSample, trState->spanContext); auto const req = DatabaseContext::VersionRequest(spanContext, trState->options.tags, trState->debugID); batcher.stream.send(req); trState->startTime = now(); @@ -7392,7 +7396,7 @@ ACTOR Future>> getRangeSplitPoints(ReferencespanID); + state Span span("NAPI:GetRangeSplitPoints"_loc, trState->spanContext); loop { state std::vector locations = @@ -7956,14 +7960,14 @@ Reference Transaction::createTrLogInfoProbabilistically(cons return Reference(); } -void Transaction::setTransactionID(uint64_t id) { +void Transaction::setTransactionID(UID id) { ASSERT(getSize() == 0); - trState->spanID = SpanID(id, trState->spanID.second()); + trState->spanContext = SpanContext(id, trState->spanContext.spanID); } void Transaction::setToken(uint64_t token) { ASSERT(getSize() == 0); - trState->spanID = SpanID(trState->spanID.first(), token); + trState->spanContext = SpanContext(trState->spanContext.traceID, token); } void enableClientInfoLogging() { diff --git a/fdbclient/NativeAPI.actor.h b/fdbclient/NativeAPI.actor.h index 988e64aaf18..fe4d578e771 100644 --- a/fdbclient/NativeAPI.actor.h +++ b/fdbclient/NativeAPI.actor.h @@ -243,7 +243,7 @@ struct TransactionState : ReferenceCounted { Optional debugID; TaskPriority taskID; - SpanID spanID; + SpanContext spanContext; UseProvisionalProxies useProvisionalProxies = UseProvisionalProxies::False; bool readVersionObtainedFromGrvProxy; @@ -259,13 +259,14 @@ struct TransactionState : ReferenceCounted { std::shared_ptr> conflictingKeys; // Only available so that Transaction can have a default constructor, for use in state variables - TransactionState(TaskPriority taskID, SpanID spanID) : taskID(taskID), spanID(spanID), tenantSet(false) {} + TransactionState(TaskPriority taskID, SpanContext spanContext) + : taskID(taskID), spanContext(spanContext), tenantSet(false) {} // VERSION_VECTOR changed default values of readVersionObtainedFromGrvProxy TransactionState(Database cx, Optional tenant, TaskPriority taskID, - SpanID spanID, + SpanContext spanContext, Reference trLogInfo); Reference cloneAndReset(Reference newTrLogInfo, bool generateNewSpan) const; @@ -435,7 +436,7 @@ class Transaction : NonCopyable { void debugTransaction(UID dID) { trState->debugID = dID; } VersionVector getVersionVector() const; - UID getSpanID() const { return trState->spanID; } + SpanContext getSpanContext() const { return trState->spanContext; } Future commitMutations(); void setupWatches(); @@ -447,7 +448,7 @@ class Transaction : NonCopyable { Database getDatabase() const { return trState->cx; } static Reference createTrLogInfoProbabilistically(const Database& cx); - void setTransactionID(uint64_t id); + void setTransactionID(UID id); void setToken(uint64_t token); const std::vector>>& getExtraReadConflictRanges() const { return extraConflictRanges; } @@ -490,7 +491,7 @@ class Transaction : NonCopyable { Future committing; }; -ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanID spanContext); +ACTOR Future waitForCommittedVersion(Database cx, Version version, SpanContext spanContext); ACTOR Future>> waitDataDistributionMetricsList(Database cx, KeyRange keys, int shardLimit); diff --git a/fdbclient/ReadYourWrites.actor.cpp b/fdbclient/ReadYourWrites.actor.cpp index c651adad32a..6c3fe880c19 100644 --- a/fdbclient/ReadYourWrites.actor.cpp +++ b/fdbclient/ReadYourWrites.actor.cpp @@ -1979,7 +1979,7 @@ void ReadYourWritesTransaction::getWriteConflicts(KeyRangeMap* result) { } } -void ReadYourWritesTransaction::setTransactionID(uint64_t id) { +void ReadYourWritesTransaction::setTransactionID(UID id) { tr.setTransactionID(id); } diff --git a/fdbclient/ReadYourWrites.h b/fdbclient/ReadYourWrites.h index 341dc4e2a1d..e67b5334f71 100644 --- a/fdbclient/ReadYourWrites.h +++ b/fdbclient/ReadYourWrites.h @@ -140,7 +140,7 @@ class ReadYourWritesTransaction final : NonCopyable, [[nodiscard]] Future commit() override; Version getCommittedVersion() const override { return tr.getCommittedVersion(); } VersionVector getVersionVector() const override { return tr.getVersionVector(); } - UID getSpanID() const override { return tr.getSpanID(); } + SpanContext getSpanContext() const override { return tr.getSpanContext(); } int64_t getApproximateSize() const override { return approximateSize; } [[nodiscard]] Future> getVersionstamp() override; @@ -177,7 +177,7 @@ class ReadYourWritesTransaction final : NonCopyable, Reference getTransactionState() const { return tr.trState; } - void setTransactionID(uint64_t id); + void setTransactionID(UID id); void setToken(uint64_t token); // Read from the special key space readConflictRangeKeysRange diff --git a/fdbclient/SpecialKeySpace.actor.cpp b/fdbclient/SpecialKeySpace.actor.cpp index f018c0fc2b8..e24c1829ec9 100644 --- a/fdbclient/SpecialKeySpace.actor.cpp +++ b/fdbclient/SpecialKeySpace.actor.cpp @@ -1595,10 +1595,10 @@ Future TracingOptionsImpl::getRange(ReadYourWritesTransaction* ryw, if (key.endsWith(kTracingTransactionIdKey)) { result.push_back_deep(result.arena(), - KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.first()))); + KeyValueRef(key, ryw->getTransactionState()->spanContext.traceID.toString())); } else if (key.endsWith(kTracingTokenKey)) { result.push_back_deep(result.arena(), - KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanID.second()))); + KeyValueRef(key, std::to_string(ryw->getTransactionState()->spanContext.spanID))); } } return result; @@ -1612,7 +1612,7 @@ void TracingOptionsImpl::set(ReadYourWritesTransaction* ryw, const KeyRef& key, } if (key.endsWith(kTracingTransactionIdKey)) { - ryw->setTransactionID(std::stoul(value.toString())); + ryw->setTransactionID(UID::fromString(value.toString())); } else if (key.endsWith(kTracingTokenKey)) { if (value.toString() == "true") { ryw->setToken(deterministicRandom()->randomUInt64()); diff --git a/fdbclient/StorageServerInterface.h b/fdbclient/StorageServerInterface.h index 13ba8f1e182..cda6a32b66d 100644 --- a/fdbclient/StorageServerInterface.h +++ b/fdbclient/StorageServerInterface.h @@ -35,6 +35,7 @@ #include "fdbclient/CommitTransaction.h" #include "fdbclient/TagThrottle.actor.h" #include "fdbclient/Tenant.h" +#include "flow/Tracing.h" #include "flow/UnitTest.h" #include "fdbclient/VersionVector.h" @@ -271,7 +272,7 @@ struct GetValueReply : public LoadBalancedReply { struct GetValueRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 8454530; - SpanID spanContext; + SpanContext spanContext; TenantInfo tenantInfo; Key key; Version version; @@ -283,7 +284,7 @@ struct GetValueRequest : TimedRequest { // serve the given key GetValueRequest() {} - GetValueRequest(SpanID spanContext, + GetValueRequest(SpanContext spanContext, const TenantInfo& tenantInfo, const Key& key, Version ver, @@ -315,7 +316,7 @@ struct WatchValueReply { struct WatchValueRequest { constexpr static FileIdentifier file_identifier = 14747733; - SpanID spanContext; + SpanContext spanContext; TenantInfo tenantInfo; Key key; Optional value; @@ -326,7 +327,7 @@ struct WatchValueRequest { WatchValueRequest() {} - WatchValueRequest(SpanID spanContext, + WatchValueRequest(SpanContext spanContext, TenantInfo tenantInfo, const Key& key, Optional value, @@ -360,7 +361,7 @@ struct GetKeyValuesReply : public LoadBalancedReply { struct GetKeyValuesRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanID spanContext; + SpanContext spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef begin, end; @@ -418,7 +419,7 @@ struct GetMappedKeyValuesReply : public LoadBalancedReply { struct GetMappedKeyValuesRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 6795747; - SpanID spanContext; + SpanContext spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef begin, end; @@ -483,7 +484,7 @@ struct GetKeyValuesStreamReply : public ReplyPromiseStreamReply { struct GetKeyValuesStreamRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanID spanContext; + SpanContext spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef begin, end; @@ -534,7 +535,7 @@ struct GetKeyReply : public LoadBalancedReply { struct GetKeyRequest : TimedRequest { constexpr static FileIdentifier file_identifier = 10457870; - SpanID spanContext; + SpanContext spanContext; Arena arena; TenantInfo tenantInfo; KeySelectorRef sel; @@ -548,7 +549,7 @@ struct GetKeyRequest : TimedRequest { GetKeyRequest() {} - GetKeyRequest(SpanID spanContext, + GetKeyRequest(SpanContext spanContext, TenantInfo tenantInfo, KeySelectorRef const& sel, Version version, @@ -835,7 +836,7 @@ struct ChangeFeedStreamReply : public ReplyPromiseStreamReply { struct ChangeFeedStreamRequest { constexpr static FileIdentifier file_identifier = 6795746; - SpanID spanContext; + SpanContext spanContext; Arena arena; Key rangeID; Version begin = 0; diff --git a/fdbclient/ThreadSafeTransaction.cpp b/fdbclient/ThreadSafeTransaction.cpp index 84ab49504bb..c796f025369 100644 --- a/fdbclient/ThreadSafeTransaction.cpp +++ b/fdbclient/ThreadSafeTransaction.cpp @@ -465,8 +465,8 @@ VersionVector ThreadSafeTransaction::getVersionVector() { return tr->getVersionVector(); } -UID ThreadSafeTransaction::getSpanID() { - return tr->getSpanID(); +SpanContext ThreadSafeTransaction::getSpanContext() { + return tr->getSpanContext(); } ThreadFuture ThreadSafeTransaction::getApproximateSize() { diff --git a/fdbclient/ThreadSafeTransaction.h b/fdbclient/ThreadSafeTransaction.h index 0ace0a2cfe1..a187bb2f459 100644 --- a/fdbclient/ThreadSafeTransaction.h +++ b/fdbclient/ThreadSafeTransaction.h @@ -167,7 +167,7 @@ class ThreadSafeTransaction : public ITransaction, ThreadSafeReferenceCounted commit() override; Version getCommittedVersion() override; VersionVector getVersionVector() override; - UID getSpanID() override; + SpanContext getSpanContext() override; ThreadFuture getApproximateSize() override; ThreadFuture getProtocolVersion(); diff --git a/fdbclient/TransactionLineage.h b/fdbclient/TransactionLineage.h index 6eed26b805e..04492db4ba5 100644 --- a/fdbclient/TransactionLineage.h +++ b/fdbclient/TransactionLineage.h @@ -34,10 +34,13 @@ struct TransactionLineage : LineageProperties { GetKeyServersLocations }; static constexpr std::string_view name = "Transaction"sv; - uint64_t txID; + UID txID; Operation operation = Operation::Unset; bool isSet(uint64_t TransactionLineage::*member) const { return this->*member > 0; } + bool isSet(UID TransactionLineage::*member) const { + return static_cast(this->*member).first() > 0 && static_cast(this->*member).second() > 0; + } bool isSet(Operation TransactionLineage::*member) const { return this->*member != Operation::Unset; } }; diff --git a/fdbrpc/FlowTests.actor.cpp b/fdbrpc/FlowTests.actor.cpp index 5c592ae0cb0..a79d13dd025 100644 --- a/fdbrpc/FlowTests.actor.cpp +++ b/fdbrpc/FlowTests.actor.cpp @@ -20,6 +20,7 @@ // Unit tests for the flow language and libraries +#include "flow/Arena.h" #include "flow/ProtocolVersion.h" #include "flow/UnitTest.h" #include "flow/DeterministicRandom.h" diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index 898f4f92043..ad737d3be47 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -19,6 +19,7 @@ */ #include "fdbrpc/FlowTransport.h" +#include "flow/Arena.h" #include "flow/network.h" #include @@ -278,6 +279,33 @@ struct UnauthorizedEndpointReceiver final : NetworkMessageReceiver { bool isPublic() const override { return true; } }; +// NetworkAddressCachedString retains a cached Standalone of +// a NetworkAddressList.address.toString() value. This cached value is useful +// for features in the hot path (i.e. Tracing), which need the String formatted value +// frequently and do not wish to pay the formatting cost. If the underlying NetworkAddressList +// needs to change, do not attempt to update it directly, use the setNetworkAddress API as it +// will ensure the new toString() cached value is updated. +class NetworkAddressCachedString { +public: + NetworkAddressCachedString() { setAddressList(NetworkAddressList()); } + NetworkAddressCachedString(NetworkAddressList const& list) { setAddressList(list); } + NetworkAddressList const& getAddressList() const { return addressList; } + void setAddressList(NetworkAddressList const& list) { + cachedStr = Standalone(StringRef(list.address.toString())); + addressList = list; + } + void setNetworkAddress(NetworkAddress const& addr) { + addressList.address = addr; + setAddressList(addressList); // force the recaching of the string. + } + Standalone getLocalAddressAsString() const { return cachedStr; } + operator NetworkAddressList const&() { return addressList; } + +private: + NetworkAddressList addressList; + Standalone cachedStr; +}; + class TransportData { public: TransportData(uint64_t transportId, int maxWellKnownEndpoints, IPAllowList const* allowList); @@ -299,7 +327,7 @@ class TransportData { // Returns true if given network address 'address' is one of the address we are listening on. bool isLocalAddress(const NetworkAddress& address) const; - NetworkAddressList localAddresses; + NetworkAddressCachedString localAddresses; std::vector> listeners; std::unordered_map> peers; std::unordered_map> closedPeers; @@ -877,12 +905,12 @@ void Peer::send(PacketBuffer* pb, ReliablePacket* rp, bool firstUnsent) { void Peer::prependConnectPacket() { // Send the ConnectPacket expected at the beginning of a new connection ConnectPacket pkt; - if (transport->localAddresses.address.isTLS() == destination.isTLS()) { - pkt.canonicalRemotePort = transport->localAddresses.address.port; - pkt.setCanonicalRemoteIp(transport->localAddresses.address.ip); - } else if (transport->localAddresses.secondaryAddress.present()) { - pkt.canonicalRemotePort = transport->localAddresses.secondaryAddress.get().port; - pkt.setCanonicalRemoteIp(transport->localAddresses.secondaryAddress.get().ip); + if (transport->localAddresses.getAddressList().address.isTLS() == destination.isTLS()) { + pkt.canonicalRemotePort = transport->localAddresses.getAddressList().address.port; + pkt.setCanonicalRemoteIp(transport->localAddresses.getAddressList().address.ip); + } else if (transport->localAddresses.getAddressList().secondaryAddress.present()) { + pkt.canonicalRemotePort = transport->localAddresses.getAddressList().secondaryAddress.get().port; + pkt.setCanonicalRemoteIp(transport->localAddresses.getAddressList().secondaryAddress.get().ip); } else { // a "mixed" TLS/non-TLS connection is like a client/server connection - there's no way to reverse it pkt.canonicalRemotePort = 0; @@ -919,10 +947,10 @@ void Peer::onIncomingConnection(Reference self, Reference con ++self->connectIncomingCount; if (!destination.isPublic() && !outgoingConnectionIdle) throw address_in_use(); - NetworkAddress compatibleAddr = transport->localAddresses.address; - if (transport->localAddresses.secondaryAddress.present() && - transport->localAddresses.secondaryAddress.get().isTLS() == destination.isTLS()) { - compatibleAddr = transport->localAddresses.secondaryAddress.get(); + NetworkAddress compatibleAddr = transport->localAddresses.getAddressList().address; + if (transport->localAddresses.getAddressList().secondaryAddress.present() && + transport->localAddresses.getAddressList().secondaryAddress.get().isTLS() == destination.isTLS()) { + compatibleAddr = transport->localAddresses.getAddressList().secondaryAddress.get(); } if (!destination.isPublic() || outgoingConnectionIdle || destination > compatibleAddr || @@ -1455,10 +1483,10 @@ ACTOR static Future listen(TransportData* self, NetworkAddress listenAddr) state ActorCollectionNoErrors incoming; // Actors monitoring incoming connections that haven't yet been associated with a peer state Reference listener = INetworkConnections::net()->listen(listenAddr); - if (!g_network->isSimulated() && self->localAddresses.address.port == 0) { + if (!g_network->isSimulated() && self->localAddresses.getAddressList().address.port == 0) { TraceEvent(SevInfo, "UpdatingListenAddress") .detail("AssignedListenAddress", listener->getListenAddress().toString()); - self->localAddresses.address = listener->getListenAddress(); + self->localAddresses.setNetworkAddress(listener->getListenAddress()); } state uint64_t connectionCount = 0; try { @@ -1507,8 +1535,9 @@ Reference TransportData::getOrOpenPeer(NetworkAddress const& address, bool } bool TransportData::isLocalAddress(const NetworkAddress& address) const { - return address == localAddresses.address || - (localAddresses.secondaryAddress.present() && address == localAddresses.secondaryAddress.get()); + return address == localAddresses.getAddressList().address || + (localAddresses.getAddressList().secondaryAddress.present() && + address == localAddresses.getAddressList().secondaryAddress.get()); } ACTOR static Future multiVersionCleanupWorker(TransportData* self) { @@ -1554,15 +1583,21 @@ void FlowTransport::initMetrics() { } NetworkAddressList FlowTransport::getLocalAddresses() const { - return self->localAddresses; + return self->localAddresses.getAddressList(); } NetworkAddress FlowTransport::getLocalAddress() const { - return self->localAddresses.address; + return self->localAddresses.getAddressList().address; +} + +Standalone FlowTransport::getLocalAddressAsString() const { + return self->localAddresses.getLocalAddressAsString(); } void FlowTransport::setLocalAddress(NetworkAddress const& address) { - self->localAddresses.address = address; + auto newAddress = self->localAddresses.getAddressList(); + newAddress.address = address; + self->localAddresses.setAddressList(newAddress); } const std::unordered_map>& FlowTransport::getAllPeers() const { @@ -1586,11 +1621,14 @@ Future FlowTransport::onIncompatibleChanged() { Future FlowTransport::bind(NetworkAddress publicAddress, NetworkAddress listenAddress) { ASSERT(publicAddress.isPublic()); - if (self->localAddresses.address == NetworkAddress()) { - self->localAddresses.address = publicAddress; + if (self->localAddresses.getAddressList().address == NetworkAddress()) { + self->localAddresses.setNetworkAddress(publicAddress); } else { - self->localAddresses.secondaryAddress = publicAddress; + auto addrList = self->localAddresses.getAddressList(); + addrList.secondaryAddress = publicAddress; + self->localAddresses.setAddressList(addrList); } + // reformatLocalAddress() TraceEvent("Binding").detail("PublicAddress", publicAddress).detail("ListenAddress", listenAddress); Future listenF = listen(self, listenAddress); @@ -1641,7 +1679,7 @@ void FlowTransport::removePeerReference(const Endpoint& endpoint, bool isStream) void FlowTransport::addEndpoint(Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID) { endpoint.token = deterministicRandom()->randomUniqueID(); if (receiver->isStream()) { - endpoint.addresses = self->localAddresses; + endpoint.addresses = self->localAddresses.getAddressList(); endpoint.token = UID(endpoint.token.first() | TOKEN_STREAM_FLAG, endpoint.token.second()); } else { endpoint.addresses = NetworkAddressList(); @@ -1651,7 +1689,7 @@ void FlowTransport::addEndpoint(Endpoint& endpoint, NetworkMessageReceiver* rece } void FlowTransport::addEndpoints(std::vector> const& streams) { - self->endpoints.insert(self->localAddresses, streams); + self->endpoints.insert(self->localAddresses.getAddressList(), streams); } void FlowTransport::removeEndpoint(const Endpoint& endpoint, NetworkMessageReceiver* receiver) { @@ -1659,7 +1697,7 @@ void FlowTransport::removeEndpoint(const Endpoint& endpoint, NetworkMessageRecei } void FlowTransport::addWellKnownEndpoint(Endpoint& endpoint, NetworkMessageReceiver* receiver, TaskPriority taskID) { - endpoint.addresses = self->localAddresses; + endpoint.addresses = self->localAddresses.getAddressList(); ASSERT(receiver->isStream()); self->endpoints.insertWellKnown(receiver, endpoint.token, taskID); } diff --git a/fdbrpc/FlowTransport.h b/fdbrpc/FlowTransport.h index 8f60a2fc9be..ceaf3e6f35d 100644 --- a/fdbrpc/FlowTransport.h +++ b/fdbrpc/FlowTransport.h @@ -20,6 +20,7 @@ #ifndef FLOW_TRANSPORT_H #define FLOW_TRANSPORT_H +#include "flow/Arena.h" #pragma once #include @@ -215,6 +216,10 @@ class FlowTransport { // Returns first local NetworkAddress. NetworkAddress getLocalAddress() const; + // Returns first local NetworkAddress as std::string. Caches value + // to avoid unnecessary calls to toString() and fmt overhead. + Standalone getLocalAddressAsString() const; + // Returns first local NetworkAddress. void setLocalAddress(NetworkAddress const&); diff --git a/fdbrpc/sim2.actor.cpp b/fdbrpc/sim2.actor.cpp index b991b64a100..065a35d1103 100644 --- a/fdbrpc/sim2.actor.cpp +++ b/fdbrpc/sim2.actor.cpp @@ -24,6 +24,7 @@ #include "contrib/fmt-8.1.1/include/fmt/format.h" #include "fdbrpc/simulator.h" +#include "flow/Arena.h" #define BOOST_SYSTEM_NO_LIB #define BOOST_DATE_TIME_NO_LIB #define BOOST_REGEX_NO_LIB diff --git a/fdbserver/ApplyMetadataMutation.cpp b/fdbserver/ApplyMetadataMutation.cpp index 90f987021fa..290e83efd48 100644 --- a/fdbserver/ApplyMetadataMutation.cpp +++ b/fdbserver/ApplyMetadataMutation.cpp @@ -53,7 +53,7 @@ namespace { class ApplyMetadataMutationsImpl { public: - ApplyMetadataMutationsImpl(const SpanID& spanContext_, + ApplyMetadataMutationsImpl(const SpanContext& spanContext_, const UID& dbgid_, Arena& arena_, const VectorRef& mutations_, @@ -61,7 +61,7 @@ class ApplyMetadataMutationsImpl { : spanContext(spanContext_), dbgid(dbgid_), arena(arena_), mutations(mutations_), txnStateStore(txnStateStore_), confChange(dummyConfChange) {} - ApplyMetadataMutationsImpl(const SpanID& spanContext_, + ApplyMetadataMutationsImpl(const SpanContext& spanContext_, Arena& arena_, const VectorRef& mutations_, ProxyCommitData& proxyCommitData_, @@ -82,7 +82,7 @@ class ApplyMetadataMutationsImpl { tssMapping(&proxyCommitData_.tssMapping), tenantMap(&proxyCommitData_.tenantMap), initialCommit(initialCommit_) {} - ApplyMetadataMutationsImpl(const SpanID& spanContext_, + ApplyMetadataMutationsImpl(const SpanContext& spanContext_, ResolverData& resolverData_, const VectorRef& mutations_) : spanContext(spanContext_), dbgid(resolverData_.dbgid), arena(resolverData_.arena), mutations(mutations_), @@ -94,7 +94,7 @@ class ApplyMetadataMutationsImpl { private: // The following variables are incoming parameters - const SpanID& spanContext; + const SpanContext& spanContext; const UID& dbgid; @@ -1217,7 +1217,7 @@ class ApplyMetadataMutationsImpl { } // anonymous namespace -void applyMetadataMutations(SpanID const& spanContext, +void applyMetadataMutations(SpanContext const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, @@ -1241,13 +1241,13 @@ void applyMetadataMutations(SpanID const& spanContext, .apply(); } -void applyMetadataMutations(SpanID const& spanContext, +void applyMetadataMutations(SpanContext const& spanContext, ResolverData& resolverData, const VectorRef& mutations) { ApplyMetadataMutationsImpl(spanContext, resolverData, mutations).apply(); } -void applyMetadataMutations(SpanID const& spanContext, +void applyMetadataMutations(SpanContext const& spanContext, const UID& dbgid, Arena& arena, const VectorRef& mutations, diff --git a/fdbserver/ApplyMetadataMutation.h b/fdbserver/ApplyMetadataMutation.h index d4e47e09460..23f9e3a2f9c 100644 --- a/fdbserver/ApplyMetadataMutation.h +++ b/fdbserver/ApplyMetadataMutation.h @@ -87,7 +87,7 @@ Reference getStorageInfo(UID id, std::map>* storageCache, IKeyValueStore* txnStateStore); -void applyMetadataMutations(SpanID const& spanContext, +void applyMetadataMutations(SpanContext const& spanContext, ProxyCommitData& proxyCommitData, Arena& arena, Reference logSystem, @@ -97,7 +97,7 @@ void applyMetadataMutations(SpanID const& spanContext, Version version, Version popVersion, bool initialCommit); -void applyMetadataMutations(SpanID const& spanContext, +void applyMetadataMutations(SpanContext const& spanContext, const UID& dbgid, Arena& arena, const VectorRef& mutations, @@ -140,7 +140,7 @@ inline bool containsMetadataMutation(const VectorRef& mutations) { } // Resolver's version -void applyMetadataMutations(SpanID const& spanContext, +void applyMetadataMutations(SpanContext const& spanContext, ResolverData& resolverData, const VectorRef& mutations); diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 0ac5b56a7d6..8addd89f05a 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -67,6 +67,10 @@ struct VersionedMessage { return false; if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) return false; + if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) { + TEST(true); // Returning false for OTELSpanContextMessage + return false; + } reader >> *m; return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey; diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 01b3cd343e3..f6d56ebe412 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -88,6 +88,7 @@ set(FDBSERVER_SRCS OldTLogServer_4_6.actor.cpp OldTLogServer_6_0.actor.cpp OldTLogServer_6_2.actor.cpp + OTELSpanContextMessage.h OnDemandStore.actor.cpp OnDemandStore.h PaxosConfigConsumer.actor.cpp diff --git a/fdbserver/ClusterRecovery.actor.cpp b/fdbserver/ClusterRecovery.actor.cpp index 94d122ad408..a0f5c22875e 100644 --- a/fdbserver/ClusterRecovery.actor.cpp +++ b/fdbserver/ClusterRecovery.actor.cpp @@ -1629,7 +1629,7 @@ ACTOR Future clusterRecoveryCore(Reference self) { tr.set(recoveryCommitRequest.arena, clusterIdKey, BinaryWriter::toValue(self->clusterId, Unversioned())); } - applyMetadataMutations(SpanID(), + applyMetadataMutations(SpanContext(), self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), diff --git a/fdbserver/CommitProxyServer.actor.cpp b/fdbserver/CommitProxyServer.actor.cpp index 5c8449a332d..ca2ffbf3d51 100644 --- a/fdbserver/CommitProxyServer.actor.cpp +++ b/fdbserver/CommitProxyServer.actor.cpp @@ -464,7 +464,7 @@ ACTOR Future addBackupMutations(ProxyCommitData* self, state int yieldBytes = 0; state BinaryWriter valueWriter(Unversioned()); - toCommit->addTransactionInfo(SpanID()); + toCommit->addTransactionInfo(SpanContext()); // Serialize the log range mutations within the map for (; logRangeMutation != logRangeMutations->cend(); ++logRangeMutation) { @@ -731,7 +731,7 @@ void CommitBatchContext::setupTraceBatch() { g_traceBatch.addAttach("CommitAttachID", tr.debugID.get().first(), debugID.get().first()); } - span.addParent(tr.spanContext); + span.addLink(tr.spanContext); } if (debugID.present()) { @@ -960,7 +960,7 @@ void applyMetadataEffect(CommitBatchContext* self) { committed = committed && self->resolution[resolver].stateMutations[versionIndex][transactionIndex].committed; if (committed) { - applyMetadataMutations(SpanID(), + applyMetadataMutations(SpanContext(), *self->pProxyCommitData, self->arena, self->pProxyCommitData->logSystem, @@ -1380,8 +1380,7 @@ ACTOR Future postResolution(CommitBatchContext* self) { // simulation TEST(true); // Semi-committed pipeline limited by MVCC window //TraceEvent("ProxyWaitingForCommitted", pProxyCommitData->dbgid).detail("CommittedVersion", pProxyCommitData->committedVersion.get()).detail("NeedToCommit", commitVersion); - waitVersionSpan = Span( - deterministicRandom()->randomUniqueID(), "MP:overMaxReadTransactionLifeVersions"_loc, { span.context }); + waitVersionSpan = Span("MP:overMaxReadTransactionLifeVersions"_loc, span.context); choose { when(wait(pProxyCommitData->committedVersion.whenAtLeast( self->commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS))) { @@ -1777,7 +1776,7 @@ void addTagMapping(GetKeyServerLocationsReply& reply, ProxyCommitData* commitDat ACTOR static Future doKeyServerLocationRequest(GetKeyServerLocationsRequest req, ProxyCommitData* commitData) { // We can't respond to these requests until we have valid txnStateStore getCurrentLineage()->modify(&TransactionLineage::operation) = TransactionLineage::Operation::GetKeyServersLocations; - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; wait(commitData->validState.getFuture()); wait(delay(0, TaskPriority::DefaultEndpoint)); @@ -2297,7 +2296,7 @@ ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolv Arena arena; bool confChanges; - applyMetadataMutations(SpanID(), + applyMetadataMutations(SpanContext(), *pContext->pCommitData, arena, Reference(), diff --git a/fdbserver/GrvProxyServer.actor.cpp b/fdbserver/GrvProxyServer.actor.cpp index 6d0127c4315..30ad98bcf10 100644 --- a/fdbserver/GrvProxyServer.actor.cpp +++ b/fdbserver/GrvProxyServer.actor.cpp @@ -542,7 +542,7 @@ ACTOR Future lastCommitUpdater(GrvProxyData* self, PromiseStream getLiveCommittedVersion(SpanID parentSpan, +ACTOR Future getLiveCommittedVersion(SpanContext parentSpan, GrvProxyData* grvProxyData, uint32_t flags, Optional debugID, @@ -945,7 +945,7 @@ ACTOR static Future transactionStarter(GrvProxyInterface proxy, int batchGRVProcessed = 0; for (int i = 0; i < start.size(); i++) { if (start[i].size()) { - Future readVersionReply = getLiveCommittedVersion(UID() /*span.context*/, + Future readVersionReply = getLiveCommittedVersion(SpanContext(), grvProxyData, i, debugID, diff --git a/fdbserver/LogSystem.cpp b/fdbserver/LogSystem.cpp index 1e1189facbd..ab8f43cfc57 100644 --- a/fdbserver/LogSystem.cpp +++ b/fdbserver/LogSystem.cpp @@ -19,6 +19,9 @@ */ #include "fdbserver/LogSystem.h" +#include "fdbclient/FDBTypes.h" +#include "fdbserver/OTELSpanContextMessage.h" +#include "fdbserver/SpanContextMessage.h" #include "flow/serialize.h" std::string LogSet::logRouterString() { @@ -277,8 +280,8 @@ void LogPushData::addTxsTag() { } } -void LogPushData::addTransactionInfo(SpanID const& context) { - TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanID +void LogPushData::addTransactionInfo(SpanContext const& context) { + TEST(!spanContext.isValid()); // addTransactionInfo with invalid SpanContext spanContext = context; writtenLocations.clear(); } @@ -344,13 +347,33 @@ bool LogPushData::writeTransactionInfo(int location, uint32_t subseq) { writtenLocations.insert(location); BinaryWriter& wr = messagesWriter[location]; - SpanContextMessage contextMessage(spanContext); - int offset = wr.getLength(); wr << uint32_t(0) << subseq << uint16_t(prev_tags.size()); for (auto& tag : prev_tags) wr << tag; - wr << contextMessage; + if (logSystem->getTLogVersion() >= TLogVersion::V7) { + OTELSpanContextMessage contextMessage(spanContext); + wr << contextMessage; + } else { + // When we're on a TLog version below 7, but the front end of the system (i.e. proxy, sequencer, resolver) + // is using OpenTelemetry tracing (i.e on or above 7.2), we need to convert the OpenTelemetry Span data model + // i.e. 16 bytes for traceId, 8 bytes for spanId, to the OpenTracing spec, which is 8 bytes for traceId + // and 8 bytes for spanId. That means we need to drop some data. + // + // As a workaround for this special case we've decided to drop is the 8 bytes + // for spanId. Therefore we're passing along the full 16 byte traceId to the storage server with 0 for spanID. + // This will result in a follows from relationship for the storage span within the trace rather than a + // parent->child. + SpanContextMessage contextMessage; + if (spanContext.isSampled()) { + TEST(true); // Converting OTELSpanContextMessage to traced SpanContextMessage + contextMessage = SpanContextMessage(UID(spanContext.traceID.first(), spanContext.traceID.second())); + } else { + TEST(true); // Converting OTELSpanContextMessage to untraced SpanContextMessage + contextMessage = SpanContextMessage(UID(0, 0)); + } + wr << contextMessage; + } int length = wr.getLength() - offset; *(uint32_t*)((uint8_t*)wr.getData() + offset) = length - sizeof(uint32_t); return true; diff --git a/fdbserver/LogSystem.h b/fdbserver/LogSystem.h index e8453184e44..6581457c259 100644 --- a/fdbserver/LogSystem.h +++ b/fdbserver/LogSystem.h @@ -26,6 +26,7 @@ #include #include "fdbserver/SpanContextMessage.h" +#include "fdbserver/OTELSpanContextMessage.h" #include "fdbserver/TLogInterface.h" #include "fdbserver/WorkerInterface.actor.h" #include "fdbclient/DatabaseConfiguration.h" @@ -519,7 +520,7 @@ struct ILogSystem { Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, - SpanID const& spanContext, + SpanContext const& spanContext, Optional debugID = Optional(), Optional> tpcvMap = Optional>()) = 0; @@ -762,7 +763,7 @@ struct LogPushData : NonCopyable { } // Add transaction info to be written before the first mutation in the transaction. - void addTransactionInfo(SpanID const& context); + void addTransactionInfo(SpanContext const& context); // copy written_tags, after filtering, into given set void saveTags(std::set& filteredTags) const { @@ -832,7 +833,7 @@ struct LogPushData : NonCopyable { // field. std::unordered_set writtenLocations; uint32_t subsequence; - SpanID spanContext; + SpanContext spanContext; bool shardChanged = false; // if keyServers has any changes, i.e., shard boundary modifications. // Writes transaction info to the message stream at the given location if diff --git a/fdbserver/MasterInterface.h b/fdbserver/MasterInterface.h index 73fc6ef1141..f9c2c506adf 100644 --- a/fdbserver/MasterInterface.h +++ b/fdbserver/MasterInterface.h @@ -133,14 +133,14 @@ struct GetCommitVersionReply { struct GetCommitVersionRequest { constexpr static FileIdentifier file_identifier = 16683181; - SpanID spanContext; + SpanContext spanContext; uint64_t requestNum; uint64_t mostRecentProcessedRequestNum; UID requestingProxy; ReplyPromise reply; GetCommitVersionRequest() {} - GetCommitVersionRequest(SpanID spanContext, + GetCommitVersionRequest(SpanContext spanContext, uint64_t requestNum, uint64_t mostRecentProcessedRequestNum, UID requestingProxy) diff --git a/fdbserver/MutationTracking.cpp b/fdbserver/MutationTracking.cpp index 9ec17299d53..fd8f55c313d 100644 --- a/fdbserver/MutationTracking.cpp +++ b/fdbserver/MutationTracking.cpp @@ -24,6 +24,7 @@ #include "fdbserver/MutationTracking.h" #include "fdbserver/LogProtocolMessage.h" #include "fdbserver/SpanContextMessage.h" +#include "fdbserver/OTELSpanContextMessage.h" #include "fdbclient/SystemData.h" #if defined(FDB_CLEAN_BUILD) && MUTATION_TRACKING_ENABLED #error "You cannot use mutation tracking in a clean/release build." @@ -96,6 +97,11 @@ TraceEvent debugTagsAndMessageEnabled(const char* context, Version version, Stri BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); SpanContextMessage scm; br >> scm; + } else if (OTELSpanContextMessage::startsOTELSpanContextMessage(mutationType)) { + TEST(true); // MutationTracking reading OTELSpanContextMessage + BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); + OTELSpanContextMessage scm; + br >> scm; } else { MutationRef m; BinaryReader br(mutationData, AssumeVersion(rdr.protocolVersion())); diff --git a/fdbserver/OTELSpanContextMessage.h b/fdbserver/OTELSpanContextMessage.h new file mode 100644 index 00000000000..9f6d588fedd --- /dev/null +++ b/fdbserver/OTELSpanContextMessage.h @@ -0,0 +1,66 @@ +/* + * OTELSpanContextMessage.h + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2013-2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FDBSERVER_OTELSPANCONTEXTMESSAGE_H +#define FDBSERVER_OTELSPANCONTEXTMESSAGE_H +#pragma once + +#include "flow/Tracing.h" +#include "fdbclient/FDBTypes.h" +#include "fdbclient/CommitTransaction.h" + +struct OTELSpanContextMessage { + // This message is pushed into the the transaction logs' memory to inform + // it what transaction subsequent mutations were a part of. This allows + // transaction logs and storage servers to associate mutations with a + // transaction identifier, called a span context. + // + // This message is similar to LogProtocolMessage. Storage servers read the + // first byte of this message to uniquely identify it, meaning it will + // never be mistaken for another message. See LogProtocolMessage.h for more + // information. + + SpanContext spanContext; + + OTELSpanContextMessage() {} + OTELSpanContextMessage(SpanContext const& spanContext) : spanContext(spanContext) {} + + std::string toString() const { + return format("code: %d, span context: %s", + MutationRef::Reserved_For_OTELSpanContextMessage, + spanContext.toString().c_str()); + } + + template + void serialize(Ar& ar) { + uint8_t poly = MutationRef::Reserved_For_OTELSpanContextMessage; + serializer(ar, poly, spanContext); + } + + static bool startsOTELSpanContextMessage(uint8_t byte) { + return byte == MutationRef::Reserved_For_OTELSpanContextMessage; + } + template + static bool isNextIn(Ar& ar) { + return startsOTELSpanContextMessage(*(const uint8_t*)ar.peekBytes(1)); + } +}; + +#endif diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index d24a0401b59..91449c69230 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -340,8 +340,8 @@ ACTOR Future resolveBatch(Reference self, ResolveTransactionBatc // The condition here must match CommitBatch::applyMetadataToCommittedTransactions() if (reply.committed[t] == ConflictBatch::TransactionCommitted && !self->forceRecovery && SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS && (!isLocked || req.transactions[t].lock_aware)) { - SpanID spanContext = - req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanID(); + SpanContext spanContext = + req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext(); applyMetadataMutations(spanContext, resolverData, req.transactions[t].mutations); } @@ -565,7 +565,7 @@ ACTOR Future processCompleteTransactionStateRequest(TransactionStateResolv ResolverData resolverData( pContext->pResolverData->dbgid, pContext->pTxnStateStore, &pContext->pResolverData->keyInfo, confChanges); - applyMetadataMutations(SpanID(), resolverData, mutations); + applyMetadataMutations(SpanContext(), resolverData, mutations); } // loop auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get(); diff --git a/fdbserver/ResolverInterface.h b/fdbserver/ResolverInterface.h index 782fa2be886..51110e5c016 100644 --- a/fdbserver/ResolverInterface.h +++ b/fdbserver/ResolverInterface.h @@ -118,7 +118,7 @@ struct ResolveTransactionBatchRequest { constexpr static FileIdentifier file_identifier = 16462858; Arena arena; - SpanID spanContext; + SpanContext spanContext; Version prevVersion; Version version; // FIXME: ? Version lastReceivedVersion; diff --git a/fdbserver/StorageCache.actor.cpp b/fdbserver/StorageCache.actor.cpp index a97931a2a8f..8cf24a67d89 100644 --- a/fdbserver/StorageCache.actor.cpp +++ b/fdbserver/StorageCache.actor.cpp @@ -18,6 +18,7 @@ * limitations under the License. */ +#include "fdbserver/OTELSpanContextMessage.h" #include "flow/Arena.h" #include "fdbclient/FDBOptions.g.h" #include "fdbclient/NativeAPI.actor.h" @@ -1897,6 +1898,10 @@ ACTOR Future pullAsyncData(StorageCacheData* data) { SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; + } else if (cloneReader.protocolVersion().hasOTELSpanContext() && + OTELSpanContextMessage::isNextIn(cloneReader)) { + OTELSpanContextMessage scm; + cloneReader >> scm; } else { MutationRef msg; cloneReader >> msg; @@ -1975,6 +1980,10 @@ ACTOR Future pullAsyncData(StorageCacheData* data) { } else if (reader.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(reader)) { SpanContextMessage scm; reader >> scm; + } else if (reader.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(reader)) { + TEST(true); // StorageCache reading OTELSpanContextMessage + OTELSpanContextMessage oscm; + reader >> oscm; } else { MutationRef msg; reader >> msg; diff --git a/fdbserver/TLogInterface.h b/fdbserver/TLogInterface.h index b8ec6899d22..9da4ecedd43 100644 --- a/fdbserver/TLogInterface.h +++ b/fdbserver/TLogInterface.h @@ -296,7 +296,7 @@ struct TLogCommitReply { struct TLogCommitRequest { constexpr static FileIdentifier file_identifier = 4022206; - SpanID spanContext; + SpanContext spanContext; Arena arena; Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion; @@ -307,7 +307,7 @@ struct TLogCommitRequest { Optional debugID; TLogCommitRequest() {} - TLogCommitRequest(const SpanID& context, + TLogCommitRequest(const SpanContext& context, const Arena& a, Version prevVersion, Version version, diff --git a/fdbserver/TagPartitionedLogSystem.actor.cpp b/fdbserver/TagPartitionedLogSystem.actor.cpp index a4822a7ef58..70a2293f146 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.cpp +++ b/fdbserver/TagPartitionedLogSystem.actor.cpp @@ -507,7 +507,7 @@ Future TagPartitionedLogSystem::push(Version prevVersion, Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, - SpanID const& spanContext, + SpanContext const& spanContext, Optional debugID, Optional> tpcvMap) { // FIXME: Randomize request order as in LegacyLogSystem? diff --git a/fdbserver/TagPartitionedLogSystem.actor.h b/fdbserver/TagPartitionedLogSystem.actor.h index baf1a467113..eb7c389e5be 100644 --- a/fdbserver/TagPartitionedLogSystem.actor.h +++ b/fdbserver/TagPartitionedLogSystem.actor.h @@ -191,7 +191,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted debugID, Optional> tpcvMap) final; diff --git a/fdbserver/masterserver.actor.cpp b/fdbserver/masterserver.actor.cpp index 7cb99d8d21d..9bd1660b07f 100644 --- a/fdbserver/masterserver.actor.cpp +++ b/fdbserver/masterserver.actor.cpp @@ -120,7 +120,7 @@ struct MasterData : NonCopyable, ReferenceCounted { }; ACTOR Future getVersion(Reference self, GetCommitVersionRequest req) { - state Span span("M:getVersion"_loc, { req.spanContext }); + state Span span("M:getVersion"_loc, req.spanContext); state std::map::iterator proxyItr = self->lastCommitProxyVersionReplies.find(req.requestingProxy); // lastCommitProxyVersionReplies never changes diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index c49f19a384a..3631affc4d0 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -24,8 +24,10 @@ #include #include "contrib/fmt-8.1.1/include/fmt/format.h" +#include "fdbclient/FDBTypes.h" #include "fdbrpc/fdbrpc.h" #include "fdbrpc/LoadBalance.h" +#include "fdbserver/OTELSpanContextMessage.h" #include "flow/ActorCollection.h" #include "flow/Arena.h" #include "flow/Error.h" @@ -1395,8 +1397,8 @@ void updateProcessStats(StorageServer* self) { #pragma region Queries #endif -ACTOR Future waitForVersionActor(StorageServer* data, Version version, SpanID spanContext) { - state Span span("SS.WaitForVersion"_loc, { spanContext }); +ACTOR Future waitForVersionActor(StorageServer* data, Version version, SpanContext spanContext) { + state Span span("SS.WaitForVersion"_loc, spanContext); choose { when(wait(data->version.whenAtLeast(version))) { // FIXME: A bunch of these can block with or without the following delay 0. @@ -1433,7 +1435,7 @@ Version getLatestCommitVersion(VersionVector& ssLatestCommitVersions, Tag& tag) return commitVersion; } -Future waitForVersion(StorageServer* data, Version version, SpanID spanContext) { +Future waitForVersion(StorageServer* data, Version version, SpanContext spanContext) { if (version == latestVersion) { version = std::max(Version(1), data->version.get()); } @@ -1454,7 +1456,10 @@ Future waitForVersion(StorageServer* data, Version version, SpanID span return waitForVersionActor(data, version, spanContext); } -Future waitForVersion(StorageServer* data, Version commitVersion, Version readVersion, SpanID spanContext) { +Future waitForVersion(StorageServer* data, + Version commitVersion, + Version readVersion, + SpanContext spanContext) { ASSERT(commitVersion == invalidVersion || commitVersion < readVersion); if (commitVersion == invalidVersion) { @@ -1528,11 +1533,11 @@ Optional StorageServer::getTenantEntry(Version version, TenantIn ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { state int64_t resultSize = 0; - Span span("SS:getValue"_loc, { req.spanContext }); + Span span("SS:getValue"_loc, req.spanContext); if (req.tenantInfo.name.present()) { - span.addTag("tenant"_sr, req.tenantInfo.name.get()); + span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); } - span.addTag("key"_sr, req.key); + span.addAttribute("key"_sr, req.key); // Temporarily disabled -- this path is hit a lot // getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); @@ -1665,9 +1670,9 @@ ACTOR Future getValueQ(StorageServer* data, GetValueRequest req) { // must be kept alive until the watch is finished. extern size_t WATCH_OVERHEAD_WATCHQ, WATCH_OVERHEAD_WATCHIMPL; -ACTOR Future watchWaitForValueChange(StorageServer* data, SpanID parent, KeyRef key) { +ACTOR Future watchWaitForValueChange(StorageServer* data, SpanContext parent, KeyRef key) { state Location spanLocation = "SS:watchWaitForValueChange"_loc; - state Span span(spanLocation, { parent }); + state Span span(spanLocation, parent); state Reference metadata = data->getWatchMetadata(key); if (metadata->debugID.present()) @@ -1774,8 +1779,8 @@ void checkCancelWatchImpl(StorageServer* data, WatchValueRequest req) { ACTOR Future watchValueSendReply(StorageServer* data, WatchValueRequest req, Future resp, - SpanID spanContext) { - state Span span("SS:watchValue"_loc, { spanContext }); + SpanContext spanContext) { + state Span span("SS:watchValue"_loc, spanContext); state double startTime = now(); ++data->counters.watchQueries; ++data->numWatches; @@ -2503,7 +2508,7 @@ ACTOR Future stopChangeFeedOnMove(StorageServer* data, ChangeFeedStreamReq } ACTOR Future changeFeedStreamQ(StorageServer* data, ChangeFeedStreamRequest req, UID streamUID) { - state Span span("SS:getChangeFeedStream"_loc, { req.spanContext }); + state Span span("SS:getChangeFeedStream"_loc, req.spanContext); state bool atLatest = false; state bool removeUID = false; state Optional blockedVersion; @@ -2859,7 +2864,7 @@ ACTOR Future readRange(StorageServer* data, KeyRange range, int limit, int* pLimitBytes, - SpanID parentSpan, + SpanContext parentSpan, IKeyValueStore::ReadType type, Optional tenantPrefix) { state GetKeyValuesReply result; @@ -3098,7 +3103,7 @@ ACTOR Future findKey(StorageServer* data, Version version, KeyRange range, int* pOffset, - SpanID parentSpan, + SpanContext parentSpan, IKeyValueStore::ReadType type) // Attempts to find the key indicated by sel in the data at version, within range. // Precondition: selectorInRange(sel, range) @@ -3119,7 +3124,7 @@ ACTOR Future findKey(StorageServer* data, state int sign = forward ? +1 : -1; state bool skipEqualKey = sel.orEqual == forward; state int distance = forward ? sel.offset : 1 - sel.offset; - state Span span("SS.findKey"_loc, { parentSpan }); + state Span span("SS.findKey"_loc, parentSpan); // Don't limit the number of bytes if this is a trivial key selector (there will be at most two items returned from // the read range in this case) @@ -3217,16 +3222,16 @@ ACTOR Future getKeyValuesQ(StorageServer* data, GetKeyValuesRequest req) // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read { - state Span span("SS:getKeyValues"_loc, { req.spanContext }); + state Span span("SS:getKeyValues"_loc, req.spanContext); state int64_t resultSize = 0; state IKeyValueStore::ReadType type = req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; if (req.tenantInfo.name.present()) { - span.addTag("tenant"_sr, req.tenantInfo.name.get()); + span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); } - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; ++data->counters.getRangeQueries; ++data->counters.allQueries; @@ -3711,16 +3716,16 @@ ACTOR Future getMappedKeyValuesQ(StorageServer* data, GetMappedKeyValuesRe // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read { - state Span span("SS:getMappedKeyValues"_loc, { req.spanContext }); + state Span span("SS:getMappedKeyValues"_loc, req.spanContext); state int64_t resultSize = 0; state IKeyValueStore::ReadType type = req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; if (req.tenantInfo.name.present()) { - span.addTag("tenant"_sr, req.tenantInfo.name.get()); + span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); } - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; ++data->counters.getMappedRangeQueries; ++data->counters.allQueries; @@ -3925,13 +3930,13 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe // Throws a wrong_shard_server if the keys in the request or result depend on data outside this server OR if a large // selector offset prevents all data from being read in one range read { - state Span span("SS:getKeyValuesStream"_loc, { req.spanContext }); + state Span span("SS:getKeyValuesStream"_loc, req.spanContext); state int64_t resultSize = 0; state IKeyValueStore::ReadType type = req.isFetchKeys ? IKeyValueStore::ReadType::FETCH : IKeyValueStore::ReadType::NORMAL; if (req.tenantInfo.name.present()) { - span.addTag("tenant"_sr, req.tenantInfo.name.get()); + span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); } req.reply.setByteLimit(SERVER_KNOBS->RANGESTREAM_LIMIT_BYTES); @@ -4129,12 +4134,12 @@ ACTOR Future getKeyValuesStreamQ(StorageServer* data, GetKeyValuesStreamRe } ACTOR Future getKeyQ(StorageServer* data, GetKeyRequest req) { - state Span span("SS:getKey"_loc, { req.spanContext }); + state Span span("SS:getKey"_loc, req.spanContext); if (req.tenantInfo.name.present()) { - span.addTag("tenant"_sr, req.tenantInfo.name.get()); + span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); } state int64_t resultSize = 0; - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; ++data->counters.getKeyQueries; ++data->counters.allQueries; @@ -6851,6 +6856,10 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { SpanContextMessage::isNextIn(cloneReader)) { SpanContextMessage scm; cloneReader >> scm; + } else if (cloneReader.protocolVersion().hasOTELSpanContext() && + OTELSpanContextMessage::isNextIn(cloneReader)) { + OTELSpanContextMessage scm; + cloneReader >> scm; } else { MutationRef msg; cloneReader >> msg; @@ -6933,7 +6942,7 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { state Version ver = invalidVersion; cloneCursor2->setProtocolVersion(data->logProtocol); - state SpanID spanContext = SpanID(); + state SpanContext spanContext = SpanContext(); state double beforeTLogMsgsUpdates = now(); state std::set updatedChangeFeeds; for (; cloneCursor2->hasMessage(); cloneCursor2->nextMessage()) { @@ -6967,17 +6976,27 @@ ACTOR Future update(StorageServer* data, bool* pReceivedUpdate) { data->logProtocol = rd.protocolVersion(); data->storage.changeLogProtocol(ver, data->logProtocol); cloneCursor2->setProtocolVersion(rd.protocolVersion()); - spanContext = UID(); + spanContext.traceID = UID(); } else if (rd.protocolVersion().hasSpanContext() && SpanContextMessage::isNextIn(rd)) { SpanContextMessage scm; rd >> scm; + TEST(true); // storageserveractor converting SpanContextMessage into OTEL SpanContext + spanContext = + SpanContext(UID(scm.spanContext.first(), scm.spanContext.second()), + 0, + scm.spanContext.first() != 0 && scm.spanContext.second() != 0 ? TraceFlags::sampled + : TraceFlags::unsampled); + } else if (rd.protocolVersion().hasOTELSpanContext() && OTELSpanContextMessage::isNextIn(rd)) { + TEST(true); // storageserveractor reading OTELSpanContextMessage + OTELSpanContextMessage scm; + rd >> scm; spanContext = scm.spanContext; } else { MutationRef msg; rd >> msg; - Span span("SS:update"_loc, { spanContext }); - span.addTag("key"_sr, msg.param1); + Span span("SS:update"_loc, spanContext); + span.addAttribute("key"_sr, msg.param1); // Drop non-private mutations if TSS fault injection is enabled in simulation, or if this is a TSS in // quarantine. @@ -8410,11 +8429,11 @@ ACTOR Future serveGetKeyRequests(StorageServer* self, FutureStream watchValueWaitForVersion(StorageServer* self, WatchValueRequest req, PromiseStream stream) { - state Span span("SS:watchValueWaitForVersion"_loc, { req.spanContext }); + state Span span("SS:watchValueWaitForVersion"_loc, req.spanContext); if (req.tenantInfo.name.present()) { - span.addTag("tenant"_sr, req.tenantInfo.name.get()); + span.addAttribute("tenant"_sr, req.tenantInfo.name.get()); } - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; try { wait(success(waitForVersionNoTooOld(self, req.version))); Optional entry = self->getTenantEntry(latestVersion, req.tenantInfo); @@ -8432,11 +8451,11 @@ ACTOR Future watchValueWaitForVersion(StorageServer* self, ACTOR Future serveWatchValueRequestsImpl(StorageServer* self, FutureStream stream) { loop { - getCurrentLineage()->modify(&TransactionLineage::txID) = 0; + getCurrentLineage()->modify(&TransactionLineage::txID) = UID(); state WatchValueRequest req = waitNext(stream); state Reference metadata = self->getWatchMetadata(req.key.contents()); - state Span span("SS:serveWatchValueRequestsImpl"_loc, { req.spanContext }); - getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.first(); + state Span span("SS:serveWatchValueRequestsImpl"_loc, req.spanContext); + getCurrentLineage()->modify(&TransactionLineage::txID) = req.spanContext.traceID; // case 1: no watch set for the current key if (!metadata.isValid()) { diff --git a/fdbserver/workloads/ApiWorkload.h b/fdbserver/workloads/ApiWorkload.h index 64836e03b6a..8f46f7b1484 100644 --- a/fdbserver/workloads/ApiWorkload.h +++ b/fdbserver/workloads/ApiWorkload.h @@ -80,8 +80,8 @@ struct TransactionWrapper : public ReferenceCounted { // Gets the version vector cached in a transaction virtual VersionVector getVersionVector() = 0; - // Gets the spanID of a transaction - virtual UID getSpanID() = 0; + // Gets the spanContext of a transaction + virtual SpanContext getSpanContext() = 0; // Prints debugging messages for a transaction; not implemented for all transaction types virtual void debugTransaction(UID debugId) {} @@ -161,8 +161,8 @@ struct FlowTransactionWrapper : public TransactionWrapper { // Gets the version vector cached in a transaction VersionVector getVersionVector() override { return transaction.getVersionVector(); } - // Gets the spanID of a transaction - UID getSpanID() override { return transaction.getSpanID(); } + // Gets the spanContext of a transaction + SpanContext getSpanContext() override { return transaction.getSpanContext(); } // Prints debugging messages for a transaction void debugTransaction(UID debugId) override { transaction.debugTransaction(debugId); } @@ -229,8 +229,8 @@ struct ThreadTransactionWrapper : public TransactionWrapper { // Gets the version vector cached in a transaction VersionVector getVersionVector() override { return transaction->getVersionVector(); } - // Gets the spanID of a transaction - UID getSpanID() override { return transaction->getSpanID(); } + // Gets the spanContext of a transaction + SpanContext getSpanContext() override { return transaction->getSpanContext(); } void addReadConflictRange(KeyRangeRef const& keys) override { transaction->addReadConflictRange(keys); } }; diff --git a/fdbserver/workloads/ConsistencyCheck.actor.cpp b/fdbserver/workloads/ConsistencyCheck.actor.cpp index 35952cc7c61..f55d8f975b8 100644 --- a/fdbserver/workloads/ConsistencyCheck.actor.cpp +++ b/fdbserver/workloads/ConsistencyCheck.actor.cpp @@ -873,7 +873,8 @@ struct ConsistencyCheckWorkload : TestWorkload { state Key begin = kr.begin; state Key end = kr.end; state int limitKeyServers = BUGGIFY ? 1 : 100; - state Span span(deterministicRandom()->randomUniqueID(), "WL:ConsistencyCheck"_loc); + state Span span(SpanContext(deterministicRandom()->randomUniqueID(), deterministicRandom()->randomUInt64()), + "WL:ConsistencyCheck"_loc); while (begin < end) { state Reference commitProxyInfo = diff --git a/fdbserver/workloads/Cycle.actor.cpp b/fdbserver/workloads/Cycle.actor.cpp index 1b7131f8e3a..a7806ac1c8a 100644 --- a/fdbserver/workloads/Cycle.actor.cpp +++ b/fdbserver/workloads/Cycle.actor.cpp @@ -106,10 +106,9 @@ struct CycleWorkload : TestWorkload { state Transaction tr(cx); if (deterministicRandom()->random01() >= self->traceParentProbability) { state Span span("CycleClient"_loc); - // TraceEvent("CycleTracingTransaction", span.context).log(); - TraceEvent("CycleTracingTransaction", span.context).log(); + TraceEvent("CycleTracingTransaction", span.context.traceID).log(); tr.setOption(FDBTransactionOptions::SPAN_PARENT, - BinaryWriter::toValue(span.context, Unversioned())); + BinaryWriter::toValue(span.context, IncludeVersion())); } while (true) { try { diff --git a/fdbserver/workloads/MiniCycle.actor.cpp b/fdbserver/workloads/MiniCycle.actor.cpp index b071902a8cf..5b9b48ab2ca 100644 --- a/fdbserver/workloads/MiniCycle.actor.cpp +++ b/fdbserver/workloads/MiniCycle.actor.cpp @@ -174,7 +174,7 @@ struct MiniCycleWorkload : TestWorkload { state Transaction tr(cx); if (deterministicRandom()->random01() >= self->traceParentProbability) { state Span span("MiniCycleClient"_loc); - TraceEvent("MiniCycleTracingTransaction", span.context).log(); + TraceEvent("MiniCycleTracingTransaction", span.context.traceID).log(); tr.setOption(FDBTransactionOptions::SPAN_PARENT, BinaryWriter::toValue(span.context, Unversioned())); } diff --git a/flow/Net2.actor.cpp b/flow/Net2.actor.cpp index 29785e1f39b..f83aac02c56 100644 --- a/flow/Net2.actor.cpp +++ b/flow/Net2.actor.cpp @@ -21,6 +21,7 @@ #include "boost/asio/buffer.hpp" #include "boost/asio/ip/address.hpp" #include "boost/system/system_error.hpp" +#include "flow/Arena.h" #include "flow/Platform.h" #include "flow/Trace.h" #include diff --git a/flow/ProtocolVersion.h b/flow/ProtocolVersion.h index eabcb381453..9f3e1f54403 100644 --- a/flow/ProtocolVersion.h +++ b/flow/ProtocolVersion.h @@ -170,6 +170,7 @@ class ProtocolVersion { PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, Tenants); PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, StorageInterfaceReadiness); PROTOCOL_VERSION_FEATURE(0x0FDB00B071010000LL, ResolverPrivateMutations); + PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, OTELSpanContext); PROTOCOL_VERSION_FEATURE(0x0FDB00B072000000LL, SWVersionTracking); }; diff --git a/flow/Tracing.actor.cpp b/flow/Tracing.actor.cpp index 144f663b7e1..d24673ca848 100644 --- a/flow/Tracing.actor.cpp +++ b/flow/Tracing.actor.cpp @@ -19,6 +19,7 @@ */ #include "flow/Tracing.h" +#include "flow/IRandom.h" #include "flow/UnitTest.h" #include "flow/Knobs.h" #include "flow/network.h" @@ -42,28 +43,11 @@ constexpr float kQueueSizeLogInterval = 5.0; struct NoopTracer : ITracer { TracerType type() const override { return TracerType::DISABLED; } void trace(Span const& span) override {} - void trace(OTELSpan const& span) override {} }; struct LogfileTracer : ITracer { TracerType type() const override { return TracerType::LOG_FILE; } void trace(Span const& span) override { - TraceEvent te(SevInfo, "TracingSpan", span.context); - te.detail("Location", span.location.name) - .detail("Begin", format("%.6f", span.begin)) - .detail("End", format("%.6f", span.end)); - if (span.parents.size() == 1) { - te.detail("Parent", *span.parents.begin()); - } else { - for (auto parent : span.parents) { - TraceEvent(SevInfo, "TracingSpanAddParent", span.context).detail("AddParent", parent); - } - } - for (const auto& [key, value] : span.tags) { - TraceEvent(SevInfo, "TracingSpanTag", span.context).detail("Key", key).detail("Value", value); - } - } - void trace(OTELSpan const& span) override { TraceEvent te(SevInfo, "TracingSpan", span.context.traceID); te.detail("SpanID", span.context.spanID) .detail("Location", span.location.name) @@ -183,31 +167,6 @@ struct UDPTracer : public ITracer { // Serializes span fields as an array into the supplied TraceRequest // buffer. void serialize_span(const Span& span, TraceRequest& request) { - // If you change the serialization format here, make sure to update the - // fluentd filter to be able to correctly parse the updated format! See - // the msgpack specification for more info on the bit patterns used - // here. - uint8_t size = 8; - if (span.parents.size() == 0) - --size; - request.write_byte(size | 0b10010000); // write as array - - serialize_string(g_network->getLocalAddress().toString(), request); // ip:port - - serialize_value(span.context.first(), request, 0xcf); // trace id - serialize_value(span.context.second(), request, 0xcf); // token (span id) - - serialize_value(span.begin, request, 0xcb); // start time - serialize_value(span.end - span.begin, request, 0xcb); // duration - - serialize_string(span.location.name.toString(), request); - - serialize_map(span.tags, request); - - serialize_vector(span.parents, request); - } - - void serialize_span(const OTELSpan& span, TraceRequest& request) { uint16_t size = 14; request.write_byte(size | 0b10010000); // write as array serialize_value(span.context.traceID.first(), request, 0xcf); // trace id @@ -274,30 +233,6 @@ struct UDPTracer : public ITracer { serialize_string(reinterpret_cast(str.data()), str.size(), request); } - // Writes the given vector of SpanIDs to the request. If the vector is - // empty, the request is not modified. - inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { - int size = vec.size(); - if (size == 0) { - return; - } - if (size <= 15) { - request.write_byte(static_cast(size) | 0b10010000); - } else if (size <= 65535) { - request.write_byte(0xdc); - request.write_byte(reinterpret_cast(&size)[1]); - request.write_byte(reinterpret_cast(&size)[0]); - } else { - TraceEvent(SevWarn, "TracingSpanSerializeVector") - .detail("Failed to MessagePack encode very large vector", size); - ASSERT_WE_THINK(false); - } - - for (const auto& parentContext : vec) { - serialize_value(parentContext.second(), request, 0xcf); - } - } - // Writes the given vector of linked SpanContext's to the request. If the vector is // empty, the request is not modified. inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { @@ -322,7 +257,7 @@ struct UDPTracer : public ITracer { // Writes the given vector of linked SpanContext's to the request. If the vector is // empty, the request is not modified. - inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { + inline void serialize_vector(const SmallVectorRef& vec, TraceRequest& request) { int size = vec.size(); if (size <= 15) { request.write_byte(static_cast(size) | 0b10010000); @@ -453,12 +388,6 @@ struct FastUDPTracer : public UDPTracer { request_.reset(); } - void trace(OTELSpan const& span) override { - prepare(span.location.name.size()); - serialize_span(span, request_); - write(); - } - void trace(Span const& span) override { prepare(span.location.name.size()); serialize_span(span, request_); @@ -513,28 +442,6 @@ void openTracer(TracerType type) { ITracer::~ITracer() {} Span& Span::operator=(Span&& o) { - if (begin > 0.0 && context.second() > 0) { - end = g_network->now(); - g_tracer->trace(*this); - } - arena = std::move(o.arena); - context = o.context; - begin = o.begin; - end = o.end; - location = o.location; - parents = std::move(o.parents); - o.begin = 0; - return *this; -} - -Span::~Span() { - if (begin > 0.0 && context.second() > 0) { - end = g_network->now(); - g_tracer->trace(*this); - } -} - -OTELSpan& OTELSpan::operator=(OTELSpan&& o) { if (begin > 0.0 && o.context.isSampled() > 0) { end = g_network->now(); g_tracer->trace(*this); @@ -558,7 +465,7 @@ OTELSpan& OTELSpan::operator=(OTELSpan&& o) { return *this; } -OTELSpan::~OTELSpan() { +Span::~Span() { if (begin > 0.0 && context.isSampled()) { end = g_network->now(); g_tracer->trace(*this); @@ -567,16 +474,15 @@ OTELSpan::~OTELSpan() { TEST_CASE("/flow/Tracing/CreateOTELSpan") { // Sampling disabled, no parent. - OTELSpan notSampled("foo"_loc); + Span notSampled("foo"_loc); ASSERT(!notSampled.context.isSampled()); // Force Sampling - OTELSpan sampled("foo"_loc, []() { return 1.0; }); - ASSERT(sampled.context.isSampled()); + // Span sampled("foo"_loc, []() { return 1.0; }); + // ASSERT(sampled.context.isSampled()); // Ensure child traceID matches parent, when parent is sampled. - OTELSpan childTraceIDMatchesParent( - "foo"_loc, []() { return 1.0; }, SpanContext(UID(100, 101), 200, TraceFlags::sampled)); + Span childTraceIDMatchesParent("foo"_loc, SpanContext(UID(100, 101), 200, TraceFlags::sampled)); ASSERT(childTraceIDMatchesParent.context.traceID.first() == childTraceIDMatchesParent.parentContext.traceID.first()); ASSERT(childTraceIDMatchesParent.context.traceID.second() == @@ -584,22 +490,20 @@ TEST_CASE("/flow/Tracing/CreateOTELSpan") { // When the parent isn't sampled AND it has legitimate values we should not sample a child, // even if the child was randomly selected for sampling. - OTELSpan parentNotSampled( - "foo"_loc, []() { return 1.0; }, SpanContext(UID(1, 1), 1, TraceFlags::unsampled)); + Span parentNotSampled("foo"_loc, SpanContext(UID(1, 1), 1, TraceFlags::unsampled)); ASSERT(!parentNotSampled.context.isSampled()); // When the parent isn't sampled AND it has zero values for traceID and spanID this means // we should defer to the child as the new root of the trace as there was no actual parent. // If the child was sampled we should send the child trace with a null parent. - OTELSpan noParent( - "foo"_loc, []() { return 1.0; }, SpanContext(UID(0, 0), 0, TraceFlags::unsampled)); - ASSERT(noParent.context.isSampled()); + // Span noParent("foo"_loc, SpanContext(UID(0, 0), 0, TraceFlags::unsampled)); + // ASSERT(noParent.context.isSampled()); return Void(); }; TEST_CASE("/flow/Tracing/AddEvents") { // Use helper method to add an OTELEventRef to an OTELSpan. - OTELSpan span1("span_with_event"_loc); + Span span1("span_with_event"_loc); auto arena = span1.arena; SmallVectorRef attrs; attrs.push_back(arena, KeyValueRef("foo"_sr, "bar"_sr)); @@ -610,14 +514,14 @@ TEST_CASE("/flow/Tracing/AddEvents") { ASSERT(span1.events[0].attributes.begin()->value.toString() == "bar"); // Use helper method to add an OTELEventRef with no attributes to an OTELSpan - OTELSpan span2("span_with_event"_loc); + Span span2("span_with_event"_loc); span2.addEvent(StringRef(span2.arena, LiteralStringRef("commit_succeed")), 1234567.100); ASSERT(span2.events[0].name.toString() == "commit_succeed"); ASSERT(span2.events[0].time == 1234567.100); ASSERT(span2.events[0].attributes.size() == 0); // Add fully constructed OTELEventRef to OTELSpan passed by value. - OTELSpan span3("span_with_event"_loc); + Span span3("span_with_event"_loc); auto s3Arena = span3.arena; SmallVectorRef s3Attrs; s3Attrs.push_back(s3Arena, KeyValueRef("xyz"_sr, "123"_sr)); @@ -636,7 +540,10 @@ TEST_CASE("/flow/Tracing/AddEvents") { }; TEST_CASE("/flow/Tracing/AddAttributes") { - OTELSpan span1("span_with_attrs"_loc); + Span span1("span_with_attrs"_loc, + SpanContext(deterministicRandom()->randomUniqueID(), + deterministicRandom()->randomUInt64(), + TraceFlags::sampled)); auto arena = span1.arena; span1.addAttribute(StringRef(arena, LiteralStringRef("foo")), StringRef(arena, LiteralStringRef("bar"))); span1.addAttribute(StringRef(arena, LiteralStringRef("operation")), StringRef(arena, LiteralStringRef("grv"))); @@ -644,25 +551,34 @@ TEST_CASE("/flow/Tracing/AddAttributes") { ASSERT(span1.attributes[1] == KeyValueRef("foo"_sr, "bar"_sr)); ASSERT(span1.attributes[2] == KeyValueRef("operation"_sr, "grv"_sr)); - OTELSpan span3("span_with_attrs"_loc); - auto s3Arena = span3.arena; - span3.addAttribute(StringRef(s3Arena, LiteralStringRef("a")), StringRef(s3Arena, LiteralStringRef("1"))) - .addAttribute(StringRef(s3Arena, LiteralStringRef("b")), LiteralStringRef("2")) - .addAttribute(StringRef(s3Arena, LiteralStringRef("c")), LiteralStringRef("3")); - - ASSERT_EQ(span3.attributes.size(), 4); // Includes default attribute of "address" - ASSERT(span3.attributes[1] == KeyValueRef("a"_sr, "1"_sr)); - ASSERT(span3.attributes[2] == KeyValueRef("b"_sr, "2"_sr)); - ASSERT(span3.attributes[3] == KeyValueRef("c"_sr, "3"_sr)); + Span span2("span_with_attrs"_loc, + SpanContext(deterministicRandom()->randomUniqueID(), + deterministicRandom()->randomUInt64(), + TraceFlags::sampled)); + auto s2Arena = span2.arena; + span2.addAttribute(StringRef(s2Arena, LiteralStringRef("a")), StringRef(s2Arena, LiteralStringRef("1"))) + .addAttribute(StringRef(s2Arena, LiteralStringRef("b")), LiteralStringRef("2")) + .addAttribute(StringRef(s2Arena, LiteralStringRef("c")), LiteralStringRef("3")); + + ASSERT_EQ(span2.attributes.size(), 4); // Includes default attribute of "address" + ASSERT(span2.attributes[1] == KeyValueRef("a"_sr, "1"_sr)); + ASSERT(span2.attributes[2] == KeyValueRef("b"_sr, "2"_sr)); + ASSERT(span2.attributes[3] == KeyValueRef("c"_sr, "3"_sr)); return Void(); }; TEST_CASE("/flow/Tracing/AddLinks") { - OTELSpan span1("span_with_links"_loc); + Span span1("span_with_links"_loc); + ASSERT(!span1.context.isSampled()); + ASSERT(!span1.context.isValid()); span1.addLink(SpanContext(UID(100, 101), 200, TraceFlags::sampled)); span1.addLink(SpanContext(UID(200, 201), 300, TraceFlags::unsampled)) .addLink(SpanContext(UID(300, 301), 400, TraceFlags::sampled)); + // Ensure the root span is now sampled and traceID and spanIDs are set. + ASSERT(span1.context.isSampled()); + ASSERT(span1.context.isValid()); + // Ensure links are present. ASSERT(span1.links[0].traceID == UID(100, 101)); ASSERT(span1.links[0].spanID == 200); ASSERT(span1.links[0].m_Flags == TraceFlags::sampled); @@ -673,11 +589,16 @@ TEST_CASE("/flow/Tracing/AddLinks") { ASSERT(span1.links[2].spanID == 400); ASSERT(span1.links[2].m_Flags == TraceFlags::sampled); - OTELSpan span2("span_with_links"_loc); + Span span2("span_with_links"_loc); + ASSERT(!span2.context.isSampled()); + ASSERT(!span2.context.isValid()); auto link1 = SpanContext(UID(1, 1), 1, TraceFlags::sampled); auto link2 = SpanContext(UID(2, 2), 2, TraceFlags::sampled); auto link3 = SpanContext(UID(3, 3), 3, TraceFlags::sampled); span2.addLinks({ link1, link2 }).addLinks({ link3 }); + // Ensure the root span is now sampled and traceID and spanIDs are set. + ASSERT(span2.context.isSampled()); + ASSERT(span2.context.isValid()); ASSERT(span2.links[0].traceID == UID(1, 1)); ASSERT(span2.links[0].spanID == 1); ASSERT(span2.links[0].m_Flags == TraceFlags::sampled); @@ -741,7 +662,7 @@ std::string readMPString(uint8_t* index) { // Windows doesn't like lack of header and declaration of constructor for FastUDPTracer #ifndef WIN32 TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { - OTELSpan span1("encoded_span"_loc); + Span span1("encoded_span"_loc); auto request = TraceRequest{ .buffer = std::make_unique(kTraceBufferSize), .data_size = 0, .buffer_size = kTraceBufferSize }; @@ -753,9 +674,9 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { // Test - constructor OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link) // Will delegate to other constructors. - OTELSpan span2("encoded_span"_loc, - SpanContext(UID(100, 101), 1, TraceFlags::sampled), - SpanContext(UID(200, 201), 2, TraceFlags::sampled)); + Span span2("encoded_span"_loc, + SpanContext(UID(100, 101), 1, TraceFlags::sampled), + { SpanContext(UID(200, 201), 2, TraceFlags::sampled) }); tracer.serialize_span(span2, request); data = request.buffer.get(); ASSERT(data[0] == 0b10011110); // 14 element array. @@ -801,7 +722,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { request.reset(); // Exercise all fluent interfaces, include links, events, and attributes. - OTELSpan span3("encoded_span_3"_loc); + Span span3("encoded_span_3"_loc, SpanContext()); auto s3Arena = span3.arena; SmallVectorRef attrs; attrs.push_back(s3Arena, KeyValueRef("foo"_sr, "bar"_sr)); @@ -870,7 +791,7 @@ TEST_CASE("/flow/Tracing/FastUDPMessagePackEncoding") { "SGKKUrpIb/7zePhBDi+gzUzyAcbQ2zUbFWI1KNi3zQk58uUG6wWJZkw+GCs7Cc3V" "OUxOljwCJkC4QTgdsbbFhxUC+rtoHV5xAqoTQwR0FXnWigUjP7NtdL6huJUr3qRv" "40c4yUI1a4+P5vJa"; - auto span4 = OTELSpan(); + Span span4; auto location = Location(); location.name = StringRef(span4.arena, longString); span4.location = location; diff --git a/flow/Tracing.h b/flow/Tracing.h index c289a73fccf..02b3c8a5b69 100644 --- a/flow/Tracing.h +++ b/flow/Tracing.h @@ -21,6 +21,7 @@ #pragma once #include "fdbclient/FDBTypes.h" +#include "fdbrpc/FlowTransport.h" #include "flow/IRandom.h" #include #include @@ -33,90 +34,43 @@ inline Location operator"" _loc(const char* str, size_t size) { return Location{ StringRef(reinterpret_cast(str), size) }; } -struct Span { - Span(SpanID context, Location location, std::initializer_list const& parents = {}) - : context(context), begin(g_network->now()), location(location), parents(arena, parents.begin(), parents.end()) { - if (parents.size() > 0) { - // If the parents' token is 0 (meaning the trace should not be - // recorded), set the child token to 0 as well. Otherwise, generate - // a new, random token. - uint64_t traceId = 0; - if ((*parents.begin()).second() > 0) { - traceId = deterministicRandom()->randomUInt64(); - } - this->context = SpanID((*parents.begin()).first(), traceId); - } - } - Span(Location location, std::initializer_list const& parents = {}) - : Span(UID(deterministicRandom()->randomUInt64(), - deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE - ? deterministicRandom()->randomUInt64() - : 0), - location, - parents) {} - Span(Location location, SpanID context) : Span(location, { context }) {} - Span(const Span&) = delete; - Span(Span&& o) { - arena = std::move(o.arena); - context = o.context; - begin = o.begin; - end = o.end; - location = o.location; - parents = std::move(o.parents); - o.context = UID(); - o.begin = 0.0; - o.end = 0.0; - } - Span() {} - ~Span(); - Span& operator=(Span&& o); - Span& operator=(const Span&) = delete; - void swap(Span& other) { - std::swap(arena, other.arena); - std::swap(context, other.context); - std::swap(begin, other.begin); - std::swap(end, other.end); - std::swap(location, other.location); - std::swap(parents, other.parents); - } +enum class TraceFlags : uint8_t { unsampled = 0b00000000, sampled = 0b00000001 }; - void addParent(SpanID span) { - if (parents.size() == 0) { - uint64_t traceId = 0; - if (span.second() > 0) { - traceId = context.second() == 0 ? deterministicRandom()->randomUInt64() : context.second(); - } - // Use first parent to set trace ID. This is non-ideal for spans - // with multiple parents, because the trace ID will associate the - // span with only one trace. A workaround is to look at the parent - // relationships instead of the trace ID. Another option in the - // future is to keep a list of trace IDs. - context = SpanID(span.first(), traceId); - } - parents.push_back(arena, span); - } +inline TraceFlags operator&(TraceFlags lhs, TraceFlags rhs) { + return static_cast(static_cast>(lhs) & + static_cast>(rhs)); +} - void addTag(const StringRef& key, const StringRef& value) { tags[key] = value; } +struct SpanContext { + UID traceID; + uint64_t spanID; + TraceFlags m_Flags; + SpanContext() : traceID(UID()), spanID(0), m_Flags(TraceFlags::unsampled) {} + SpanContext(UID traceID, uint64_t spanID, TraceFlags flags) : traceID(traceID), spanID(spanID), m_Flags(flags) {} + SpanContext(UID traceID, uint64_t spanID) : traceID(traceID), spanID(spanID), m_Flags(TraceFlags::unsampled) {} + SpanContext(Arena arena, const SpanContext& span) + : traceID(span.traceID), spanID(span.spanID), m_Flags(span.m_Flags) {} + bool isSampled() const { return (m_Flags & TraceFlags::sampled) == TraceFlags::sampled; } + std::string toString() const { return format("%016llx%016llx%016llx", traceID.first(), traceID.second(), spanID); }; + bool isValid() const { return traceID.first() != 0 && traceID.second() != 0 && spanID != 0; } - Arena arena; - UID context = UID(); - double begin = 0.0, end = 0.0; - Location location; - SmallVectorRef parents; - std::unordered_map tags; + template + void serialize(Ar& ar) { + serializer(ar, traceID, spanID, m_Flags); + } }; -// OTELSpan +// Span // -// OTELSpan is a tracing implementation which, for the most part, complies with the W3C Trace Context specification +// Span is a tracing implementation which, for the most part, complies with the W3C Trace Context specification // https://www.w3.org/TR/trace-context/ and the OpenTelemetry API // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md. // -// The major differences between OTELSpan and the current Span implementation, which is based off the OpenTracing.io +// The major differences between Span and the 7.0 Span implementation, which is based off the OpenTracing.io // specification https://opentracing.io/ are as follows. // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#span // -// OTELSpans have... +// OpenTelemetry Spans have... // 1. A SpanContext which consists of 3 attributes. // // TraceId - A valid trace identifier is a 16-byte array with at least one non-zero byte. @@ -146,82 +100,63 @@ enum class SpanKind : uint8_t { INTERNAL = 0, CLIENT = 1, SERVER = 2, PRODUCER = enum class SpanStatus : uint8_t { UNSET = 0, OK = 1, ERR = 2 }; -struct OTELEventRef { - OTELEventRef() {} - OTELEventRef(const StringRef& name, +struct SpanEventRef { + SpanEventRef() {} + SpanEventRef(const StringRef& name, const double& time, const SmallVectorRef& attributes = SmallVectorRef()) : name(name), time(time), attributes(attributes) {} - OTELEventRef(Arena& arena, const OTELEventRef& other) + SpanEventRef(Arena& arena, const SpanEventRef& other) : name(arena, other.name), time(other.time), attributes(arena, other.attributes) {} StringRef name; double time = 0.0; SmallVectorRef attributes; }; -class OTELSpan { +class Span { public: - OTELSpan(const SpanContext& context, - const Location& location, - const SpanContext& parentContext, - const std::initializer_list& links = {}) + // Construct a Span with a given context, location, parentContext and optional links. + // + // N.B. While this constructor receives a parentContext it does not overwrite the traceId of the Span's context. + // Therefore it is the responsibility of the caller to ensure the traceID and m_Flags of both the context and + // parentContext are identical if the caller wishes to establish a parent/child relationship between these spans. We + // do this to avoid needless comparisons or copies as this constructor is only called once in NativeAPI.actor.cpp + // and from below in the by the Span(location, parent, links) constructor. The Span(location, parent, links) + // constructor is used broadly and performs the copy of the parent's traceID and m_Flags. + Span(const SpanContext& context, + const Location& location, + const SpanContext& parentContext, + const std::initializer_list& links = {}) : context(context), location(location), parentContext(parentContext), links(arena, links.begin(), links.end()), begin(g_network->now()) { - // We've simplified the logic here, essentially we're now always setting trace and span ids and relying on the - // TraceFlags to determine if we're sampling. Therefore if the parent is sampled, we simply overwrite this - // span's traceID with the parent trace id. - if (parentContext.isSampled()) { - this->context.traceID = UID(parentContext.traceID.first(), parentContext.traceID.second()); - this->context.m_Flags = TraceFlags::sampled; - } else { - // However there are two other cases. - // 1. A legitamite parent span exists but it was not selected for tracing. - // 2. There is no actual parent, just a default arg parent provided by the constructor AND the "child" span - // was selected for sampling. For case 1. we handle below by marking the child as unsampled. For case 2 we - // needn't do anything, and can rely on the values in this OTELSpan - if (parentContext.traceID.first() != 0 && parentContext.traceID.second() != 0 && - parentContext.spanID != 0) { - this->context.m_Flags = TraceFlags::unsampled; - } - } this->kind = SpanKind::SERVER; this->status = SpanStatus::OK; this->attributes.push_back( - this->arena, KeyValueRef("address"_sr, StringRef(this->arena, g_network->getLocalAddress().toString()))); + // this->arena, KeyValueRef("address"_sr, StringRef(this->arena, "localhost:4000"))); + this->arena, + KeyValueRef("address"_sr, StringRef(this->arena, FlowTransport::transport().getLocalAddressAsString()))); } - OTELSpan(const Location& location, - const SpanContext& parent = SpanContext(), - const std::initializer_list& links = {}) - : OTELSpan( - SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()), // traceID - deterministicRandom()->randomUInt64(), // spanID - deterministicRandom()->random01() < FLOW_KNOBS->TRACING_SAMPLE_RATE // sampled or unsampled - ? TraceFlags::sampled - : TraceFlags::unsampled), - location, - parent, - links) {} + // Construct Span with a location, parent, and optional links. + // This constructor copies the parent's traceID creating a parent->child relationship between Spans. + // Additionally we inherit the m_Flags of the parent, thus enabling or disabling sampling to match the parent. + Span(const Location& location, const SpanContext& parent, const std::initializer_list& links = {}) + : Span(SpanContext(parent.traceID, deterministicRandom()->randomUInt64(), parent.m_Flags), + location, + parent, + links) {} - OTELSpan(const Location& location, const SpanContext parent, const SpanContext& link) - : OTELSpan(location, parent, { link }) {} + // Construct Span without parent. Used for creating a root span, or when the parent is not known at construction + // time. + Span(const SpanContext& context, const Location& location) : Span(context, location, SpanContext()) {} - // NOTE: This constructor is primarly for unit testing until we sort out how to enable/disable a Knob dynamically in - // a test. - OTELSpan(const Location& location, - const std::function& rateProvider, - const SpanContext& parent = SpanContext(), - const std::initializer_list& links = {}) - : OTELSpan(SpanContext(UID(deterministicRandom()->randomUInt64(), deterministicRandom()->randomUInt64()), - deterministicRandom()->randomUInt64(), - deterministicRandom()->random01() < rateProvider() ? TraceFlags::sampled - : TraceFlags::unsampled), - location, - parent, - links) {} + // We've determined for initial tracing release, spans with only a location will not be traced. + // Generally these are for background processes, some are called infrequently, while others may be high volume. + // TODO: review and address in subsequent PRs. + Span(const Location& location) : location(location), begin(g_network->now()) {} - OTELSpan(const OTELSpan&) = delete; - OTELSpan(OTELSpan&& o) { + Span(const Span&) = delete; + Span(Span&& o) { arena = std::move(o.arena); context = o.context; location = o.location; @@ -239,11 +174,11 @@ class OTELSpan { o.end = 0.0; o.status = SpanStatus::UNSET; } - OTELSpan() {} - ~OTELSpan(); - OTELSpan& operator=(OTELSpan&& o); - OTELSpan& operator=(const OTELSpan&) = delete; - void swap(OTELSpan& other) { + Span() {} + ~Span(); + Span& operator=(Span&& o); + Span& operator=(const Span&) = delete; + void swap(Span& other) { std::swap(arena, other.arena); std::swap(context, other.context); std::swap(location, other.location); @@ -256,34 +191,53 @@ class OTELSpan { std::swap(events, other.events); } - OTELSpan& addLink(const SpanContext& linkContext) { + Span& addLink(const SpanContext& linkContext) { links.push_back(arena, linkContext); + // Check if link is sampled, if so sample this span. + if (!context.isSampled() && linkContext.isSampled()) { + context.m_Flags = TraceFlags::sampled; + // If for some reason this span isn't valid, we need to give it a + // traceID and spanID. This case is currently hit in CommitProxyServer + // CommitBatchContext::CommitBatchContext and CommitBatchContext::setupTraceBatch. + if (!context.isValid()) { + context.traceID = deterministicRandom()->randomUniqueID(); + context.spanID = deterministicRandom()->randomUInt64(); + } + } return *this; } - OTELSpan& addLinks(const std::initializer_list& linkContexts = {}) { + Span& addLinks(const std::initializer_list& linkContexts = {}) { for (auto const& sc : linkContexts) { - links.push_back(arena, sc); + addLink(sc); } return *this; } - OTELSpan& addEvent(const OTELEventRef& event) { + Span& addEvent(const SpanEventRef& event) { events.push_back_deep(arena, event); return *this; } - OTELSpan& addEvent(const StringRef& name, - const double& time, - const SmallVectorRef& attrs = SmallVectorRef()) { - return addEvent(OTELEventRef(name, time, attrs)); + Span& addEvent(const StringRef& name, + const double& time, + const SmallVectorRef& attrs = SmallVectorRef()) { + return addEvent(SpanEventRef(name, time, attrs)); } - OTELSpan& addAttribute(const StringRef& key, const StringRef& value) { + Span& addAttribute(const StringRef& key, const StringRef& value) { attributes.push_back_deep(arena, KeyValueRef(key, value)); return *this; } + Span& setParent(const SpanContext& parent) { + parentContext = parent; + context.traceID = parent.traceID; + context.spanID = deterministicRandom()->randomUInt64(); + context.m_Flags = parent.m_Flags; + return *this; + } + Arena arena; SpanContext context; Location location; @@ -292,7 +246,7 @@ class OTELSpan { SmallVectorRef links; double begin = 0.0, end = 0.0; SmallVectorRef attributes; // not necessarily sorted - SmallVectorRef events; + SmallVectorRef events; SpanStatus status; }; @@ -311,7 +265,6 @@ struct ITracer { virtual TracerType type() const = 0; // passed ownership to the tracer virtual void trace(Span const& span) = 0; - virtual void trace(OTELSpan const& span) = 0; }; void openTracer(TracerType type); @@ -328,16 +281,3 @@ struct SpannedDeque : Deque { span = std::move(other.span); } }; - -template -struct OTELSpannedDeque : Deque { - OTELSpan span; - explicit OTELSpannedDeque(Location loc) : span(loc) {} - OTELSpannedDeque(OTELSpannedDeque&& other) : Deque(std::move(other)), span(std::move(other.span)) {} - OTELSpannedDeque(OTELSpannedDeque const&) = delete; - OTELSpannedDeque& operator=(OTELSpannedDeque const&) = delete; - OTELSpannedDeque& operator=(OTELSpannedDeque&& other) { - *static_cast*>(this) = std::move(other); - span = std::move(other.span); - } -};