Skip to content

Commit f2e6b99

Browse files
Merge pull request #2527 from wanghaoshuang/order_xmap
Add an order switch to xmap_readers
2 parents 7bce40d + 30eca3a commit f2e6b99

File tree

2 files changed

+55
-4
lines changed

2 files changed

+55
-4
lines changed

python/paddle/v2/reader/decorator.py

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ class XmapEndSignal():
230230
pass
231231

232232

233-
def xmap_readers(mapper, reader, process_num, buffer_size):
233+
def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
234234
"""
235235
Use multiprocess to map samples from reader by a mapper defined by user.
236236
And this function contains a buffered decorator.
@@ -242,21 +242,33 @@ def xmap_readers(mapper, reader, process_num, buffer_size):
242242
:type process_num: int
243243
:param buffer_size: max buffer size
244244
:type buffer_size: int
245+
:param order: keep the order of reader
246+
:type order: bool
245247
:return: the decarated reader
246248
:rtype: callable
247249
"""
248250
end = XmapEndSignal()
249251
in_queue = Queue(buffer_size)
250252
out_queue = Queue(buffer_size)
253+
out_order = [0]
251254

252255
# define a worker to read samples from reader to in_queue
253256
def read_worker(reader, in_queue):
254257
for i in reader():
255258
in_queue.put(i)
256259
in_queue.put(end)
257260

261+
# define a worker to read samples from reader to in_queue with order flag
262+
def order_read_worker(reader, in_queue):
263+
in_order = 0
264+
for i in reader():
265+
in_queue.put((in_order, i))
266+
in_order += 1
267+
in_queue.put(end)
268+
258269
# start a read worker in a thread
259-
t = Thread(target=read_worker, args=(reader, in_queue))
270+
target = order_read_worker if order else read_worker
271+
t = Thread(target=target, args=(reader, in_queue))
260272
t.daemon = True
261273
t.start()
262274

@@ -271,11 +283,28 @@ def handle_worker(in_queue, out_queue, mapper):
271283
in_queue.put(end)
272284
out_queue.put(end)
273285

286+
# define a worker to handle samples from in_queue by mapper
287+
# and put mapped samples into out_queue by order
288+
def order_handle_worker(in_queue, out_queue, mapper, out_order):
289+
ins = in_queue.get()
290+
while not isinstance(ins, XmapEndSignal):
291+
order, sample = ins
292+
r = mapper(sample)
293+
while order != out_order[0]:
294+
pass
295+
out_queue.put(r)
296+
out_order[0] += 1
297+
ins = in_queue.get()
298+
in_queue.put(end)
299+
out_queue.put(end)
300+
274301
# start several handle_workers
302+
target = order_handle_worker if order else handle_worker
303+
args = (in_queue, out_queue, mapper, out_order) if order else (
304+
in_queue, out_queue, mapper)
275305
workers = []
276306
for i in xrange(process_num):
277-
worker = Thread(
278-
target=handle_worker, args=(in_queue, out_queue, mapper))
307+
worker = Thread(target=target, args=args)
279308
worker.daemon = True
280309
workers.append(worker)
281310
for w in workers:

python/paddle/v2/reader/tests/decorator_test.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,5 +121,27 @@ def test_shuffle(self):
121121
self.assertEqual(total, 10)
122122

123123

124+
class TestXmap(unittest.TestCase):
125+
def test_xmap(self):
126+
def mapper(x):
127+
return (x + 1)
128+
129+
orders = (True, False)
130+
thread_nums = (1, 2, 4, 8, 16)
131+
buffered_size = (1, 2, 4, 8, 16)
132+
for order in orders:
133+
for tNum in thread_nums:
134+
for size in buffered_size:
135+
result = []
136+
for i in paddle.v2.reader.xmap_readers(mapper,
137+
reader_creator_10(0),
138+
tNum, size, order)():
139+
result.append(i)
140+
if not order:
141+
result.sort()
142+
for idx, e in enumerate(result):
143+
self.assertEqual(e, mapper(idx))
144+
145+
124146
if __name__ == '__main__':
125147
unittest.main()

0 commit comments

Comments
 (0)