Cumulatieve stateful transformatie in Apache Spark-streaming



Deze blogpost bespreekt stateful transformaties in Spark Streaming. Leer alles over cumulatieve tracking en bijscholing voor een Hadoop Spark-carrière.

Bijgedragen door Prithviraj Bose

In mijn vorige blog heb ik stateful transformaties besproken met behulp van het vensterconcept van Apache Spark Streaming. U kunt het lezen hier .





In dit bericht ga ik cumulatieve stateful bewerkingen in Apache Spark Streaming bespreken. Als je nieuw bent bij Spark Streaming, raad ik je ten zeerste aan om mijn vorige blog te lezen om te begrijpen hoe windowing werkt.

Typen stateful transformatie in Spark Streaming (vervolg ...)

> Cumulatieve tracking

We hadden de reduceByKeyAndWindow (…) API om de status van sleutels bij te houden, maar vensters vormen beperkingen voor bepaalde gebruiksscenario's. Wat als we de toestanden van de sleutels overal willen verzamelen in plaats van deze te beperken tot een tijdvenster? In dat geval zouden we moeten gebruiken updateStateByKey (…) BRAND.



Deze API is geïntroduceerd in Spark 1.3.0 en is erg populair. Deze API heeft echter enige prestatieoverhead, de prestaties nemen af ​​naarmate de grootte van de toestanden in de loop van de tijd toeneemt. Ik heb een voorbeeld geschreven om het gebruik van deze API te laten zien. Je kunt de code vinden hier .

Spark 1.6.0 heeft een nieuwe API geïntroduceerd mapWithState (…) waarmee de overheadkosten van de prestaties worden opgelost updateStateByKey (…) . In deze blog ga ik deze specifieke API bespreken met behulp van een voorbeeldprogramma dat ik heb geschreven. Je kunt de code vinden hier .

Laten we, voordat ik in een codewandeling duik, een paar woorden over checkpointing sparen. Voor elke stateful transformatie is checkpointing verplicht. Checkpointing is een mechanisme om de staat van de sleutels te herstellen voor het geval het stuurprogramma mislukt. Wanneer de driver opnieuw opstart, wordt de staat van de sleutels hersteld vanuit de checkpointing-bestanden. Checkpoint-locaties zijn meestal HDFS of Amazon S3 of een betrouwbare opslag. Tijdens het testen van de code kan men deze ook opslaan in het lokale bestandssysteem.



In het voorbeeldprogramma luisteren we naar socket-tekststroom op host = localhost en poort = 9999. Het tokeniseert de inkomende stroom in (woorden, aantal exemplaren) en volgt het aantal woorden met behulp van de 1.6.0 API mapWithState (…) . Bovendien worden sleutels zonder updates verwijderd met StateSpec.time-out API. We checken in HDFS en de controlefrequentie is elke 20 seconden.

Laten we eerst een Spark Streaming-sessie maken,

Spark-streaming-session

We creëren een checkpointDir in de HDFS en roep vervolgens de objectmethode aan getOrCreate (...) . De getOrCreate API controleert het checkpointDir om te zien of er eerdere statussen zijn om te herstellen (als die bestaan), wordt de Spark Streaming-sessie opnieuw gemaakt en worden de statussen van de sleutels bijgewerkt op basis van de gegevens die zijn opgeslagen in de bestanden voordat verder wordt gegaan met nieuwe gegevens. Anders wordt er een nieuwe Spark Streaming-sessie gemaakt.

De getOrCreate neemt de naam van de checkpoint-directory en een functie (die we createFunc ) wiens handtekening zou moeten zijn () => StreamingContext .

kun je een reeks objecten maken in java

Laten we de code erin bekijken createFunc .

Regel 2: We creëren een streamingcontext met taaknaam naar 'TestMapWithStateJob' en batch-interval = 5 seconden.

Regel # 5: Stel de checkpoint-directory in.

Regel 8: Stel de staatsspecificatie in met behulp van de klasse org.apache.streaming.StateSpec voorwerp. We stellen eerst de functie in die de status zal volgen, daarna stellen we het aantal partities in voor de resulterende DStreams die moeten worden gegenereerd tijdens daaropvolgende transformaties. Ten slotte stellen we de time-out in (op 30 seconden) waarbij als een update voor een sleutel niet binnen 30 seconden wordt ontvangen, de sleutelstatus wordt verwijderd.

Regel 12 #: Stel de socketstroom in, maak de inkomende batchgegevens plat, maak een sleutel / waarde-paar, bel mapWithState , stel het interval voor controlepunten in op 20 seconden en druk ten slotte de resultaten af.

Het Spark-framework noemt th e createFunc voor elke sleutel met de vorige waarde en de huidige status. We berekenen de som en werken de staat bij met de cumulatieve som en uiteindelijk retourneren we de som voor de sleutel.

dubbel converteren naar int java

Github-bronnen -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Heeft u een vraag voor ons? Vermeld het in het opmerkingengedeelte en we nemen contact met u op.

Gerelateerde berichten:

Ga aan de slag met Apache Spark & ​​Scala

Stateful Transformations met Windowing in Spark Streaming