RDD utilisant Spark: le bloc de construction d'Apache Spark

Ce blog sur RDD utilisant Spark vous fournira une connaissance détaillée et complète de RDD, qui est l'unité fondamentale de Spark et de son utilité.

, Le mot lui-même suffit à générer une étincelle dans l'esprit de chaque ingénieur Hadoop. À n en mémoire outil de traitement ce qui est ultra-rapide dans l'informatique en cluster. Par rapport à MapReduce, le partage de données en mémoire rend les RDD 10 à 100x Plus vite que le partage de réseau et de disque et tout cela est possible grâce aux RDD (Resilient Distributed Data sets). Les points clés sur lesquels nous nous concentrons aujourd'hui dans cet article RDD utilisant Spark sont:



Besoin de RDD?

Pourquoi avons-nous besoin de RDD? -RDD avec Spark



Le monde évolue avec et Science des données en raison de l'avancement de . Algorithmes basé sur Régression , , et qui fonctionne sur Distribué Calcul itératif ation mode qui inclut la réutilisation et le partage de données entre plusieurs unités informatiques.

Le traditionnel techniques nécessitaient un stockage intermédiaire stable et distribué comme HDFS comprenant des calculs répétitifs avec des réplications de données et une sérialisation des données, ce qui a rendu le processus beaucoup plus lent. Trouver une solution n'a jamais été facile.



C'est ici que RDD (Ensembles de données distribués résilients) vient à la grande image.

RDD Les s sont faciles à utiliser et faciles à créer car les données sont importées à partir de sources de données et déposées dans les RDD. En outre, les opérations sont appliquées pour les traiter. Ils sont un collection distribuée de mémoire avec des autorisations comme Lecture seulement et surtout, ils sont Tolérance de panne .



Si seulement partition de données de le RDD est perdu , il peut être régénéré en appliquant le même transformation opération sur cette partition perdue dans lignée , plutôt que de traiter toutes les données à partir de zéro. Ce type d'approche dans des scénarios en temps réel peut faire des miracles dans des situations de perte de données ou lorsqu'un système est en panne.

Que sont les RDD?

RDD ou ( Ensemble de données distribuées résilientes ) est un élément fondamental Structure de données dans Spark. Le terme Résilient définit la capacité qui génère les données automatiquement ou les données reculer à la l'état original lorsqu'une calamité inattendue se produit avec une probabilité de perte de données.

Les données écrites dans les RDD sont partitionné et stocké dans plusieurs nœuds exécutables . Si un nœud en cours d'exécution échoue dans le temps d'exécution, puis il récupère instantanément la sauvegarde du prochain nœud exécutable . C'est pourquoi les RDD sont considérés comme un type avancé de structures de données par rapport aux autres structures de données traditionnelles. Les RDD peuvent stocker des données structurées, non structurées et semi-structurées.

Allons de l'avant avec notre RDD en utilisant le blog Spark et découvrons les fonctionnalités uniques des RDD qui lui donnent un avantage sur d'autres types de structures de données.

Caractéristiques de RDD

  • En mémoire (RAM) Calculs : Le concept de calcul en mémoire amène le traitement des données à une étape plus rapide et efficace où l'ensemble performance du système est mis à jour.
  • L son évaluation : Le terme évaluation paresseuse dit que transformations sont appliqués aux données dans RDD, mais la sortie n'est pas générée. Au lieu de cela, les transformations appliquées sont connecté.
  • Persistance : Les RDD résultants sont toujours réutilisable.
  • Opérations à gros grains : L'utilisateur peut appliquer des transformations à tous les éléments des ensembles de données via carte, filtre ou par groupe opérations.
  • Tolérance de panne : En cas de perte de données, le système peut retour en arriere à son l'état original en utilisant le journal transformations .
  • Immutabilité : Les données définies, récupérées ou créées ne peuvent pas être modifié une fois qu'il est connecté au système. Dans le cas où vous devez accéder et modifier le RDD existant, vous devez créer un nouveau RDD en appliquant un ensemble de Transformation fonctionne sur le RDD actuel ou précédent.
  • Partitionnement : C'est le unité cruciale du parallélisme dans Spark RDD. Par défaut, le nombre de partitions créées est basé sur votre source de données. Vous pouvez même décider du nombre de partitions que vous souhaitez créer en utilisant partition personnalisée les fonctions.

