@@ -81,13 +81,14 @@ class _PubSubImpl implements PubSub {
8181 return _api.projects.subscriptions.modifyPushConfig (request, subscription);
8282 }
8383
84- Future _publish (
85- String topic, List < int > message, Map <String , String > attributes) {
84+ Future _publish (String topic, List < int > message,
85+ Map <String , String > attributes, String ? orderingKey ) {
8686 var request = pubsub.PublishRequest ()
8787 ..messages = [
8888 (pubsub.PubsubMessage ()
8989 ..dataAsBytes = message
90- ..attributes = attributes.isEmpty ? null : attributes)
90+ ..attributes = attributes.isEmpty ? null : attributes
91+ ..orderingKey = orderingKey)
9192 ];
9293 // TODO(sgjesse): Handle PublishResponse containing message ids.
9394 return _api.projects.topics.publish (request, topic).then ((_) => null );
@@ -227,13 +228,18 @@ class _MessageImpl implements Message {
227228 @override
228229 final Map <String , String > attributes;
229230
231+ @override
232+ final String ? orderingKey;
233+
230234 _MessageImpl .withString (
231235 this ._stringMessage, {
232236 Map <String , String >? attributes,
237+ this .orderingKey,
233238 }) : _bytesMessage = null ,
234239 attributes = attributes ?? < String , String > {};
235240
236- _MessageImpl .withBytes (this ._bytesMessage, {Map <String , String >? attributes})
241+ _MessageImpl .withBytes (this ._bytesMessage,
242+ {Map <String , String >? attributes, this .orderingKey})
237243 : _stringMessage = null ,
238244 attributes = attributes ?? < String , String > {};
239245
@@ -251,11 +257,14 @@ class _MessageImpl implements Message {
251257///
252258/// The labels map is lazily created when first accessed.
253259class _PullMessage implements Message {
260+ _PullMessage (this ._message, this .orderingKey);
261+
254262 final pubsub.PubsubMessage _message;
255263 List <int >? _bytes;
256264 String ? _string;
257265
258- _PullMessage (this ._message);
266+ @override
267+ String ? orderingKey;
259268
260269 @override
261270 List <int > get asBytes {
@@ -281,11 +290,15 @@ class _PullMessage implements Message {
281290///
282291/// The labels have been decoded into a Map.
283292class _PushMessage implements Message {
284- final String _base64Message;
293+ _PushMessage (this ._base64Message, this .attributes, this .orderingKey);
294+
285295 @override
286296 final Map <String , String > attributes;
287297
288- _PushMessage (this ._base64Message, this .attributes);
298+ @override
299+ final String ? orderingKey;
300+
301+ final String _base64Message;
289302
290303 @override
291304 List <int > get asBytes => base64.decode (_base64Message);
@@ -306,13 +319,15 @@ class _PullEventImpl implements PullEvent {
306319
307320 /// Low level response received from Pub/Sub.
308321 final pubsub.PullResponse _response;
322+
309323 @override
310324 final Message message;
311325
312326 _PullEventImpl (
313327 this ._api, this ._subscriptionName, pubsub.PullResponse response)
314328 : _response = response,
315- message = _PullMessage (response.receivedMessages! [0 ].message! );
329+ message = _PullMessage (response.receivedMessages! [0 ].message! ,
330+ response.receivedMessages! [0 ].message? .orderingKey);
316331
317332 @override
318333 Future acknowledge () {
@@ -346,13 +361,15 @@ class _PushEventImpl implements PushEvent {
346361 var value = l['strValue' ] ?? l['numValue' ];
347362 labels[key] = value.toString ();
348363 }
364+ var orderingKey = body['orderingKey' ] as String ? ;
349365 var subscription = body['subscription' ] as String ;
350366 // TODO(#1): Remove this when the push event subscription name is prefixed
351367 // with '/subscriptions/'.
352368 if (! subscription.startsWith (_prefix)) {
353369 subscription = _prefix + subscription;
354370 }
355- return _PushEventImpl (_PushMessage (data, labels), subscription);
371+ return _PushEventImpl (
372+ _PushMessage (data, labels, orderingKey), subscription);
356373 }
357374}
358375
@@ -379,22 +396,26 @@ class _TopicImpl implements Topic {
379396
380397 @override
381398 Future publish (Message message) {
382- return _api._publish (_topic.name! , message.asBytes, message.attributes);
399+ return _api._publish (
400+ _topic.name! , message.asBytes, message.attributes, message.orderingKey);
383401 }
384402
385403 @override
386404 Future delete () => _api._deleteTopic (_topic.name! );
387405
388406 @override
389- Future publishString (String message, {Map <String , String >? attributes}) {
407+ Future publishString (String message,
408+ {Map <String , String >? attributes, String ? orderingKey}) {
390409 attributes ?? = < String , String > {};
391- return _api._publish (_topic.name! , utf8.encode (message), attributes);
410+ return _api._publish (
411+ _topic.name! , utf8.encode (message), attributes, orderingKey);
392412 }
393413
394414 @override
395- Future publishBytes (List <int > message, {Map <String , String >? attributes}) {
415+ Future publishBytes (List <int > message,
416+ {Map <String , String >? attributes, String ? orderingKey}) {
396417 attributes ?? = < String , String > {};
397- return _api._publish (_topic.name! , message, attributes);
418+ return _api._publish (_topic.name! , message, attributes, orderingKey );
398419 }
399420}
400421
0 commit comments