https://pcreux.com/2024/10/07/rails-data-sync-service.html << Philippe Creux Building a Robust Data Synchronization Framework with Rails 07 Oct 2024 At Zipline, we offer a one-stop shop application for retail employees that handles everything from clocking to task management and team communication. To achieve this, we need to integrate with numerous third-party applications with unique requirements. We decided to develop a satellite service called ZipIO for data synchronization. In this post, I'll review our journey of building this robust and scalable system. Key Design Principles 1. Language and Framework: We chose Ruby on Rails for its simplicity and productivity. However, we structured our core logic as POROs (Plain Old Ruby Object) to decouple the core logic from the engine that runs it. 2. Modular Architecture: We broke down the workflow into discrete steps: Signals, Commands, Outcomes, and Tasks. This modular approach allows for easy reuse of logic across different third-party integrations. 3. Asynchronous Processing: While we support synchronous operations for specific use cases, 99% of our workflows run asynchronously for optimal performance and scalability. 4. Robust Error Handling: Each step in the workflow is designed to handle errors gracefully, ensuring smooth operation even when issues arise. 5. Full Visibility: We prioritized comprehensive logging and monitoring to make troubleshooting and maintenance as painless as possible. Defining workflows Steps Our workflow consists of four main kind of steps: 1. Signals: These are triggers, often from API calls or webhooks, that initiate a workflow. 2. Commands: Actions performed in response to signals, such as synchronizing data with another app. 3. Outcomes: The results of commands, which can be success, failure, or skipped. 4. Tasks: Scheduled operations that poll third-party services for new data to sync. They create Signals, in turn. Each step in the workflow is designed as a standalone Ruby object that takes a data input, performs an action, and returns the next step(s) to be executed. Since each step defines the downstream step (s), it can perform complex routing depending on input data or user configuration. For example, a Signal could return several Commands to sync an event to multiple third-party services. flowchart LR A[Signal: New Order] B[Command: Sync to service A] C[Command: Sync to service B] BB[Outcome: Success] CB[Outcome: Success] A --> B A --> C B --> BB C --> CB The data inputs are defined as Dry::Struct. Each step declares the type(s) of step it returns, adding an extra layer of type safety and allowing us to generate documentation automatically. Here is an example: # This signal is created when we receive a new order via a webhook. class Stripe::Signals::NewOrder < ZipIO::Step data_struct [Stripe::Structs::Order] emits MyStore::Commands::SaveOrder # Convert Stripe Order to a Generic Struct and return a Command to create the order def call order = Transformer.call(data, to: Generic::Structs::Order) MyStore::Commands::CreateOrder.new(order) end end class Stripe::Commands::CreateOrder < ZipIO::Step # Create an order via an API and return an Outcome according to the response. def call response = api_client.orders.post(data) if response.success? MyStore::Outcome::OrderCreated.new(response.parsed_body) else MyStore::Outcome::OrderCreateFailure.new(response.parsed_body) end private def api_client # ... end end # Two outcomes. They are the final steps of the workflow so they don't do anything. Stripe::Commands::OrderCreated = Class.new(ZipIO::Outcomes::Success) Stripe::Commands::OrderCreateFailure = Class.new(ZipIO::Outcomes::Failure) # Order received from Stripe. class Stripe::Structs::Order < DryStruct attribute :amount, Types::Decimal attribute :order_id, Types::String attribute :user_id, Types::String attribute :email, Types::String attribute :payment_type, Types::String end # Generic order used throughout the system. class Generic::Structs::Order < DryStruct attribute :amount, Types::Decimal attribute :order_id, Types::String attribute :integration_key, Types::String attribute :email, Types::String attribute :payment_source, Types::String.enum('credit_card', 'debit_card') end Transforming data The core logic is to transform data from one schema (ex: Stripe Order) to another (ex: Generic Order). We created a simple library inspired by csv-importer to define data transformation using a DSL. class StripeOrderToGenericOrder < ZipIO::Transformer # map `amount` to `amount` attribute :amount # map `stripe_order_id` to `order_id` attribute :order_id, source: :stripe_order_id # build a custom integration key attribute :integration_key, source: ->(data) { [data.fetch(:user_id), data.fetch(:order_id)].join("-") } # downcase email attribute :email, transform: -> { _1.&downcase } # Define mapping attribute :payment_source, source: :payment_type, transform: Mapper.new( "cc" => "credit_card", "dc" => "debit_card", # ... ) end Using a proper naming convention, we can look up the transformers on the fly: Transformer.call(stripe_order, to: Generic::Structs::Order) # => <# Generic::Structs::Order ...> Live documentation Since Steps, Structs, and Transformers are defined using DSLs, we can introspect them to generate documentation. This documentation is available in our Admin UI so that non-technical folks (Product Owners, Account Managers, ...) understand how the data flows and how it's transformed. We traverse the graph of emit-ed steps to render a flow chart: flowchart LR A[Stripe::Signals::Webhook] --> B[Stripe::Signals::PaymentSucceeded] A[Stripe::Signals::Webhook] --> C[Stripe::Signals::NewOrder] A[Stripe::Signals::Webhook] --> D[Stripe::Outcome::IgnoredWebhookEvent] C --> E[MyStore::Commands::CreateOrder] E --> F[MyStore::Outcome::OrderCreated] E --> G[MyStore::Outcome::OrderCreateFailure] L[Shopify::Signals::Webhook] --> M M[Shopify::Signals::NewOrder] --> E We look up all Structs and Transformers to display them in a human-friendly way. Here is for example the documentation for StripeOrderToGenericOrder: Source Target Transform amount [?] amount stripe_order_id [?] order_id Proc[data.fetch(:user_id), data.fetch(:order_id)].join [?] integration_key ("-") email [?] email Proc -> { _1.& downcase } payment_source [?] payment_type "cc" [?] "credit_card" "dc" [?] "debit_card" So far, we've defined the steps, structs, and transformations. Let's see how we make data flow through them. Processing a Workflow Processing a Workflow synchronously is straightforward. # Run a workflow synchronously and return a collection of child steps. # Ex: [ Stripe::Signals::NewOrder, [ MyStore::Commands::CreateOrder, [ MyStore::Outcome::OrderCreated ] ] ] def run(step) return step if step.is_a?(Symbol) # e.g. :success, :skip, ... next_steps = Array(step.call) next_steps.map do |next_step| run(next_step) end end We use this approach to perform integration tests where we want to exerce an entire workflow. On a production environment, we persist Pipelines and Steps to the database and we process them asynchronously. Database Structure We use a minimal set of Rails models: * Pipeline: A specific instance of a pipeline * PipelineStep: Individual steps within a pipeline Asynchronous Processing When a signal is received: 1. A new pipeline is created and persisted in the database. 2. A job is enqueued to process the first step. 3. Each step, when processed, can trigger subsequent steps, which are then enqueued as separate jobs. This approach allows for excellent scalability, fault tolerance, and visibility. Error Handling and Retry Mechanism By persisting each step in the database, we can retry failed steps without restarting the entire workflow. This is particularly useful for long sequences of API calls where only one step might fail. Scalability Our service runs on Heroku, using Judoscale to auto-scale background workers. This setup allows us to handle peak loads efficiently by spinning up additional workers as needed. Example In the example below, we handle Stripe webhooks. Each webhook can contain multiple events, so we start a pipeline where the first step breaks down each event into its own Signal. class StripeWebhooksController < ApplicationController # POST /webhooks/stripe # The webhooks can contain multiple events. # Build a Webhook signal from the raw payload and start a Pipeline. def create signal = Stripe::Signals::Webhook.new(params) Pipeline.start!(signal) end end class Stripe::Signals::Webhook < ZipIO::Signal # Return an array of individual signals. # These signals will be processed in parallel. def call(webhook) webhook.events.filter_map do |event| case event.type when 'payment_intent.succeeded' Stripe::Signals::PaymentSucceeded.new(event) when 'orders.succeeded' # See definition above. Stripe::Signals::NewOrder.new(event) else Stripe::Outcome::IgnoredWebhookEvent.new(event) end end end end At a high level, the Pipeline implementation looks like this: class Pipeline < ApplicationRecord # Persist the signal as the first step of the pipeline and process it asynchronously. def self.start!(signal) PipelineStep.enqueue(signal) end end class PipelineStep < ApplicationRecord # Persist the step and process it asynchronously. def self.enqueue(step, pipeline: nil) pipeline ||= Pipeline.create! step_model = create!(data: step.attributes, step_class: step.class.name, pipeline: pipeline) step_model.process_async! end # Enqueue a job that calls `process!` def process_async! ProcessPipelineStepJob.perform_async(self) end # Constantize the persisted step, trigger `call`, and enqueue the resulting steps. def process! step = step_class.constantize.new(data) # ex: Stripe::Signals::Webhook # Trigger the step resulting_step = step.call return if resulting_step.is_a?(Symbol) # :success, :fail, :skip Array(resulting_step).each do |step| PipelineStep.enqueue(step, pipeline: self.pipeline) end end end Instrumentation, visibility, and monitoring Since every single step is persisted in the database, writing an admin interface that gives complete visibility into each pipeline and its steps is straightforward. Thanks to this admin interface, most debugging sessions are painless. There's no need to dig into the logs or open a Rails console; everything is nicely laid out in a web UI. Every step run is sent to Datadog to help us monitor the system's health and alert us when failure rates are high. Conclusion I'm very happy with how ZipIO turned out. We succeeded to built a framework that allows us to define the core logic by writing simple and elegant code. After three years of operation, the codebase defines 125 steps and 50 transformers. We have processed 1 billion steps, and our service continues to be a pleasure to work with and expand. Questions? Comments? pcreux@ruby.social. --------------------------------------------------------------------- You might also be interested in: * Organize your Rails codebase with aaa engines * Handling webhooks in Ruby on Rails * Snapshot testing with dbt