Création de RDD avec Spark

Les RDD peuvent être créés dans trois façons:

  1. Lire les données depuis collections parallélisées
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Postuler transformation sur les RDD précédents
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'puissant', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Lire les données depuis stockage externe ou des chemins de fichiers comme HDFS ou HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Opérations effectuées sur les RDD:

Il existe principalement deux types d'opérations qui sont effectuées sur les RDD, à savoir:

  • Transformations
  • Actions

Transformations : La les opérations nous postulons sur les RDD pour filtre, accès et modifier les données du RDD parent pour générer un RDD successif est appelé transformation . Le nouveau RDD renvoie un pointeur vers le RDD précédent assurant la dépendance entre eux.

Les transformations sont Évaluations paresseuses, en d'autres termes, les opérations appliquées sur le RDD sur lequel vous travaillez seront consignées mais pas réalisé. Le système renvoie un résultat ou une exception après avoir déclenché le action .

Nous pouvons diviser les transformations en deux types comme ci-dessous:

  • Transformations étroites
  • Transformations larges

Transformations étroites Nous appliquons des transformations étroites sur un partition unique du RDD parent pour générer un nouveau RDD car les données nécessaires au traitement du RDD sont disponibles sur une seule partition du parent TSA . Les exemples de transformations étroites sont:

logique floue en intelligence artificielle
  • carte()
  • filtre()
  • flatMap ()
  • cloison()
  • mapPartitions ()

Transformations larges: Nous appliquons la large transformation sur plusieurs partitions pour générer un nouveau RDD. Les données nécessaires au traitement du RDD sont disponibles sur les multiples partitions du parent TSA . Les exemples de transformations larges sont:

  • réduire par()
  • syndicat()

Actions : Les actions demandent à Apache Spark de s'appliquer calcul et renvoyez le résultat ou une exception au pilote RDD. Peu d'actions comprennent:

  • collecte()
  • compter()
  • prendre()
  • premier()

Appliquons pratiquement les opérations sur les RDD:

IPL (Premier League indienne) est un tournoi de cricket avec son hipe à un niveau maximal. Alors, mettons aujourd'hui la main sur l'ensemble de données IPL et exécutons notre RDD à l'aide de Spark.

  • tout d'abord, téléchargeons les données de correspondance CSV d'IPL. Après l'avoir téléchargé, il commence à ressembler à un fichier EXCEL avec des lignes et des colonnes.

À l'étape suivante, nous déclenchons l'étincelle et chargeons le fichier matches.csv à partir de son emplacement, dans mon cas, moncsvl'emplacement du fichier est '/User/edureka_566977/test/matches.csv'

attendre et notifier en java

Commençons maintenant par le Transformation partie en premier:

  • carte():

Nous utilisons Transformation de la carte pour appliquer une opération de transformation spécifique sur chaque élément d'un RDD. Ici, nous créons un RDD par le nom CKfile où stocker notrecsvfichier. Nous allons créer un autre RDD appelé States à stocker les détails de la ville .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filtre():

Filtrer la transformation, le nom lui-même décrit son utilisation. Nous utilisons cette opération de transformation pour filtrer les données sélectives d'une collection de données données. Nous appliquons fonctionnement du filtre ici pour obtenir les records des matchs IPL de l'année 2017 et stockez-le dans le fichier RDD.

