RDD met Spark: het bouwblok van Apache Spark



Deze blog over RDD met behulp van Spark geeft je een gedetailleerde en uitgebreide kennis van RDD, de fundamentele eenheid van Spark en hoe nuttig het is.

, Het woord zelf is voldoende om een ​​vonk in de geest van elke Hadoop-ingenieur te genereren. NAAR n in het geheugen verwerkingstool wat razendsnel is in clustercomputing. In vergelijking met MapReduce maakt het delen van gegevens in het geheugen RDD's 10-100x sneller dan netwerk- en schijfdeling en dit alles is mogelijk dankzij RDD's (Resilient Distributed Data sets). De belangrijkste punten waar we ons vandaag in dit RDD-gebruik van Spark-artikel op richten zijn:

RDD's nodig?

Waarom hebben we RDD nodig? -RDD met Spark





De wereld evolueert mee en Data Science vanwege de vooruitgang in . Algoritmen gebaseerd op Regressie , , en die doorloopt Verdeeld Iteratieve Comput ation mode die het hergebruiken en delen van gegevens tussen meerdere computereenheden omvat.

De traditionele technieken hadden een stabiele tussenliggende en gedistribueerde opslag nodig, zoals HDFS bestaande uit repetitieve berekeningen met datareplicaties en dataserialisatie, waardoor het proces een stuk langzamer ging. Een oplossing vinden was nooit gemakkelijk.



Dit is waar RDD's (Resilient Distributed Datasets) komt naar het grote plaatje.

RDD s zijn gemakkelijk te gebruiken en moeiteloos te maken omdat gegevens worden geïmporteerd uit gegevensbronnen en in RDD's worden neergezet. Verder worden de bewerkingen toegepast om ze te verwerken. Ze zijn een gedistribueerde geheugenverzameling met machtigingen als Alleen lezen en vooral: ze zijn Fouttolerant .



Eventueel gegevenspartitie van de RDD is verloren , kan het worden geregenereerd door hetzelfde toe te passen transformatie bewerking op die verloren partitie in afstamming , in plaats van alle gegevens helemaal opnieuw te verwerken. Dit soort benadering in realtime scenario's kan wonderen doen gebeuren in situaties van gegevensverlies of wanneer een systeem uitvalt.

Wat zijn RDD's?

RDD of ( Veerkrachtige gedistribueerde dataset ) is een fundamentele data structuur in Spark. De voorwaarde Veerkrachtig definieert de mogelijkheid die de gegevens automatisch of gegevens genereert terugrollen naar de originele staat wanneer zich een onverwachte calamiteit voordoet met kans op gegevensverlies.

De gegevens die in RDD's worden geschreven, zijn gepartitioneerd en opgeslagen in meerdere uitvoerbare knooppunten . Als een uitvoerend knooppunt mislukt in de looptijd, dan krijgt het onmiddellijk de back-up van de volgende uitvoerbare knoop . Dit is de reden waarom RDD's worden beschouwd als een geavanceerd type datastructuren in vergelijking met andere traditionele datastructuren. RDD's kunnen gestructureerde, ongestructureerde en semi-gestructureerde gegevens opslaan.

Laten we verder gaan met onze RDD met behulp van Spark-blog en meer te weten komen over de unieke kenmerken van RDD's die het een voorsprong geven op andere soorten gegevensstructuren.

Kenmerken van RDD

  • In het geheugen (RAM) Berekeningen : Het concept van in-Memory-berekening brengt de gegevensverwerking naar een snellere en efficiëntere fase waar de algehele prestatie van het systeem is opgewaardeerd.
  • L. zijn evaluatie : De term luie evaluatie zegt het transformaties worden toegepast op de gegevens in RDD, maar de uitvoer wordt niet gegenereerd. In plaats daarvan zijn de toegepaste transformaties aangemeld.
  • Persistentie : De resulterende RDD's zijn altijd herbruikbaar.
  • Grofkorrelige bewerkingen : De gebruiker kan transformaties toepassen op alle elementen in datasets via kaart, filter of groeperen op operaties.
  • Fouttolerant : Als er gegevens verloren gaan, kan het systeem terugrollen naar zijn originele staat door het gelogde transformaties .
  • Onveranderlijkheid : Gegevens die zijn gedefinieerd, opgehaald of gemaakt kunnen niet veranderd zodra het is ingelogd op het systeem. Als u de bestaande RDD moet openen en wijzigen, moet u een nieuwe RDD maken door een set Transformatie functioneert op de huidige of voorgaande RDD.
  • Verdeling : Het is de cruciale eenheid van parallellisme in Spark RDD. Het aantal gemaakte partities is standaard gebaseerd op uw gegevensbron. U kunt zelfs beslissen hoeveel partities u wilt gebruiken aangepaste partitie functies.

Aanmaken van RDD met Spark

RDD's kunnen worden gemaakt in drie manieren:

  1. Gegevens lezen van parallelle verzamelingen
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Toepassen transformatie op eerdere RDD's
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'krachtig', 'taal')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Gegevens lezen van externe opslag of bestandspaden zoals HDFS of HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Bewerkingen uitgevoerd op RDD's:

Er zijn hoofdzakelijk twee soorten bewerkingen die worden uitgevoerd op RDD's, namelijk:

  • Transformaties
  • Acties

Transformaties : De operaties we passen RDD's toe op filter, toegang en aanpassen de gegevens in de bovenliggende RDD om een opeenvolgende RDD wordt genoemd transformatie . De nieuwe RDD retourneert een pointer naar de vorige RDD om de onderlinge afhankelijkheid te verzekeren.

Transformaties zijn Luie evaluaties, met andere woorden, de bewerkingen die worden toegepast op de RDD waarmee u werkt, worden gelogd, maar niet uitgevoerd. Het systeem genereert een resultaat of uitzondering na het activeren van de Actie .

We kunnen transformaties in twee typen verdelen, zoals hieronder:

  • Smalle transformaties
  • Brede transformaties

Smalle transformaties We passen smalle transformaties toe op een enkele partitie van de bovenliggende RDD om een ​​nieuwe RDD te genereren, aangezien de gegevens die nodig zijn om de RDD te verwerken beschikbaar zijn op een enkele partitie van het ouder ASD . De voorbeelden voor smalle transformaties zijn:

  • kaart()
  • filter()
  • flatMap ()
  • partitie ()
  • mapPartitions ()

Brede transformaties: We passen de brede transformatie toe op meerdere partities om een ​​nieuwe RDD te genereren. De gegevens die nodig zijn om de RDD te verwerken, zijn beschikbaar op de meerdere partities van het ouder ASD . De voorbeelden voor brede transformaties zijn:

  • reduceBy ()
  • unie()

Acties : Acties instrueren Apache Spark om toe te passen berekening en geef het resultaat of een uitzondering terug aan de RDD van de bestuurder. Enkele van de acties zijn:

  • verzamelen()
  • tellen ()
  • nemen()
  • eerste()

Laten we de bewerkingen praktisch toepassen op RDD's:

IPL (Indiase Premier League) is een crickettoernooi met zijn hoogtepunt op het hoogste niveau. Dus laten we vandaag de IPL-dataset in handen krijgen en onze RDD uitvoeren met Spark.

  • Ten eerste, laten we een CSV-overeenkomstgegevens van IPL downloaden. Na het downloaden begint het eruit te zien als een EXCEL-bestand met rijen en kolommen.

In de volgende stap starten we de vonk en laden we het bestand matches.csv vanaf de locatie, in mijn geval mijncsvbestandslocatie is '/User/edureka_566977/test/matches.csv'

Laten we nu beginnen met de Transformatie deel eerst:

zout vs chef vs pop
  • kaart():

We gebruiken Kaarttransformatie om een ​​specifieke transformatiebewerking toe te passen op elk element van een RDD. Hier maken we een RDD met de naam CKfile waar onzecsvhet dossier. We zullen nog een RDD creëren met de naam Staten Sla de stadsgegevens op .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filter():

Filtertransformatie, de naam zelf beschrijft het gebruik ervan. We gebruiken deze transformatiebewerking om de selectieve gegevens uit een verzameling gegeven gegevens te filteren. We zijn van toepassing filter werking hier om de records van de IPL-wedstrijden van het jaar te krijgen 2017 en bewaar het in het bestand RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

