Skip to content

Commit

Permalink
Merge main into release
Browse files Browse the repository at this point in the history
  • Loading branch information
google-oss-bot authored Aug 14, 2024
2 parents 62c4c98 + 6d6ce81 commit 926709f
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 110 deletions.
6 changes: 6 additions & 0 deletions .changeset/itchy-boxes-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'firebase': minor
'@firebase/storage': minor
---

Migrate from the Node to Web ReadableStream interface
5 changes: 5 additions & 0 deletions .changeset/large-games-dress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@firebase/app': patch
---

Prevent heartbeats methods from throwing - warn instead.
2 changes: 1 addition & 1 deletion common/api-review/storage.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export function getMetadata(ref: StorageReference): Promise<FullMetadata>;
export function getStorage(app?: FirebaseApp, bucketUrl?: string): FirebaseStorage;

// @public
export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream;
export function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream;

// @internal (undocumented)
export function _invalidArgument(message: string): StorageError;
Expand Down
4 changes: 2 additions & 2 deletions docs-devsite/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ This API is only available in Node.
<b>Signature:</b>

```typescript
export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): NodeJS.ReadableStream;
export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?: number): ReadableStream;
```

#### Parameters
Expand All @@ -291,7 +291,7 @@ export declare function getStream(ref: StorageReference, maxDownloadSizeBytes?:

<b>Returns:</b>

NodeJS.ReadableStream
ReadableStream

A stream with the object's data as bytes

Expand Down
20 changes: 20 additions & 0 deletions packages/app/src/heartbeatService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,26 @@ describe('HeartbeatServiceImpl', () => {
const emptyHeaders = await heartbeatService.getHeartbeatsHeader();
expect(emptyHeaders).to.equal('');
});
it(`triggerHeartbeat() doesn't throw even if code errors`, async () => {
//@ts-expect-error Ensure this doesn't match
heartbeatService._heartbeatsCache?.lastSentHeartbeatDate = 50;
//@ts-expect-error Ensure you can't .push() to this
heartbeatService._heartbeatsCache.heartbeats = 50;
const warnStub = stub(console, 'warn');
await heartbeatService.triggerHeartbeat();
expect(warnStub).to.be.called;
expect(warnStub.args[0][1].message).to.include('heartbeats');
warnStub.restore();
});
it(`getHeartbeatsHeader() doesn't throw even if code errors`, async () => {
//@ts-expect-error Ensure you can't .push() to this
heartbeatService._heartbeatsCache.heartbeats = 50;
const warnStub = stub(console, 'warn');
await heartbeatService.getHeartbeatsHeader();
expect(warnStub).to.be.called;
expect(warnStub.args[0][1].message).to.include('heartbeats');
warnStub.restore();
});
});
describe('If IndexedDB has entries', () => {
let heartbeatService: HeartbeatServiceImpl;
Expand Down
136 changes: 73 additions & 63 deletions packages/app/src/heartbeatService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
HeartbeatStorage,
SingleDateHeartbeat
} from './types';
import { logger } from './logger';

const MAX_HEADER_BYTES = 1024;
// 30 days
Expand Down Expand Up @@ -80,43 +81,47 @@ export class HeartbeatServiceImpl implements HeartbeatService {
* already logged, subsequent calls to this function in the same day will be ignored.
*/
async triggerHeartbeat(): Promise<void> {
const platformLogger = this.container
.getProvider('platform-logger')
.getImmediate();
try {
const platformLogger = this.container
.getProvider('platform-logger')
.getImmediate();

// This is the "Firebase user agent" string from the platform logger
// service, not the browser user agent.
const agent = platformLogger.getPlatformInfoString();
const date = getUTCDateString();
if (this._heartbeatsCache?.heartbeats == null) {
this._heartbeatsCache = await this._heartbeatsCachePromise;
// If we failed to construct a heartbeats cache, then return immediately.
// This is the "Firebase user agent" string from the platform logger
// service, not the browser user agent.
const agent = platformLogger.getPlatformInfoString();
const date = getUTCDateString();
console.log('heartbeats', this._heartbeatsCache?.heartbeats);
if (this._heartbeatsCache?.heartbeats == null) {
this._heartbeatsCache = await this._heartbeatsCachePromise;
// If we failed to construct a heartbeats cache, then return immediately.
if (this._heartbeatsCache?.heartbeats == null) {
return;
}
}
// Do not store a heartbeat if one is already stored for this day
// or if a header has already been sent today.
if (
this._heartbeatsCache.lastSentHeartbeatDate === date ||
this._heartbeatsCache.heartbeats.some(
singleDateHeartbeat => singleDateHeartbeat.date === date
)
) {
return;
} else {
// There is no entry for this date. Create one.
this._heartbeatsCache.heartbeats.push({ date, agent });
}
// Remove entries older than 30 days.
this._heartbeatsCache.heartbeats =
this._heartbeatsCache.heartbeats.filter(singleDateHeartbeat => {
const hbTimestamp = new Date(singleDateHeartbeat.date).valueOf();
const now = Date.now();
return now - hbTimestamp <= STORED_HEARTBEAT_RETENTION_MAX_MILLIS;
});
return this._storage.overwrite(this._heartbeatsCache);
} catch (e) {
logger.warn(e);
}
// Do not store a heartbeat if one is already stored for this day
// or if a header has already been sent today.
if (
this._heartbeatsCache.lastSentHeartbeatDate === date ||
this._heartbeatsCache.heartbeats.some(
singleDateHeartbeat => singleDateHeartbeat.date === date
)
) {
return;
} else {
// There is no entry for this date. Create one.
this._heartbeatsCache.heartbeats.push({ date, agent });
}
// Remove entries older than 30 days.
this._heartbeatsCache.heartbeats = this._heartbeatsCache.heartbeats.filter(
singleDateHeartbeat => {
const hbTimestamp = new Date(singleDateHeartbeat.date).valueOf();
const now = Date.now();
return now - hbTimestamp <= STORED_HEARTBEAT_RETENTION_MAX_MILLIS;
}
);
return this._storage.overwrite(this._heartbeatsCache);
}

/**
Expand All @@ -127,39 +132,44 @@ export class HeartbeatServiceImpl implements HeartbeatService {
* returns an empty string.
*/
async getHeartbeatsHeader(): Promise<string> {
if (this._heartbeatsCache === null) {
await this._heartbeatsCachePromise;
}
// If it's still null or the array is empty, there is no data to send.
if (
this._heartbeatsCache?.heartbeats == null ||
this._heartbeatsCache.heartbeats.length === 0
) {
try {
if (this._heartbeatsCache === null) {
await this._heartbeatsCachePromise;
}
// If it's still null or the array is empty, there is no data to send.
if (
this._heartbeatsCache?.heartbeats == null ||
this._heartbeatsCache.heartbeats.length === 0
) {
return '';
}
const date = getUTCDateString();
// Extract as many heartbeats from the cache as will fit under the size limit.
const { heartbeatsToSend, unsentEntries } = extractHeartbeatsForHeader(
this._heartbeatsCache.heartbeats
);
const headerString = base64urlEncodeWithoutPadding(
JSON.stringify({ version: 2, heartbeats: heartbeatsToSend })
);
// Store last sent date to prevent another being logged/sent for the same day.
this._heartbeatsCache.lastSentHeartbeatDate = date;
if (unsentEntries.length > 0) {
// Store any unsent entries if they exist.
this._heartbeatsCache.heartbeats = unsentEntries;
// This seems more likely than emptying the array (below) to lead to some odd state
// since the cache isn't empty and this will be called again on the next request,
// and is probably safest if we await it.
await this._storage.overwrite(this._heartbeatsCache);
} else {
this._heartbeatsCache.heartbeats = [];
// Do not wait for this, to reduce latency.
void this._storage.overwrite(this._heartbeatsCache);
}
return headerString;
} catch (e) {
logger.warn(e);
return '';
}
const date = getUTCDateString();
// Extract as many heartbeats from the cache as will fit under the size limit.
const { heartbeatsToSend, unsentEntries } = extractHeartbeatsForHeader(
this._heartbeatsCache.heartbeats
);
const headerString = base64urlEncodeWithoutPadding(
JSON.stringify({ version: 2, heartbeats: heartbeatsToSend })
);
// Store last sent date to prevent another being logged/sent for the same day.
this._heartbeatsCache.lastSentHeartbeatDate = date;
if (unsentEntries.length > 0) {
// Store any unsent entries if they exist.
this._heartbeatsCache.heartbeats = unsentEntries;
// This seems more likely than emptying the array (below) to lead to some odd state
// since the cache isn't empty and this will be called again on the next request,
// and is probably safest if we await it.
await this._storage.overwrite(this._heartbeatsCache);
} else {
this._heartbeatsCache.heartbeats = [];
// Do not wait for this, to reduce latency.
void this._storage.overwrite(this._heartbeatsCache);
}
return headerString;
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/api.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ export function getBlob(
export function getStream(
ref: StorageReference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
throw new Error('getStream() is only supported by NodeJS builds');
}
2 changes: 1 addition & 1 deletion packages/storage/src/api.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export function getBlob(
export function getStream(
ref: StorageReference,
maxDownloadSizeBytes?: number
): NodeJS.ReadableStream {
): ReadableStream {
ref = getModularInstance(ref);
return getStreamInternal(ref as Reference, maxDownloadSizeBytes);
}
3 changes: 1 addition & 2 deletions packages/storage/src/implementation/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Network headers */
export type Headers = Record<string, string>;

Expand All @@ -23,7 +22,7 @@ export type ConnectionType =
| string
| ArrayBuffer
| Blob
| NodeJS.ReadableStream;
| ReadableStream<Uint8Array>;

/**
* A lightweight wrapper around XMLHttpRequest with a
Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/platform/browser/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ export function newBlobConnection(): Connection<Blob> {
return new XhrBlobConnection();
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream> {
throw new Error('Streams are only supported on Node');
}

Expand Down
2 changes: 1 addition & 1 deletion packages/storage/src/platform/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export function newBlobConnection(): Connection<Blob> {
return nodeNewBlobConnection();
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream<Uint8Array>> {
// This file is only used in Node.js tests using ts-node.
return nodeNewStreamConnection();
}
20 changes: 11 additions & 9 deletions packages/storage/src/platform/node/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ abstract class FetchConnection<T extends ConnectionType>
async send(
url: string,
method: string,
body?: ArrayBufferView | Blob | string,
body?: NodeJS.ArrayBufferView | Blob | string,
headers?: Record<string, string>
): Promise<void> {
if (this.sent_) {
Expand All @@ -62,7 +62,7 @@ abstract class FetchConnection<T extends ConnectionType>
const response = await this.fetch_(url, {
method,
headers: headers || {},
body: body as ArrayBufferView | string
body: body as NodeJS.ArrayBufferView | string
});
this.headers_ = response.headers;
this.statusCode_ = response.status;
Expand Down Expand Up @@ -146,13 +146,15 @@ export function newBytesConnection(): Connection<ArrayBuffer> {
return new FetchBytesConnection();
}

export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream> {
private stream_: NodeJS.ReadableStream | null = null;
export class FetchStreamConnection extends FetchConnection<
ReadableStream<Uint8Array>
> {
private stream_: ReadableStream<Uint8Array> | null = null;

async send(
url: string,
method: string,
body?: ArrayBufferView | Blob | string,
body?: NodeJS.ArrayBufferView | Blob | string,
headers?: Record<string, string>
): Promise<void> {
if (this.sent_) {
Expand All @@ -164,12 +166,12 @@ export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream
const response = await this.fetch_(url, {
method,
headers: headers || {},
body: body as ArrayBufferView | string
body: body as NodeJS.ArrayBufferView | string
});
this.headers_ = response.headers;
this.statusCode_ = response.status;
this.errorCode_ = ErrorCode.NO_ERROR;
this.stream_ = response.body;
this.stream_ = response.body as ReadableStream<Uint8Array>;
} catch (e) {
this.errorText_ = (e as Error)?.message;
// emulate XHR which sets status to 0 when encountering a network error
Expand All @@ -178,15 +180,15 @@ export class FetchStreamConnection extends FetchConnection<NodeJS.ReadableStream
}
}

getResponse(): NodeJS.ReadableStream {
getResponse(): ReadableStream {
if (!this.stream_) {
throw internalError('cannot .getResponse() before sending');
}
return this.stream_;
}
}

export function newStreamConnection(): Connection<NodeJS.ReadableStream> {
export function newStreamConnection(): Connection<ReadableStream<Uint8Array>> {
return new FetchStreamConnection();
}

Expand Down
Loading

0 comments on commit 926709f

Please sign in to comment.