|
| 1 | +import pyarrow as pa |
| 2 | +from random import randbytes |
| 3 | +from http.server import BaseHTTPRequestHandler, HTTPServer |
| 4 | +import io |
| 5 | + |
| 6 | +schema = pa.schema([ |
| 7 | + ('a', pa.int64()), |
| 8 | + ('b', pa.int64()), |
| 9 | + ('c', pa.int64()), |
| 10 | + ('d', pa.int64()) |
| 11 | +]) |
| 12 | + |
| 13 | +def GetPutData(): |
| 14 | + total_records = 100000000 |
| 15 | + length = 4096 |
| 16 | + ncolumns = 4 |
| 17 | + |
| 18 | + arrays = [] |
| 19 | + |
| 20 | + for x in range(0, ncolumns): |
| 21 | + buffer = pa.py_buffer(randbytes(length * 8)) |
| 22 | + arrays.append(pa.Int64Array.from_buffers(pa.int64(), length, [None, buffer], null_count=0)) |
| 23 | + |
| 24 | + batch = pa.record_batch(arrays, schema) |
| 25 | + batches = [] |
| 26 | + |
| 27 | + records_sent = 0 |
| 28 | + while records_sent < total_records: |
| 29 | + if records_sent + length > total_records: |
| 30 | + last_length = total_records - records_sent |
| 31 | + batches.append(batch.slice(0, last_length)) |
| 32 | + records_sent += last_length |
| 33 | + else: |
| 34 | + batches.append(batch) |
| 35 | + records_sent += length |
| 36 | + |
| 37 | + return batches |
| 38 | + |
| 39 | +def make_reader(schema, batches): |
| 40 | + return pa.RecordBatchReader.from_batches(schema, batches) |
| 41 | + |
| 42 | +def generate_batches(schema, reader): |
| 43 | + with io.BytesIO() as sink, pa.ipc.new_stream(sink, schema) as writer: |
| 44 | + yield sink.getvalue() |
| 45 | + |
| 46 | + for batch in reader: |
| 47 | + sink.seek(0) |
| 48 | + sink.truncate(0) |
| 49 | + writer.write_batch(batch) |
| 50 | + yield sink.getvalue() |
| 51 | + |
| 52 | + sink.seek(0) |
| 53 | + sink.truncate(0) |
| 54 | + writer.close() |
| 55 | + yield sink.getvalue() |
| 56 | + |
| 57 | +class MyServer(BaseHTTPRequestHandler): |
| 58 | + def do_GET(self): |
| 59 | + self.send_response(200) |
| 60 | + self.send_header('Content-Type', 'application/vnd.apache.arrow.stream') |
| 61 | + |
| 62 | + ####################################################################### |
| 63 | + # include these to enable testing JavaScript client in local browser |
| 64 | + self.send_header('Access-Control-Allow-Origin', 'http://localhost:8080') |
| 65 | + self.send_header('Access-Control-Allow-Methods', 'GET') |
| 66 | + self.send_header('Access-Control-Allow-Headers', 'Content-Type') |
| 67 | + ####################################################################### |
| 68 | + |
| 69 | + self.end_headers() |
| 70 | + |
| 71 | + for buffer in generate_batches(schema, make_reader(schema, batches)): |
| 72 | + self.wfile.write(buffer) |
| 73 | + self.wfile.flush() |
| 74 | + |
| 75 | +batches = GetPutData() |
| 76 | + |
| 77 | +server_address = ('localhost', 8000) |
| 78 | +httpd = HTTPServer(server_address, MyServer) |
| 79 | + |
| 80 | +print(f'Serving on {server_address[0]}:{server_address[1]}...') |
| 81 | +httpd.serve_forever() |
0 commit comments