启用Hbase
- 下载hbase,下载.tar.gz文件,不下载src.tar.gz文件
- 在安装hbase前需要安装java环境,hbase-env.sh文件,可以直接在头行加上
export JAVA_HOME=///
standalone
- conf/hbase-site.xml问hbase的主要配置文件,需要在文件内写明hbase的主目录,zookeeper的主目录。
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///home/testuser/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/testuser/zookeeper</value>
</property>
<property>
<name>hbase.unsafe.stream.capability.enforce</name>
<value>false</value>
<description>
Controls whether HBase will check for stream capabilities (hflush/hsync).
Disable this if you intend to run on LocalFileSystem, denoted by a rootdir
with the 'file://' scheme, but be mindful of the NOTE below.
WARNING: Setting this to false blinds you to potential data loss and
inconsistent system state in the event of process and/or node failures. If
HBase is complaining of an inability to use hsync or hflush it's most
likely not a false positive.
</description>
</property>
</configuration>
standalone模式下hbase的主目录配置为本地的目录‘file://' 使用./start-hbase.sh启动hbase可以在jps看到一个HMaster,一个HReginServer,以及Zookeeper的线程。
Pseudo-distributed
Pseudo-Distributed 伪分布模式,在standalone模式下所有的进程(HMaster,....)运行在一个JVM里
代码语言:javascript复制<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
配置分布式,让每一个进程一个jvm,改变根目录位于hdfs
代码语言:javascript复制<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:8020/hbase</value>
</property>
同时移除 hbase.unsafe.stream.capability.enforce 属性或者设置为 true
HMaster控制着Hbase-cluster 启用./bin/local-master-backup.sh start 2 3 hbase Master的备用进程,2,3为偏移量,16012 16013为备用端口(源端口为16010)
代码语言:javascript复制local-regionservers.sh start 2 3 4 5
启动多个RegionServer
fully distribution
首先在完全分布下,要关闭防火墙 no firewall rule,出现no route to host问题,注意防火墙问题。同时要配置机器之间的免密访问。
- 在HMaster节点配置
<property>
<name>hbase.zookeeper.quorum</name>
<value>node-a.example.com,node-b.example.com,node-c.example.com</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/usr/local/zookeeper</value>
</property>
hbase 配置
Hbase 2.0 支持Jdk8,Hbase1.0支持Jdk7,8
- hbase-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.zookeeper.quorum</name>
<value>example1,example2,example3</value>
<description>The directory shared by RegionServers.
</description>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/export/zookeeper</value>
<description>Property from ZooKeeper config zoo.cfg.
The directory where the snapshot is stored.
</description>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://example0:8020/hbase</value>
<description>The directory shared by RegionServers.
</description>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
<description>The mode the cluster will be in. Possible values are
false: standalone and pseudo-distributed setups with managed ZooKeeper
true: fully-distributed with unmanaged ZooKeeper Quorum (see hbase-env.sh)
</description>
</property>
</configuration>
配值zookeeper信息,配值hbase的根目录,集群是否开启
2.配值regionservers信息 配值regionserver节点
3.配值hbase-env.sh 配值java_home等前提条件信息
动态更改配置文件,不需要重启服务,直接在shell中 update_config and update_all_config即可更新配置文件。
代码语言:javascript复制hbase.ipc.server.fallback-to-simple-auth-allowed
hbase.cleaner.scan.dir.concurrent.size
hbase.regionserver.thread.compaction.large
hbase.regionserver.thread.compaction.small
hbase.regionserver.thread.split
hbase.regionserver.throughput.controller
hbase.regionserver.thread.hfilecleaner.throttle
hbase.regionserver.hfilecleaner.large.queue.size
...
...
hbase shell
可以使用linux命令调用hbase shell
代码语言:javascript复制echo "describe 'test'" | ./hbase shell -n > /dev/null 2>&1
HBase shell使用返回0成功命令的值的标准约定,以及失败命令的一些非零值。 Bash将命令的返回值存储在一个名为的特殊环境变量中$?。
代码语言:javascript复制#!/bin/bash
echo "describe 'test'" | ./hbase shell -n > /dev/null 2>&1
status=$?
echo "The status was " $status
if ($status == 0); then
echo "The command succeeded"
else
echo "The command may have failed."
fi
return $status
快速查看hbase的配置值
代码语言:javascript复制@shell.hbase.configuration.get("hbase.zookeeper.quorum")
也可以进行设置覆盖 同时可以编写简单代码,HBase Shell实际上是一个Ruby环境,因此您可以使用简单的Ruby脚本来算法计算拆分
代码语言:javascript复制hbase(main):021:0> import java.text.SimpleDateFormat
hbase(main):022:0> import java.text.ParsePosition
hbase(main):023:0> SimpleDateFormat.new(“yy / MM / dd HH:mm:ss”)。parse(“08/08/16 20:56:29”,ParsePosition.new(0))。 getTime()=> 1218920189000
走向另一个方向:
hbase(main):021:0> import java.util.Date
hbase(main):022:0> Date.new(1218920189000).toString()=>“Sat Aug 16 20:56:29 UTC 2008”
也可以开启debug开关
代码语言:javascript复制 ./bin/hbase shell -d
debug <RETURN>
加快统计方法
代码语言:javascript复制count '<tablename>', CACHE => 1000
数据模型
概念视图中空白cell在物理上是不存储的,因为根本没有必要存储。因此若一个请求为要获取t8时间的contents:html,他的结果就是空。相似的,若请求为获取t9时间的anchor:my.look.ca,结果也是空。但是,如果不指明时间,将会返回最新时间的行,每个最新的都会返回。
在物理上,一个的列族成员在文件系统上都是存储在一起。因为存储优化都是针对列族级别的,这就意味着,一个colimn family的所有成员的是用相同的方式访问的.
尽量在你的应用中使用一个列族。只有你的所有查询操作只访问一个列族的时候,可以引入第二个和第三个列族.例如,你有两个列族,但你查询的时候总是访问其中的一个,从来不会两个一起访问。
namespace是类比与传统数据库中的数据库的,hbase命名空间和default命名空间。
删除会写一个逻辑删除,只有在下一次主要压缩运行后才会消失。假设你删除了所有内容⇐T。在此之后你做了一个带有时间戳⇐T的新put。这个put,即使它发生在删除之后,也会被删除墓碑掩盖
所有数据模型操作HBase按排序顺序返回数据。首先是行,然后是ColumnFamily,后面是列限定符,最后是时间戳(反向排序,因此首先返回最新的记录)。
表的架构设计
一张表目标存储1050G的数据,超过的建议存储HDFS,一个表13个列族,50~100个连续区间,Java堆应该是32GB(20G区域,128M存储器,其余默认值)
如果ColumnFamilyA有100万行而ColumnFamilyB有10亿行,则ColumnFamilyA的数据可能会分布在许多区域(和RegionServers)中。这使得ColumnFamilyA的大规模扫描效率降低。
ColumnFamilies可以设置TTL长度(以秒为单位),HBase将在到达到期时间后自动删除行。
如果时间范围非常广(例如,长达一年的报告)并且数据量很大,则汇总表是一种常用方法。这些将通过MapReduce作业生成到另一个表中。
配置hbase
hbase.ipc.server.callqueue.read.ratio(hbase.ipc.server.callqueue.read.share在0.98中)将呼叫队列分成读写队列:
代码语言:javascript复制0.5 意味着将有相同数量的读写队列
< 0.5 更多阅读而非写作
> 0.5 更多写入而非阅读
hbase mapReduce 读写示例
代码语言:javascript复制Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class); // class that contains mapper and reducer
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
// set other scan attrs
TableMapReduceUtil.initTableMapperJob(
sourceTable, // input table
scan, // Scan instance to control CF and attribute selection
MyMapper.class, // mapper class
Text.class, // mapper output key
IntWritable.class, // mapper output value
job);
TableMapReduceUtil.initTableReducerJob(
targetTable, // output table
MyTableReducer.class, // reducer class
job);
job.setNumReduceTasks(1); // at least one, adjust as required
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
在此示例映射器中,将选择具有String-value的列作为要汇总的值。此值用作从映射器发出的键,而a IntWritable表示实例计数器。
public static class MyMapper extends TableMapper<Text, IntWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] ATTR1 = "attr1".getBytes();
private final IntWritable ONE = new IntWritable(1);
private Text text = new Text();
public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
String val = new String(value.getValue(CF, ATTR1));
text.set(val); // we can only emit Writables...
context.write(text, ONE);
}
}
在reducer中,计算“ones”(就像执行此操作的任何其他MR示例一样),然后发出一个Put。
public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
public static final byte[] CF = "cf".getBytes();
public static final byte[] COUNT = "count".getBytes();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable val : values) {
i = val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(CF, COUNT, Bytes.toBytes(i));
context.write(null, put);
}
}
hbase 安全
设置hbase.ssl.enabled请true,hbase将仅仅提供http服务
可以通过使用hbase-site.xml中的hbase.security.authentication.ui 属性配置SPNEGO来启用对HBase Web UI的Kerberos身份验证。启用此身份验证要求HBase还配置为对RPC使用Kerberos身份验证。 hbase.security.authentication kerberos
在运行Thrift网关的每个群集节点的hbase-site.xml中,将该属性设置hbase.thrift.security.qop为以下三个值之一:
代码语言:javascript复制privacy - 身份验证,完整性和机密性检查。
integrity - 身份验证和完整性检查
authentication - 仅验证身份验证
安全HBase需要安全的ZooKeeper和HDFS,以便用户无法访问和/或修改HBase下的元数据和数据。HBase使用HDFS(或配置文件系统)来保存其数据文件以及预写日志(WAL)和其他数据。HBase使用ZooKeeper存储一些操作元数据(主地址,表锁,恢复状态等)。
连接Hbase
Representational State Transfer (REST)
代码语言:javascript复制# Foreground
$ bin/hbase rest start -p <port>
# Background, logging to a file in $HBASE_LOGS_DIR
$ bin/hbase-daemon.sh start rest -p <port>
$ bin/hbase-daemon.sh stop rest
JAVA
代码语言:javascript复制public class HBaseExample {
public static void main(String[] args) throws Exception {
AbstractHBaseDBO dbo = new HBaseDBOImpl();
/*drop if table is already exist.*
if(dbo.isTableExist("user")){
dbo.deleteTable("user");
}
//*create table*
dbo.createTableIfNotExist("user",HBaseOrder.DESC,"account");
//dbo.createTableIfNotExist("user",HBaseOrder.ASC,"account");
//create index.
String[] cols={"id","name"};
dbo.addIndexExistingTable("user","account",cols);
//insert
InsertQuery insert = dbo.createInsertQuery("user");
UserBean bean = new UserBean();
bean.setFamily("account");
bean.setAge(20);
bean.setEmail("ncanis@gmail.com");
bean.setId("ncanis");
bean.setName("ncanis");
bean.setPassword("1111");
insert.insert(bean);
//select 1 row
SelectQuery select = dbo.createSelectQuery("user");
UserBean resultBean = (UserBean)select.select(bean.getRow(),UserBean.class);
// select column value.
String value = (String)select.selectColumn(bean.getRow(),"account","id",String.class);
// search with option (QSearch has EQUAL, NOT_EQUAL, LIKE)
// select id,password,name,email from account where id='ncanis' limit startRow,20
HBaseParam param = new HBaseParam();
param.setPage(bean.getRow(),20);
param.addColumn("id","password","name","email");
param.addSearchOption("id","ncanis",QSearch.EQUAL);
select.search("account", param, UserBean.class);
// search column value is existing.
boolean isExist = select.existColumnValue("account","id","ncanis".getBytes());
// update password.
UpdateQuery update = dbo.createUpdateQuery("user");
Hashtable<String, byte[]> colsTable = new Hashtable<String, byte[]>();
colsTable.put("password","2222".getBytes());
update.update(bean.getRow(),"account",colsTable);
//delete
DeleteQuery delete = dbo.createDeleteQuery("user");
delete.deleteRow(resultBean.getRow());
////////////////////////////////////
// etc
// HTable pool with apache commons pool
// borrow and release. HBasePoolManager(maxActive, minIdle etc..)
IndexedTable table = dbo.getPool().borrow("user");
dbo.getPool().release(table);
// upload bigFile by hadoop directly.
HBaseBigFile bigFile = new HBaseBigFile();
File file = new File("doc/movie.avi");
FileInputStream fis = new FileInputStream(file);
Path rootPath = new Path("/files/");
String filename = "movie.avi";
bigFile.uploadFile(rootPath,filename,fis,true);
// receive file stream from hadoop.
Path p = new Path(rootPath,filename);
InputStream is = bigFile.path2Stream(p,4096);
}
}
SCALA
代码语言:javascript复制import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection,ConnectionFactory,HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes
val conf = new HBaseConfiguration()
val connection = ConnectionFactory.createConnection(conf);
val admin = connection.getAdmin();
// list the tables
val listtables=admin.listTables()
listtables.foreach(println)
// let's insert some data in 'mytable' and get the row
val table = new HTable(conf, "mytable")
val theput= new Put(Bytes.toBytes("rowkey1"))
theput.add(Bytes.toBytes("ids"),Bytes.toBytes("id1"),Bytes.toBytes("one"))
table.put(theput)
val theget= new Get(Bytes.toBytes("rowkey1"))
val result=table.get(theget)
val value=result.value()
println(Bytes.toString(value))
PYTHON
代码语言:javascript复制import java.lang
from org.apache.hadoop.hbase import HBaseConfiguration, HTableDescriptor, HColumnDescriptor, TableName
from org.apache.hadoop.hbase.client import Admin, Connection, ConnectionFactory, Get, Put, Result, Table
from org.apache.hadoop.conf import Configuration
# First get a conf object. This will read in the configuration
# that is out in your hbase-*.xml files such as location of the
# hbase master node.
conf = HBaseConfiguration.create()
connection = ConnectionFactory.createConnection(conf)
admin = connection.getAdmin()
# Create a table named 'test' that has a column family
# named 'content'.
tableName = TableName.valueOf("test")
table = connection.getTable(tableName)
desc = HTableDescriptor(tableName)
desc.addFamily(HColumnDescriptor("content"))
# Drop and recreate if it exists
if admin.tableExists(tableName):
admin.disableTable(tableName)
admin.deleteTable(tableName)
admin.createTable(desc)
# Add content to 'column:' on a row named 'row_x'
row = 'row_x'
put = Put(row)
put.addColumn("content", "qual", "some content")
table.put(put)
# Now fetch the content just added, returns a byte[]
get = Get(row)
result = table.get(get)
data = java.lang.String(result.getValue("content", "qual"), "UTF8")
print "The fetched row contains the value '%s'" % data
hbase和spark
spark和hbase的基本集成
所有Spark和HBase集成的根源都是HBaseContext。HBaseContext接受HBase配置并将它们推送到Spark执行程序。这允许我们在静态位置为每个Spark Executor建立一个HBase连接。
scala
代码语言:javascript复制val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
...
val hbaseContext = new HBaseContext(sc, config)
rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
it.foreach((putRecord) => {
. val put = new Put(putRecord._1)
. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
. bufferedMutator.mutate(put)
})
bufferedMutator.flush()
bufferedMutator.close()
})
java
代码语言:javascript复制JavaSparkContext jsc = new JavaSparkContext(sparkConf);
try {
List<byte[]> list = new ArrayList<>();
list.add(Bytes.toBytes("1"));
...
list.add(Bytes.toBytes("5"));
JavaRDD<byte[]> rdd = jsc.parallelize(list);
Configuration conf = HBaseConfiguration.create();
JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
hbaseContext.foreachPartition(rdd,
new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
public void call(Tuple2<Iterator<byte[]>, Connection> t)
throws Exception {
Table table = t._2().getTable(TableName.valueOf(tableName));
BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
while (t._1().hasNext()) {
byte[] b = t._1().next();
Result r = table.get(new Get(b));
if (r.getExists()) {
mutator.mutate(new Put(b));
}
}
mutator.flush();
mutator.close();
table.close();
}
});
} finally {
jsc.stop();
}
spark Streaming
代码语言:javascript复制val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val ssc = new StreamingContext(sc, Milliseconds(200))
val rdd1 = ...
val rdd2 = ...
val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
Array[Byte], Array[Byte])])]]()
queue = rdd1
queue = rdd2
val dStream = ssc.queueStream(queue)
dStream.hbaseBulkPut(
hbaseContext,
TableName.valueOf(tableName),
(putRecord) => {
val put = new Put(putRecord._1)
putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
})
bulk load
代码语言:javascript复制val sc = new SparkContext("local", "test")
val config = new HBaseConfiguration()
val hbaseContext = new HBaseContext(sc, config)
val stagingFolder = ...
val rdd = sc.parallelize(Array(
(Bytes.toBytes("1"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
(Bytes.toBytes("3"),
(Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
rdd.hbaseBulkLoad(TableName.valueOf(tableName),
t => {
val rowKey = t._1
val family:Array[Byte] = t._2(0)._1
val qualifier = t._2(0)._2
val value = t._2(0)._3
val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
Seq((keyFamilyQualifier, value)).iterator
},
stagingFolder.getPath)
val load = new LoadIncrementalHFiles(config)
load.doBulkLoad(new Path(stagingFolder.getPath),
conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))
SparkSQL/DataFrames
代码语言:javascript复制def catalog = s"""{
|"table":{"namespace":"default", "name":"table1"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
|}
|}""".stripMargin
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog->cat))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val df = withCatalog(catalog)