117x Filetype PDF File size 1.37 MB Source: assets.confluent.io
Consistency and Completeness: Rethinking Distributed Stream Processing in Apache Kafka GuozhangWang Lei Chen AyusmanDikshit guozhang@confluent.io lchen576@bloomberg.net adikshit@expediagroup.com Confluent Inc., USA Bloomberg, USA Expedia Group, India Jason Gustafson BoyangChen Matthias J. Sax jason@confluent.io boyang@confluent.io matthias@confluent.io Confluent Inc., USA Confluent Inc., USA Confluent Inc., USA John Roesler Sophie Blee-Goldman BrunoCadonna john@confluent.io sophie@confluent.io bruno@confluent.io Confluent Inc., USA Confluent Inc., USA Confluent Inc., USA ApurvaMehta Varun Madan Jun Rao apurva@confluent.io vmadan@confluent.io jun@confluent.io Confluent Inc., USA Confluent Inc., USA Confluent Inc., USA ABSTRACT KEYWORDS An increasingly important system requirement for distributed Stream Processing, Semantics stream processing applications is to provide strong correctness ACMReferenceFormat: guarantees under unexpected failures and out-of-order data so that Guozhang Wang, Lei Chen, Ayusman Dikshit, Jason Gustafson, Boyang its results can be authoritative (not needing complementary batch Chen, Matthias J. Sax, John Roesler, Sophie Blee-Goldman, Bruno Cadonna, results). Although existing systems have put a lot of effort into Apurva Mehta, Varun Madan, and Jun Rao. 2021. Consistency and Com- addressing some specific issues, such as consistency and complete- pleteness: Rethinking Distributed Stream Processing in Apache Kafka. In ness, how to enable users to make flexible and transparent trade-off Proceedings of the 2021 International Conference on Management of Data decisions among correctness, performance, and cost still remains (SIGMOD’21), June 20ś25, 2021, Virtual Event, China. ACM, New York, NY, a practical challenge. Specifically, similar mechanisms are usually USA,12pages.https://doi.org/10.1145/3448016.3457556 applied to tackle both consistency and completeness, which can 1 INTRODUCTION result in unnecessary performance penalties. WepresentApacheKafka’scoredesignforstreamprocessing, Stream processing is gaining tremendous attention in the indus- which relies on its persistent log architecture as the storage and try as a new programming paradigm to implement real-time data- inter-processor communication layers to achieve correctness guar- driven applications. This paradigm is more collaboration friendly antees. Kafka Streams, a scalable stream processing client library in to modern organizations that are composed of vertically separated ApacheKafka, defines the processing logic as read-process-write engineering teams responsible for loosely coupled sub-systems. cycles in which all processing state updates and result outputs Comparedwithtraditional data-driven applications that centralize are captured as log appends. Idempotent and transactional write their state in a shared database management system, the asyn- protocols are utilized to guarantee exactly-once semantics. Fur- chronous nature of stream processing allows teams to build their thermore, revision-based speculative processing is employed to sub-systems as event-driven applications that communicate with emit results as soon as possible while handling out-of-order data. each other via event-message passing. The sub-systems react to Wealsodemonstrate howKafkaStreamsbehavesinpractice with those messages with local application state updates without deeply large-scale deployments and performance insights exhibiting its coupled interactions that usually need to be coordinated and syn- flexible and low-overhead trade-offs. chronized. Oneofthebiggestchallengesforstreamingsystemsistoprovide CCSCONCEPTS correctness guarantees for data processing in a distributed envi- · Information systems → Stream management. ronment [6, 11, 20]. Although data stream management systems (DSMS) have been an active research topic in the database com- Permission to make digital or hard copies of part or all of this work for personal or munity for many years, early efforts were focused on producing classroom use is granted without fee provided that copies are not made or distributed real-time, perhaps approximate and lossy, results with high scala- for profit or commercial advantage and that copies bear this notice and the full citation bility and low latency. As a result, until recently stream processing onthefirstpage.Copyrightsforthird-partycomponentsofthisworkmustbehonored. This work is licensed under a Creative Commons Attribution-ShareAlike International For all other uses, contact the owner/author(s). has still been considered as an auxiliary architecture in addition to 4.0 License. SIGMOD’21,June20ś25,2021, Virtual Event, China the more reliable, periodic batch processing jobs [8]. SIGMOD ’21, June 20–25, 2021, Virtual Event, China. ©2021Copyrightheldbytheowner/author(s). Modernstreamingengines are designed to be authoritative and © 2021 Copyright held by the owner/author(s). ACMISBN978-1-4503-8343-1/21/06. ACM ISBN 978-1-4503-8343-1/21/06. https://doi.org/10.1145/3448016.3457556 are no longer treated as an approximation of the truth generated by https://doi.org/10.1145/3448016.3457556 batch processing systems. This is accomplished by offering strong correctness guarantees along with fault-tolerance. In these engines, however, conventional stream processing wisdom still holds that users will have to make trade-off decisions among performance, correctness, and cost since they are not all achievable at the same time [3]. For example, if users choose to optimize for correctness, they can specify the stream processing paradigm as batch process- ing (chunking continuous data streams into discrete finite data sets andprocessthemasmicro-batches)[19];iftheychoosetooptimize for performance,theymayuseanin-memorystreamingframework Figure 1: Examples for Streaming Consistency and Com- that generates approximate results and may lose data in the face of pleteness Challenges failures [15, 39]. How to achieve correct streaming results while not sacrificing too much on performance remains a common concern for stream processing developers. wedesigned Kafka to store all continuous data streams as repli- In this paper, we identify two primary properties that contribute cated append-only logs. This allows us to simplify the streaming to stream processing correctness guarantees, namely consistency consistency and completeness challenges by using ordered trans- in face of failures and completeness with out-of-order data. Con- actional log appends and replays. Apache Kafka contains a stream sistency is a guarantee that a stream processing application can processing library, Kafka Streams, introduced in version 0.10 (May recover from failures to a consistent state such that final results 2016) to help users build real-time applications with streaming data will not contain duplicates or lose any data. In other words, the stored in Kafka. It decouples these two challenges and tackles them streamprocessingapplication delivers results as if the records were with separate approaches: idempotent and transactional writes processed exactly once without any failures. Completeness is a for consistency, and speculative processing with revision for com- guarantee that a stream processing application does not generate pleteness. Kafka Streams maintains both processing state updates incomplete partial outputs as final results even when input stream andintermediate shuffled streams as persistent logs in Kafka, and records may arrive out of order. Completeness requires the applica- hence does not require state checkpointing and delayed emitting tion to be able to measure processing progress in time and estimate of output results. As shown in some large-scale deployments of howcompletetheemittedoutputresults are corresponding to its Kafka Streams in production, by configuring different values for input streams. transaction committing intervals and per-operator lateness toler- Figure 1 showcases these two correctness challenges in stream ance periods, developers using Kafka Streams have more flexible processing. It depicts a simple stateful processor with a single input trade-offs between performance and correctness. and a single output stream. Processing state is maintained in a To summarize, in this paper we present that by tackling the store and accessed by this processor for reads and writes. The consistency and completeness properties separately with a small input stream contains only three records with timestamps 11, 13, tributetocost,KafkaStreamscanhelpstreamprocessingdevelopers and12(Figure 1.a). Suppose that after processing the record with to achieve both correctness and performance in their applications. timestamp 11 and updating the state but before the processing is Its design is underpinned by the following technical contributions: acknowledged on the input stream (denoted by the dotted bar), a • We discuss the two correctness challenges in stream pro- failure occurs on the processor (Figure 1.b). Upon recovery, the cessing, namely consistency in the face of failures and com- processor would reprocess the record with timestamp 11 from the pleteness with out-of-order data. We survey state of the inputstreamandhencedoubleupdatethestate(Figure1.c),causing art solutions for these challenges and their implicit latency inconsistent results. Furthermore, suppose that after the first and trade-offs. second records are processed and the results for times 11 and 13 • We introduce the Kafka Streams client library of Apache are emitted respectively, an out-of-order record with an earlier Kafka, which leverages Kafka’s core architecture as a per- timestamp 12 is received, indicating that previously emitted results sistent, immutable commit log to support stream processing are actually not complete up to 12 (Figure 1.d). capabilities. It tackles streaming consistency by transform- Today, a common technique employed in production streaming ingallcomputationalresultsaslogappendswithidempotent systemstoachievethesetwopropertiesisstatecheckpointingalong andtransactional protocols, and handles out-of-order data withoutputdelays.Whileeffectiveinprovidingcorrectresults,this with a fine-grained speculative approach. mechanismwouldrequiresusers to make hard trade-off decisions • WedemonstratehowKafkaStreamshasbeenusedinlarge- onend-to-end streaming latencies. scale streaming application deployments in production en- ApacheKafka[1]is an open-source distributed event streaming vironments that rely heavily on the correctness guarantees, platform that addresses these correctness challenges in a different andwedescribetheir performance metrics and insights. mannerbyintegrating stream processing with persistent logging. Thekeyideacomesfromourexperiencedevelopingandoperating Theremainder of this paper is organized as follows: Section 2 streamprocessingapplicationssuchthat,bypayingamodestcostto identifies two key challenges to support distributed stream process- persiststreamingdata,wecanimplementmoreflexiblemechanisms ing correctness guarantees. Section 3 gives an overview of the core aiming for both correctness and performance. More specifically, design principles of Apache Kafka and its streaming library, Kafka Streams. Section 4 describes how Kafka Streams implements its exactly-once semantics via idempotent and transactional log write andprocessing the same data streams, producing duplicate outputs. protocols to support consistency with failure recovery. Section 5 Wecall this the problem of zombie instances. illustrates the revision-based speculative processing mechanism Theinter-processor RPC can fail: In a distributed environment, employedbyKafkaStreamstohandleout-of-orderdataandreason streamprocessorscommunicatewitheachotherandpropagatepro- about streaming completeness. Finally, Section 6 presents exist- cessing results through message passing. The durability of those ing large-scale deployments of Kafka Streams and discusses their sent messages usually depends on the sender receiving an acknowl- performance insights related to its consistency configurations. Sec- edgement (ack) from the receiver. The failure to receive that ack tion 7 summarizes related work, followed by our conclusions and does not necessarily mean that the message did not reach the other future work in Section 8. side: for example, it is possible that the receiving processor success- 2 STREAMINGCORRECTNESSCHALLENGES fully gets the message, processes its associated records and updates 2.1 Fault Tolerance and Consistency its states, but fails to send an ack back to the sender. It is also possi- ble that the receiving processor does not fail at all but a jitter in the Consistency has for long been an open research issue in stream network delays the ack back to the sender, exceeding the sender’s processing partially due to the lack of a formal specification of the ack timeout. Since the sender cannot know the exact reason for the problem itself. In this paper, we primarily focus on consistency lack of acknowledgements, in practice it is forced to assume the guarantees in the face of a failure. Most streaming systems should message was not received and processed successfully and hence beabletoproducecorrectresultsduringfailure-freeexecutions,but needs to retry. In this case, the same record may be sent multiple completely masking a failure is quite hard. In distributed systems, times between processors, causing the record to be processed more fault tolerance is the capability to continue system operations in than once in the streaming system. spiteoffailures.Additionally,thedeliveredserviceshouldbeasifno Today, most distributed streaming systems rely on checkpoint- failures ever happened. Whenitcomestostreamprocessing,people ing mechanisms to tolerate failures: during normal execution, the usually refer to this guarantee as exactly-once semantics. In recent system periodically checkpoints its processing state, which can years,thistermhaslentitselftoseveraldifferentinterpretationsand then act as snapshots to which the system falls back in case of a for clarity of presentation we define exactly-once as the following: failure. To align multiple state checkpoints throughout the process- foreachrecordfromtheinputdatastream,itsprocessingresultswill ing pipeline, these systems usually inject punctuations into data be reflected exactly once even under failures. Results are reflected streamsassynchronizationbarriers[31].Asaresult,thecompletion in two ways: result records in the output data streams, but also of the checkpoints is not only determined by the amount of data internal state updates in stateful stream processing operators. to checkpoint, but also on the speed at which punctuations flow Astreaming system must be tolerant to a number of failure through the application. If there is backpressure in data process- scenarios which may even occur at the same time in practice: ing, checkpoint efficiency would also be impacted. In addition, the Thestorageenginecanfail: If a stream processing system does checkpointing mechanism alone is not sufficient to accomplish the notguaranteethatallofitsrunningstateispersistedÐe.g.ifitstores desired semantics: since processing results may have already been all state in main memory or if it asynchronously flushes the state to emitted before a failure, after state is rolled back to a prior check- persistentstorageenginesÐthenuponfailuressomeorallofitsstate point, re-processing the input records may cause duplicated results maybelost. In this case, the stream processing system would have in the output streams. In the literature, this is usually termed as the to redo computations from the beginning, hoping the processed outputcommitproblem[34].Thereareseveralknownsolutionstore- inputdatastreamshavenotvanishedyet.Duringtherecomputation solvethisproblem,suchasassigninguniqueidsthatcontainlineage period, new streaming data would not be processed, reducing the information of streaming records, or keeping track of the received system’s availability. What is worse, if the recomputation cannot records’ unique logical timestamps for de-duplication [12, 27, 32]. generate exactly the same state as before the failure due to reasons However, because the inter-operator communication channels are suchasanapproximaterestartpointornondeterministiclogic,then usually memoryless, such de-duplication approaches have to de- queries before and after the failure may return inconsistent results. pendonthedownstreamoperatorsthemselvestodetectanddiscard The stream processor can fail: When a stream processor fails duplicated inputs based on locally maintained lineage information. over to a new host, it must be able to recover exactly the same state 2.2 Out-of-order Handling and Completeness andresumeprocessingattheexactpointwhereitleftoff.However, the processor may crash after successfully processing a record and Streaming systems receive and process input data streams continu- persisting a state update but before acknowledging the reception of ously in a certain order to provide semantically correct results. The that record. Hence, when it resumes from a failure event, the same order of the input records typically represents the logical prece- record could be received and processed again, causing not only dence when the input stream records are generated, as indicated duplicated outgoing records but also incorrect state with double by their timestamps. However, in practice, the order of data stream updates. Even worse, when a processor temporarily loses connec- records may be disturbed, where some records may appear in a tivity to others in a distributed environment, it may be deemed datastreamwithsmallertimestampsafterotherrecordswithlarger as failed permanently and a new instance may be started up to timestamps. The most common external factor that results in out- replace it while the disconnected processor continues to work on of-order data streams are clock skewness and network delays. More- its own. In this case, we can have duplicated processors fetching over, stream processors that read multiple input streams and merge 1 builder . stream (" pageview−events ") the log. Kafka records within a partition log are fetched in the 2 . f i l t e r ( ( key , view ) −> view. period >= 30000) sameordertheywereappended,calledtheoffset order. However, 3 . map(( key , view ) −> new KeyValue(view . category , view )) as mentioned in Section 2.2, in practice this offset order does not 4 . groupByKey () 5 . windowedBy(TimeWindows. of (5000)) necessarily reflect the logical order of the streaming events those 6 . count () records represent. To tackle this issue, each record also has an 7 . toStream ( ) . to (" pageview−windowed−counts ") embeddedtimestamp field. Producers can set the record timestamp Figure 2: Example in Kafka Streams DSL to represent event time. Processing systems may use the timestamp field to properly handle out-of-order data, in which records that are them into a single output stream might also shuffle the order in appended later actually have smaller timestamps than some others whichrecords are transmitted and introduce artificial disorder. that are appended earlier in the log [33]. If no out-of-order data exists in the input data streams, guaran- teeing complete final results is straightforward: whenever an input 3.2 StreamsDSLandOperatorTopology record with timestamp t + 1 is received, we can infer that all the KafkaStreamsisaJavalibraryincludedinApacheKafkathatallows events up to timestamp t have been completed and their results users to build real-time stateful stream processing applications. reflected in the emitted outputs. However, if out-of-order data does The library contains a high-level DSL for users to specify their exist, a streaming processor may emit partial results from incom- streaming logic that continuously reads data streams from source plete input streams. Reasoning about completeness and managing Kafka topics with consumer clients, transforms the input streams disorder are fundamental considerations for streaming systems. into new streams and evolving tables, and finally pipes the result Checkpoint-based approaches tend to block emitting results un- streams back to sink Kafka topics. It is also used as the underlying til the system is confident that all events up to a certain point in parallel runtime of ksqlDB [24], an event streaming database built time are complete. To make this determination, they either rely to work with streaming data in Apache Kafka. ksqlDB takes input on external signals or internal indicators injected into the input data streams stored in Kafka and applies continuous queries that streams. Similarly, micro-batching techniques [36, 40] break down derive new data streams or materialized views such as continuous unbounded streams into batches of bounded data in which each aggregates over windows. Those continuous queries submitted to batch represents a window of the unbounded stream. When the ksqlDB are compiled and executed as Kafka Streams applications batch of input records are considered as complete, its processing that run indefinitely until terminated. will then be triggered synchronously and the updated states be Figure 2 illustrates an example application written in the Kafka committed to external storage. Streams DSL. The application first reads in an event stream from 3 STREAMPROCESSINGINAPACHEKAFKA Kafka topic pageview-events, filters out those pageview events Apache Kafka has been used in many of the largest companies whoseperiod is less than 30 seconds, and then creates a windowed across industries as the backbone for data pipelines, streaming count of the number of pageviews per category every 5 seconds. analytics, data integration, and mission-critical applications. Data Thewindowedcountisconsideredanevolvingtable whose aggre- streams stored in Kafka are organized in topics, and each topic can gate results are continuously updated and new update records are be divided into one or more partitions. Each partition is maintained appended as a changelog stream. The changelog stream is written as an immutable sequence of records, i.e. a log. Partitions can be back to another Kafka topic pageview-windowed-counts. continuously appended by producer clients and continuously read Kafka Streams translates the application logic in Figure 2 to byconsumerclients [25]. a topology composed of connected data transformation operators. A topology is sub-divided into sub-topologies where each sub- 3.1 Partitions and Timestamped Records topology consists of consecutive operators between which no data shuffling through network is required. For example, the filter Thepartitioning mechanism allows for the horizontal scalability andthemapoperatorintheexampleapplicationbelongtothesame of Kafka where log partitions are hosted on different Kafka bro- sub-topology. No data shuffling is required between them since kers. The actual processing of records is done by the consumer the filter only removes records from the data stream and does not clients. Consumer clients can subscribe to one or more topic par- change the partitioning key of the records. In contrast, the map titions and read the records from those partition logs in append andthecountaggregationoperator do not belong to the same sub- order. In addition, multiple consumer clients with a common sub- topology. This is because map may change the partitioning key of scription can form a consumer group. Kafka assigns the subscribed the records and the key-based count requires all records with the topic partitions to the members of the consumer group such that samekeytobecontainedinonepartition, therefore data shuffling each topic partition is consumed by a single member at any time. based on the new key is required between the operators. Operators Kafka consumer groups handle task assignment, rebalancing due within a sub-topology are effectively fused together as upstream to membershipchanges,anddurableprogresstracking,providinga operators can directly pass data to down stream operators within solid distributed systems foundation upon which stream processing the sub-topology without incurring any network overhead [20]. applications may be built. Whentheprocessing logic requires reshuffling input streams Records stored in the topic partitions are key-value pairs, and for data localities, such as the key-based count operator in our each record has an assigned incremental offset when appended example, Kafka Streams routes the data through a repartition topic. to the partition log to uniquely identify the record’s position in Upstream sub-topologies produce to the repartition topic as a sink
no reviews yet
Please Login to review.