一、使用Rxjava访问数据库的优点:
1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程
2.随时订阅和取消订阅,而不必再使用回调函数
3.对读取的数据用rxjava进行过滤,流式处理
4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架
(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,
同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)
二、接下来之关注实现过程:
本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)
实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法
定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)
代码语言:javascript复制public interface DbSource {
//String sql = "insert into table_task (tid,startts) values(tid,startts)";
Flowable<Boolean insertNewTask(int tid, int startts);
//String sql = "select * from table_task";
Flowable<List<TaskItem getAllTask();
//String sql = "select * from table_task where endts = 0";
Flowable<Optional<TaskItem getRunningTask();
//String sql = "update table_task set isuploadend=isuploadend where tid=tid";
Flowable<Boolean markUploadEnd(int tid, boolean isuploadend);
//String sql = "delete from table_task where tid=tid and endts 0";
Flowable<Boolean deleteTask(int tid);
}
三、用Android原生的Sqlite实现数据库操作
代码语言:javascript复制public class SimpleDb implements DbSource {
private static SimpleDb sqlite;
private SqliteHelper sqliteHelper;
private SimpleDb(Context context) {
this.sqliteHelper = new SqliteHelper(context);
}
public static synchronized SimpleDb getInstance(Context context) {
if (sqlite == null )
sqlite = new SimpleDb(context);
return sqlite;
}
Flowable<Boolean insertNewTask(int tid, int startts) {
return Flowable.create(new FlowableOnSubscribe<Boolean () {
@Override
public void subscribe(FlowableEmitter<Boolean e) throws Exception {
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
ContentValues values = new ContentValues();
values.put(“tid”, 1);
values.put(“startts”,13233);
if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME, null, values) != -1)
e.onNext(true);
else
e.onNext(false);
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
}
Flowable<List<TaskItem getAllTask() {
return Flowable.create(new FlowableOnSubscribe<List<TaskItem () {
@Override
public void subscribe(FlowableEmitter<List<TaskItem e) throws Exception {
List<TaskItem taskList = new ArrayList< ();
StringBuilder sql = new StringBuilder(100);
sql.append("select * from ");
sql.append(SqliteHelper.TABLE_NAME_TASK);
SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();
Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);
if (cursor.moveToFirst()) {
int count = cursor.getCount();
for (int a = 0; a < count; a ) {
TaskItem item = new TaskItem();
item.setTid(cursor.getInt(1));
item.setStartts(cursor.getInt(2));
item.setEndts(cursor.getInt(3));
taskList.add(item);
cursor.move(1);
}
}
cursor.close();
sqLiteDatabase.close();
e.onNext(taskList);
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
}
Flowable<Optional<TaskItem getRunningTask() {
return Flowable.create(new FlowableOnSubscribe<Optional<TaskItem () {
@Override
public void subscribe(FlowableEmitter<Optional<TaskItem e) throws Exception {
TaskItem item = null;
StringBuilder sql = new StringBuilder(100);
sql.append("select * from ");
sql.append(SqliteHelper.TABLE_NAME_TASK);
sql.append(" where endts=0 limit 1");
SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();
Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);
if (cursor.moveToFirst()) {
int count = cursor.getCount();
if (count == 1) {
item = new TaskItem();
item.setId(cursor.getInt(0));
item.setTid(cursor.getInt(1));
item.setStartts(cursor.getInt(2));
item.setEndts(cursor.getInt(3));
}
}
cursor.close();
sqLiteDatabase.close();
e.onNext(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好
e.onComplete();
}
}, BackpressureStrategy.BUFFER);
}
Flowable<Boolean markUploadEnd(int tid, boolean isuploadend) {
return Flowable.create(new FlowableOnSubscribe<Boolean () {
@Override
public void subscribe(FlowableEmitter<Boolean e) throws Exception {
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
//数据库操作代码
e.onNext(false);//返回结果
e.onComplete();//返回结束
}
}, BackpressureStrategy.BUFFER);
}
Flowable<Boolean deleteTask(int tid) {
return Flowable.create(new FlowableOnSubscribe<Boolean () {
@Override
public void subscribe(FlowableEmitter<Boolean e) throws Exception {
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
//数据库操作代码
e.onNext(false);//返回结果
e.onComplete();//返回结束
}
}, BackpressureStrategy.BUFFER);
}
}
四、同一个接口使用sqlbrite的实现方式
代码语言:javascript复制public class BriteDb implements DbSource {
@NonNull
protected final BriteDatabase mDatabaseHelper;
@NonNull
private Function<Cursor, TaskItem mTaskMapperFunction;
@NonNull
private Function<Cursor, PoiItem mPoiMapperFunction;
@NonNull
private Function<Cursor, InterestPoiItem mInterestPoiMapperFunction;
// Prevent direct instantiation.
private BriteDb(@NonNull Context context) {
DbHelper dbHelper = new DbHelper(context);
SqlBrite sqlBrite = new SqlBrite.Builder().build();
mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, Schedulers.io();
mTaskMapperFunction = this::getTask;
mPoiMapperFunction = this::getPoi;
mInterestPoiMapperFunction = this::getInterestPoi;
}
@Nullable
private static BriteDb INSTANCE;
public static BriteDb getInstance(@NonNull Context context) {
if (INSTANCE == null) {
INSTANCE = new BriteDb(context);
}
return INSTANCE;
}
@NonNull
private TaskItem getTask(@NonNull Cursor c) {
TaskItem item = new TaskItem();
item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID)));
item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID)));
item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS)));
item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS)));
return item;
}
@Override
public void insertNewTask(int tid, int startts) {
ContentValues values = new ContentValues();
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID, tid);
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS, startts);
mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, SQLiteDatabase.CONFLICT_REPLACE);
}
@Override
public Flowable<List<TaskItem getAllTask() {
String sql = String.format("SELECT * FROM %s", PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串
return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql)
.mapToList(mTaskMapperFunction)
.toFlowable(BackpressureStrategy.BUFFER);
}
@Override
public Flowable<Optional<TaskItem getRunningTask() {
String sql = String.format("SELECT * FROM %s WHERE %s = ? limit 1",
PersistenceContract.TaskEntry.TABLE_NAME_TASK, PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS);
return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql, "0")
.mapToOne(cursor - Optional.fromNullable(mTaskMapperFunction.apply(cursor)))
.toFlowable(BackpressureStrategy.BUFFER);
}
@Override
public Flowable<Boolean markUploadEnd(int tid, boolean isuploadend) {
return Flowable.create(new FlowableOnSubscribe<Boolean () {
@Override
public void subscribe(FlowableEmitter<Boolean e) throws Exception {
ContentValues values = new ContentValues();
if(isuploadend) {
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 1);
} else {
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 0);
}
String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID " = ?";
//String[] selectionArgs = {String.valueOf(tid)};
String selectionArgs = String.valueOf(tid);
int res = mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, selection, selectionArgs);
if (res 0) {
e.onNext(true);//返回结果
} else {
e.onNext(false);//返回结果
}
e.onComplete();//返回结束
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Flowable<Boolean deleteTask(int tid) {
return Flowable.create(new FlowableOnSubscribe<Boolean () {
@Override
public void subscribe(FlowableEmitter<Boolean e) throws Exception {
String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID " = ? AND "
PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS " 0";
String[] selectionArgs = new String[1];
selectionArgs[0] = String.valueOf(tid);
int res = mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK, selection, selectionArgs);
if (res 0) {
e.onNext(true);//返回结果
} else {
e.onNext(false);//返回结果
}
e.onComplete();//返回结束
}
}, BackpressureStrategy.BUFFER);
}
}
五、数据库调用使用方法
使用了lambda简化了表达式进一步简化代码:
简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)
代码语言:javascript复制compileOptions {
sourceCompatibility JavaVersion.VERSION_1_8
targetCompatibility JavaVersion.VERSION_1_8
}
接口调用(获得数据库实例):
代码语言:javascript复制//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可
public class Injection {
public static DbSource getDbSource(Context context) {
//choose one of them
//return BriteDb.getInstance(context);
return SimpleDb.getInstance(context);
}
}
DbSource db = Injection.getInstance(mContext);
disposable1 = db.getAllTask()
.flatMap(Flowable::fromIterable)
.filter(task - { //自定义过滤
if (!task.getIsuploadend()) {
return true;
} else {
return false;
}
})
.subscribe(taskItems - //这里是使用了lambda简化了表达式
doTaskProcess(taskItems)
, throwable - {
throwable.printStackTrace();
},// onCompleted
() - {
if (disposable1 != null && !disposable1.isDisposed()) {
disposable1.dispose();
}
});
disposable1 = db.getRunningTask()
.filter(Optional::isPresent) //判断是否为空,为空的就跳过
.map(Optional::get) //获取到真的参数
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(taskItem - { //onNext()
//has running task
mTid = taskItem.getTid();
}, throwable - throwable.printStackTrace() //onError()
, () - disposable1.dispose()); //onComplete()
disposable1 = db.markUploadEnd(tid, isuploadend)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(status - { //onNext()
if (status) {
//dosomething
}
}, throwable - throwable.printStackTrace() //onError()
, () - disposable1.dispose()); //onComplete()
disposable1 = db.deleteTask(tid)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(status - { //onNext()
if (status) {
//dosomething
}
}, throwable - throwable.printStackTrace() //onError()
, () - disposable1.dispose()); //onComplete()
以上这篇Rxjava2_Flowable_Sqlite_Android数据库访问实例就是小编分享给大家的全部内容了,希望能给大家一个参考。