[HN Gopher] Amazon's exabyte-scale migration from Apache Spark t...
___________________________________________________________________
Amazon's exabyte-scale migration from Apache Spark to Ray on EC2
Author : nojito
Score : 112 points
Date : 2024-07-29 22:14 UTC (2 days ago)
(HTM) web link (aws.amazon.com)
(TXT) w3m dump (aws.amazon.com)
| Narhem wrote:
| Absolutely insane work. So much data you'd think they would come
| up with a custom solution instead of using the "newest available
| toolkit" but I understand how much of a mess dealing with that
| much data is.
| thedood wrote:
| Hi Narhem - one of the devs that worked on the migration here.
| The data volume, and subsequent compute power required to
| process it, is actually one of the things that led us to Ray
| (or Ray Core specifically) since it had the distributed
| computing primitives (tasks and actors) that we needed to build
| out our envisioned solution with very few compromises. One
| thing we DIDN'T want to do was just throw another one-liner SQL
| statement running on a new data processing framework at the
| problem, since that leads us back to the problems we had with
| Spark - not enough low-level control for such an important
| problem.
|
| In short, after evaluating our options, Ray seemed to strike
| the best balance between the one efficiency extreme of, say,
| building out custom "compaction-optimized" hardware/clusters,
| and the other maintainability extreme of just letting the
| latest managed cloud service run a 1-liner SQL statement for us
| without ever looking under the hood.
|
| Regardless, I expect both our existing solution and the
| distributed compute frameworks leveraged to deliver it to
| continue to evolve over time.
| mannyv wrote:
| Crazy that the project took almost 4 years end-to-end, and it's
| still ongoing.
|
| I had no idea anything at AWS had that long of an attention span.
|
| It's funny and telling that in the end, it's all backed by CSVs
| in s3. Long live CSV!
| jerrygenser wrote:
| They reference parquet files, not sure if it's only CSV or CSV
| even figures in that heavily other than the first iteration
| before migrating to spark
| thedood wrote:
| Hi mannyv - one of the devs that worked on the migration here.
| It has been a pretty long project - approached with caution due
| to the criticality of keeping our BI datasets healthy - but the
| preliminary results produced year-over-year kept looking
| promising enough to keep after it. =)
|
| Also, we mostly have Parquet data cataloged in S3 today, but
| delimited text is indeed ubiquitous and surprisingly sticky, so
| we continue to maintain some very large datasets natively in
| this format. However, while the table's data producer may
| prefer to write delimited text, they are almost always
| converted to Parquet during the compaction process to produce a
| read-optimized table variant downstream.
| gregw2 wrote:
| Are you all shifting over to storing as iceberg-enriched
| parquet yet and letting it (within, say Athena) manage
| compaction or thinking about it, or is it not worth it since
| this new Ray+Parquet thing is working for you?
| thedood wrote:
| As alluded to in the blog post, Ray+Parquet+Iceberg is the
| next frontier we'd like to make our compactor and similar
| procedures available on in open source so that the
| community can start bringing similar benefits for their
| Iceberg workloads. Stay tuned. =)
| wenc wrote:
| Most people are moving away from CSV for big datasets, except
| in exceptional cases involving linear reads (append only ETL).
| CSV has one big upside which is human readability. But so many
| downsides: poor random access, no typing, no compression,
| complex parser needing to handle exceptions.
| 100pctremote wrote:
| Most people don't directly query or otherwise operate on raw
| CSV, though. Large source datasets in CSV format still reign
| in many enterprises, but these are typically read into a
| dataframe, manipulated and stored as Parquet and the like,
| then operated upon by DuckDB, Polars, etc., or modeled (E.g.
| DBT) and pushed to an OLAP target.
| wenc wrote:
| There are folks who still directly query CSV formats in a
| data lake using a query engine like Athena or Spark or
| Redshift Spectrum -- which ends up being much slower and
| consuming more resources than is necessary due to full
| table scans.
|
| CSV is only good for append only.
|
| But so is Parquet and if you can write Parquet from the get
| go, you save on storage as well has have a directly
| queryable column store from the start.
|
| CSV still exists because of legacy data generating
| processes and dearth of Parquet familiarity among many
| software engineers. CSV is simple to generate and easy to
| troubleshoot without specialized tools (compared to Parquet
| which requires tools like Visidata). But you pay for it
| elsewhere.
| cmollis wrote:
| exactly.. parquet is good for append only.. stream mods
| to parquet in new partitions.. compact, repeat.
| whoevercares wrote:
| Is it really AWS? I don't recall any service called BDT
| quadrature wrote:
| Anyone know enough about ray to comment on what the exact
| performance unlock was ?. They mention that it gave them enough
| control over the distribution of work so that they could avoid
| unnecessary reads/write. That seems like a good win but I would
| assume that doing compaction in python would be quite slow.
| thedood wrote:
| Some of the initial differentiators are described at the bottom
| of our design doc at https://github.com/ray-
| project/deltacat/blob/main/deltacat/c.... But yes, controlling
| file I/O was also an important part of this since it allowed us
| to (1) run more targeted downloads/reads of only the Parquet
| row groups and columns participating in compaction and (2)
| track dirty/clean files to skip unnecessary re-writes of
| "clean" files that weren't altered by compaction. Also, just
| better leveraging catalog metadata (e.g., primary key indexes
| if available) to filter out more files in the initial scan, and
| to copy clean files into the compacted variant by reference
| (when supported by the underlying catalog format).
|
| The trick with doing compaction in Python was to ensure that
| the most performance-sensitive code was delegated to more
| optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code
| paths. If we did all of our per-record processing ops in pure
| Python, compaction would indeed be much slower.
| quadrature wrote:
| Thanks a lot for the explanation. Sounds a lot like how
| Pyspark allows for declarative definitions of computation
| that is actually executed in Java.
| ZeroCool2u wrote:
| This is one of the first times I've heard of people using
| Daft in the wild. Would you be able to elaborate on where
| Daft came in handy?
|
| Edit: Nvm, I kept reading! Thanks for the interesting post!
| whoiscroberts wrote:
| Ray user here, what language actors are they using? Ray support
| Python Java and cpp actors...
| thedood wrote:
| We wrote the compactor in Python but, as noted in my previous
| response to quadrature, most of the performance sensitive code
| is written in C++ and Rust (but still invoked from Python).
| esafak wrote:
| Are we talking about big data ETL here? I did not know Ray was
| suited for it.
| thedood wrote:
| This is a specialized ETL use-case - similar to taking a single
| SQL query and creating a dedicated distributed application
| tailored to run only that query. The lower-level primitives in
| Ray Core (tasks and actors) are general purpose enough to make
| building this type of application possible, but you'll be hard
| pressed to quickly (i.e., with less than 1 day of effort) make
| any arbitrary SQL query or dataframe operation run with better
| efficiency or scale on Ray than on dedicated data processing
| frameworks like Spark. IMO, the main value add of frameworks
| like Spark lies more in unlocking "good enough" efficiency and
| scale for almost any ETL job relatively quickly and easily,
| even if it may not run your ETL job optimally.
| dekhn wrote:
| Speaking as a distributed computing nerd, Ray is definitely
| one of the more interesting and exciting frameworks I've seen
| in a while. It's one of those systems where reading the
| manual, I can see that I'm not going to have to learn
| anything new, because the mental model resembles so many
| distributed systems I've worked with before (I dunno about
| anybody else, but tensorflow is an example of a distributed
| system that forced me to forget basically everything I knew
| before I could be even remotely productive in it).
|
| Unclear if it's in the best interests of anyscale to promote
| Ray as a general purpose cluster productivity tool, even if
| it's good at that more general use case.
| jiripospisil wrote:
| > From the typical Amazon EC2 customer's perspective, this
| translates to saving over $120MM/year on Amazon EC2 on-demand R5
| instance charges.
|
| Does the sales team know about this? /jk
| taeric wrote:
| This assumes they aren't reinvesting the money back to other
| tangible improvements in the services, though?
| andrewxdiamond wrote:
| Amazon sales and business teams are constantly focused on
| reducing costs for customers and they celebrate this internally
| too.
|
| I have seen dozens of big ticket "we saved this customer
| $xxxK/year" posts on slack and other internal venues, the
| customer obsession is real.
| immibis wrote:
| Even the reduced cost is still ten times what it costs on a
| traditional provider, though, right?
| 100pctremote wrote:
| Rather nuts. New challenge: build datacenters quickly enough to
| support the new platform.
| whalesalad wrote:
| Video from the author deep diving this.
| https://www.youtube.com/watch?v=h7svj_oAY14
| zitterbewegung wrote:
| I was in a workshop that taught me Ray. It was interesting to
| know that the people who started Spark were also involved in
| making Ray.
|
| This is not badmouthing either project just an observation and if
| you architected one task you would be good at attacking the same
| problem better .
| fs111 wrote:
| Well spark was really a showcase Project for mesos when it was
| created. Now everyone knows a lot more
| igmor wrote:
| Can you share any data on how big of a cluster is running Ray
| jobs?
| thedood wrote:
| From the blog post, the largest individual Ray cluster that was
| observed running a production compaction job in Q1 had 26,846
| vCPUs and ~210TiB of RAM. This is roughly equivalent to a Ray
| cluster composed of 839 r5.8xlarge EC2 nodes (w/ 32 vCPUs and
| 256GiB RAM per node).
| robertnishihara wrote:
| I'm one of the creators of Ray. A few thoughts :)
|
| 1. This is truly impressive work from AWS. Patrick Ames began
| speaking about this a couple years ago, though at this point the
| blog post is probably the best reference.
| https://www.youtube.com/watch?v=h7svj_oAY14
|
| 2. This is not a "typical" Ray use case. I'm not aware of any
| other exabyte scale data processing workloads. Our bread and
| butter is ML workloads: training, inference, and unstructured
| data processing.
|
| 3. We have a data processing library called Ray Data for
| ingesting and processing data, often done in conjunction with
| training and inference. However, I believe in this particular use
| case, the heavy lifting is largely done with Ray's core APIs
| (tasks & actors), which are lower level and more flexible, which
| makes sense for highly custom use cases. Most Ray users use the
| Ray libraries (train, data, serve), but power users often use the
| Ray core APIs.
|
| 4. Since people often ask about data processing with Ray and
| Spark, Spark use cases tend to be more geared toward structured
| data and CPU processing. If you are joining a bunch of tables
| together or running SQL queries, Spark is going to be way better.
| If you're working with unstructured data (images, text, video,
| audio, etc), need mixed CPU & GPU compute, are doing deep
| learning and running inference, etc, then Ray is going to be much
| better.
| justsocrateasin wrote:
| I'm just learning about this tool now and had a brief question
| if you have the time:
|
| The paper mentions support for zero-copy intranode object
| sharing which links to serialization in the Ray docs -
| https://docs.ray.io/en/latest/ray-core/objects/serialization...
|
| I'm really curious how this is performant - I recently tried
| building a pipeline that leveraged substantial multiprocessing
| in Python, and found that my process was bottlenecked by the
| serialization/deserialization that occurs during Python
| multiprocessing. Would love any reading or explanation you can
| provide as to how this doesn't also bottleneck a process in
| Ray, since it seems that data transferred between workers and
| nodes will need to serialized and deserialized.
|
| Thanks in advance! Really cool tool, hopefully I'll be able to
| use it sooner rather than later.
| robertnishihara wrote:
| Your right that the serialization / deserialization overhead
| can quickly exceed the compute time. To avoid this you have
| to get a lot of small things right. And given our focus on ML
| workloads, this is particularly important when sharing large
| numerical arrays between processes (especially processes
| running on the same node).
|
| One of the key things is to make sure the serialized object
| is stored in a data format where the serialized object does
| not need to be "transformed" in order to access it. For
| example, a numpy array can be created in O(1) time from a
| serialized blob by initializing a Python object with the
| right shape and dtype and a pointer to the right offset in
| the serialized blob. We also use projects like Apache Arrow
| that put a lot of care into this.
|
| Example in more detail:
|
| Imagine the object you are passing from process A to process
| B is a 1GB numpy array of floats. In the serialization step,
| process A produces a serialized blob of bytes that is
| basically just the 1GB numpy array plus a little bit of
| metadata. Process A writes that serialized blob into shared
| memory. This step of "writing into shared memory" still
| involves O(N) work, where N is the size of the array (though
| you can have multiple threads do the memcpy in parallel and
| be limited just by memory bandwidth).
|
| In the deserialization step, process B accesses the same
| shared memory blob (process A and B are on the same machine).
| It reads a tiny bit of metadata to figure out the type of the
| serialized object and shape and so on. Then it constructs a
| numpy array with the correct shape and type and with a
| _pointer_ to the actual data in shared memory at the right
| offset. Therefore it doesn 't need to touch all of the bytes
| of data, it just does O(1) work instead of O(N).
|
| That's the basic idea. You can imagine generalizing this
| beyond numpy arrays, but it's most effective for objects that
| include large numerical data (e.g., objects that include
| numpy arrays).
|
| There are a bunch of little details to get right, e.g.,
| serializing directly into shared memory instead of creating a
| serialized copy in process A and then copying it into shared
| memory. Doing the write into shared memory in parallel with a
| bunch of threads. Getting the deserialization right. You also
| have to make sure that the starting addresses of the numpy
| arrays are 64-byte aligned (if memory serves) so that you
| don't accidentally trigger a copy later on.
|
| EDIT: I edited the above to add more detail.
| Xophmeister wrote:
| This is probably a naive question, but how do two processes
| share address space? mmap?
| parhamn wrote:
| Im curious, how do data scientists use these massive datasets,
| especially the old stuff. Is it more of a compliance and
| need/should-save type thing or is the data actually useful? Im
| baffled by these numbers having never used a large BI tool, and
| am genuinely curious how the data is actually used operationally.
|
| As a layman, I imagine lots of it loses relevancy very quickly,
| e.g Amazon sales data from 5 years ago is marginally useful to
| determining future trends and analyzing new consumer behavior
| regimes?
| gregw2 wrote:
| If you have seasonal demand patterns, you generally need three
| years history to do good predictive analytics.
|
| I do tend to agree data from five years ago is rarely relevant
| BUT our business is still using for some BI purposes data from
| the fiscal year before COVID as a comparison baseline for
| certain analytics/business processes which have been slow to
| reach pre-COVID levels of performance. So that means we are now
| using data 6 years old, comparing this year to that pre-COVID
| year for certain analytics!
| smfjaw wrote:
| I work in finance and it's great having big historical
| datasets, even if the figures are far lower in previous years
| it's good to see system 'shocks' and these can be used at a
| different magnitude/scaled for future forecasting
| OutOfHere wrote:
| Can you help us understand how others can use and derive value
| from Ray DeltaCAT? What would be the specific use cases?
| thedood wrote:
| Some of DeltaCAT's goals and use cases have been discussed in
| this 2022 talk: https://youtu.be/M3pZDp1zock?t=4676
|
| Today, our immediate next goal for DeltaCAT is to ensure that
| the compactor, and similar procedures for Ray, can run on
| Apache Iceberg. So, if you're an Iceberg user relying on
| procedures like Spark's "rewrite_data_files" and/or
| "rewrite_positional_delete_files" to compact your datasets
| today, then DeltaCAT will let you easily run similar compaction
| procedures on Ray to realize similar efficiency/scale
| improvements (even if it winds up delegating some of the work
| to other projects like PyIceberg, Daft, etc. along the way).
|
| Going forward, we'd like DeltaCAT to also provide better
| general-purpose abstractions (e.g., reading/writing/altering
| large datasets) to simplify writing Ray apps in Python that
| work across (1) different catalog formats like Iceberg, Hudi,
| and Delta and (2) different distributed data processing
| frameworks like Ray Data, Daft, Modin, etc.
|
| From the perspective of an internal DeltaCAT developer, another
| goal is to just reduce the maintainability burden and dev hours
| required to write something like a compactor that works across
| multiple catalogs (i.e., by ensuring that all interfaces used
| by such a procedure can be readily implemented for multiple
| catalog formats like Iceberg, Hudi, Delta, etc.).
| jgalt212 wrote:
| Slightly flip, but it's interesting that no one believes in or
| brags about cost savings via statistical sampling techniques
| these days.
| dekhn wrote:
| well, I can save money by eating only lentils, but I prefer a
| richer diet. As do BI folks in a highly profitable company.
| whoevercares wrote:
| I wonder if similar performance can be achieved with Spark
| accelerator like https://github.com/apache/datafusion-comet. Of
| course it didn't exist 4 years ago, but would it cheaper to
| build?
___________________________________________________________________
(page generated 2024-07-31 23:00 UTC)