We passen flatMap toe, een transformatiebewerking op elk van de elementen van een RDD om een ​​nieuwe RDD te creëren. Het is vergelijkbaar met kaarttransformatie. hier zijn we van toepassingFlatmapnaar spuug de lucifers van de stad Hyderabad uit en sla de gegevens op infilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • partitie ():

Alle gegevens die we naar een RDD schrijven, worden opgesplitst in een bepaald aantal partities. We gebruiken deze transformatie om de aantal partities de gegevens zijn feitelijk opgesplitst in.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

We beschouwen MapPatitions als een alternatief voor Map () envoor elk() samen. We gebruiken hier mapPartitions om het aantal rijen we hebben in ons bestand RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

We gebruikenReduceBy() Aan Sleutel-waarde-paren . We gebruikten deze transformatie op onzecsvbestand om de speler met de hoogste Man van de wedstrijden .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • unie():

De naam verklaart het allemaal, we gebruiken vakbondstransformatie om club twee RDD's samen . Hier maken we twee RDD's, namelijk fil en fil2. fil RDD bevat de records van 2017 IPL-overeenkomsten en fil2 RDD bevat 2016 IPL-matchrecord.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Laten we beginnen met de Actie deel waar we de werkelijke output laten zien:

  • verzamelen():

Verzamelen is de actie die we gebruiken de inhoud weergeven in de RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • tellen ():

Tellenis een actie die we gebruiken om de aantal records aanwezig in de RDD.Hierwe gebruiken deze bewerking om het totale aantal records in ons matches.csv-bestand te tellen.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • nemen():

Take is een actie die vergelijkbaar is met verzamelen, maar het enige verschil is dat er een kan worden afgedrukt selectief aantal rijen vanaf gebruikersverzoek. Hier passen we de volgende code toe om het top tien toonaangevende rapporten.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • eerste():

First () is een actie-operatie vergelijkbaar met collect () en take ()hetgebruikt om het bovenste rapport af te drukken de uitvoer Hier gebruiken we de eerste () bewerking om de maximum aantal wedstrijden gespeeld in een bepaalde stad en we krijgen Mumbai als output.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Om ons proces, ons leren RDD met behulp van Spark, nog interessanter te maken, heb ik een interessante use case bedacht.

RDD met Spark: Pokemon Use Case

  • Ten eerste, Laten we een Pokemon.csv-bestand downloaden en het in de spark-shell laden zoals we deden in het Matches.csv-bestand.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemons zijn eigenlijk beschikbaar in een grote variëteit, laten we een paar soorten vinden.

  • Schema verwijderen uit het Pokemon.csv-bestand

We hebben de Schema van Pokemon.csv-bestand. Daarom verwijderen we het.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Het aantal partities onze pokemon.csv wordt gedistribueerd naar.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Water Pokemon

Het vinden van het aantal Water Pokemon

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Vuur Pokemon

Het vinden van het aantal Fire Pokemon

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • We kunnen ook de bevolking van een ander type Pokemon met behulp van de count-functie
WaterRDD.count () FireRDD.count ()

  • Omdat ik het spel leuk vind defensieve strategie laten we de pokemon zoeken met maximale verdediging.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • We kennen het maximum waarde van de verdedigingskracht maar we weten niet welke Pokemon het is. dus, laten we eens kijken wat dat is Pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. kaart {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Bestellen [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Laten we nu de Pokemon uitzoeken met minste verdediging
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

hoe classpath in java in windows 10 in te stellen
  • Laten we nu de Pokemon bekijken met een minder defensieve strategie.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val def2WithPokemon2 .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Dus hiermee komen we aan het einde van deze RDD met behulp van het Spark-artikel. Ik hoop dat we een beetje licht hebben geworpen op uw kennis over RDD's, hun functies en de verschillende soorten bewerkingen die erop kunnen worden uitgevoerd.

Dit artikel is gebaseerd op is ontworpen om u voor te bereiden op het Cloudera Hadoop en Spark Developer Certification Exam (CCA175). Je krijgt een grondige kennis van Apache Spark en het Spark-ecosysteem, waaronder Spark RDD, Spark SQL, Spark MLlib en Spark Streaming. Je krijgt uitgebreide kennis van Scala-programmeertaal, HDFS, Sqoop, Flume, Spark GraphX ​​en Messaging-systemen zoals Kafka.