DBInputFormat pour transférer des données de SQL vers une base de données NoSQL

L'objectif de ce blog est d'apprendre comment transférer des données de bases de données SQL vers HDFS, comment transférer des données de bases de données SQL vers des bases de données NoSQL.

Dans ce blog, nous explorerons les capacités et les possibilités de l'un des composants les plus importants de la technologie Hadoop, à savoir MapReduce.



Aujourd'hui, les entreprises adoptent le cadre Hadoop comme premier choix pour le stockage de données en raison de ses capacités à gérer efficacement des données volumineuses. Mais nous savons également que les données sont polyvalentes et existent dans diverses structures et formats. Pour contrôler une si grande variété de données et ses différents formats, il devrait y avoir un mécanisme pour accueillir toutes les variétés tout en produisant un résultat efficace et cohérent.



Le composant le plus puissant du framework Hadoop est MapReduce qui peut fournir le contrôle sur les données et leur structure mieux que ses autres homologues. Bien que cela nécessite une surcharge de la courbe d'apprentissage et de la complexité de la programmation, si vous pouvez gérer ces complexités, vous pouvez sûrement gérer tout type de données avec Hadoop.

Le framework MapReduce divise toutes ses tâches de traitement en deux phases: mapper et réduire.



La préparation de vos données brutes pour ces phases nécessite la compréhension de certaines classes et interfaces de base. La super classe pour ces retraits est InputFormat.

navigateur db pour le didacticiel sqlite

La InputFormat class est l'une des classes principales de l'API Hadoop MapReduce. Cette classe est chargée de définir deux choses principales:

  • Division des données
  • Lecteur d'enregistrement

Répartition des données est un concept fondamental du framework Hadoop MapReduce qui définit à la fois la taille des tâches cartographiques individuelles et son serveur d'exécution potentiel. La Lecteur d'enregistrement est responsable de la lecture réelle des enregistrements du fichier d'entrée et de leur soumission (sous forme de paires clé / valeur) au mappeur.



Le nombre de mappeurs est décidé en fonction du nombre de divisions. C'est le travail d'InputFormat de créer les fractionnements. La plupart du temps, la taille de la division est équivalente à la taille du bloc, mais ce n’est pas toujours que les divisions seront créées en fonction de la taille du bloc HDFS. Cela dépend totalement de la façon dont la méthode getSplits () de votre InputFormat a été remplacée.

Il existe une différence fondamentale entre la division MR et le bloc HDFS. Un bloc est un bloc physique de données tandis qu'un fractionnement n'est qu'un bloc logique que lit un mappeur. Un fractionnement ne contient pas les données d'entrée, il contient simplement une référence ou une adresse des données. Une division a essentiellement deux choses: une longueur en octets et un ensemble d'emplacements de stockage, qui ne sont que des chaînes.

Pour mieux comprendre cela, prenons un exemple: le traitement des données stockées dans votre MySQL à l’aide de MR. Puisqu'il n'y a pas de concept de blocs dans ce cas, la théorie: 'les divisions sont toujours créées sur la base du bloc HDFS',échoue. Une possibilité est de créer des fractionnements basés sur des plages de lignes dans votre table MySQL (et c'est ce que fait DBInputFormat, un format d'entrée pour lire des données à partir d'une base de données relationnelle). Nous pouvons avoir k nombre de divisions constituées de n lignes.

Ce n'est que pour les InputFormats basés sur FileInputFormat (un InputFormat pour gérer les données stockées dans des fichiers) que les fractionnements sont créés en fonction de la taille totale, en octets, des fichiers d'entrée. Cependant, la taille de bloc FileSystem des fichiers d'entrée est traitée comme une limite supérieure pour les fractionnements d'entrée. Si votre fichier est plus petit que la taille de bloc HDFS, vous n’obtiendrez qu’un seul mappeur pour ce fichier. Si vous souhaitez avoir un comportement différent, vous pouvez utiliser mapred.min.split.size. Mais encore une fois, cela dépend uniquement du getSplits () de votre InputFormat.

Nous avons tellement de formats d'entrée préexistants disponibles sous le package org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

La valeur par défaut est TextInputFormat.

De même, nous avons tellement de formats de sortie qui lisent les données des réducteurs et les stockent dans HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

surcharge de fonction c ++

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

La valeur par défaut est TextOutputFormat.

Au moment où vous aurez fini de lire ce blog, vous auriez appris:

  • Comment écrire un programme de réduction de carte
  • À propos des différents types de InputFormats disponibles dans Mapreduce
  • Quel est le besoin de InputFormats
  • Comment écrire des InputFormats personnalisés
  • Comment transférer des données de bases de données SQL vers HDFS
  • Comment transférer des données de bases de données SQL (ici MySQL) vers des bases de données NoSQL (ici Hbase)
  • Comment transférer des données d'une base de données SQL vers une autre table dans des bases de données SQL (Peut-être que cela n'est peut-être pas très important si nous faisons cela dans la même base de données SQL. Cependant, il n'y a rien de mal à en avoir une connaissance. On ne sait jamais comment il peut être utilisé)

Prérequis:

  • Hadoop pré-installé
  • SQL pré-installé
  • Hbase préinstallée
  • Compréhension de base de Java
  • MapReduce connaissance
  • Connaissances de base du framework Hadoop

Comprenons l’énoncé du problème que nous allons résoudre ici:

Nous avons une table d'employés dans MySQL DB dans notre base de données relationnelle Edureka. Maintenant, conformément aux exigences de l'entreprise, nous devons transférer toutes les données disponibles dans la base de données relationnelle vers le système de fichiers Hadoop, c'est-à-dire HDFS, NoSQL DB connue sous le nom de Hbase.

Nous avons de nombreuses options pour effectuer cette tâche:

  • Sqoop
  • Buse
  • MapReduce

Maintenant, vous ne souhaitez pas installer et configurer un autre outil pour cette opération. Il ne vous reste qu’une seule option qui est le cadre de traitement de Hadoop MapReduce. Le framework MapReduce vous donnerait un contrôle total sur les données lors du transfert. Vous pouvez manipuler les colonnes et les placer directement à l'un des deux emplacements cibles.

Remarque:

  • Nous devons télécharger et placer le connecteur MySQL dans le chemin de classe de Hadoop pour récupérer les tables de la table MySQL. Pour ce faire, téléchargez le connecteur com.mysql.jdbc_5.1.5.jar et conservez-le sous le répertoire Hadoop_home / share / Hadoop / MaPreduce / lib.
Téléchargements cp / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • De plus, placez tous les pots Hbase sous le chemin de classe Hadoop afin de permettre à votre programme MR d'accéder à Hbase. Pour ce faire, exécutez la commande suivante :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Les versions de logiciel que j'ai utilisées dans l'exécution de cette tâche sont:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Lune éclipse

Afin d'éviter le programme dans tout problème de compatibilité, je prescrit à mes lecteurs d'exécuter la commande avec un environnement similaire.

DBInputWritable personnalisé:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable classe publique DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException // L'objet Resultset représente les données renvoyées par une instruction SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) jette IOException { } public void write (PreparedStatement ps) jette SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

DBOutputWritable personnalisé:

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implements Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) jette IOException {} public void readFields (ResultSet rs) jette SQLException {} public void write (DataOutput out) jette IOException {} public void write (PreparedStatement) ps) lance SQLException {ps.setString (1, nom) ps.setInt (2, id) ps.setString (3, dept)}}

Tableau d'entrée:

créer une base de données edureka
créer une table emp (empid int non nul, nom varchar (30), dept varchar (20), clé primaire (empid))
insérer dans les valeurs emp (1, 'abhay', 'developement'), (2, 'brundesh', 'test')
sélectionnez * de emp

Cas 1: transfert de MySQL vers HDFS

package com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable Classe publique MainDbtohdfs {public static void main (String [] args) lève l'exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // classe de pilote' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // nom d'utilisateur' root ') // mot de passe Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInormputFormat.classFichier) new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nom de la table d'entrée null, null, new String [] {'empid', 'name', 'dept'} / / table colonnes) Chemin p = nouveau chemin (args [0]) FileSystem fs = FileSystem.get (nouvel URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ce morceau de code nous permet de préparer ou de configurer le format d'entrée pour accéder à notre base de données SQL source. Le paramètre inclut la classe du pilote, l'URL a l'adresse de la base de données SQL, son nom d'utilisateur et le mot de passe.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de pilote 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // nom d'utilisateur 'root') //mot de passe

Ce morceau de code nous permet de transmettre les détails des tables de la base de données et de les définir dans l'objet de travail. Les paramètres incluent bien sûr l'instance de travail, la classe inscriptible personnalisée qui doit implémenter l'interface DBWritable, le nom de la table source, la condition s'il y en a d'autre null, les paramètres de tri sinon nul, la liste des colonnes de la table respectivement.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nom de la table d'entrée null, null, new String [] {'empid', 'name', 'dept'} // colonnes de la table)

Cartographe

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io Classe publique .IntWritable Map étend Mapper {
protected void map (clé LongWritable, valeur DBInputWritable, contexte ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (nouveau texte (nom + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Réducteur: Réducteur d'identité utilisé

Commande à exécuter:

pot hadoop dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Sortie: Table MySQL transférée vers HDFS

hadoop dfs -ls / dbtohdfs / *

Cas 2: transfert d'une table dans MySQL à une autre dans MySQL

création d'une table de sortie dans MySQL

créer la table employee1 (nom varchar (20), id int, dept varchar (20))

package com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable classe publique Mainonetable_to_other_table {public static void main (String [] args) jette une exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de pilote 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nom d'utilisateur' root ') // mot de passe Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Réduire.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nom de la table d'entrée null, null, new String [] {'empid ',' name ',' dept '} // colonnes de la table) DBOutputFormat.setOutput (job,' employee1 ', // sortie du nom de la table new String [] {' name ',' id ',' dept '} // table colonnes) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ce morceau de code nous permet de configurer le nom de la table de sortie dans SQL DB. Les paramètres sont respectivement l'instance de travail, le nom de la table de sortie et les noms de colonne de sortie.

DBOutputFormat.setOutput (job, 'employee1', // nom de la table de sortie new String [] {'name', 'id', 'dept'} // colonnes de la table)

Mapper: identique au cas 1

Réducteur:

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io Classe publique .NullWritable Réduire étend le réducteur {protected void reduction (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (ligne [0] .toString (), Integer.parseInt (ligne [1] .toString ()), ligne [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Commande à exécuter:

pot hadoop dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Sortie: données transférées de la table EMP dans MySQL vers une autre table Employee1 dans MySQL

Cas 3: transfert d'une table de MySQL vers une table NoSQL (Hbase)

Création de la table Hbase pour accueillir la sortie de la table SQL:

créer 'employee', 'official_info'

Classe de conducteur:

package Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de pilote 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // nom d'utilisateur 'root') // mot de passe Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('employé', Réduire.classe.travail) job.ormatOutputFormatClass ('employé', Réduire.classe.travail) job.ormatutputFormatFput (classe.travail) job.ormatutputFormatFput (classe.travail) job. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nom de la table d'entrée null, null, new String [] {'empid', 'name', 'dept'} // colonnes de la table) System.exit (job.waitForCompletion (vrai)? 0: 1)}}

Ce morceau de code vous permet de configurer la classe de clé de sortie qui, dans le cas de hbase, est ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Ici, nous passons le nom de la table hbase et le réducteur pour agir sur la table.

TableMapReduceUtil.initTableReducerJob ('employé', Réduire.classe, travail)

Mappeur:

package Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable Classe publique Map extend Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Dans ce morceau de code, nous prenons des valeurs des getters de la classe DBinputwritable, puis nous les transmettons
ImmutableBytesWritable afin qu'ils atteignent le réducteur sous forme d'octets que Hbase comprend.

comment déclarer une variable d'instance en java
String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Réducteur:

package Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text classe publique Réduire étend TableReducer {public void reduction (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {String [] cause = null // Valeurs de boucle for (Text val: values) {cause = val.toString (). split ('')} // Mettre dans HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('name'), Bytes.toBytes (cause [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (cause [1 ])) context.write (clé, mettre)}}

Ce morceau de code nous permet de décider de la ligne exacte et de la colonne dans laquelle nous stockerions les valeurs du réducteur. Ici, nous stockons chaque empid dans une ligne séparée car nous avons créé l'empid comme clé de ligne qui serait unique. Dans chaque ligne, nous stockons les informations officielles des employés sous la famille de colonnes «official_info» sous les colonnes «name» et «department» respectivement.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (cause [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (cause [1])) context.write (key, put)

Données transférées dans Hbase:

scanner employé

Comme nous le voyons, nous avons réussi à mener à bien la migration de nos données d'entreprise d'une base de données SQL relationnelle vers une base de données NoSQL.

Dans le prochain blog, nous allons apprendre à écrire et exécuter des codes pour d'autres formats d'entrée et de sortie.

Continuez à publier vos commentaires, questions ou commentaires. J'aimerais avoir de vos nouvelles.

Vous avez une question pour nous? Veuillez le mentionner dans la section commentaires et nous vous recontacterons.

Articles Similaires: