学习任何的spark技术之前,请先正确理解spark,可以参考:

以下是用spark RDD java api实现从关系型数据库中读取数据,这里使用的是derby本地数据库,当然可以是mysql或者oracle等关系型数据库:

package com.twq.javaapi.java7;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.rdd.JdbcRDD;import java.io.Serializable;import java.sql.*;public class JavaJdbcRDDSuite implements Serializable {    public static void prepareData() throws ClassNotFoundException, SQLException {        //使用本地数据库derby,当然可以使用mysql等关系型数据库        Class.forName("org.apache.derby.jdbc.EmbeddedDriver");        Connection connection =                DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true");        try {            //创建一张表FOO,ID是一个自增的主键,DATA是一个INTEGER列            Statement create = connection.createStatement();            create.execute(                    "CREATE TABLE FOO(" +                            "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," +                            "DATA INTEGER)");            create.close();            //插入数据            PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)");            for (int i = 1; i <= 5; i++) {                insert.setInt(1, i * 2);                insert.executeUpdate();            }            insert.close();        } catch (SQLException e) {            // If table doesn't exist...            if (e.getSQLState().compareTo("X0Y32") != 0) {                throw e;            }        } finally {            connection.close();        }    }    public static void shutdownDB() throws SQLException {        try {            DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true");        } catch (SQLException e) {            // Throw if not normal single database shutdown            // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html            if (e.getSQLState().compareTo("08006") != 0) {                throw e;            }        }    }    public static void main(String[] args) throws Exception {        JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite");        //准备数据        prepareData();        //构建JdbcRDD        JavaRDD
 rdd = JdbcRDD.create(                sc,                new JdbcRDD.ConnectionFactory() {                    @Override                    public Connection getConnection() throws SQLException {                        return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");                    }                },                "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",                1, 5, 1,                new Function
() {                    @Override                    public Integer call(ResultSet r) throws Exception {                        return r.getInt(1);                    }                }        );        //结果: [2, 4, 6, 8, 10]        System.out.println(rdd.collect());        shutdownDB();        sc.stop();    }}

详细了解RDD的api的话,可以参考: