[HN Gopher] Internal Consistency in Streaming Systems
___________________________________________________________________
Internal Consistency in Streaming Systems
Author : barrkel
Score : 71 points
Date : 2021-04-18 12:20 UTC (10 hours ago)
(HTM) web link (scattered-thoughts.net)
(TXT) w3m dump (scattered-thoughts.net)
| thoughtlede wrote:
| Interesting article. Two comments:
|
| 1. I have always thought of (eventual) consistency to mean
| consistency between replicas: how in-sync are the replicas in
| what they "store". Whereas, internal consistency as defined here
| seems to mean how "multiple reads" can lock into the same storage
| state. I believe the two concepts are orthogonal, so comparing
| the two concepts didn't feel natural to me.
|
| 2. If transaction ids are monotonically increasing (an if), isn't
| it possible for subsequent reads to lock into the maximum
| transaction id of the first read? For example:
| select credits, max(txnid) from table; select debits from
| table where txnid <= max_txnid;
| jamii wrote:
| > internal consistency as defined here seems to mean how
| "multiple reads" can lock into the same storage state
|
| Sort of. When you make a single read of a single key at the end
| of a streaming topology, that's equivalent to reading a all the
| inputs and doing the computation yourself. Internal consistency
| means that every path through the graph back to the the inputs
| should be reading the same set of inputs. Things go wrong when
| one path has processed more inputs than another, or when a
| single input produces multiple internal updates and those
| updates aren't processed atomically.
|
| > isn't it possible for subsequent reads to lock into the
| maximum transaction id of the first read
|
| Your example would still allow credits to have read more inputs
| than debits, but it's on the right track.
|
| In differential dataflow aggregates wait until they see a
| watermark before emitting output, so the sum in `balance` will
| wait until the watermark for eg time 7 before emitting the sum
| for time 7. That allows the output from the join to settle.
|
| Of course doing that efficiently is non-trivial...
| KptMarchewa wrote:
| I don't really understand why author treats Flink datastream API
| as "high temporal locality" and "can't express our running
| example". Windowing operators provided by Flink are just an one
| way of using datastream, and you can easily express any
| operations you want using something like KeyedProcessFunction and
| manually manipulating the state.
| jamii wrote:
| > you can easily express any operations you want
|
| That's true in that the table api itself is built on top of the
| datastream api. But to run the example you'd have to implement
| your own joins and retraction-aware aggregates and then we'd be
| testing the consistency properties of that implementation, not
| of the datastream api itself, and we might as well test a
| mature implementation like the table api instead.
| BenoitP wrote:
| Very well written article. I'll add that there are tricks to
| manage that behavior.
|
| The dataflow is a diamond: transactions ---->
| credits \ \ ----> debits
| -----JOIN----> balance ---> total
|
| In Flink the SQL query planner may instantiate two different
| consumers groups on transactions; that topic might be read twice.
| These can progress at different rates, which is partly why some
| inconsistencies could appear.
|
| Now, the trick: converting transactions to datastream then back
| to SQL will introduce a materializing barrier, and you will
| benefit from high temporal locality. There might be some
| inconsistencies left though, as the JOIN requires some shuffling
| and that can introduce some processing skew. For example the
| Netflix account will be the target of many individual accounts;
| the instance responsible for Netflix might be a hot spot and
| process things differently (probably by backpressuring a little,
| making the data arrive in larger micro batches).
|
| Anyway when processing financial data you might want to make the
| transaction IDs tag along the processing; maybe pair them back
| together in tumbling event-time windows at the total. Like he
| said: 'One thing to note is that this query does not window any
| of its inputs, putting it firmly on the low temporal locality
| side of the map where consistency is more difficult to
| maintain.'. Also, windowing would have introduced some end-to-end
| Latency.
|
| This makes me think: Streaming systems introduce a new letter in
| the CAP trade-off?
|
| I humbly propose: The CLAP trade-off, with L for Latency.
|
| [1] https://issues.apache.org/jira/browse/FLINK-15775?filter=-2
| kcolford wrote:
| Latency has always been part of the CAP theorem. There is a
| strictly stronger generalization of it that says in a
| consistent system the latency of a transaction is bounded by
| the latency of the links in the network. In the case of a
| partition the network latency is infinite, so too is the
| transaction latency, thus no availability.
| jamii wrote:
| > converting transactions to datastream then back to SQL will
| introduce a materializing barrier
|
| It seems that this would still leave the problem where each
| transactions causes two deletes and two inserts to `balance`
| and then `total` sums those one by one? These errors are very
| transient but they still cause at least 3/4 outputs to be
| incorrect.
|
| > tumbling event-time windows
|
| Will this work for the join? For a given account, the last
| update to each of credits and debits may have arbitrarily
| disparate event times and transaction ids. It doesn't seem like
| there is window you could set that would be guaranteed to
| connect them?
| BenoitP wrote:
| > this would still leave the problem where each transactions
| causes two deletes and two inserts
|
| Indeed! You might be interested in the '6.5 Materialization
| Controls' section of this paper:
|
| https://arxiv.org/pdf/1905.12133.pdf
|
| They intend to add these extensions: SELECT
| ... EMIT STREAM AFTER DELAY INTERVAL '6' MINUTES ; -- to fix
| the output multiplication you mention SELECT ... EMIT
| STREAM AFTER WATERMARK ; -- this should fix the two deletes
| and two inserts
|
| I don't know if this is still in active development, though.
| There has been no news since the paper got published 2 years
| ago.
|
| > It doesn't seem like there is window you could set that
| would be guaranteed to connect them?
|
| No there isn't. But I'm quite confident having a transaction-
| centric approach can be obtained in SQL. Your use case is
| perfect for illustrating the problem, still.
|
| I'd try something like:
|
| CREATE VIEW credits AS SELECT to_account AS account,
| sum(amount) AS credits, last_value(id) AS updating_tx FROM
| transactions GROUP BY to_account;
|
| And then try to join credits and debits together by
| updating_tx.
|
| No idea on how to check that accounts don't go negative,
| though.
| jamii wrote:
| > And then try to join credits and debits together by
| updating_tx.
|
| You can't join on updating_tx because the credits and
| debits per account are disjoint sets of transactions - that
| join will never produce output.
|
| I did try something similar with timestamps -
| https://github.com/jamii/streaming-
| consistency/blob/main/fli.... This is also wrong (because
| the timestamps don't have to match between credits and
| debits) but it at least produces output. It had a very
| similar error distribution to the original.
|
| Plus the join is only one of the problems here - the sum in
| `total` also needs to at minimum process all the balance
| updates from a single transaction atomically.
| jamii wrote:
| You could instead put the global max seen id into every
| row, but then you would have to update all the rows on
| every transaction. Which is not great peformance-wise,
| but would also massively exacerbate the non-atomic sum
| problem downstream in total.
| rubiquity wrote:
| > I humbly propose: The CLAP trade-off, with L for Latency.
|
| Daniel Abadi beat you to the punch with PACELC
|
| https://en.wikipedia.org/wiki/PACELC_theorem
| macintux wrote:
| This is a really interesting look at an aspect of consistency I
| hadn't previously given a lot of thought to, although it's
| implicit in a lot of discussions around data modeling with
| eventual consistency, and I hope it starts a longer conversation.
|
| Eventual consistency in a distributed database is hard enough for
| application developers to reason about without adding streaming
| computation concerns.
|
| With Riak 2.0, at Basho we were trying to address that complexity
| with two different approaches: offering a strong consistency
| model based on Multi-Paxos, and building CRDTs into Riak to give
| applications more robust/easier to use primitives.
|
| Unfortunately the company folded before either effort yielded
| much fruit.
| jamii wrote:
| Did basho have any internal discussion about how to ensure that
| operations reading and writing from multiple CRDTs were still
| confluent? The only work I've seen on this is
| http://www.neilconway.org/docs/socc2012_bloom_lattices.pdf but
| that seems like a better fit for a streaming system than for a
| general database.
| macintux wrote:
| I honestly don't recall; I was more a kibitzer in that realm.
|
| Chris Meiklejohn with the Lasp project would be a good person
| to reach out to.
|
| https://github.com/lasp-lang/lasp
| jamii wrote:
| Oh, I have seen some of the lasp papers. And I guess
| https://dl.acm.org/doi/abs/10.1145/2578855.2535842 is
| related too. Some of the stuff coming out of the RISE lab
| too eg https://rise.cs.berkeley.edu/projects/anna/.
|
| > The only work I've seen on this is
|
| Honestly, my brain in the morning...
| yamrzou wrote:
| Interesting article. However, I find some claims regarding Flink
| to be inaccurate:
|
| > Flink is split into two apis - the datastream api is strongly
| focused on high-temporal-locality problems and so can't express
| our running example
|
| While the streaming API doesn't have the relational SQL syntax,
| it should have all the necessary building blocks to do anything
| that can be done with the table API.
|
| > The file source appears to load the current contents of the
| file in a single batch and then ignore any future appends, so
| it's not usable for testing streaming behavior
|
| The file source can be used for testing streaming behavior, for
| example by using a directory and using a PROCESS_CONTINUOUSELY
| watch type. See: https://ci.apache.org/projects/flink/flink-docs-
| stable/dev/d...
|
| > Flink also has a 5s trailing watermark, but it doesn't reject
| any inputs in non-windowed computations.
|
| This is expected, that's how Flink deals with late data by
| default, so that it doesn't loose any. But it can be done using
| the Datastream API, afaik. In winodwed computations,
| allowedLateness(...) can be used to drop late records:
| https://ci.apache.org/projects/flink/flink-docs-release-1.12....
| If the computation graph has no windows such as the article
| example, the low level operation ProcessFunction can be used to
| access the watermark using timerService.currentWatermark() and to
| drop late events. Keep in mind though, that this comes at the
| cost of parallelization as the data stream should be keyed in
| order to be able to use timers (see for example:
| https://stackoverflow.com/a/47071833/3398493).
|
| Also, it seems very odd to me that a watermark of 5s is used
| while the expected out-of-orderness is 10s. The watermark is
| usually set in accordance with the extected out-of-orderness,
| precisely to avoid consistency issues caused by late data. Why
| did the author choose to do that?
| jamii wrote:
| > it should have all the necessary building blocks to do
| anything that can be done with the table API
|
| That's true in that the table api itself is built on top of the
| datastream api. But to run the example you'd have to implement
| your own joins and retraction-aware aggregates and then we'd be
| testing the consistency properties of that implementation, not
| of the datastream api itself, and we might as well test a
| mature implementation like the table api instead.
|
| > The file source can be used for testing streaming behavior,
| for example by using a directory and using a
| PROCESS_CONTINUOUSELY watch type.
|
| Ah, I was only looking at the table api version
| (https://ci.apache.org/projects/flink/flink-docs-
| release-1.12...) which doesn't mention that option. I guess I
| could make stream and turn it into a table?
|
| > This is expected, that's how Flink deals with late data...
|
| I should clarify in the article that I was surprised not
| because it's wrong, but because it's very different from the
| behavior that I'm used to.
|
| > Also, it seems very odd to me that a watermark of 5s is used
| while the expected out-of-orderness is 10s. Why did the author
| choose to do that?
|
| To explore how late data is treated.
|
| There are some plausible sources of inconsistency that from
| late data handling that I didn't get to demonstrate in the
| article. For example, it sounds like even without
| allowed_lateness if two different windowed operations had
| different window sizes they might drop different subsets of
| late data, which could causes inconsistency between their
| outputs.
| jamii wrote:
| Oh, wait, this doesn't sound promising.
|
| > If the watchType is set to
| FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is
| modified, its contents are re-processed entirely. This can
| break the "exactly-once" semantics, as appending data at the
| end of a file will lead to all its contents being re-
| processed.
___________________________________________________________________
(page generated 2021-04-18 23:01 UTC)