一、HBase介绍
1、基本概念
HBase是一种Hadoop数据库,经常被描述为一种稀疏的,分布式的,持久化的,多维有序映射,它基于行键、列键和时间戳建立索引,是一个可以随机访问的存储和检索数据的平台。HBase不限制存储的数据的种类,允许动态的、灵活的数据模型,不用SQL语言,也不强调数据之间的关系。HBase被设计成在一个服务器集群上运行,可以相应地横向扩展。
2、HBase使用场景和成功案例
- 互联网搜索问题:爬虫收集网页,存储到BigTable里,MapReduce计算作业扫描全表生成搜索索引,从BigTable中查询搜索结果,展示给用户。
- 抓取增量数据:例如,抓取监控指标,抓取用户交互数据,遥测技术,定向投放广告等
- 内容服务
- 信息交互
上面简单介绍一下hbase, 至于hbase的原理,以及架构, 后面我整理完, 再发出来。 现在只是对hbase会使用。 就先从使用开始入门。
二、 HBase使用
hbase是数据库, 数据库那就是存储数据的, 那就离不开curd. 类似mysql, 有shell客户端以及语言的sdk方式。
2.1 HBASE shell
hbase shell 类似mysql的客户端
help可以查看所有的命名帮助
下面是命令分组:
代码语言:javascript复制COMMAND GROUPS:
Group name: general
Commands: processlist, status, table_help, version, whoami
Group name: ddl
Commands: alter, alter_async, alter_status, create, create_layered, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, list_regions, locate_region, show_filters
Group name: namespace
Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
Group name: dml
Commands: append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve
Group name: tools
Commands: assign, balance_switch, balancer, balancer_enabled, catalogjanitor_enabled, catalogjanitor_run, catalogjanitor_switch, cleaner_chore_enabled, cleaner_chore_run, cleaner_chore_switch, clear_block_cache, clear_compaction_queues, clear_deadservers, close_region, compact, compact_rs, compaction_state, flush, is_in_maintenance_mode, list_deadservers, major_compact, merge_region, move, normalize, normalizer_enabled, normalizer_switch, split, splitormerge_enabled, splitormerge_switch, trace, unassign, wal_roll, zk_dump
Group name: replication
Commands: add_peer, append_peer_namespaces, append_peer_tableCFs, disable_peer, disable_table_replication, enable_peer, enable_table_replication, get_peer_config, list_peer_configs, list_peers, list_replicated_tables, remove_peer, remove_peer_namespaces, remove_peer_tableCFs, set_peer_bandwidth, set_peer_exclude_namespaces, set_peer_exclude_tableCFs, set_peer_namespaces, set_peer_replicate_all, set_peer_tableCFs, show_peer_tableCFs, update_peer_config
Group name: snapshots
Commands: clone_snapshot, delete_all_snapshot, delete_snapshot, delete_table_snapshots, list_snapshots, list_table_snapshots, restore_snapshot, snapshot
Group name: configuration
Commands: update_all_config, update_config
Group name: quotas
Commands: list_quota_snapshots, list_quota_table_sizes, list_quotas, list_snapshot_sizes, set_quota
Group name: security
Commands: grant, list_security_capabilities, revoke, user_permission
Group name: procedures
Commands: abort_procedure, list_locks, list_procedures
Group name: visibility labels
Commands: add_labels, clear_auths, get_auths, list_labels, set_auths, set_visibility
Group name: rsgroup
Commands: add_rsgroup, balance_rsgroup, get_rsgroup, get_server_rsgroup, get_table_rsgroup, list_rsgroups, move_namespaces_rsgroup, move_servers_namespaces_rsgroup, move_servers_rsgroup, move_servers_tables_rsgroup, move_tables_rsgroup, remove_rsgroup, remove_servers_rsgroup
1. 常规命名:
- 集群状态 status
hbase(main):005:0> status
1 active master, 0 backup masters, 1 servers, 0 dead, 793.0000 average load
Took 0.9453 seconds
- 版本 version
hbase(main):006:0> version
2.0.2, rc6f16dff66b5d7c4fb66d3bf7eda4f56515c63f3, Fri Jan 25 19:23:41 CST 2019
Took 0.0004 seconds
2. DDL命令
命令 | 命令含义 | 命令使用示例 |
---|---|---|
alter | 修改表的列族的描述属性 | aliter 't1',NAME => 'f1',VERSIONS => 5 |
alter_async | 异步修改表的列族的描述属性,并不需要等待所有Region都完成操作。用法和alter命令相同 | alter_async 't1',NAME => 'f1',VERSIONS => 5 |
alter_status | 获取alter命令的状态,会标注已经有多少region更改了Schema。 命令的参数是表名 | alter_status 't1' |
create | 创建表 | create 't1' ,{NAME => 'f1', VERSIONS => 5}; create 't1','f1','f2', 'f3' |
describe | 获取表的元数据信息和是否可用的的状态 | describe 't1' |
disable | 禁用某个表 | disable 't1' |
disable_all | 禁用所有正则匹配的表 | disable_all 't1.*' |
drop | 删除表 | drop 't1' |
enable | 启用表 | enable 't1' |
enable_all | 启用正则匹配的表 | enable_all 't1.*' |
exists | 判断表是否存在 | exists 't1' |
is_disable | 判断表是否是禁用的 | is_disable 't1' |
is_enbale | 判断表是否是启用的 | is_disable 't1' |
show_filter | 查看所支持的所有过滤器的名称 | show_filters |
list | 列出所有表的名称 | list |
DML
- count 统计表的总行数
count 't1'
count 't1', INTERVAL => 1000
count 't1', CACHE => 1000,
count 't1', INTERVAL => 10, CACHE => 1000
- delete 删除一个单元格
delete 't1', 'r1', 'c1', ts1
- deleteall 删除一行或一列
deleteall 't1','r1'
deleteall 't1','r1','c1'
deleteall 't1', 'r1','c1', ts1
- get 单行读
hbase> get 'ns1:t1', 'r1'
hbase> get 't1', 'r1'
hbase> get 't1', 'r1', {TIMERANGE => [ts1, ts2]}
hbase> get 't1', 'r1', {COLUMN => 'c1'}
hbase> get 't1', 'r1', {COLUMN => ['c1', 'c2', 'c3']}
hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1}
hbase> get 't1', 'r1', {COLUMN => 'c1', TIMERANGE => [ts1, ts2], VERSIONS => 4}
hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1, VERSIONS => 4}
hbase> get 't1', 'r1', {FILTER => "ValueFilter(=, 'binary:abc')"}
hbase> get 't1', 'r1', 'c1'
hbase> get 't1', 'r1', 'c1', 'c2'
hbase> get 't1', 'r1', ['c1', 'c2']
hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}}
hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']}
hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'}
hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
- get_counter 读取计数器
hbase> get_counter 'ns1:t1', 'r1', 'c1'
hbase> get_counter 't1', 'r1', 'c1'
- incr 自增写入
hbase> incr 'ns1:t1', 'r1', 'c1'
hbase> incr 't1', 'r1', 'c1'
hbase> incr 't1', 'r1', 'c1', 1
hbase> incr 't1', 'r1', 'c1', 10
hbase> incr 't1', 'r1', 'c1', 10, {ATTRIBUTES=>{'mykey'=>'myvalue'}}
hbase> incr 't1', 'r1', 'c1', {ATTRIBUTES=>{'mykey'=>'myvalue'}}
hbase> incr 't1', 'r1', 'c1', 10, {VISIBILITY=>'PRIVATE|SECRET'}
- put 数据写入
hbase> put 'ns1:t1', 'r1', 'c1', 'value'
hbase> put 't1', 'r1', 'c1', 'value'
hbase> put 't1', 'r1', 'c1', 'value', ts1
hbase> put 't1', 'r1', 'c1', 'value', {ATTRIBUTES=>{'mykey'=>'myvalue'}}
hbase> put 't1', 'r1', 'c1', 'value', ts1, {ATTRIBUTES=>{'mykey'=>'myvalue'}}
hbase> put 't1', 'r1', 'c1', 'value', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
- scan 扫描表
hbase> scan 'hbase:meta'
// 显示指定列
hbase> scan 'hbase:meta', {COLUMNS => 'info:regioninfo'}
// limit start
hbase> scan 'ns1:t1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
// 时间范围
hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804000, 1303668904000]}
hbase> scan 't1', {REVERSED => true}
hbase> scan 't1', {ALL_METRICS => true}
hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']}
// 使用过滤器, show_filters查看所有可以使用的过滤器
hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => "
(QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"}
hbase> scan 't1', {FILTER =>
org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}
hbase> scan 't1', {CONSISTENCY => 'TIMELINE'}
For setting the Operation Attributes
hbase> scan 't1', { COLUMNS => ['c1', 'c2'], ATTRIBUTES => {'mykey' => 'myvalue'}}
hbase> scan 't1', { COLUMNS => ['c1', 'c2'], AUTHORIZATIONS => ['PRIVATE','SECRET']}
- truncate 清空表
truncate 't1'
还有其他命令, 就不多介绍了, 自己使用 help查看了
2.2 go操作 hbase
介绍一下go操作hbase
Install
代码语言:javascript复制go get github.com/tsuna/gohbase
Create a client
代码语言:javascript复制client := gohbase.NewClient("localhost")
Insert a cell
代码语言:javascript复制// Values maps a ColumnFamily -> Qualifiers -> Values.
values := map[string]map[string][]byte{"cf": map[string][]byte{"a": []byte{0}}}
putRequest, err := hrpc.NewPutStr(context.Background(), "table", "key", values)
rsp, err := client.Put(putRequest)
Get an entire row
代码语言:javascript复制getRequest, err := hrpc.NewGetStr(context.Background(), "table", "row")
getRsp, err := client.Get(getRequest)
Get a specific cell
代码语言:javascript复制// Perform a get for the cell with key "15", column family "cf" and qualifier "a"
family := map[string][]string{"cf": []string{"a"}}
getRequest, err := hrpc.NewGetStr(context.Background(), "table", "15",
hrpc.Families(family))
getRsp, err := client.Get(getRequest)
Get a specific cell with a filter
代码语言:javascript复制pFilter := filter.NewKeyOnlyFilter(true)
family := map[string][]string{"cf": []string{"a"}}
getRequest, err := hrpc.NewGetStr(context.Background(), "table", "15",
hrpc.Families(family), hrpc.Filters(pFilter))
getRsp, err := client.Get(getRequest)
Scan with a filter
代码语言:javascript复制pFilter := filter.NewPrefixFilter([]byte("7"))
scanRequest, err := hrpc.NewScanStr(context.Background(), "table",
hrpc.Filters(pFilter))
scanRsp, err := client.Scan(scanRequest)
我们看一下代码架构
代码语言:javascript复制├── AUTHORS
├── COPYING
├── Makefile
├── README.md
├── admin_client.go
├── caches.go
├── check_line_len.awk
├── client.go
├── discovery_test.go
├── filter
├── hrpc
├── install_ci.sh
├── integration_test.go
├── metacache_test.go
├── pb
├── region
├── rpc.go
├── rpc_test.go
├── scanner.go
├── scanner_test.go
├── table_test.go
├── test
└── zk
上面代码的整理的很有条理, hrpc主要是rpc调用的方法 filter是get或scan的filter过滤器 region是 region的一些接口 cache是缓存,hbase中为了提高性能,很多地方都采用cache方式。 zk就是zookeeper相关的。
我们下面阅读以下源码
gohbase操作的入口主要是 client
和admin_client
我们围绕 client
和admin_client
看
// AdminClient to perform admistrative operations with HMaster
type AdminClient interface {
CreateTable(t *hrpc.CreateTable) error
DeleteTable(t *hrpc.DeleteTable) error
EnableTable(t *hrpc.EnableTable) error
DisableTable(t *hrpc.DisableTable) error
ClusterStatus() (*pb.ClusterStatus, error)
}
// CreateTable represents a CreateTable HBase call
type CreateTable struct {
base
families map[string]map[string]string
splitKeys [][]byte
}
// NewCreateTable creates a new CreateTable request that will create the given
// table in HBase. 'families' is a map of column family name to its attributes.
// For use by the admin client.
func NewCreateTable(ctx context.Context, table []byte,
families map[string]map[string]string,
options ...func(*CreateTable)) *CreateTable {
ct := &CreateTable{
base: base{
table: table,
ctx: ctx,
resultch: make(chan RPCResult, 1),
},
families: make(map[string]map[string]string, len(families)),
}
for _, option := range options {
option(ct)
}
for family, attrs := range families {
ct.families[family] = make(map[string]string, len(defaultAttributes))
for k, dv := range defaultAttributes {
if v, ok := attrs[k]; ok {
ct.families[family][k] = v
} else {
ct.families[family][k] = dv
}
}
}
return ct
}
主要是DDL
再看 client
// Client a regular HBase client
type Client interface {
Scan(s *hrpc.Scan) hrpc.Scanner
Get(g *hrpc.Get) (*hrpc.Result, error)
Put(p *hrpc.Mutate) (*hrpc.Result, error)
Delete(d *hrpc.Mutate) (*hrpc.Result, error)
Append(a *hrpc.Mutate) (*hrpc.Result, error)
Increment(i *hrpc.Mutate) (int64, error)
CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
expectedValue []byte) (bool, error)
Close()
}
主要是DML相关的。
我们看一下put
吧
// NewPut creates a new Mutation request to insert the given
// family-column-values in the given row key of the given table.
func NewPut(ctx context.Context, table, key []byte,
values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
m, err := baseMutate(ctx, table, key, values, options...)
if err != nil {
return nil, err
}
m.mutationType = pb.MutationProto_PUT
return m, nil
}
// NewPutStr is just like NewPut but takes table and key as strings.
func NewPutStr(ctx context.Context, table, key string,
values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
return NewPut(ctx, []byte(table), []byte(key), values, options...)
}
其中
代码语言:javascript复制// baseMutate returns a Mutate struct without the mutationType filled in.
func baseMutate(ctx context.Context, table, key []byte, values map[string]map[string][]byte,
options ...func(Call) error) (*Mutate, error) {
m := &Mutate{
base: base{
table: table,
key: key,
ctx: ctx,
resultch: make(chan RPCResult, 1),
},
values: values,
timestamp: MaxTimestamp,
}
err := applyOptions(m, options...)
if err != nil {
return nil, err
}
return m, nil
}
// 注意
func applyOptions(call Call, options ...func(Call) error) error {
call.(withOptions).setOptions(options)
for _, option := range options {
err := option(call)
if err != nil {
return err
}
}
return nil
}
其中option的使用如下:
代码语言:javascript复制 client := gohbase.NewClient("localhost")
pFilter := filter.NewKeyOnlyFilter(true)
family := map[string][]string{"cf": []string{"a"}}
getRequest, _ := hrpc.NewGetStr(context.Background(), "table", "15",
hrpc.Families(family), hrpc.Filters(pFilter), hrpc.MaxVersions(2))
_, _ := client.Get(getRequest)
values := map[string]map[string][]byte{"cf": map[string][]byte{"a": []byte{0}}}
putRequest, err := hrpc.NewPutStr(context.Background(), "table", "key", values, hrpc.Timestamp(time.Time{}), hrpc.MaxVersions(1))
rsp, err := client.Put(putRequest)
}