簡單說:
項目代碼
package test;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.SparkSession;
public class SparkSQLJob {
private static final String WRITE_FORMAT = "csv";
private static final String HDFS = "/data/sparksearch/";
private static final Logger LOG = Logger.getLogger(SparkSQLJob.class);
public static void main(String[] args) throws InterruptedException{
LOG.setLevel(Level.INFO);
if (args == null || args.length < 4){
LOG.error("please input the parameter AppName, querySQL, savePath, LogPath, four parameters!");
return;
}
SparkSQLJob searchObj = new SparkSQLJob();
String querySQL = args[1];
LOG.info("querySQL = " + querySQL);
searchObj.run(querySQL);
}
public void run(String querySQL) throws InterruptedException{
querySQL = querySQL.toLowerCase();
SparkSession spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate();
String applicationId = spark.sparkContext().applicationId();
if (querySQL.contains("limit")){
LOG.info("include limit field");
}else{
LOG.info("not include limit field");
StringBuilder queryCondition = new StringBuilder("");
queryCondition.append("limit 100000");
querySQL += queryCondition;
}
LOG.info("applicationId = " + applicationId);
LOG.info("spark sql = " + querySQL);
LOG.info("Start Spark Job ----------->");
spark.sql(querySQL).write().format(WRITE_FORMAT).save(HDFS);
LOG.info("Finish Spark Job, has stored in HDFS");
}
}
Shell腳本:
其中下面的jar 是上面項目代碼打包之後的jar。
#!/bin/bash
source ~/.bash_profile
export JAVA_HOME=/usr/local/webserver/jdk
jar=/usr/local/webserver/jars/SparkSQLJob-1.0-SNAPSHOT-jar-with-dependencies.jar
mainClass=test.SparkSQLJob
function run(){
/usr/local/webserver/spark/bin/spark-submit \
--master yarn \
--name $1 \
--num-executors 4 \
--executor-memory 4G \
--queue sparksql \
--driver-memory 4G \
--conf spark.ui.port=5022 \
--class $mainClass $jar $2 $3 &> $4
}
echo "start SparkSQL Job Shell ->"
run $1 $2 $3 $4
echo "Finished SparkSQL Job Shell -->|||"