Rxjava2_Flowable_Sqlite_Android数据库访问实例
一、使用Rxjava访问数据库的优点:
1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程
2.随时订阅和取消订阅,而不必再使用回调函数
3.对读取的数据用rxjava进行过滤,流式处理
4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架
(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,
同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)
二、接下来之关注实现过程:
本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)
实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法
定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)
publicinterfaceDbSource{
//Stringsql="insertintotable_task(tid,startts)values(tid,startts)";
FlowableinsertNewTask(inttid,intstartts);
//Stringsql="select*fromtable_task";
Flowable>getAllTask();
//Stringsql="select*fromtable_taskwhereendts=0";
Flowable>getRunningTask();
//Stringsql="updatetable_tasksetisuploadend=isuploadendwheretid=tid";
FlowablemarkUploadEnd(inttid,booleanisuploadend);
//Stringsql="deletefromtable_taskwheretid=tidandendts>0";
FlowabledeleteTask(inttid);
}
三、用Android原生的Sqlite实现数据库操作
publicclassSimpleDbimplementsDbSource{
privatestaticSimpleDbsqlite;
privateSqliteHelpersqliteHelper;
privateSimpleDb(Contextcontext){
this.sqliteHelper=newSqliteHelper(context);
}
publicstaticsynchronizedSimpleDbgetInstance(Contextcontext){
if(sqlite==null)
sqlite=newSimpleDb(context);
returnsqlite;
}
FlowableinsertNewTask(inttid,intstartts){
returnFlowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(FlowableEmittere)throwsException{
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
ContentValuesvalues=newContentValues();
values.put(“tid”,1);
values.put(“startts”,13233);
if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME,null,values)!=-1)
e.onNext(true);
else
e.onNext(false);
e.onComplete();
}
},BackpressureStrategy.BUFFER);
}
Flowable>getAllTask(){
returnFlowable.create(newFlowableOnSubscribe>(){
@Override
publicvoidsubscribe(FlowableEmitter>e)throwsException{
ListtaskList=newArrayList<>();
StringBuildersql=newStringBuilder(100);
sql.append("select*from");
sql.append(SqliteHelper.TABLE_NAME_TASK);
SQLiteDatabasesqLiteDatabase=sqliteHelper.getReadableDatabase();
Cursorcursor=sqLiteDatabase.rawQuery(sql.toString(),null);
if(cursor.moveToFirst()){
intcount=cursor.getCount();
for(inta=0;a>getRunningTask(){
returnFlowable.create(newFlowableOnSubscribe>(){
@Override
publicvoidsubscribe(FlowableEmitter>e)throwsException{
TaskItemitem=null;
StringBuildersql=newStringBuilder(100);
sql.append("select*from");
sql.append(SqliteHelper.TABLE_NAME_TASK);
sql.append("whereendts=0limit1");
SQLiteDatabasesqLiteDatabase=sqliteHelper.getReadableDatabase();
Cursorcursor=sqLiteDatabase.rawQuery(sql.toString(),null);
if(cursor.moveToFirst()){
intcount=cursor.getCount();
if(count==1){
item=newTaskItem();
item.setId(cursor.getInt(0));
item.setTid(cursor.getInt(1));
item.setStartts(cursor.getInt(2));
item.setEndts(cursor.getInt(3));
}
}
cursor.close();
sqLiteDatabase.close();
e.onNext(Optional.fromNullable(item));//importcom.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好
e.onComplete();
}
},BackpressureStrategy.BUFFER);
}
FlowablemarkUploadEnd(inttid,booleanisuploadend){
returnFlowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(FlowableEmittere)throwsException{
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
//数据库操作代码
e.onNext(false);//返回结果
e.onComplete();//返回结束
}
},BackpressureStrategy.BUFFER);
}
FlowabledeleteTask(inttid){
returnFlowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(FlowableEmittere)throwsException{
//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法
//数据库操作代码
e.onNext(false);//返回结果
e.onComplete();//返回结束
}
},BackpressureStrategy.BUFFER);
}
}
四、同一个接口使用sqlbrite的实现方式
publicclassBriteDbimplementsDbSource{
@NonNull
protectedfinalBriteDatabasemDatabaseHelper;
@NonNull
privateFunctionmTaskMapperFunction;
@NonNull
privateFunctionmPoiMapperFunction;
@NonNull
privateFunctionmInterestPoiMapperFunction;
//Preventdirectinstantiation.
privateBriteDb(@NonNullContextcontext){
DbHelperdbHelper=newDbHelper(context);
SqlBritesqlBrite=newSqlBrite.Builder().build();
mDatabaseHelper=sqlBrite.wrapDatabaseHelper(dbHelper,Schedulers.io();
mTaskMapperFunction=this::getTask;
mPoiMapperFunction=this::getPoi;
mInterestPoiMapperFunction=this::getInterestPoi;
}
@Nullable
privatestaticBriteDbINSTANCE;
publicstaticBriteDbgetInstance(@NonNullContextcontext){
if(INSTANCE==null){
INSTANCE=newBriteDb(context);
}
returnINSTANCE;
}
@NonNull
privateTaskItemgetTask(@NonNullCursorc){
TaskItemitem=newTaskItem();
item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID)));
item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID)));
item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS)));
item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS)));
returnitem;
}
@Override
publicvoidinsertNewTask(inttid,intstartts){
ContentValuesvalues=newContentValues();
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID,tid);
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS,startts);
mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK,values,SQLiteDatabase.CONFLICT_REPLACE);
}
@Override
publicFlowable>getAllTask(){
Stringsql=String.format("SELECT*FROM%s",PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串
returnmDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK,sql)
.mapToList(mTaskMapperFunction)
.toFlowable(BackpressureStrategy.BUFFER);
}
@Override
publicFlowable>getRunningTask(){
Stringsql=String.format("SELECT*FROM%sWHERE%s=?limit1",
PersistenceContract.TaskEntry.TABLE_NAME_TASK,PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS);
returnmDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK,sql,"0")
.mapToOne(cursor->Optional.fromNullable(mTaskMapperFunction.apply(cursor)))
.toFlowable(BackpressureStrategy.BUFFER);
}
@Override
publicFlowablemarkUploadEnd(inttid,booleanisuploadend){
returnFlowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(FlowableEmittere)throwsException{
ContentValuesvalues=newContentValues();
if(isuploadend){
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND,1);
}else{
values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND,0);
}
Stringselection=PersistenceContract.TaskEntry.COLUMN_TASK_TID+"=?";
//String[]selectionArgs={String.valueOf(tid)};
StringselectionArgs=String.valueOf(tid);
intres=mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK,values,selection,selectionArgs);
if(res>0){
e.onNext(true);//返回结果
}else{
e.onNext(false);//返回结果
}
e.onComplete();//返回结束
}
},BackpressureStrategy.BUFFER);
}
@Override
publicFlowabledeleteTask(inttid){
returnFlowable.create(newFlowableOnSubscribe(){
@Override
publicvoidsubscribe(FlowableEmittere)throwsException{
Stringselection=PersistenceContract.TaskEntry.COLUMN_TASK_TID+"=?AND"+
PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS+">0";
String[]selectionArgs=newString[1];
selectionArgs[0]=String.valueOf(tid);
intres=mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK,selection,selectionArgs);
if(res>0){
e.onNext(true);//返回结果
}else{
e.onNext(false);//返回结果
}
e.onComplete();//返回结束
}
},BackpressureStrategy.BUFFER);
}
}
五、数据库调用使用方法
使用了lambda简化了表达式进一步简化代码:
简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)
compileOptions{
sourceCompatibilityJavaVersion.VERSION_1_8
targetCompatibilityJavaVersion.VERSION_1_8
}
接口调用(获得数据库实例):
//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可
publicclassInjection{
publicstaticDbSourcegetDbSource(Contextcontext){
//chooseoneofthem
//returnBriteDb.getInstance(context);
returnSimpleDb.getInstance(context);
}
}
DbSourcedb=Injection.getInstance(mContext);
disposable1=db.getAllTask()
.flatMap(Flowable::fromIterable)
.filter(task->{//自定义过滤
if(!task.getIsuploadend()){
returntrue;
}else{
returnfalse;
}
})
.subscribe(taskItems->//这里是使用了lambda简化了表达式
doTaskProcess(taskItems)
,throwable->{
throwable.printStackTrace();
},//onCompleted
()->{
if(disposable1!=null&&!disposable1.isDisposed()){
disposable1.dispose();
}
});
disposable1=db.getRunningTask()
.filter(Optional::isPresent)//判断是否为空,为空的就跳过
.map(Optional::get)//获取到真的参数
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(taskItem->{//onNext()
//hasrunningtask
mTid=taskItem.getTid();
},throwable->throwable.printStackTrace()//onError()
,()->disposable1.dispose());//onComplete()
disposable1=db.markUploadEnd(tid,isuploadend)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(status->{//onNext()
if(status){
//dosomething
}
},throwable->throwable.printStackTrace()//onError()
,()->disposable1.dispose());//onComplete()
disposable1=db.deleteTask(tid)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(status->{//onNext()
if(status){
//dosomething
}
},throwable->throwable.printStackTrace()//onError()
,()->disposable1.dispose());//onComplete()
以上这篇Rxjava2_Flowable_Sqlite_Android数据库访问实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。