要想搞明白Geotrellis的数据处理情况,首先要弄清楚数据的存放,Geotrellis将数据存放在Accumulo中。
Accumulo是一个分布式的Key Value型NOSQL数据库,官网为(https://accumulo.apache.org/),在使用Ambari安装hadoop集群一文中已经介绍了如何安装Hadoop集群以及Accumulo。
Accumulo以表来分区存放数据,结构为Key Value,其中Key又包含RowID和Column,Column又包含Family、Qualifier、Visibility。
闲话莫谈,首先介绍一下如何在accumulo shell中操作Accumulo。
1、进入accumulo shell控制台
accumulo shell -u [username]
username就是具有操作accumulo的用户
2、查看所有表
tables
3、创建表
createtable mytable
4、删除表
deletetable mytable
5、扫描表,查看数据
scan
6、插入数据 插入数据的时候要在当前表的工作域中
insert row1 colf colq value1
只要rowID family qualifier有一个不重复即可,如果重复会覆盖掉原来的value。
7、切换表
table mytable
下面介绍一下如何使用Scala语言操作Accumulo,也比较简单,先贴出全部代码
代码语言:javascript复制 1 object Main {
2
3 val token = new PasswordToken("pass")
4 val user = "root"
5 val instanceName = "hdp-accumulo-instance"
6 val zooServers = "zooserver"
7 val table = "table"
8
9 def main(args: Array[String]) {
10 // write
11 read
12 }
13
14 def read = {
15 val conn = getConn
16 val auths = Authorizations.EMPTY// new Authorizations("Valid")
17 val scanner = conn.createScanner(table, auths)
18
19 val range = new org.apache.accumulo.core.data.Range("row1", "row2") // start row --- end row 即row ID
20 scanner.setRange(range)
21 // scanner.fetchColumnFamily()
22 // println(scanner.iterator().next().getKey)
23 val iter = scanner.iterator()
24 while (iter.hasNext){
25 var item = iter.next()
26 //Accumulo中数据存放在table中,分为Key Value,其中Key又包含RowID和Column,Column包含Family Qualifier Visibility
27 println(s"key row:${item.getKey.getRow} fam:${item.getKey.getColumnFamily} qua:${item.getKey.getColumnQualifier} value:${item.getValue}")
28 }
29 // for(entry <- scanner) {
30 // println(entry.getKey " is " entry.getValue)
31 // }
32 }
33
34 def write {
35 val mutation = createMutation
36 val writer = getWriter
37 writer.addMutation(mutation)
38 // writer.flush()
39 writer.close
40 }
41
42 def createMutation = {
43 val rowID = new Text("row2")
44 val colFam = new Text("myColFam")
45 val colQual = new Text("myColQual")
46 // val colVis = new ColumnVisibility("public") //不需要加入可见性
47 var timstamp = System.currentTimeMillis
48 val value = new Value("myValue".getBytes)
49 val mutation = new Mutation(rowID)
50 mutation.put(colFam, colQual, timstamp, value)
51 mutation
52 }
53
54 def getConn = {
55 val inst = new ZooKeeperInstance(instanceName, zooServers)
56 val conn = inst.getConnector("root", token)
57 conn
58 }
59
60 def getWriter() = {
61 val conn = getConn
62 val config = new BatchWriterConfig
63 config.setMaxMemory(10000000L)
64 val writer: BatchWriter = conn.createBatchWriter(table, config)
65 writer
66 }
67 }
以上代码主要实现了Accumulo的读写操作,其中zooServers是安装的zookeeper的主节点地址。instanceName是accumulo的实例名称。read的Range实现了范围内查找,但是此处的范围需要输入的是RowID的起始值,由于Accumulo是自动排序的,所以此处输入范围会将该范围内的数据全部返回。其他代码均通俗易懂(自认为,哈哈),所以不在这里赘述。
本文简单介绍了Accumulo的操作,仅是为了方便理解Geotrellis的工作原理以及阅读Geotrellis的源代码做准备,若是有人恰好需要将数据存放到集群中,不妨可以试一下存入到Accumulo中。
参考链接 一、geotrellis使用初探 二、geotrellis使用(二)geotrellis-chatta-demo以及geotrellis框架数据读取方式初探 三、geotrellis使用(三)geotrellis数据处理过程分析 四、geotrellis使用(四)geotrellis数据处理部分细节
五、geotrellis使用(五)使用scala操作Accumulo