DBInputFormat om gegevens van SQL naar NoSQL-database over te brengen



Het doel van deze blog is om te leren hoe u gegevens van SQL-databases naar HDFS kunt overbrengen, hoe u gegevens van SQL-databases naar NoSQL-databases kunt overbrengen.

In deze blog verkennen we de mogelijkheden en mogelijkheden van een van de belangrijkste componenten van Hadoop-technologie, namelijk MapReduce.

Tegenwoordig gebruiken bedrijven het Hadoop-framework als hun eerste keuze voor gegevensopslag vanwege de mogelijkheden om grote gegevens effectief te verwerken. Maar we weten ook dat de gegevens veelzijdig zijn en in verschillende structuren en formaten bestaan. Om zo'n enorme verscheidenheid aan gegevens en de verschillende formaten ervan te beheersen, moet er een mechanisme zijn om alle variëteiten te accommoderen en toch een effectief en consistent resultaat te produceren.





Het krachtigste onderdeel in het Hadoop-framework is MapReduce, dat de controle over de gegevens en de structuur ervan beter kan bieden dan zijn andere tegenhangers. Hoewel het overhead van leercurve en de programmeercomplexiteit vereist, als je deze complexiteit aankan, kun je met Hadoop zeker omgaan met alle soorten gegevens.

Het MapReduce-framework verdeelt al zijn verwerkingstaken in twee fasen: Map en Reduce.



Het voorbereiden van uw onbewerkte gegevens voor deze fasen vereist kennis van enkele basisklassen en interfaces. De superklasse voor deze herverwerking is Invoer formaat.

De Invoer formaat class is een van de kernklassen in de Hadoop MapReduce API. Deze klasse is verantwoordelijk voor het definiëren van twee belangrijke dingen:

  • Gegevens worden gesplitst
  • Record lezer

Gegevens splitsen is een fundamenteel concept in het Hadoop MapReduce-framework dat zowel de grootte van individuele kaarttaken als de potentiële uitvoerserver definieert. De Record Reader is verantwoordelijk voor het daadwerkelijk lezen van records uit het invoerbestand en het indienen ervan (als sleutel / waarde-paren) naar de mapper.



Het aantal mappers wordt bepaald op basis van het aantal splitsingen. Het is de taak van InputFormat om de splitsingen te maken. Meestal is de splitsingsgrootte gelijk aan de blokgrootte, maar het is niet altijd dat splitsingen worden gemaakt op basis van de HDFS-blokgrootte. Het hangt er volledig van af hoe de methode getSplits () van uw InputFormat is overschreven.

Er is een fundamenteel verschil tussen MR-split en HDFS-blok. Een blok is een fysiek stuk gegevens, terwijl een splitsing slechts een logisch stuk is dat een mapper leest. Een splitsing bevat niet de invoergegevens, het bevat alleen een referentie of adres van de gegevens. Een splitsing heeft in feite twee dingen: een lengte in bytes en een reeks opslaglocaties, die slechts strings zijn.

Laten we een voorbeeld nemen om dit beter te begrijpen: gegevens verwerken die zijn opgeslagen in uw MySQL met MR. Omdat er in dit geval geen concept van blokken is, is de theorie: 'splitsingen worden altijd gemaakt op basis van het HDFS-blok',mislukt. Een mogelijkheid is om splitsingen te maken op basis van rijen rijen in uw MySQL-tabel (en dit is wat DBInputFormat doet, een invoerformaat voor het lezen van gegevens uit relationele databases). Mogelijk hebben we een k aantal splitsingen bestaande uit n rijen.

Alleen voor de InputFormats op basis van FileInputFormat (een InputFormat voor het verwerken van gegevens die in bestanden zijn opgeslagen) worden de splitsingen gemaakt op basis van de totale grootte, in bytes, van de invoerbestanden. De blokkering van de invoerbestanden door het FileSystem wordt echter behandeld als een bovengrens voor invoersplitsingen. Als u een bestand heeft dat kleiner is dan de HDFS-blokgrootte, krijgt u slechts één mapper voor dat bestand. Als u zich anders wilt gedragen, kunt u mapred.min.split.size gebruiken. Maar het hangt weer uitsluitend af van de getSplits () van uw InputFormat.

We hebben zoveel reeds bestaande invoerformaten beschikbaar onder pakket org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

De standaardinstelling is TextInputFormat.

Evenzo hebben we zoveel uitvoerformaten die de gegevens van verloopstukken lezen en opslaan in HDFS:

wat is een javaboon

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Standaard is TextOutputFormat.

Tegen de tijd dat je deze blog hebt gelezen, zou je hebben geleerd:

wat is big data en hadoop
  • Hoe een kaartreductieprogramma te schrijven
  • Over verschillende soorten InputFormats die beschikbaar zijn in Mapreduce
  • Wat is de behoefte aan InputFormats
  • Hoe aangepaste invoerformaten te schrijven
  • Gegevens overbrengen van SQL-databases naar HDFS
  • Gegevens overbrengen van SQL (hier MySQL) -databases naar NoSQL-databases (hier Hbase)
  • Hoe u gegevens van de ene SQL-database naar een andere tabel in SQL-databases kunt overbrengen (misschien is dit niet zo belangrijk als we dit in dezelfde SQL-database doen. Er is echter niets mis mee om hiervan kennis te hebben. U weet maar nooit hoe het in gebruik kan komen)

