3030using System . Text . Json ;
3131using System . Text . RegularExpressions ;
3232using System . Threading . Tasks ;
33+ using Apache . Arrow ;
34+ using Apache . Arrow . Adbc ;
3335using Apache . Arrow . Adbc . Extensions ;
36+ using Apache . Arrow . Adbc . Telemetry . Traces . Listeners ;
37+ using Apache . Arrow . Adbc . Telemetry . Traces . Listeners . FileListener ;
3438using Apache . Arrow . Adbc . Tracing ;
3539using Apache . Arrow . Ipc ;
3640using Apache . Arrow . Types ;
3943using Google . Apis . Bigquery . v2 . Data ;
4044using Google . Cloud . BigQuery . V2 ;
4145
42- namespace Apache . Arrow . Adbc . Drivers . BigQuery
46+ namespace AdbcDrivers . BigQuery
4347{
4448 /// <summary>
4549 /// BigQuery-specific implementation of <see cref="AdbcConnection"/>
@@ -51,6 +55,9 @@ public class BigQueryConnection : TracingConnection, ITokenProtectedResource
5155 bool includePublicProjectIds = false ;
5256 const string infoDriverName = "ADBC BigQuery Driver" ;
5357 const string infoVendorName = "BigQuery" ;
58+ // Note: this needs to be set before the constructor runs
59+ private readonly string _traceInstanceId = Guid . NewGuid ( ) . ToString ( "N" ) ;
60+ private readonly FileActivityListener ? _fileActivityListener ;
5461
5562 private readonly string infoDriverArrowVersion = BigQueryUtils . GetAssemblyVersion ( typeof ( IArrowArray ) ) ;
5663
@@ -72,10 +79,20 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties) : base
7279 this . properties = properties . ToDictionary ( k => k . Key , v => v . Value ) ;
7380 }
7481
75- // add the default value for now and set to true until C# has a BigDecimal
76- this . properties [ BigQueryParameters . LargeDecimalsAsString ] = BigQueryConstants . TreatLargeDecimalAsString ;
82+ TryInitTracerProvider ( out _fileActivityListener ) ;
83+
7784 this . httpClient = new HttpClient ( ) ;
7885
86+ if ( this . properties . TryGetValue ( BigQueryParameters . LargeDecimalsAsString , out string ? sLargeDecimalsAsString ) &&
87+ bool . TryParse ( sLargeDecimalsAsString , out bool largeDecimalsAsString ) )
88+ {
89+ this . properties [ BigQueryParameters . LargeDecimalsAsString ] = largeDecimalsAsString . ToString ( ) ;
90+ }
91+ else
92+ {
93+ this . properties [ BigQueryParameters . LargeDecimalsAsString ] = BigQueryConstants . TreatLargeDecimalAsString ;
94+ }
95+
7996 if ( this . properties . TryGetValue ( BigQueryParameters . MaximumRetryAttempts , out string ? sRetryAttempts ) &&
8097 int . TryParse ( sRetryAttempts , out int retries ) &&
8198 retries >= 0 )
@@ -89,8 +106,40 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties) : base
89106 {
90107 RetryDelayMs = delay ;
91108 }
109+
110+ if ( this . properties . TryGetValue ( BigQueryParameters . DefaultClientLocation , out string ? location ) &&
111+ ! string . IsNullOrEmpty ( location ) &&
112+ BigQueryConstants . ValidLocations . Any ( l => l . Equals ( location , StringComparison . OrdinalIgnoreCase ) ) )
113+ {
114+ DefaultClientLocation = location ;
115+ }
92116 }
93117
118+ private bool TryInitTracerProvider ( out FileActivityListener ? fileActivityListener )
119+ {
120+ properties . TryGetValue ( ListenersOptions . Exporter , out string ? exporterOption ) ;
121+ // This listener will only listen for activity from this specific connection instance.
122+ bool shouldListenTo ( ActivitySource source ) => source . Tags ? . Any ( t => ReferenceEquals ( t . Key , _traceInstanceId ) ) == true ;
123+ return FileActivityListener . TryActivateFileListener ( AssemblyName , exporterOption , out fileActivityListener , shouldListenTo : shouldListenTo ) ;
124+ }
125+
126+ public override IEnumerable < KeyValuePair < string , object ? > > ? GetActivitySourceTags ( IReadOnlyDictionary < string , string > properties )
127+ {
128+ IEnumerable < KeyValuePair < string , object ? > > ? tags = base . GetActivitySourceTags ( properties ) ;
129+ tags ??= [ ] ;
130+ tags = tags . Concat ( [ new ( _traceInstanceId , null ) ] ) ;
131+ return tags ;
132+ }
133+
134+ /// <summary>
135+ /// Conditional used to determines if it is safe to trace
136+ /// </summary>
137+ /// <remarks>
138+ /// It is safe to write to some output types (ie, files) but not others (ie, a shared resource).
139+ /// </remarks>
140+ /// <returns></returns>
141+ internal bool IsSafeToTrace => _fileActivityListener != null ;
142+
94143 /// <summary>
95144 /// The function to call when updating the token.
96145 /// </summary>
@@ -106,6 +155,9 @@ public BigQueryConnection(IReadOnlyDictionary<string, string> properties) : base
106155
107156 internal int RetryDelayMs { get ; private set ; } = 200 ;
108157
158+ // if this value is null, the BigQuery API chooses the location (typically the `US` multi-region)
159+ internal string ? DefaultClientLocation { get ; private set ; }
160+
109161 public override string AssemblyVersion => BigQueryUtils . BigQueryAssemblyVersion ;
110162
111163 public override string AssemblyName => BigQueryUtils . BigQueryAssemblyName ;
@@ -177,6 +229,24 @@ internal BigQueryClient Open(string? projectId = null)
177229 GoogleCredential = Credential
178230 } ;
179231
232+ if ( ! string . IsNullOrEmpty ( DefaultClientLocation ) )
233+ {
234+ // If the user selects a public dataset (from a multi-region) but sets this
235+ // value to a specific location like us-east4, then there is an error produced
236+ // that the caller doesn't have permission to call to the public dataset.
237+ // Example:
238+ // Access Denied: Table bigquery-public-data:blockchain_analytics_ethereum_mainnet_us.accounts:
239+ // User does not have permission to query table bigquery-public-data:blockchain_analytics_ethereum_mainnet_us.accounts,
240+ // or perhaps it does not exist.'
241+
242+ bigQueryClientBuilder . DefaultLocation = DefaultClientLocation ;
243+ activity ? . AddBigQueryParameterTag ( BigQueryParameters . DefaultClientLocation , DefaultClientLocation ) ;
244+ }
245+ else
246+ {
247+ activity ? . AddBigQueryTag ( "client.default_location" , null ) ;
248+ }
249+
180250 BigQueryClient client = bigQueryClientBuilder . Build ( ) ;
181251
182252 if ( clientTimeout . HasValue )
@@ -476,7 +546,7 @@ internal void UpdateClientToken()
476546
477547 return this . TraceActivity ( activity =>
478548 {
479- activity ? . AddConditionalTag ( SemanticConventions . Db . Query . Text , sql , BigQueryUtils . IsSafeToTrace ( ) ) ;
549+ activity ? . AddConditionalTag ( SemanticConventions . Db . Query . Text , sql , IsSafeToTrace ) ;
480550
481551 Func < Task < BigQueryResults ? > > func = ( ) => Client . ExecuteQueryAsync ( sql , parameters ?? Enumerable . Empty < BigQueryParameter > ( ) , queryOptions , resultsOptions ) ;
482552 BigQueryResults ? result = ExecuteWithRetriesAsync < BigQueryResults ? > ( func , activity ) . GetAwaiter ( ) . GetResult ( ) ;
@@ -1279,6 +1349,7 @@ public override void Dispose()
12791349 Client ? . Dispose ( ) ;
12801350 Client = null ;
12811351 this . httpClient ? . Dispose ( ) ;
1352+ this . _fileActivityListener ? . Dispose ( ) ;
12821353 }
12831354
12841355 private static Regex sanitizedInputRegex = new Regex ( "^[a-zA-Z0-9_-]+" ) ;
0 commit comments