val fil = CKfile.filter (ligne => ligne.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Nous appliquons flatMap est une opération de transformation à chacun des éléments d'un RDD pour créer un newRDD. Elle est similaire à la transformation de carte. ici nous appliquonsFlatmapà crache les matchs de la ville d'Hyderabad et stocker les données dansfilRDDRDD.

val filRDD = fil.flatMap (ligne => line.split ('Hyderabad')). collect ()

  • cloison():

Toutes les données que nous écrivons dans un RDD sont divisées en un certain nombre de partitions. Nous utilisons cette transformation pour trouver le nombre de partitions les données sont en fait divisées en.

val fil = CKfile.filter(line => line.contains('2017')) fil.partitions.size

  • mapPartitions ():

Nous considérons MapPatitions comme une alternative à Map () etpour chaque() ensemble. Nous utilisons mapPartitions ici pour trouver les Nombre de rangées nous avons dans notre fichier RDD.

val fil = CKfile.filter (ligne => ligne.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • réduire par():

Nous utilisonsRéduire par() sur Paires clé-valeur . Nous avons utilisé cette transformation sur notrecsvfichier pour trouver le lecteur avec le Homme le plus élevé des matchs .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • syndicat():

Le nom explique tout, nous utilisons la transformation syndicale pour regrouper deux RDD ensemble . Ici, nous créons deux RDD, à savoir fil et fil2. fil RDD contient les enregistrements des correspondances IPL 2017 et fil2 RDD contient l'enregistrement des correspondances IPL 2016.

val fil = CKfile.filter (ligne => ligne.contains ('2017')) val fil2 = CKfile.filter (ligne => ligne.contains ('2016')) val uninRDD = fil.union (fil2)

Commençons par le action partie où nous montrons la sortie réelle:

  • collecte():

Collecter est l'action que nous utilisons pour afficher le contenu dans le RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • compter():

Compterest une action que nous utilisons pour compter les nombre d'enregistrements présent dans le RDD.Icinous utilisons cette opération pour compter le nombre total d'enregistrements dans notre fichier matches.csv.

val CKfile = sc.textFile ('/ utilisateur / edureka_566977 / test / matches.csv') CKfile.count ()

  • prendre():

Take est une opération Action similaire à collecter mais la seule différence est qu'elle peut imprimer n'importe quel nombre sélectif de lignes selon la demande de l'utilisateur. Ici, nous appliquons le code suivant pour imprimer le les dix principaux rapports.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. prendre (10) .pourchaque (println)

  • premier():

First () est une opération d'action similaire à collect () et take ()ilutilisé pour imprimer le rapport le plus haut s la sortie Ici, nous utilisons la première opération () pour trouver le nombre maximum de matchs joués dans une ville donnée et nous obtenons Mumbai comme sortie.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Pour rendre notre processus d'apprentissage RDD à l'aide de Spark encore plus intéressant, j'ai proposé un cas d'utilisation intéressant.

RDD utilisant Spark: cas d'utilisation de Pokemon

  • tout d'abord, Téléchargez un fichier Pokemon.csv et chargez-le dans le spark-shell comme nous l'avons fait pour le fichier Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Les Pokémons sont en fait disponibles dans une grande variété, trouvons quelques variétés.

  • Suppression du schéma du fichier Pokemon.csv

Nous n'avons peut-être pas besoin du Schéma du fichier Pokemon.csv. Par conséquent, nous le supprimons.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (ligne =>! line.equals (Head))

  • Trouver le nombre de partitions notre pokemon.csv est distribué dans.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokémon d'eau

Trouver le nombre de Pokémon Eau

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Pokémon de feu

Trouver le nombre de pokémon de feu

programme java pour la série fibonacci
val FireRDD = PokemonDataRDD1.filter (ligne => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Nous pouvons également détecter le population d'un autre type de pokémon en utilisant la fonction de comptage
WaterRDD.count () FireRDD.count ()

  • Depuis que j'aime le jeu de stratégie défensive trouvons le pokémon avec défense maximale.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Higher_Defence:' + defenceList.max ())

  • Nous connaissons le maximum valeur de la force de défense mais nous ne savons pas de quel Pokémon il s’agit. alors, trouvons qui est-ce pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Commande [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Maintenant laissez-nous trier le pokemon avec moins de défense
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Voyons maintenant le Pokémon avec un stratégie moins défensive.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) valer2WithPokeadName .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Donc, avec cela, nous arrivons à la fin de ce RDD en utilisant l'article Spark. J'espère que nous avons éclairé un peu vos connaissances sur les RDD, leurs fonctionnalités et les différents types d'opérations qui peuvent être effectuées sur eux.

Cet article basé sur est conçu pour vous préparer à l'examen de certification Cloudera Hadoop et Spark Developer Certification (CCA175). Vous obtiendrez une connaissance approfondie d'Apache Spark et de l'écosystème Spark, qui comprend Spark RDD, Spark SQL, Spark MLlib et Spark Streaming. Vous obtiendrez des connaissances approfondies sur le langage de programmation Scala, HDFS, Sqoop, Flume, Spark GraphX ​​et le système de messagerie tel que Kafka.