Voorwaarde:

  • Hadoop vooraf geïnstalleerd
  • SQL vooraf geïnstalleerd
  • Hbase vooraf geïnstalleerd
  • Basiskennis van Java
  • MapReduce kennis
  • Basiskennis Hadoop-framework

Laten we de probleemstelling begrijpen die we hier gaan oplossen:

We hebben een werknemerstabel in MySQL DB in onze relationele database Edureka. Nu moeten we volgens de zakelijke vereisten alle beschikbare gegevens in relationele DB naar het Hadoop-bestandssysteem verplaatsen, d.w.z. HDFS, NoSQL DB bekend als Hbase.

We hebben veel opties om deze taak uit te voeren:

  • Sqoop
  • Fluim
  • MapReduce

Nu wilt u voor deze bewerking geen andere tool installeren en configureren. Je hebt maar één optie over, namelijk Hadoop's verwerkingsraamwerk MapReduce. Het MapReduce-framework geeft u volledige controle over de gegevens tijdens het overbrengen. U kunt de kolommen manipuleren en direct op een van de twee doellocaties plaatsen.

Opmerking:

  • We moeten de MySQL-connector downloaden en in het klassenpad van Hadoop plaatsen om tabellen op te halen uit de MySQL-tabel. Om dit te doen, downloadt u de connector com.mysql.jdbc_5.1.5.jar en houdt u deze onder de map Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Plaats ook alle Hbase-jars onder Hadoop classpath om uw MR-programma toegang te geven tot Hbase. Voer hiervoor de volgende opdracht uit :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

De softwareversies die ik heb gebruikt bij het uitvoeren van deze taak zijn:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Eclipse Maan

Om het programma in elk compatibiliteitsprobleem te vermijden, schrijf ik mijn lezers voor om het commando in een vergelijkbare omgeving uit te voeren.

Aangepaste DBInputWritable:

pakket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBDBWritable publieke klasse DBInputWritable implementeert Writable, DBWritable {private int id private String naam, afdeling public void readFields (DataInput in) gooit IOException {} public void readFields (ResultSet rs) throws SQLException // Resultset-object vertegenwoordigt de gegevens die zijn geretourneerd door een SQL-instructie {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) gooit IOException { } public void write (PreparedStatement ps) gooit SQLException {ps.setInt (1, id) ps.setString (2, naam) ps.setString (3, afdeling)} public int getId () {return id} public String getName () {return name} public String getDept () {return department}}

Aangepaste DBOutputWritable:

pakket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementeert Writable, DBWritable {private String naam private int id private String afdeling public DBOutputWritable (String naam, int id, String afdeling) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) gooit IOException {} public void readFields (ResultSet rs) gooit SQLException {} public void write (DataOutput out) gooit IOException {} public void write (PreparedStatement ps) gooit SQLException {ps.setString (1, naam) ps.setInt (2, id) ps.setString (3, afdeling)}}

Invoertabel:

maak database edureka
maak tabel emp (empid int niet null, naam varchar (30), afd varchar (20), primaire sleutel (empid))
invoegen in emp-waarden (1, 'abhay', 'development'), (2, 'brundesh', 'test')
selecteer * uit emp

Geval 1: Transfer van MySQL naar HDFS

