# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import time import pyarrow as pa class HighLatencyReader(object): def __init__(self, raw, latency): self.raw = raw self.latency = latency def close(self): self.raw.close() @property def closed(self): return self.raw.closed def read(self, nbytes=None): time.sleep(self.latency) return self.raw.read(nbytes) class HighLatencyWriter(object): def __init__(self, raw, latency): self.raw = raw self.latency = latency def close(self): self.raw.close() @property def closed(self): return self.raw.closed def write(self, data): time.sleep(self.latency) self.raw.write(data) class BufferedIOHighLatency(object): """Benchmark creating a parquet manifest.""" increment = 1024 total_size = 16 * (1 << 20) # 16 MB buffer_size = 1 << 20 # 1 MB latency = 0.1 # 100ms param_names = ('latency',) params = [0, 0.01, 0.1] def time_buffered_writes(self, latency): test_data = b'x' * self.increment bytes_written = 0 out = pa.BufferOutputStream() slow_out = HighLatencyWriter(out, latency) buffered_out = pa.output_stream(slow_out, buffer_size=self.buffer_size) while bytes_written < self.total_size: buffered_out.write(test_data) bytes_written += self.increment buffered_out.flush() def time_buffered_reads(self, latency): bytes_read = 0 reader = pa.input_stream(pa.py_buffer(b'x' * self.total_size)) slow_reader = HighLatencyReader(reader, latency) buffered_reader = pa.input_stream(slow_reader, buffer_size=self.buffer_size) while bytes_read < self.total_size: buffered_reader.read(self.increment) bytes_read += self.increment