1+ use futures:: StreamExt ;
2+ use serde_json;
3+ use azure_data_cosmos:: { CosmosClient , CosmosClientOptions , PartitionKey } ;
4+ use azure_identity:: DefaultAzureCredential ;
5+ use crate :: item:: Item ;
6+
7+ pub async fn run < F > (
8+ endpoint : String ,
9+ database_name : String ,
10+ container_name : String ,
11+ callback : F ,
12+ )
13+ where
14+ F : Fn ( String ) ,
15+ {
16+ callback ( "Current Status:\t Starting..." . to_string ( ) ) ;
17+
18+ let credential = DefaultAzureCredential :: new ( ) . unwrap ( ) ;
19+
20+ let client_options = CosmosClientOptions :: default ( ) ;
21+ let client_options = Some ( client_options) ;
22+
23+ let service_client = match CosmosClient :: new ( & endpoint, credential, client_options) {
24+ Ok ( client) => client,
25+ Err ( e) => {
26+ eprintln ! ( "Error creating CosmosClient: {}" , e) ;
27+ return ;
28+ }
29+ } ;
30+ callback ( "Client created" . to_string ( ) ) ;
31+
32+ let database_client = service_client. database_client ( & database_name) ;
33+ callback ( format ! ( "Get database:\t {}" , database_name) ) ;
34+
35+ let container_client = database_client. container_client ( & container_name) ;
36+ callback ( format ! ( "Get container:\t {}" , container_name) ) ;
37+
38+ {
39+ let item = Item {
40+ id : "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb" . to_string ( ) ,
41+ category : "gear-surf-surfboards" . to_string ( ) ,
42+ name : "Yamba Surfboard" . to_string ( ) ,
43+ quantity : 12 ,
44+ price : 850.00 ,
45+ clearance : false ,
46+ } ;
47+
48+ let partition_key = PartitionKey :: from ( item. category . clone ( ) ) ;
49+
50+ let upsert_response = container_client. upsert_item ( partition_key, item, None ) . await ;
51+
52+ match upsert_response {
53+ Ok ( r) => {
54+ let deserialize_response = r. deserialize_body ( ) . await ;
55+ match deserialize_response {
56+ Ok ( i) => {
57+ let upserted_item = i. unwrap ( ) ;
58+ callback ( format ! ( "Upserted item:\t {}" , upserted_item. id) ) ;
59+ } ,
60+ Err ( e) => {
61+ eprintln ! ( "Error deserializing response: {}" , e) ;
62+ } ,
63+ }
64+ } ,
65+ Err ( e) => {
66+ eprintln ! ( "Error upserting item: {}" , e) ;
67+ } ,
68+ }
69+ }
70+
71+ {
72+ let item = Item {
73+ id : "bbbbbbbb-1111-2222-3333-cccccccccccc" . to_string ( ) ,
74+ category : "gear-surf-surfboards" . to_string ( ) ,
75+ name : "Kiama Classic Surfboard" . to_string ( ) ,
76+ quantity : 25 ,
77+ price : 790.00 ,
78+ clearance : true ,
79+ } ;
80+
81+ let partition_key = PartitionKey :: from ( item. category . clone ( ) ) ;
82+
83+ let upsert_response = container_client. upsert_item ( partition_key, item, None ) . await ;
84+
85+ match upsert_response {
86+ Ok ( r) => {
87+ let deserialize_response = r. deserialize_body ( ) . await ;
88+ match deserialize_response {
89+ Ok ( i) => {
90+ let upserted_item = i. unwrap ( ) ;
91+ callback ( format ! ( "Upserted item:\t {}" , upserted_item. id) ) ;
92+ } ,
93+ Err ( e) => {
94+ eprintln ! ( "Error deserializing response: {}" , e) ;
95+ } ,
96+ }
97+ } ,
98+ Err ( e) => {
99+ eprintln ! ( "Error upserting item: {}" , e) ;
100+ } ,
101+ }
102+ }
103+
104+ {
105+ let item_id = "aaaaaaaa-0000-1111-2222-bbbbbbbbbbbb" ;
106+ let item_partition_key = "gear-surf-surfboards" ;
107+
108+ let read_response = container_client. read_item :: < Item > ( item_partition_key, item_id, None ) . await ;
109+
110+ match read_response {
111+ Ok ( r) => {
112+ let deserialize_response = r. deserialize_body ( ) . await ;
113+ match deserialize_response {
114+ Ok ( i) => {
115+ let read_item = i. unwrap ( ) ;
116+ callback ( format ! ( "Read item:\t {}\t {}" , read_item. id, read_item. category) ) ;
117+ } ,
118+ Err ( e) => {
119+ eprintln ! ( "Error deserializing response: {}" , e) ;
120+ } ,
121+ }
122+ } ,
123+ Err ( e) => {
124+ eprintln ! ( "Error reading item: {}" , e) ;
125+ } ,
126+ }
127+ }
128+
129+ {
130+ let item_partition_key = "gear-surf-surfboards" ;
131+
132+ let partition_key = PartitionKey :: from ( item_partition_key) ;
133+
134+ let query = format ! ( "SELECT * FROM c WHERE c.category = '{}'" , item_partition_key) ;
135+
136+ let page_response = container_client. query_items :: < Item > ( & query, partition_key, None ) ;
137+
138+ callback ( "Run query:" . to_string ( ) ) ;
139+ match page_response {
140+ Ok ( mut page) => {
141+ while let Some ( item) = page. next ( ) . await {
142+ match item {
143+ Ok ( i) => {
144+ let deserialize_response = i. deserialize_body ( ) . await ;
145+ match deserialize_response {
146+ Ok ( page) => {
147+ for item in page. items {
148+ callback ( serde_json:: to_string_pretty ( & item) . unwrap ( ) ) ;
149+ }
150+ } ,
151+ Err ( e) => {
152+ eprintln ! ( "Error deserializing item: {}" , e) ;
153+ } ,
154+ }
155+ } ,
156+ Err ( e) => {
157+ eprintln ! ( "Error querying item: {}" , e) ;
158+ } ,
159+ }
160+ }
161+ } ,
162+ Err ( e) => {
163+ eprintln ! ( "Error querying items: {}" , e) ;
164+ } ,
165+ }
166+ }
167+
168+ callback ( "Current Status:\t Stopping..." . to_string ( ) ) ;
169+ }
0 commit comments