[HN Gopher] Internal Consistency in Streaming Systems
       ___________________________________________________________________
        
       Internal Consistency in Streaming Systems
        
       Author : barrkel
       Score  : 71 points
       Date   : 2021-04-18 12:20 UTC (10 hours ago)
        
 (HTM) web link (scattered-thoughts.net)
 (TXT) w3m dump (scattered-thoughts.net)
        
       | thoughtlede wrote:
       | Interesting article. Two comments:
       | 
       | 1. I have always thought of (eventual) consistency to mean
       | consistency between replicas: how in-sync are the replicas in
       | what they "store". Whereas, internal consistency as defined here
       | seems to mean how "multiple reads" can lock into the same storage
       | state. I believe the two concepts are orthogonal, so comparing
       | the two concepts didn't feel natural to me.
       | 
       | 2. If transaction ids are monotonically increasing (an if), isn't
       | it possible for subsequent reads to lock into the maximum
       | transaction id of the first read? For example:
       | select credits, max(txnid) from table;         select debits from
       | table where txnid <= max_txnid;
        
         | jamii wrote:
         | > internal consistency as defined here seems to mean how
         | "multiple reads" can lock into the same storage state
         | 
         | Sort of. When you make a single read of a single key at the end
         | of a streaming topology, that's equivalent to reading a all the
         | inputs and doing the computation yourself. Internal consistency
         | means that every path through the graph back to the the inputs
         | should be reading the same set of inputs. Things go wrong when
         | one path has processed more inputs than another, or when a
         | single input produces multiple internal updates and those
         | updates aren't processed atomically.
         | 
         | > isn't it possible for subsequent reads to lock into the
         | maximum transaction id of the first read
         | 
         | Your example would still allow credits to have read more inputs
         | than debits, but it's on the right track.
         | 
         | In differential dataflow aggregates wait until they see a
         | watermark before emitting output, so the sum in `balance` will
         | wait until the watermark for eg time 7 before emitting the sum
         | for time 7. That allows the output from the join to settle.
         | 
         | Of course doing that efficiently is non-trivial...
        
       | KptMarchewa wrote:
       | I don't really understand why author treats Flink datastream API
       | as "high temporal locality" and "can't express our running
       | example". Windowing operators provided by Flink are just an one
       | way of using datastream, and you can easily express any
       | operations you want using something like KeyedProcessFunction and
       | manually manipulating the state.
        
         | jamii wrote:
         | > you can easily express any operations you want
         | 
         | That's true in that the table api itself is built on top of the
         | datastream api. But to run the example you'd have to implement
         | your own joins and retraction-aware aggregates and then we'd be
         | testing the consistency properties of that implementation, not
         | of the datastream api itself, and we might as well test a
         | mature implementation like the table api instead.
        
       | BenoitP wrote:
       | Very well written article. I'll add that there are tricks to
       | manage that behavior.
       | 
       | The dataflow is a diamond:                   transactions ---->
       | credits            \                   \            ----> debits
       | -----JOIN----> balance ---> total
       | 
       | In Flink the SQL query planner may instantiate two different
       | consumers groups on transactions; that topic might be read twice.
       | These can progress at different rates, which is partly why some
       | inconsistencies could appear.
       | 
       | Now, the trick: converting transactions to datastream then back
       | to SQL will introduce a materializing barrier, and you will
       | benefit from high temporal locality. There might be some
       | inconsistencies left though, as the JOIN requires some shuffling
       | and that can introduce some processing skew. For example the
       | Netflix account will be the target of many individual accounts;
       | the instance responsible for Netflix might be a hot spot and
       | process things differently (probably by backpressuring a little,
       | making the data arrive in larger micro batches).
       | 
       | Anyway when processing financial data you might want to make the
       | transaction IDs tag along the processing; maybe pair them back
       | together in tumbling event-time windows at the total. Like he
       | said: 'One thing to note is that this query does not window any
       | of its inputs, putting it firmly on the low temporal locality
       | side of the map where consistency is more difficult to
       | maintain.'. Also, windowing would have introduced some end-to-end
       | Latency.
       | 
       | This makes me think: Streaming systems introduce a new letter in
       | the CAP trade-off?
       | 
       | I humbly propose: The CLAP trade-off, with L for Latency.
       | 
       | [1] https://issues.apache.org/jira/browse/FLINK-15775?filter=-2
        
         | kcolford wrote:
         | Latency has always been part of the CAP theorem. There is a
         | strictly stronger generalization of it that says in a
         | consistent system the latency of a transaction is bounded by
         | the latency of the links in the network. In the case of a
         | partition the network latency is infinite, so too is the
         | transaction latency, thus no availability.
        
         | jamii wrote:
         | > converting transactions to datastream then back to SQL will
         | introduce a materializing barrier
         | 
         | It seems that this would still leave the problem where each
         | transactions causes two deletes and two inserts to `balance`
         | and then `total` sums those one by one? These errors are very
         | transient but they still cause at least 3/4 outputs to be
         | incorrect.
         | 
         | > tumbling event-time windows
         | 
         | Will this work for the join? For a given account, the last
         | update to each of credits and debits may have arbitrarily
         | disparate event times and transaction ids. It doesn't seem like
         | there is window you could set that would be guaranteed to
         | connect them?
        
           | BenoitP wrote:
           | > this would still leave the problem where each transactions
           | causes two deletes and two inserts
           | 
           | Indeed! You might be interested in the '6.5 Materialization
           | Controls' section of this paper:
           | 
           | https://arxiv.org/pdf/1905.12133.pdf
           | 
           | They intend to add these extensions:                   SELECT
           | ... EMIT STREAM AFTER DELAY INTERVAL '6' MINUTES ; -- to fix
           | the output multiplication you mention         SELECT ... EMIT
           | STREAM AFTER WATERMARK ; -- this should fix the two deletes
           | and two inserts
           | 
           | I don't know if this is still in active development, though.
           | There has been no news since the paper got published 2 years
           | ago.
           | 
           | > It doesn't seem like there is window you could set that
           | would be guaranteed to connect them?
           | 
           | No there isn't. But I'm quite confident having a transaction-
           | centric approach can be obtained in SQL. Your use case is
           | perfect for illustrating the problem, still.
           | 
           | I'd try something like:
           | 
           | CREATE VIEW credits AS SELECT to_account AS account,
           | sum(amount) AS credits, last_value(id) AS updating_tx FROM
           | transactions GROUP BY to_account;
           | 
           | And then try to join credits and debits together by
           | updating_tx.
           | 
           | No idea on how to check that accounts don't go negative,
           | though.
        
             | jamii wrote:
             | > And then try to join credits and debits together by
             | updating_tx.
             | 
             | You can't join on updating_tx because the credits and
             | debits per account are disjoint sets of transactions - that
             | join will never produce output.
             | 
             | I did try something similar with timestamps -
             | https://github.com/jamii/streaming-
             | consistency/blob/main/fli.... This is also wrong (because
             | the timestamps don't have to match between credits and
             | debits) but it at least produces output. It had a very
             | similar error distribution to the original.
             | 
             | Plus the join is only one of the problems here - the sum in
             | `total` also needs to at minimum process all the balance
             | updates from a single transaction atomically.
        
               | jamii wrote:
               | You could instead put the global max seen id into every
               | row, but then you would have to update all the rows on
               | every transaction. Which is not great peformance-wise,
               | but would also massively exacerbate the non-atomic sum
               | problem downstream in total.
        
         | rubiquity wrote:
         | > I humbly propose: The CLAP trade-off, with L for Latency.
         | 
         | Daniel Abadi beat you to the punch with PACELC
         | 
         | https://en.wikipedia.org/wiki/PACELC_theorem
        
       | macintux wrote:
       | This is a really interesting look at an aspect of consistency I
       | hadn't previously given a lot of thought to, although it's
       | implicit in a lot of discussions around data modeling with
       | eventual consistency, and I hope it starts a longer conversation.
       | 
       | Eventual consistency in a distributed database is hard enough for
       | application developers to reason about without adding streaming
       | computation concerns.
       | 
       | With Riak 2.0, at Basho we were trying to address that complexity
       | with two different approaches: offering a strong consistency
       | model based on Multi-Paxos, and building CRDTs into Riak to give
       | applications more robust/easier to use primitives.
       | 
       | Unfortunately the company folded before either effort yielded
       | much fruit.
        
         | jamii wrote:
         | Did basho have any internal discussion about how to ensure that
         | operations reading and writing from multiple CRDTs were still
         | confluent? The only work I've seen on this is
         | http://www.neilconway.org/docs/socc2012_bloom_lattices.pdf but
         | that seems like a better fit for a streaming system than for a
         | general database.
        
           | macintux wrote:
           | I honestly don't recall; I was more a kibitzer in that realm.
           | 
           | Chris Meiklejohn with the Lasp project would be a good person
           | to reach out to.
           | 
           | https://github.com/lasp-lang/lasp
        
             | jamii wrote:
             | Oh, I have seen some of the lasp papers. And I guess
             | https://dl.acm.org/doi/abs/10.1145/2578855.2535842 is
             | related too. Some of the stuff coming out of the RISE lab
             | too eg https://rise.cs.berkeley.edu/projects/anna/.
             | 
             | > The only work I've seen on this is
             | 
             | Honestly, my brain in the morning...
        
       | yamrzou wrote:
       | Interesting article. However, I find some claims regarding Flink
       | to be inaccurate:
       | 
       | > Flink is split into two apis - the datastream api is strongly
       | focused on high-temporal-locality problems and so can't express
       | our running example
       | 
       | While the streaming API doesn't have the relational SQL syntax,
       | it should have all the necessary building blocks to do anything
       | that can be done with the table API.
       | 
       | > The file source appears to load the current contents of the
       | file in a single batch and then ignore any future appends, so
       | it's not usable for testing streaming behavior
       | 
       | The file source can be used for testing streaming behavior, for
       | example by using a directory and using a PROCESS_CONTINUOUSELY
       | watch type. See: https://ci.apache.org/projects/flink/flink-docs-
       | stable/dev/d...
       | 
       | > Flink also has a 5s trailing watermark, but it doesn't reject
       | any inputs in non-windowed computations.
       | 
       | This is expected, that's how Flink deals with late data by
       | default, so that it doesn't loose any. But it can be done using
       | the Datastream API, afaik. In winodwed computations,
       | allowedLateness(...) can be used to drop late records:
       | https://ci.apache.org/projects/flink/flink-docs-release-1.12....
       | If the computation graph has no windows such as the article
       | example, the low level operation ProcessFunction can be used to
       | access the watermark using timerService.currentWatermark() and to
       | drop late events. Keep in mind though, that this comes at the
       | cost of parallelization as the data stream should be keyed in
       | order to be able to use timers (see for example:
       | https://stackoverflow.com/a/47071833/3398493).
       | 
       | Also, it seems very odd to me that a watermark of 5s is used
       | while the expected out-of-orderness is 10s. The watermark is
       | usually set in accordance with the extected out-of-orderness,
       | precisely to avoid consistency issues caused by late data. Why
       | did the author choose to do that?
        
         | jamii wrote:
         | > it should have all the necessary building blocks to do
         | anything that can be done with the table API
         | 
         | That's true in that the table api itself is built on top of the
         | datastream api. But to run the example you'd have to implement
         | your own joins and retraction-aware aggregates and then we'd be
         | testing the consistency properties of that implementation, not
         | of the datastream api itself, and we might as well test a
         | mature implementation like the table api instead.
         | 
         | > The file source can be used for testing streaming behavior,
         | for example by using a directory and using a
         | PROCESS_CONTINUOUSELY watch type.
         | 
         | Ah, I was only looking at the table api version
         | (https://ci.apache.org/projects/flink/flink-docs-
         | release-1.12...) which doesn't mention that option. I guess I
         | could make stream and turn it into a table?
         | 
         | > This is expected, that's how Flink deals with late data...
         | 
         | I should clarify in the article that I was surprised not
         | because it's wrong, but because it's very different from the
         | behavior that I'm used to.
         | 
         | > Also, it seems very odd to me that a watermark of 5s is used
         | while the expected out-of-orderness is 10s. Why did the author
         | choose to do that?
         | 
         | To explore how late data is treated.
         | 
         | There are some plausible sources of inconsistency that from
         | late data handling that I didn't get to demonstrate in the
         | article. For example, it sounds like even without
         | allowed_lateness if two different windowed operations had
         | different window sizes they might drop different subsets of
         | late data, which could causes inconsistency between their
         | outputs.
        
           | jamii wrote:
           | Oh, wait, this doesn't sound promising.
           | 
           | > If the watchType is set to
           | FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is
           | modified, its contents are re-processed entirely. This can
           | break the "exactly-once" semantics, as appending data at the
           | end of a file will lead to all its contents being re-
           | processed.
        
       ___________________________________________________________________
       (page generated 2021-04-18 23:01 UTC)