@@ -115,3 +115,52 @@ def test_arrow_streamset_to_dataframe(conn, tmp_collection):
115
115
}
116
116
expected_df = pd .DataFrame (expected_dat , index = pd .DatetimeIndex (expected_times ))
117
117
assert values .equals (expected_df )
118
+
119
+
120
+ def test_timesnap_backward_extends_range (conn , tmp_collection ):
121
+ sec = 10 ** 9
122
+ tv1 = [
123
+ [int (0.5 * sec ), 0.5 ],
124
+ [2 * sec , 2.0 ],
125
+ ]
126
+ tv2 = [
127
+ [int (0.5 * sec ) - 1 , 0.5 ],
128
+ [2 * sec , 2.0 ],
129
+ ]
130
+ tv3 = [
131
+ [1 * sec , 1.0 ],
132
+ [2 * sec , 2.0 ],
133
+ ]
134
+ s1 = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s1" })
135
+ s2 = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s2" })
136
+ s3 = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s3" })
137
+ s1 .insert (tv1 )
138
+ s2 .insert (tv2 )
139
+ s3 .insert (tv3 )
140
+ ss = btrdb .stream .StreamSet ([s1 , s2 , s3 ]).filter (
141
+ start = 1 * sec , end = 3 * sec , sampling_frequency = 1
142
+ )
143
+ values = ss .arrow_values ()
144
+ assert [1 * sec , 2 * sec ] == [t .value for t in values ["time" ]]
145
+ assert [0.5 , 2.0 ] == [v .as_py () for v in values [tmp_collection + "/s1" ]]
146
+ assert [None , 2.0 ] == [
147
+ None if isnan (v .as_py ()) else v .as_py () for v in values [tmp_collection + "/s2" ]
148
+ ]
149
+ assert [1.0 , 2.0 ] == [v .as_py () for v in values [tmp_collection + "/s3" ]]
150
+
151
+
152
+ def test_timesnap_forward_restricts_range (conn , tmp_collection ):
153
+ sec = 10 ** 9
154
+ tv = [
155
+ [1 * sec , 1.0 ],
156
+ [2 * sec , 2.0 ],
157
+ [int (2.75 * sec ), 2.75 ],
158
+ ]
159
+ s = conn .create (new_uuid (), tmp_collection , tags = {"name" : "s" })
160
+ s .insert (tv )
161
+ ss = btrdb .stream .StreamSet ([s ]).filter (start = 1 * sec , sampling_frequency = 1 )
162
+ values = ss .filter (end = int (3.0 * sec )).arrow_values ()
163
+ assert [1 * sec , 2 * sec ] == [t .value for t in values ["time" ]]
164
+ assert [1.0 , 2.0 ] == [v .as_py () for v in values [tmp_collection + "/s" ]]
165
+ # Same result if skipping past end instead of to end.
166
+ assert values == ss .filter (end = int (2.9 * sec )).arrow_values ()
0 commit comments