Apache Spark un Amazon S3 - Gotchas un labākā prakse

S3 ir objektu krātuve, nevis failu sistēma, tāpēc jautājumi, kas rodas no iespējamās konsekvences, ar atomu nesaistītiem nosaukumiem, ir jārisina lietojumprogrammas kodā. Direktoriju serveris failu sistēmā ir aizstāts ar faila nosaukuma hash algoritmu. Tas ir slikti lietu uzskaitīšanai, direktoriju darbībām, dzēšanai un pārdēvēšanai (kopēšanai un izdzēšanai, jo tehniski objektu veikalos nav pārdēvēšanas)

Sāciet lietot S3A (URI shēma: s3a: //) - Hadoop 2.7+. S3a ir ieteicamais S3 klients Hadoop 2.7 un jaunākām versijām. S3a ir veiksmīgāka un atbalsta lielākus failus (līdz 5 TB), kā arī atbalsta daudzdaļīgu augšupielādi. Visiem objektiem, kuriem var piekļūt no s3n: // URL, vajadzētu būt pieejamiem arī no s3a, vienkārši aizstājot URL shēmu. Lielākā daļa kļūdu ziņojumu pret S3N tiks slēgti, jo WONTFIX

Panākt, lai Spark 2.0.1 darbotos ar S3a. Spark 2.0.1 savā klases takā izmantojiet hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, joda-time-2.9.3.jar; neaizmirstiet atjaunināt spark-default.conf ar AWS taustiņiem un S3A FileSystemClass

Spark.hadoop.fs.s3a.access.key XXXXXXX
spark.hadoop.fs.s3a.secret.key XXXXXXX
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

Noteikti izmantojiet Dataframes, jo vaicājumu pārkārtošana un predikatīvā nolaišana ir pieejama ārpus lodziņa, tāpēc tiek ienests mazāk datu, tādējādi paātrinot vaicājumus.

Ja lasāt tos pašus datus vairākas reizes, mēģiniet izmantot .cache vai s3distcp, lai pārsūtītu failus uz vietējo EMR klasteru, lai iegūtu labāku no reālas failu sistēmas failu lasīšanas veiktspējas. S3distcp opcija groupBy ir lieliska iespēja mazo failu problēmas risināšanai, apvienojot lielu skaitu mazu failu.

Tas man liek pievērst uzmanību daudzu mazu failu lasīšanai. Ja failu apvienošana, izmantojot rīku, nav iespējama, izmēģiniet šo kodu, kas efektīvi darbojas lēnajā S3 direktoriju saraksta sašaurinājumā

importēt com.amazonaws.services.s3._, model._
    importēt com.amazonaws.auth.BasicAWSCredentials

    val pieprasījums = jauns ListObjectsRequest ()
    request.setBucketName (spainis)
    request.setPrefix (prefikss)
    request.setMaxKeys (pageLength)
    def s3 = jauns AmazonS3Client (jauni BasicAWSCredentials (atslēga, slepenais))

    val objs = s3.listObjects (pieprasījums) // Ņemiet vērā, ka šī metode atgriež saīsinātus datus, ja tie ir garāki par iepriekš norādīto "pageLength". Jums, iespējams, vajadzēs to risināt.
    sc.parallelize (objs.getObjectSummaries.map (_. getKey) .toList)
        .flatMap {key => Source.fromInputStream (s3.getObject (spainis, atslēga) .getObjectContent: InputStream) .getLines}

Pārliecinieties, vai spark.sql.parquet.filterPushdown ir taisnība un spark.sql.parquet.mergeSchema ir nepatiesa (lai izvairītos no shēmu apvienošanas rakstīšanas laikā, kas patiešām palēnina rakstīšanas posmu). Par laimi Spark 2.0 ir pareizais noklusējums

Vai esat prātojies, kāpēc tieši tajā laikā, kad darbs tiek pabeigts, žurnālos nekas netiek rakstīts un visas dzirksteļošanas darbības, šķiet, ir apstājušās, bet rezultāti vēl nav atrodami S3 izvades direktorijā ... kas notiek? Katru reizi, kad izpildītāji raksta darba rezultātu, katrs no viņiem ieraksta pagaidu direktorijā ārpus galvenā direktorija, kur bija jāraksta faili, un pēc visu izpildītāju darbības tiek pārdēvēts, lai iegūtu atomu ekskluzivitāti. Tas viss ir kārtībā standarta failu sistēmā, piemēram, hdfs, kur pārdēvēšana notiek uzreiz, bet objekta veikalā, piemēram, S3, tas neveicina pārdēvēšanu uz S3 ar ātrumu 6 MB / s.

Ja iespējams, ierakstiet darbu izvadi EMR hdfs (lai izmantotu gandrīz momentānus pārdēvējumus un labāku vietējo hdfs failu IO) un pievienojiet dstcp soli, lai failus pārvietotu uz S3, lai saglabātu visas nepatikšanas, kas saistītas ar iekšējo objektu krātuve, kas mēģina būt par failu sistēmu. Rakstīšana vietējiem hdfs ļaus jums ļaut spekulācijām kontrolēt bēguļojošus uzdevumus, neiekrītot strupceļa slazdos, kas saistīti ar DirectOutputCommiter.

Ja kā izvades direktorijs ir jāizmanto S3, pārliecinieties, vai ir iestatītas šādas dzirksteles konfigurācijas

2. dzirkstele.hadoop.mapreduce.fileoutputcommitter.algorithm.version
dzirkstele.spekulācija nepatiesa

Piezīme: DirectParquetOutputCommitter tiek noņemts no Spark 2.0 datu zaudēšanas iespējas dēļ. Diemžēl, kamēr mēs neesam uzlabojuši konsekvenci no S3a, mums ir jāstrādā ar apkārtējiem risinājumiem. Lietas uzlabojas ar Hadoop 2.8

Izvairieties no taustiņvārdiem leksikogrāfiskā secībā. Lai pārvietotos, varētu izmantot sajaukšanas / nejaušus prefiksus vai mainīt datuma un laika apgriezienus. Triks ir nosaukt atslēgas hierarhiski, atslēgas kreisajā pusē ievietojot visbiežāk filtrētās lietas. Un DNS problēmu dēļ nekad nevajag pasvītrot kausu nosaukumos.

Iespējojot fs.s3a.fast.upload, vienlaikus augšupielādējiet viena faila daļas Amazon S3

Tas bija smadzeņu izgāšanās no jautājumiem ražošanā, kurus es nesen risinu, lai Spark darbotos ar S3. Uzziniet vairāk par šo, padziļinoties nākamajā ziņojumā ...