Question:
I have a spark application running on a Yarn cluster that needs to read files from multiple buckets on an S3-compatible object store, each bucket having its own set of credentials.
According to hadoop documentation it should be possible to specify credentials for multiple buckets by setting configuration of the form spark.hadoop.fs.s3a.bucket.<bucket-name>.access.key=<access-key>
in the active SparkSession
but that has not worked for me in practice.
An example that according to the documentation, I believe should work:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import org.apache.spark.sql.{SaveMode, SparkSession} case class BucketCredential(bucketName: String, accessKey: String, secretKey: String) object TestMultiBucketReadWrite { val credentials: Seq[BucketCredential] = Seq( BucketCredential("bucket.1", "access.key.1", "secret.key.1"), BucketCredential("bucket.2", "access.key.2", "secret.key.2") ) def addCredentials(sparkBuilder: SparkSession.Builder, credentials: Seq[BucketCredential]): SparkSession.Builder = { var sBuilder = sparkBuilder for (credential <- credentials) { sBuilder = sBuilder .config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.access.key", credential.accessKey) .config(s"spark.hadoop.fs.s3a.bucket.${credential.bucketName}.secret.key", credential.secretKey) } sBuilder } def main(args: Array[String]): Unit = { val spark = addCredentials(SparkSession.builder(), credentials) .appName("Test MultiBucket Credentials") .getOrCreate() import spark.implicits._ val dummyDF = Seq(1,2,3,4,5).toDS() println("Testing multi write...") credentials.foreach(credential => { val bucket = credential.bucketName dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json") }) println("Testing multi read...") credentials.foreach(credential => { val bucket = credential.bucketName val df = spark.read.json(s"s3a://$bucket/test.json").as[Long] println(df.collect()) }) } } |
However, when submitted the job fails with the following error:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
Testing multi write... Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: null at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:976) at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:956) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:892) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:77) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:93) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:545) |
The job does succeed when I instead set the fs.s3a.access.key
and fs.s3a.secret.key
settings sequentially but that involves sequential reads/writes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
//... println("Testing multi write...") credentials.foreach(credential => { val bucket = credential.bucketName spark.conf.set("fs.s3a.access.key", credential.accessKey) spark.conf.set("fs.s3a.secret.key", credential.secretKey) dummyDF.write.mode(SaveMode.Overwrite).json(s"s3a://$bucket/test.json") }) println("Testing multi read...") credentials.foreach(credential => { val bucket = credential.bucketName spark.conf.set("fs.s3a.access.key", credential.accessKey) spark.conf.set("fs.s3a.secret.key", credential.secretKey) val df = spark.read.json(s"s3a://$bucket/test.json").as[Long] println(df.collect()) }) //... |
Answer:
Exception in thread “main”
com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403,
AWS Service: Amazon S3, AWS Request ID: null, AWS Error Code: null,
AWS Error Message: Forbidden, S3 Extended Request ID: null
403 Forbidden means understood the request and cant serve….
s3 account does not have access permission for one of your mutliple bucket[s].
pls check again…
One of the reason might be proxy issue…
AWS uses http proxy to connect to aws cluster. I hope those proxy settings are right
define these sample variables in your shell script,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
SPARK_DRIVER_JAVA_OPTS=" -Dhttp.proxyHost=${PROXY_HOST} -Dhttp.proxyPort=${PROXY_PORT} -Dhttps.proxyHost=${PROXY_HOST} -Dhttps.proxyPort=${PROXY_PORT} $SPARK_DRIVER_JAVA_OPTS" SPARK_EXECUTOR_JAVA_OPTS=" -Dhttp.proxyHost=${PROXY_HOST} -Dhttp.proxyPort=${PROXY_PORT} -Dhttps.proxyHost=${PROXY_HOST} -Dhttps.proxyPort=${PROXY_PORT} $SPARK_EXECUTOR_JAVA_OPTS" SPARK_OPTS=" --conf spark.hadoop.fs.s3a.proxy.host=${PROXY_HOST} --conf spark.hadoop.fs.s3a.proxy.port=${PROXY_PORT} $SPARK_OPTS" |
spark submit would look like…
1 2 3 4 5 6 7 8 9 10 11 |
spark-submit \ --executor-cores $SPARK_EXECUTOR_CORES \ --executor-memory $SPARK_EXECUTOR_MEMORY \ --driver-memory $SPARK_DRIVER_MEMORY \ --driver-java-options "$SPARK_DRIVER_JAVA_OPTS" \ --conf spark.executor.extraJavaOptions="$SPARK_EXECUTOR_JAVA_OPTS" \ --master $SPARK_MASTER \ --class $APP_MAIN \ $SPARK_OPTS \ $APP_JAR "$@" |
Note : AFAIK, if you have s3 access to AWS EMR no need to set access keys every time since it will be implicit