[关闭]
@BrandonLin 2016-07-29T16:29:55.000000Z 字数 7243 阅读 5016

Sqoop

Hadoop Sqoop


简介

Sqoop是一个用于在外部结构化数据与Hadoop之间导入导出数据的工具。

Apache Sqoop is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.

Data-Loading-into-Hadoop-Flume-3.jpg-23.5kB

sqoop_import_export1.png-101.9kB

Sqoop:

下载安装配置

根据Hadoop版本选择对应的Sqoop,我这里是Hadoop,选择Sqoop 1.99.6:

  1. wget http://mirrors.hust.edu.cn/apache/sqoop/1.99.6/sqoop-1.99.6-bin-hadoop200.tar.gz
  2. tar zxvf sqoop-1.99.6-bin-hadoop200.tar.gz
  3. mv sqoop-1.99.6-bin-hadoop200 /home/sqoop-1.99.6

配置PATH:

  1. vim /etc/profile
  2. export SQOOP_HOME=/home/sqoop-1.99.6
  3. export PATH=$PATH:$SQOOP_HOME/bin
  4. source /etc/profile

配置server/conf/catalina.properties

  1. common.loader=${catalina.base}/lib,${catalina.base}/lib/*.jar,${catalina.home}/lib,${catalina.home}/lib/*.jar,${catalina.home}/../lib/*.jar,/home/hadoop-2.6.0/share/hadoop/common/*.jar,/home/hadoop-2.6.0/share/hadoop/common/lib/*.jar,/home/hadoop-2.6.0/share/hadoop/hdfs/*.jar,/home/hadoop-2.6.0/share/hadoop/hdfs/lib/*.jar,/home/hadoop-2.6.0/share/hadoop/mapreduce/*.jar,/home/hadoop-2.6.0/share/hadoop/mapreduce/lib/*.jar,/home/hadoop-2.6.0/share/hadoop/yarn/*.jar,/home/hadoop-2.6.0/share/hadoop/yarn/lib/*.jar,/home/hadoop-2.6.0/share/hadoop/tools/*.jar,/home/hadoop-2.6.0/share/hadoop/tools/lib/*.jar,/home/hadoop-2.6.0/share/hadoop/httpfs/tomcat/lib/*.jar

配置server/conf/sqoop.properties:

  1. org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/home/hadoop-2.6.0/etc/hadoop

验证是否配置正确:

  1. sqoop2-tool veriify

image_1aoric1io78r8li1onb4nj1l6n9.png-11kB

启动sqoop server:

  1. % sqoop2-server start

image_1aorie288577jqt4qfke1l8nm.png-25kB

启动的的tomcat运行在12000端口。

启动客户端shell:

  1. sqoop2-shell

image_1aorip1vk19tnqk42ms1rul157f13.png-11.6kB

Sqoop实例

启动Hadoop集群:

  1. start-dfs.sh
  2. start-yarn.sh

启动sqoop2-shell:

  1. sqoop2-shell

查看版本和connector:

  1. show version -all
  2. show connector

image_1aorivuvu5ae1r2tii01a47q3e1g.png-46.7kB

创建JDBC link:

image_1aork6fqojqq1n141itp18jr1saj1t.png-22.6kB

创建HDFS link:

image_1aork9i2cldf1i6mbfk1bgo1cb22a.png-35.5kB

查看可用link

image_1aorkb4a01k50qun181ksjhfl12n.png-31.7kB

创建从JDBC到HDFS的作业:

  1. create jon -f 6 -t 7

f和t分别指定from和to的link。然后主要是配置数据源中的数据库和表,分区字段,文件格式,压缩格式等:
image_1aorkgoh1ttl1st82cc199gote34.png-37.3kB

image_1aorkio5c99ove51sn214h317ur3h.png-29.6kB

启动刚才创建的作业:

  1. start jon -j 1

image_1aorkklvl1nav15gdh3t1h7tavr3u.png-21.2kB

查看进度:
image_1aorkle7bjjgqpcajshm32v84b.png-19.1kB

YARN管理界面中的MapReduce作业:
image_1aorkns641f8icng1dp315rr1qpr4o.png-69.7kB

image_1aorkoirjp0jap1mc71fdn1ijp55.png-17.2kB

查看HDFS上是否有文件:

  1. hadoop fs -ls /root/...

image_1aorktk677o16j712ma1pgf6l05i.png-45.6kB

image_1aorm5q3p1uttm3gaug1itk4ej9.png-28.5kB
最后运行结果:

  1. sqoop: status job -j 1

image_1aorl7tfd1eot18vm1qpd1jmu7k55v.png-62.6kB
如果发现无法查看作业状态,出现如下异常:

image_1aorlk1ek263f8j1e811l9q1t1u6p.png-24.1kB

从server的tomcat日志查看到如下异常:
image_1aorlm2fbqnk75l1s9i17ee1aqc76.png-100kB
根据经验判断,是没有启动历史作业的服务器,使用如下命令启动:

  1. mr-jobhistory-daemon.sh start historyserver

从YARN管理器上看:
image_1aorlaoclhl8rhd1vpj1vl6mvm6c.png-83.6kB

可见Sqoop启动了一个只有map没有reduce的MapReduce作业,使用了10个map。耗时一分钟54秒。

核心概念:Sqoop Connector

Sqoop的整个架构是基于Connector的,外部的结构化数据与Hadoop之间通过Connector连接器完成数据传输。例如针对RDBMS的MySQL连接器,Oracle连接器等,还有一个通用的JDBC连接器。

另外一些Connector针对特定的数据库做了优化,例如使用MySQL的mysqldump可以提高导入效率,这称为Direct-Mode。除了内置的Sqoop Connector外,还有许多第三方的Connector,包括像Couchbase这样的NoSQL数据库。

可以通过sqoop2-shell的命令查看可用的connector:

  1. sqoop: show connector

image_1aormg7gn13gsb3219l1hnrqdlm.png-17.5kB

工作原理

Sqoop内部将数据集划分为不同的分区(Partition),然后使用只有map的MapReduce作业来完成数据传输,每个mapper负责一个分区。Sqoop使用数据库元数据来确保类型安全。

image_1aop14p9613tm1j9r1fqg14q34uq9.png-329.1kB

代码生成:

  1. % sqoop codegen --connect jdbc:mysql://localhost/hadoopguide \
  2. --table widgets --class-name Widget

可以把生成的代码看做是指定表的数据访问对象(DAO)。生成的代码保存表中提取出来的一条数据,可以在MapReduce中操作数据或者存入到SequenceFile中,每条记录作为value保存在文件中。

基于文本的文件可以不生成代码。

Sqoop启动MapReduce作业用于传输数据。InputFormat可以通过JDBC读取表中的部分数据。Hadoop自带的DataDrivenDBInputFormat用于对表数据进行分区,每个map传输其中的一部分数据。

数据分区一般根据表中特定的列,例如主键。在运行时可以通过指定--split-by参数指定分区使用的列。

可以通过--where参数对导入的数据进行过滤,更复杂的控制可以通过--query参数完成,这在增量任务中很有用。

由于导入进程是并发执行的,进行事务控制很困难。因此通过使用关闭数据写入的方式来保证数据的一致性。

增量导入

通过使用--check-column--last-value参数,可以增量导入新增的数据,Sqoop只导入指定列中大于last-value的记录。

对于写入后不更新的数据,并且ID是严格递增的,可以使用--incremental参数增量导入。

会更新的数据,可以使用--incremental lastmodified来增量。增量导入结束后,Sqoop会打印出--last-value,用于下次使用,但是更好的方式是使用Sqoop自动保存这个值,下次的时候也自动使用这个值。

Direct-Mode导入

大部分导入都通过DataDrivenDBInputFormat来实现,但是有些数据提供了专门的工具用来快速导入,例如MySQL的mysqldump,这个工具的吞吐量远大于JDBC的能力。使用这些外部工具来完成导入,sqoop中成为Direct-Mode,MapReduce作用启动myqldump,然后读取其输出文件,导入数据。这种模式通过--direct参数来启用,同时会受到一些限制,例如使用mysqldump无法读取CLOB,BLOB这样的数据类型。数据库元数据的获取依然是通过JDBC来完成。

导入后的数据处理

通过Sqoop导入的数据,如果是文本,那么在处理的时候可能需要完成类型的转换,sqoop的生成代码可以帮助完成这个工具,使得我们可以专注于MapReduce的业务逻辑。运行这些作业时,需确保Sqoop的库可以被读取到。驱动程序中可以通过HADOOP_CLASSPATH环境变量来配置:

  1. HADOOP_CLASSPATH=$SQOOP_HOME/sqoop-version.jar

Map作业节点上可以通过libjars参数设置:

  1. hadoop jar className example.jar -libjars $SQOOP_HOME/sqoop-version.jar

如果导入的文件是Avro格式,可以通过Avro MapReduce来处理,通过使用Generic Avro mapping来完成,可以不使用Schema生成的代码。

导入数据与Hive

Sqoop与Hive经常作为工具组合来处理来自关系型数据库中的数据。使用Sqoop提供的工具可以从关系型的元数据中生成Hive表:

  1. sqoop create-hive-table --connect jdbc:mysql://localhost/hadoopguide --table widgets --fields-terminated-by ','

由于Hive的数据类型跟大部分关系型数据库相比较都比较简陋,因此Sqoop在生成Hive表的时候,会尽可能使用接近的数据类型。但是这还是可以导致丢失精度的情况,这时候Sqoop会给出警告信息。例如:

  1. WARN hive.TableDefWriter: Colum design_date had to be cast to a less precise type in Hive

假设widgets表的数据已经通过Sqoop导入到HDFS中的widgets目录,使用下面的命令可以将这些数据导入到创建的Hive表:

  1. LOAD DATA PATH "widgets" INTO TABLE widgets;

导入数据到HDFS、创建Hive表、导入HDFS的数据到Hive表这三个步骤可以被简化成一个步骤:

  1. sqoop import --connect jdbc:mysql://localhost/hadoopguide --table widgets -m 1 --hive-import

使用--hive-import参数导入数据的时候,Sqoop直接把数据从源数据库导入到Hive表,Hive的Schema通过元数据推断而来。

导入到Hive后,就可以结合其他数据,例如LOG日志进行数据分析了。

  1. hive> CREATE TABLE sales(widget_id INT ,qty INT, street STRING,city STRING , state STRING,zip INT , sale_date STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
  2. hive> LOAD DATA LOCAL INPUT "/path/to/logs/sales.log" INTO TABLE slaes;
  3. hive> CREATE TABLE zip_profits AS SELECT SUM(w.prices * s.qty) AS sales_vol , s.zip FROM SALES s JOIN widgets w ON (s.widget_id = w.id) GROUP BY s.zip;
  4. hive> SELECT * FROM zip_profits ORDER BY sales_vol DESC;
  5. ...
  6. OK
  7. 403.71 90210
  8. 28.0 10005

导入大对象

大多数数据库都允许我们将大量的数据存在一个字段,通常有存储文本的CLOB和存储二进制的BLOB。在存储方式上,为了避免扫描时读取大对象字段到内存中,通常不在记录中内联存储大对象,而是独立到外部。

image_1aor03kn41bkc1frdfmq664jc39.png-144.6kB

在MapReduce中,数据在送往mapper之前,通常需要物化到内存中,如果把大对象跟其他普通字段存在同一个文件中,将会导致MapReduce性能非常低下。所以Sqoop导入的数据在存储格式上跟数据库非常相似,如果大对象的大小超过16MB(通过sqoop.inline.lob.length.max设置),会被存储在另外的文件中,这种文件的格式为LobFile,LobFile中的每条记录存储一个大对象。这样在客户端(如MapReduce)读入数据的时候,无需读入整个大文件,而只是存储一个引用,当真正需要访问时,则通过该引用进而通过InputStream访问具体的大对象数据。

二进制大对象的引用是一个BlobRef,其格式如下:

  1. externalLob(lf,lobfile0,100,5011714)

分别代表格式为LobFile,存储在lobfile0中,偏移量为100,数据长度为5011714字节。通过BlobRef的getDataStream方法,可以获取到指向大对象的输入流(InputStream)。

CBLOB类似的使用ClobRef引用。

在MapReduce中,访问大对象的次数经常很少,通过这种外部文件加引用的方式,可以大大提高IO效率。

数据导出

Sqoop也可以用于将数据从Hadoop导出到外部数据库。在导出之前,需要先手动创建表(Sqoop可以从SQL类型推断Java类型,但是不能够从Java类型正确推断SQL类型,例如Java的String可以存为CAHR,VARCHAR或者其他类型,因此必须手动创建对应的表。

导出命令如下:

  1. sqoop export --connect jdbc:mysql://localhost -m 1 --table sales_by_zip --export-dir /user/hive/warehouse/zip_profits --input-fields-terminated-by '\0001'

上述命令导出Hive的zip_profites表到mysql的sale_by_zip表,其中指定了Hive文件中默认的字段分隔符CTRL+A(Unicode字符为0x0001)。

在导出数据之前,Sqoop会根据connect string选定一种策略,例如使用mysqlimport或者JDBC。然后基于目标表的元数据生成一个类,这个类能够从文本文件中解析出记录,并将记录插入到对应的表。然后启动一个MapReduce作业,从HDFS中读取文件,用生成的类解析成记录,并用选定的策略执行导出。

image_1aor4hmf117n71eg51fiaub7124r9.png-410.9kB

对于mysql的direct-mode,每个map任务启动一个mysqlimport进程,并使用FIFO文件进行通信,导入数据到mysql。map的数量可以通过-m选项指定。基于JDBC的策略会使用批量的模式导出数据。

导出数据是不具备事务性的,并行的导入map可能在不同的时间结束,即使在任务中使用事务,前一个任务的输出也可能在后续的task完成之前可见。另外数据库经常使用固定大小的缓冲来保存事物,这个缓冲很有太小容纳不下一个任务重的所有数据。因此通常在导出完成之前,其他使用数据的应用最好不用访问数据,避免只看到部分结果。

有一种方式可以解决这个问题,Sqoop允许我们先将数据导出到一个临时表,然后在导出结束之后,使用单个事务将中间表导入最终的目标表。临时表通过--staging-table参数指定,这个临时表必须是已经存在的表并且是空的,除非指定了--clear-staging-table参数。使用这种方式会导致性能下降,因为数据需要写两次,并且在最后的数据移动阶段,其实是有2份数据的,占用了更多的空间。

导出SequenceFile

Sqoop可以导出非Hive表的数据,例如SequenceFile类型的文件,但是有较多的限制。

  1. % sqoop import --connect jdbc:mysql://localhost/hadoopguide --table widgets -m 1 --class-name WidgetHolder --as-sequencefile --target-dir widget_sequence_file --bindir .

as-sequencefile指定导入味SequenceFile,bindir指定jar包放在当前目录下,供下一步使用。

  1. %mysql hadoopguide
  2. mysql> create table widgets2 (id int ,widget_name varchar(100),price double,desigened date , version int,notes vatchar(200);
  1. % sqoop export --connect jdbc:mysql://localhost/hadoopguide --table widgets -m 1 --class-name WidgetHolder --jar-file WidgetHolder.jar --export-dir widget_sequence_files

jar-file指定上一步中生成的jar包。

参考

官网安装指南:
http://sqoop.apache.org/docs/1.99.6/Installation.html

官网5分钟Demo:
http://sqoop.apache.org/docs/1.99.6/Sqoop5MinutesDemo.html

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注