[HN Gopher] Amazon's exabyte-scale migration from Apache Spark t...
       ___________________________________________________________________
        
       Amazon's exabyte-scale migration from Apache Spark to Ray on EC2
        
       Author : nojito
       Score  : 112 points
       Date   : 2024-07-29 22:14 UTC (2 days ago)
        
 (HTM) web link (aws.amazon.com)
 (TXT) w3m dump (aws.amazon.com)
        
       | Narhem wrote:
       | Absolutely insane work. So much data you'd think they would come
       | up with a custom solution instead of using the "newest available
       | toolkit" but I understand how much of a mess dealing with that
       | much data is.
        
         | thedood wrote:
         | Hi Narhem - one of the devs that worked on the migration here.
         | The data volume, and subsequent compute power required to
         | process it, is actually one of the things that led us to Ray
         | (or Ray Core specifically) since it had the distributed
         | computing primitives (tasks and actors) that we needed to build
         | out our envisioned solution with very few compromises. One
         | thing we DIDN'T want to do was just throw another one-liner SQL
         | statement running on a new data processing framework at the
         | problem, since that leads us back to the problems we had with
         | Spark - not enough low-level control for such an important
         | problem.
         | 
         | In short, after evaluating our options, Ray seemed to strike
         | the best balance between the one efficiency extreme of, say,
         | building out custom "compaction-optimized" hardware/clusters,
         | and the other maintainability extreme of just letting the
         | latest managed cloud service run a 1-liner SQL statement for us
         | without ever looking under the hood.
         | 
         | Regardless, I expect both our existing solution and the
         | distributed compute frameworks leveraged to deliver it to
         | continue to evolve over time.
        
       | mannyv wrote:
       | Crazy that the project took almost 4 years end-to-end, and it's
       | still ongoing.
       | 
       | I had no idea anything at AWS had that long of an attention span.
       | 
       | It's funny and telling that in the end, it's all backed by CSVs
       | in s3. Long live CSV!
        
         | jerrygenser wrote:
         | They reference parquet files, not sure if it's only CSV or CSV
         | even figures in that heavily other than the first iteration
         | before migrating to spark
        
         | thedood wrote:
         | Hi mannyv - one of the devs that worked on the migration here.
         | It has been a pretty long project - approached with caution due
         | to the criticality of keeping our BI datasets healthy - but the
         | preliminary results produced year-over-year kept looking
         | promising enough to keep after it. =)
         | 
         | Also, we mostly have Parquet data cataloged in S3 today, but
         | delimited text is indeed ubiquitous and surprisingly sticky, so
         | we continue to maintain some very large datasets natively in
         | this format. However, while the table's data producer may
         | prefer to write delimited text, they are almost always
         | converted to Parquet during the compaction process to produce a
         | read-optimized table variant downstream.
        
           | gregw2 wrote:
           | Are you all shifting over to storing as iceberg-enriched
           | parquet yet and letting it (within, say Athena) manage
           | compaction or thinking about it, or is it not worth it since
           | this new Ray+Parquet thing is working for you?
        
             | thedood wrote:
             | As alluded to in the blog post, Ray+Parquet+Iceberg is the
             | next frontier we'd like to make our compactor and similar
             | procedures available on in open source so that the
             | community can start bringing similar benefits for their
             | Iceberg workloads. Stay tuned. =)
        
         | wenc wrote:
         | Most people are moving away from CSV for big datasets, except
         | in exceptional cases involving linear reads (append only ETL).
         | CSV has one big upside which is human readability. But so many
         | downsides: poor random access, no typing, no compression,
         | complex parser needing to handle exceptions.
        
           | 100pctremote wrote:
           | Most people don't directly query or otherwise operate on raw
           | CSV, though. Large source datasets in CSV format still reign
           | in many enterprises, but these are typically read into a
           | dataframe, manipulated and stored as Parquet and the like,
           | then operated upon by DuckDB, Polars, etc., or modeled (E.g.
           | DBT) and pushed to an OLAP target.
        
             | wenc wrote:
             | There are folks who still directly query CSV formats in a
             | data lake using a query engine like Athena or Spark or
             | Redshift Spectrum -- which ends up being much slower and
             | consuming more resources than is necessary due to full
             | table scans.
             | 
             | CSV is only good for append only.
             | 
             | But so is Parquet and if you can write Parquet from the get
             | go, you save on storage as well has have a directly
             | queryable column store from the start.
             | 
             | CSV still exists because of legacy data generating
             | processes and dearth of Parquet familiarity among many
             | software engineers. CSV is simple to generate and easy to
             | troubleshoot without specialized tools (compared to Parquet
             | which requires tools like Visidata). But you pay for it
             | elsewhere.
        
               | cmollis wrote:
               | exactly.. parquet is good for append only.. stream mods
               | to parquet in new partitions.. compact, repeat.
        
         | whoevercares wrote:
         | Is it really AWS? I don't recall any service called BDT
        
       | quadrature wrote:
       | Anyone know enough about ray to comment on what the exact
       | performance unlock was ?. They mention that it gave them enough
       | control over the distribution of work so that they could avoid
       | unnecessary reads/write. That seems like a good win but I would
       | assume that doing compaction in python would be quite slow.
        
         | thedood wrote:
         | Some of the initial differentiators are described at the bottom
         | of our design doc at https://github.com/ray-
         | project/deltacat/blob/main/deltacat/c.... But yes, controlling
         | file I/O was also an important part of this since it allowed us
         | to (1) run more targeted downloads/reads of only the Parquet
         | row groups and columns participating in compaction and (2)
         | track dirty/clean files to skip unnecessary re-writes of
         | "clean" files that weren't altered by compaction. Also, just
         | better leveraging catalog metadata (e.g., primary key indexes
         | if available) to filter out more files in the initial scan, and
         | to copy clean files into the compacted variant by reference
         | (when supported by the underlying catalog format).
         | 
         | The trick with doing compaction in Python was to ensure that
         | the most performance-sensitive code was delegated to more
         | optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code
         | paths. If we did all of our per-record processing ops in pure
         | Python, compaction would indeed be much slower.
        
           | quadrature wrote:
           | Thanks a lot for the explanation. Sounds a lot like how
           | Pyspark allows for declarative definitions of computation
           | that is actually executed in Java.
        
           | ZeroCool2u wrote:
           | This is one of the first times I've heard of people using
           | Daft in the wild. Would you be able to elaborate on where
           | Daft came in handy?
           | 
           | Edit: Nvm, I kept reading! Thanks for the interesting post!
        
       | whoiscroberts wrote:
       | Ray user here, what language actors are they using? Ray support
       | Python Java and cpp actors...
        
         | thedood wrote:
         | We wrote the compactor in Python but, as noted in my previous
         | response to quadrature, most of the performance sensitive code
         | is written in C++ and Rust (but still invoked from Python).
        
       | esafak wrote:
       | Are we talking about big data ETL here? I did not know Ray was
       | suited for it.
        
         | thedood wrote:
         | This is a specialized ETL use-case - similar to taking a single
         | SQL query and creating a dedicated distributed application
         | tailored to run only that query. The lower-level primitives in
         | Ray Core (tasks and actors) are general purpose enough to make
         | building this type of application possible, but you'll be hard
         | pressed to quickly (i.e., with less than 1 day of effort) make
         | any arbitrary SQL query or dataframe operation run with better
         | efficiency or scale on Ray than on dedicated data processing
         | frameworks like Spark. IMO, the main value add of frameworks
         | like Spark lies more in unlocking "good enough" efficiency and
         | scale for almost any ETL job relatively quickly and easily,
         | even if it may not run your ETL job optimally.
        
           | dekhn wrote:
           | Speaking as a distributed computing nerd, Ray is definitely
           | one of the more interesting and exciting frameworks I've seen
           | in a while. It's one of those systems where reading the
           | manual, I can see that I'm not going to have to learn
           | anything new, because the mental model resembles so many
           | distributed systems I've worked with before (I dunno about
           | anybody else, but tensorflow is an example of a distributed
           | system that forced me to forget basically everything I knew
           | before I could be even remotely productive in it).
           | 
           | Unclear if it's in the best interests of anyscale to promote
           | Ray as a general purpose cluster productivity tool, even if
           | it's good at that more general use case.
        
       | jiripospisil wrote:
       | > From the typical Amazon EC2 customer's perspective, this
       | translates to saving over $120MM/year on Amazon EC2 on-demand R5
       | instance charges.
       | 
       | Does the sales team know about this? /jk
        
         | taeric wrote:
         | This assumes they aren't reinvesting the money back to other
         | tangible improvements in the services, though?
        
         | andrewxdiamond wrote:
         | Amazon sales and business teams are constantly focused on
         | reducing costs for customers and they celebrate this internally
         | too.
         | 
         | I have seen dozens of big ticket "we saved this customer
         | $xxxK/year" posts on slack and other internal venues, the
         | customer obsession is real.
        
           | immibis wrote:
           | Even the reduced cost is still ten times what it costs on a
           | traditional provider, though, right?
        
       | 100pctremote wrote:
       | Rather nuts. New challenge: build datacenters quickly enough to
       | support the new platform.
        
       | whalesalad wrote:
       | Video from the author deep diving this.
       | https://www.youtube.com/watch?v=h7svj_oAY14
        
       | zitterbewegung wrote:
       | I was in a workshop that taught me Ray. It was interesting to
       | know that the people who started Spark were also involved in
       | making Ray.
       | 
       | This is not badmouthing either project just an observation and if
       | you architected one task you would be good at attacking the same
       | problem better .
        
         | fs111 wrote:
         | Well spark was really a showcase Project for mesos when it was
         | created. Now everyone knows a lot more
        
       | igmor wrote:
       | Can you share any data on how big of a cluster is running Ray
       | jobs?
        
         | thedood wrote:
         | From the blog post, the largest individual Ray cluster that was
         | observed running a production compaction job in Q1 had 26,846
         | vCPUs and ~210TiB of RAM. This is roughly equivalent to a Ray
         | cluster composed of 839 r5.8xlarge EC2 nodes (w/ 32 vCPUs and
         | 256GiB RAM per node).
        
       | robertnishihara wrote:
       | I'm one of the creators of Ray. A few thoughts :)
       | 
       | 1. This is truly impressive work from AWS. Patrick Ames began
       | speaking about this a couple years ago, though at this point the
       | blog post is probably the best reference.
       | https://www.youtube.com/watch?v=h7svj_oAY14
       | 
       | 2. This is not a "typical" Ray use case. I'm not aware of any
       | other exabyte scale data processing workloads. Our bread and
       | butter is ML workloads: training, inference, and unstructured
       | data processing.
       | 
       | 3. We have a data processing library called Ray Data for
       | ingesting and processing data, often done in conjunction with
       | training and inference. However, I believe in this particular use
       | case, the heavy lifting is largely done with Ray's core APIs
       | (tasks & actors), which are lower level and more flexible, which
       | makes sense for highly custom use cases. Most Ray users use the
       | Ray libraries (train, data, serve), but power users often use the
       | Ray core APIs.
       | 
       | 4. Since people often ask about data processing with Ray and
       | Spark, Spark use cases tend to be more geared toward structured
       | data and CPU processing. If you are joining a bunch of tables
       | together or running SQL queries, Spark is going to be way better.
       | If you're working with unstructured data (images, text, video,
       | audio, etc), need mixed CPU & GPU compute, are doing deep
       | learning and running inference, etc, then Ray is going to be much
       | better.
        
         | justsocrateasin wrote:
         | I'm just learning about this tool now and had a brief question
         | if you have the time:
         | 
         | The paper mentions support for zero-copy intranode object
         | sharing which links to serialization in the Ray docs -
         | https://docs.ray.io/en/latest/ray-core/objects/serialization...
         | 
         | I'm really curious how this is performant - I recently tried
         | building a pipeline that leveraged substantial multiprocessing
         | in Python, and found that my process was bottlenecked by the
         | serialization/deserialization that occurs during Python
         | multiprocessing. Would love any reading or explanation you can
         | provide as to how this doesn't also bottleneck a process in
         | Ray, since it seems that data transferred between workers and
         | nodes will need to serialized and deserialized.
         | 
         | Thanks in advance! Really cool tool, hopefully I'll be able to
         | use it sooner rather than later.
        
           | robertnishihara wrote:
           | Your right that the serialization / deserialization overhead
           | can quickly exceed the compute time. To avoid this you have
           | to get a lot of small things right. And given our focus on ML
           | workloads, this is particularly important when sharing large
           | numerical arrays between processes (especially processes
           | running on the same node).
           | 
           | One of the key things is to make sure the serialized object
           | is stored in a data format where the serialized object does
           | not need to be "transformed" in order to access it. For
           | example, a numpy array can be created in O(1) time from a
           | serialized blob by initializing a Python object with the
           | right shape and dtype and a pointer to the right offset in
           | the serialized blob. We also use projects like Apache Arrow
           | that put a lot of care into this.
           | 
           | Example in more detail:
           | 
           | Imagine the object you are passing from process A to process
           | B is a 1GB numpy array of floats. In the serialization step,
           | process A produces a serialized blob of bytes that is
           | basically just the 1GB numpy array plus a little bit of
           | metadata. Process A writes that serialized blob into shared
           | memory. This step of "writing into shared memory" still
           | involves O(N) work, where N is the size of the array (though
           | you can have multiple threads do the memcpy in parallel and
           | be limited just by memory bandwidth).
           | 
           | In the deserialization step, process B accesses the same
           | shared memory blob (process A and B are on the same machine).
           | It reads a tiny bit of metadata to figure out the type of the
           | serialized object and shape and so on. Then it constructs a
           | numpy array with the correct shape and type and with a
           | _pointer_ to the actual data in shared memory at the right
           | offset. Therefore it doesn 't need to touch all of the bytes
           | of data, it just does O(1) work instead of O(N).
           | 
           | That's the basic idea. You can imagine generalizing this
           | beyond numpy arrays, but it's most effective for objects that
           | include large numerical data (e.g., objects that include
           | numpy arrays).
           | 
           | There are a bunch of little details to get right, e.g.,
           | serializing directly into shared memory instead of creating a
           | serialized copy in process A and then copying it into shared
           | memory. Doing the write into shared memory in parallel with a
           | bunch of threads. Getting the deserialization right. You also
           | have to make sure that the starting addresses of the numpy
           | arrays are 64-byte aligned (if memory serves) so that you
           | don't accidentally trigger a copy later on.
           | 
           | EDIT: I edited the above to add more detail.
        
             | Xophmeister wrote:
             | This is probably a naive question, but how do two processes
             | share address space? mmap?
        
       | parhamn wrote:
       | Im curious, how do data scientists use these massive datasets,
       | especially the old stuff. Is it more of a compliance and
       | need/should-save type thing or is the data actually useful? Im
       | baffled by these numbers having never used a large BI tool, and
       | am genuinely curious how the data is actually used operationally.
       | 
       | As a layman, I imagine lots of it loses relevancy very quickly,
       | e.g Amazon sales data from 5 years ago is marginally useful to
       | determining future trends and analyzing new consumer behavior
       | regimes?
        
         | gregw2 wrote:
         | If you have seasonal demand patterns, you generally need three
         | years history to do good predictive analytics.
         | 
         | I do tend to agree data from five years ago is rarely relevant
         | BUT our business is still using for some BI purposes data from
         | the fiscal year before COVID as a comparison baseline for
         | certain analytics/business processes which have been slow to
         | reach pre-COVID levels of performance. So that means we are now
         | using data 6 years old, comparing this year to that pre-COVID
         | year for certain analytics!
        
         | smfjaw wrote:
         | I work in finance and it's great having big historical
         | datasets, even if the figures are far lower in previous years
         | it's good to see system 'shocks' and these can be used at a
         | different magnitude/scaled for future forecasting
        
       | OutOfHere wrote:
       | Can you help us understand how others can use and derive value
       | from Ray DeltaCAT? What would be the specific use cases?
        
         | thedood wrote:
         | Some of DeltaCAT's goals and use cases have been discussed in
         | this 2022 talk: https://youtu.be/M3pZDp1zock?t=4676
         | 
         | Today, our immediate next goal for DeltaCAT is to ensure that
         | the compactor, and similar procedures for Ray, can run on
         | Apache Iceberg. So, if you're an Iceberg user relying on
         | procedures like Spark's "rewrite_data_files" and/or
         | "rewrite_positional_delete_files" to compact your datasets
         | today, then DeltaCAT will let you easily run similar compaction
         | procedures on Ray to realize similar efficiency/scale
         | improvements (even if it winds up delegating some of the work
         | to other projects like PyIceberg, Daft, etc. along the way).
         | 
         | Going forward, we'd like DeltaCAT to also provide better
         | general-purpose abstractions (e.g., reading/writing/altering
         | large datasets) to simplify writing Ray apps in Python that
         | work across (1) different catalog formats like Iceberg, Hudi,
         | and Delta and (2) different distributed data processing
         | frameworks like Ray Data, Daft, Modin, etc.
         | 
         | From the perspective of an internal DeltaCAT developer, another
         | goal is to just reduce the maintainability burden and dev hours
         | required to write something like a compactor that works across
         | multiple catalogs (i.e., by ensuring that all interfaces used
         | by such a procedure can be readily implemented for multiple
         | catalog formats like Iceberg, Hudi, Delta, etc.).
        
       | jgalt212 wrote:
       | Slightly flip, but it's interesting that no one believes in or
       | brags about cost savings via statistical sampling techniques
       | these days.
        
         | dekhn wrote:
         | well, I can save money by eating only lentils, but I prefer a
         | richer diet. As do BI folks in a highly profitable company.
        
       | whoevercares wrote:
       | I wonder if similar performance can be achieved with Spark
       | accelerator like https://github.com/apache/datafusion-comet. Of
       | course it didn't exist 4 years ago, but would it cheaper to
       | build?
        
       ___________________________________________________________________
       (page generated 2024-07-31 23:00 UTC)