Preface
At GreptimeDB, we utilize OpenDAL as our unified data access layer
. Recently, a colleague informed me that it took 10 seconds to execute a Copy From
statement to import an 800 KiB Parquet file from S3. After some investigation and reviewing related Reader
of OpenDAL documentation and its implementation (realizing we hadn't RTFSC 🥲), I document and briefly summarize our findings here.
Relevant OpenDAL source code Commit: 6980cd1
Understanding OpenDAL Source Code
Frankly speaking, it was only recently that I fully grasped the intricacies of the OpenDAL source code and its invocation relationships, after previously having only a partial understanding of it.
Starting with the Operator
All our IO operations revolve around the Operator
. Let's see how the Operator
is constructed. In main.rs
, we first create a file-system-based Backend Builder
; subsequently build it into an accessor
(implementing the Accessor
trait); and then pass this accessor
into OperatorBuilder::new
, finally calling finish.
OpenDAL unifies the behavior of different storage backends through the
Accessor
trait, exposing a unified IO interface to the upper layer, likecreate_dir
,read
,write
, etc.
use opendal::services::Fs;
use opendal::Operator;
#[tokio::main]
async fn main() -> Result<()> {
// Create fs backend builder.
let mut builder = Fs::default();
// Set the root for fs, all operations will happen under this root.
//
// NOTE: the root must be absolute path.
builder.root("/tmp");
let accessor = builder.build()?;
let op: Operator = OperatorBuilder::new(accessor)?.finish();
Ok(())
}
What Happens in OperatorBuilder::new
?
The accessor
we pass in is attached with two layers when invoking new
, and an additional internal Layer
is added when invoking finish. With these layers added, when we invoke interfaces exposed by Operator
, the invoking starts from the outermost CompleteLayer
and eventually reaches the innermost FsAccessor
.
FsAccessor
ErrorContextLayer
CompleteLayer
^
|
| Invoking (`read`, `reader_with`, `stat`...)
impl<A: Accessor> OperatorBuilder<A> {
/// Create a new operator builder.
#[allow(clippy::new_ret_no_self)]
pub fn new(accessor: A) -> OperatorBuilder<impl Accessor> {
// Make sure error context layer has been attached.
OperatorBuilder { accessor }
.layer(ErrorContextLayer)
.layer(CompleteLayer)
}
...
/// Finish the building to construct an Operator.
pub fn finish(self) -> Operator {
let ob = self.layer(TypeEraseLayer);
Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor)
}
}
TL;DR: I just want to emphasize that we should read the source code of OpenDAL starting from CompleteLayer (an epiphany).
Background Information
Let me provide some necessary context here to understand the following content.
LruCacheLayer
Currently, in query scenarios, we add a LruCacheLayer
while building the Operator
, so our Operator
looks like the diagram below:
S3Accessor FsAccessor
ErrorContextLayer ErrorContextLayer
CompleteLayer CompleteLayer
^ ^ |
| | |
|`inner` `cache`| |
| | |
| | |
| | |
+----- LruCacheLayer -----+ |
^ |
| |
| |
| v
| FileReader::new(oio::TokioReader<tokio::fs::File>)
|
Invoking(`reader`, `reader_with`)
For example, with the read
interface, LruCacheLayer
caches S3 files in the file system, returning the cached file-system-based Box<dyn oio::Read>
(FileReader::new(oio::TokioReader<tokio::fs::File>)
) to the upper layer; if the file to be read is not in the cache, it's first loaded in full from S3 to the local file system.
struct LruCacheLayer {
inner: Operator, // S3Backend
cache: Operator, // FsBackend
index: CacheIndex
}
impl LayeredAccessor for LruCacheLayer {
...
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
if self.index.hit(path, args) {
// Returns `Box<dyn oio::Read>`
self.cache.read(path, args).await
} else {
// Fetches cache and stores...
}
}
...
}
The Copy From
Scenario
In the Copy From
scenario, I didn't add this LruCacheLayer
layer. Thus, our Operator
looks like the diagram below:
S3Accessor
ErrorContextLayer
CompleteLayer
▲ │
│ │
│ │
│ ▼
│ RangeReader::new(IncomingAsyncBody)
│
Invoking (`reader`, `reader_with`)
Issues Encountered with OpenDAL RangeReader
Starting with the Construction of ParquetRecordBatchStream
In Copy From
, after obtaining the file information(i.e., the file location on the S3), we first invoke operator.reader
to return a reader
implementing AsyncReader + AsyncSeek
, then wrap it with a BufReader
. Ultimately, this reader
is passed into ParquetRecordBatchStreamBuilder
.
Here, the use of
BufReader
is superfluous because it clears its internal buffer after invoking theseek
method, negating any potential performance benefits.
...
let reader = operator
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let buf_reader = BufReader::new(reader.compat());
let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
.await
.context(error::ReadParquetSnafu)?;
let upstream = builder
.build()
.context(error::BuildParquetRecordBatchStreamSnafu)?;
...
Reading Metadata in ParquetRecordBatchStream::new
The metadata reading logic is as follows: first, invokes seek(SeekFrom::End(-FOOTER_SIZE_I64))
, reads FOOTER_SIZE
bytes and parse metadata_len
; then invokes seek again
, and reads metadata_len
bytes to parse the metadata.
impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
async move {
self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;
let mut buf = [0_u8; FOOTER_SIZE];
self.read_exact(&mut buf).await?;
let metadata_len = decode_footer(&buf)?;
self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
.await?;
let mut buf = Vec::with_capacity(metadata_len);
self.take(metadata_len as _).read_to_end(&mut buf).await?;
Ok(Arc::new(decode_metadata(&buf)?))
}
.boxed()
}
}
The Real Problem
Up to this point, we've discussed some minor issues. The more challenging problem arises here, where the variable stream
is the ParquetRecordBatchStream
we've built above. When we invoke next
, ParquetRecordBatchStream
invokes RangeReader
's seek
and read
multiple times. However, each call to seek
resets RangeReader
's internal state (discarding the previous byte stream) and, on the subsequent read
call, initiates a new remote request (in the S3 backend scenario).
You can see detailed information in this issue and the discussion here).
When using
ParquetRecordBatchStream
to retrieve each column's data, it'll first invoke RangeReaderseek
, thenread
some bytes. Thus, the total number of remote calls required is the number ofRowGroups
multiplied by the number of columns in aRowGroup
. Our 800KiB file contains 50RowGroups
and 12 columns (perRowGroup
), which results in 600 S3 get requests!
pub async fn copy_table_from(
...
while let Some(r) = stream.next().await {
let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
let vectors =
Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;
pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();
let columns_values = fields
.iter()
.cloned()
.zip(vectors)
.collect::<HashMap<_, _>>();
pending.push(self.inserter.handle_table_insert(
InsertRequest {
catalog_name: req.catalog_name.to_string(),
schema_name: req.schema_name.to_string(),
table_name: req.table_name.to_string(),
columns_values,
},
query_ctx.clone(),
));
if pending_mem_size as u64 >= pending_mem_threshold {
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
}
}
...
Explore the RangeReader
Source Code
Take a look at self.poll_read()
In RangeReader
, the self.state
initially starts as State::Idle
. Let's assume that self.offset
is Some(0)
, then self.state
is set to State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
and self.poll_read(cx, buf)
is invoked again.
impl<A, R> oio::Read for RangeReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
...
match &mut self.state {
State::Idle => {
self.state = if self.offset.is_none() {
// When the offset is none, it means we are performing tailing reading.
// We should start by getting the correct offset through a stat operation.
State::SendStat(self.stat_future())
} else {
State::SendRead(self.read_future())
};
self.poll_read(cx, buf)
}
...
}
}
}
What happens in self.read_future()
Clearly, self.read_future()
returns a BoxedFuture
. Within this BoxedFuture
, the underlying Accessor
's read
method (acc.read(&path, op).await
) is invoked. The Accessor
can be an implementation for some storage backend. In our context, this Accessor
represents an S3 storage backend. When its read
interface is invoked, it establishes a TCP connection to retrieve the file data and returns a byte stream from S3's response to the upper layer.
impl<A, R> RangeReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
let acc = self.acc.clone();
let path = self.path.clone();
let mut op = self.op.clone();
if self.cur != 0 {
op = op.into_deterministic();
}
op = op.with_range(self.calculate_range());
Box::pin(async move { acc.read(&path, op).await })
}
...
}
Continuing from where we left off in self.poll_read()
At this point, poll_read
has not yet returned. In the previous section, self.poll_read()
was invoked again with self.state
being State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>)
. The value returned by ready!(Pin::new(fut).poll(cx))
corresponds to the result of acc.read(&path, op).await
from the previous section. For the S3 storage backend, remote calls happen here.
Afterward, the internal state self.poll_read
is set to State::Read(r)
, and self.poll_read(cx, buf)
is invoked once more. Upon entering self.poll_read()
again, the internal state of RangeReader
is set to State::Reader(R)
. Here, R(r)
represents the byte stream of the read request's response. For the S3 storage backend, the Pin::new(r).poll_read(cx, buf)
writes the byte data from the TCP buffer into the upper-level application.
impl<A, R> oio::Read for RangeReader<A, R>
where
A: Accessor<Reader = R>,
R: oio::Read,
{
fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
// Sanity check for normal cases.
if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
return Poll::Ready(Ok(0));
}
match &mut self.state {
...
State::SendRead(fut) => {
let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
// If the read future returns an error, reset the state to Idle to retry.
self.state = State::Idle;
err
})?;
// Set the size if the read returns a size hint.
if let Some(size) = rp.size() {
if size != 0 && self.size.is_none() {
self.size = Some(size + self.cur);
}
}
self.state = State::Read(r);
self.poll_read(cx, buf)
}
State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
Ok(0) => {
// Reset the state to Idle after all data has been consumed.
self.state = State::Idle;
Poll::Ready(Ok(0))
}
Ok(n) => {
self.cur += n as u64;
Poll::Ready(Ok(n))
}
Err(e) => {
self.state = State::Idle;
Poll::Ready(Err(e))
}
},
}
}
}
Final Look at self.poll_seek()
Remember the internal state of RangeReader
we discussed earlier? Yes, it was State::Reader(R)
. When we call seek
after a read
, the byte stream inside RangeReader
is discarded, and the state is reset to State::Idle
. In other words, every time read
is invoked after seek
, RangeReader
requests the read
method of the underlying Accessor
(acc.read(&path, op).await
) to initiate a remote call. For the S3 storage backend, invoking this interface incurs significant overhead (typically around hundreds of milliseconds).
Additionally, there's a performance-related point to be considered. When attempting SeekFrom::End()
and self.size
is unknown, an additional stat
operation is performed. After invoking self.poll_seek()
, self.cur
will be set to base.checked_add(amt)
.
Summary
We've implemented a quick fix that decreased the number of
RowGroups
imported from 50 to just 1. However, this solution still necessitates 12 remote calls. Moving forward, we plan to contribute aBufferReader
to OpenDAL (details available at RFC here), which is expected to significantly reduce the number of consecutive remote calls triggered by 'seek' and 'read' operations inRangeReader
. In certain cases, these calls could be entirely eliminated.When invokes
seek
on aRangeReader
, the internal state will be reset, and a subsequentread
invoking results in a remote call that happens in the underlyingAccessor
(in scenarios where the backend is S3). (For related information, please refer to this issue and discussion links provided).Both
std::io::BufReader
andtokio::io::BufReader
clear their internal buffers afterseek
. If you wish to continue reading from theBuffer
, you should useseek_relative
.