pakket com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job importeren org.apache.hadoop.mapreduce.lib.db.DBConfiguration importeren org.apache.hadoop.mapreduce.lib.db.DBInputFormat importeren org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importeren org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable publieke klasse MainDbtohdfs {public static void main (String [] args) gooit Uitzondering {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // driver class' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // gebruikersnaam' root ') // wachtwoord Job job = nieuwe job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormatClass) FileOutput new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // invoertabelnaam null, null, nieuwe String [] {'empid', 'name', 'dept'} / / tabelkolommen) Pad p = nieuw pad (args [0]) FileSystem fs = FileSystem.get (nieuwe URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Met dit stukje code kunnen we het invoerformaat voorbereiden of configureren om toegang te krijgen tot onze bron-SQL DB. De parameter bevat de driver-klasse, de URL heeft het adres van de SQL-database, de gebruikersnaam en het wachtwoord.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // gebruikersnaam 'root') //wachtwoord

Met dit stukje code kunnen we de details van de tabellen in de database doorgeven en in het job-object instellen. De parameters omvatten natuurlijk de job-instantie, de aangepaste beschrijfbare klasse die DBWritable-interface moet implementeren, de naam van de brontabel, de eventuele andere voorwaarde null, eventuele sorteerparameters null, respectievelijk de lijst met tabelkolommen.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // invoertabelnaam null, null, nieuwe String [] {'empid', 'name', 'afdeling'} // tabelkolommen)

Mapper

pakket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map breidt Mapper uit {
beschermde ongeldige kaart (LongWritable-sleutel, DBInputWritable-waarde, Context ctx) {try {String name = value.getName () IntWritable id = nieuwe IntWritable (value.getId ()) String afdeling = value.getDept ()
ctx.write (nieuwe tekst (naam + '' + id + '' + afdeling), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reducer: Identity Reducer gebruikt

Commando om uit te voeren:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Uitvoer: MySQL-tabel overgedragen naar HDFS

hadoop dfs -ls / dbtohdfs / *

Case 2: Transfer van de ene tabel in MySQL naar de andere in MySQL

het maken van een uitvoertabel in MySQL

tabel aanmaken medewerker1 (naam varchar (20), id int, afdeling varchar (20))

pakket com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable openbare klasse Mainonetable_to_other_table {public static void main (String [] args) gooit uitzondering {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // gebruikersnaam' root ') // wachtwoord Job job = nieuwe job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // invoertabelnaam null, null, nieuwe String [] {'empid ',' naam ',' afdeling '} // tabelkolommen) DBOutputFormat.setOutput (job,' werknemer1 ', // naam uitvoertabel nieuwe String [] {' naam ',' id ',' afdeling '} // tabel kolommen) System.exit (job.waitForCompletion (true)? 0: 1)}}

Met dit stukje code kunnen we de uitvoertabelnaam in SQL DB configureren. De parameters zijn respectievelijk taakinstantie, uitvoertabelnaam en de uitvoerkolomnamen.

DBOutputFormat.setOutput (job, 'employee1', // naam uitvoertabel new String [] {'naam', 'id', 'afdeling'} // tabelkolommen)

Mapper: hetzelfde als Case 1

Verloopstuk:

pakket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') probeer {ctx.write (nieuwe DBOutputWritable (regel [0] .toString (), Integer.parseInt (regel [1] .toString ())), regel [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Commando om uit te voeren:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Uitvoer: overgedragen gegevens van EMP-tabel in MySQL naar een andere tabelmedewerker1 in MySQL

Geval 3: Overzetten van tabel in MySQL naar NoSQL (Hbase) -tabel

Hbase-tabel maken om de uitvoer van de SQL-tabel te accommoderen:

maak 'medewerker', 'official_info' aan

Bestuurdersklasse:

pakket Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) gooit Uitzondering {Configuratie conf = HBaseConfiguration.create () HTableInterface mytable = nieuwe HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // gebruikersnaam 'root') // wachtwoord Job job = nieuwe job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('werknemer', Reduce.class, job) job.setOutputClass (DB) job.setInputClass.InputClass. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // invoertabelnaam null, null, nieuwe String [] {'empid', 'name', 'afdeling'} // tabelkolommen) System.exit (job.waitForCompletion (true)? 0: 1)}}

Met dit stuk code kunt u de uitvoersleutelklasse configureren die in het geval van hbase ImmutableBytesWritable is

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Hier geven we de hbase-tabelnaam en het verloopstuk door om op de tafel te handelen.

TableMapReduceUtil.initTableReducerJob ('werknemer', Reduce.class, job)

Mapper:

pakket Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map breidt Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), nieuwe tekst (regel + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

In dit stuk code nemen we waarden van de getters van de DBinputwritable-klasse en geven ze vervolgens door
ImmutableBytesWritable zodat ze het reductiemiddel bereiken in een bytewriatbare vorm die Hbase begrijpt.

String line = value.getName () String cd = value.getId () + '' String afdeling = waarde.getDept () context.write (nieuwe ImmutableBytesWritable (Bytes.toBytes (cd)), nieuwe tekst (regel + '' + ))

Verloopstuk:

pakket Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce breidt TableReducer uit {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) gooit IOException, InterruptedException {String [] cause = null // Loop-waarden for (Tekstval: waarden) {oorzaak = val.toString (). split ('')} // Zet in HBase Put put = nieuwe Put (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('naam'), Bytes.toBytes (oorzaak [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('afdeling'), Bytes.toBytes (oorzaak [1 ])) context.write (key, put)}}

Met dit stuk code kunnen we de exacte rij en de kolom bepalen waarin we waarden van het verloopstuk zouden opslaan. Hier slaan we elk empid op in een aparte rij, omdat we empid hebben gemaakt als een rijtoets die uniek zou zijn. In elke rij slaan we de officiële informatie van de werknemers op onder respectievelijk de kolomfamilie “official_info” onder de kolommen “naam” en “afdeling”.

Put put = nieuw Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('naam'), Bytes.toBytes (oorzaak [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (oorzaak [1])) context.write (sleutel, put)

Overgedragen gegevens in Hbase:

scan medewerker

Zoals we zien, hebben we de taak van het migreren van onze bedrijfsgegevens van een relationele SQL-database naar een NoSQL-database met succes voltooid.

In de volgende blog zullen we leren hoe we codes kunnen schrijven en uitvoeren voor andere invoer- en uitvoerformaten.

Blijf uw opmerkingen, vragen of feedback posten. Ik zou graag van u horen.

Heeft u een vraag voor ons? Vermeld het in het opmerkingengedeelte en we nemen contact met u op.

Gerelateerde berichten: