-
Notifications
You must be signed in to change notification settings - Fork 643
[realppl 3] Replace query and target with QueryOrPipeline and TargetOrPipeline #7351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: wuandy/RealPpl_2
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,6 +21,7 @@ import com.google.firebase.firestore.core.Canonicalizable | |||||
import com.google.firebase.firestore.model.Document | ||||||
import com.google.firebase.firestore.model.DocumentKey | ||||||
import com.google.firebase.firestore.model.MutableDocument | ||||||
import com.google.firebase.firestore.model.ResourcePath | ||||||
import com.google.firebase.firestore.model.Values | ||||||
import com.google.firebase.firestore.pipeline.AddFieldsStage | ||||||
import com.google.firebase.firestore.pipeline.AggregateFunction | ||||||
|
@@ -43,7 +44,6 @@ import com.google.firebase.firestore.pipeline.InternalOptions | |||||
import com.google.firebase.firestore.pipeline.LimitStage | ||||||
import com.google.firebase.firestore.pipeline.OffsetStage | ||||||
import com.google.firebase.firestore.pipeline.Ordering | ||||||
import com.google.firebase.firestore.pipeline.PipelineOptions | ||||||
import com.google.firebase.firestore.pipeline.RawStage | ||||||
import com.google.firebase.firestore.pipeline.RemoveFieldsStage | ||||||
import com.google.firebase.firestore.pipeline.ReplaceStage | ||||||
|
@@ -55,17 +55,28 @@ import com.google.firebase.firestore.pipeline.Stage | |||||
import com.google.firebase.firestore.pipeline.UnionStage | ||||||
import com.google.firebase.firestore.pipeline.UnnestStage | ||||||
import com.google.firebase.firestore.pipeline.WhereStage | ||||||
import com.google.firebase.firestore.remote.RemoteSerializer | ||||||
import com.google.firebase.firestore.util.Assert.fail | ||||||
import com.google.firestore.v1.ExecutePipelineRequest | ||||||
import com.google.firestore.v1.StructuredPipeline | ||||||
import com.google.firestore.v1.Value | ||||||
|
||||||
open class AbstractPipeline | ||||||
class Pipeline | ||||||
internal constructor( | ||||||
internal val firestore: FirebaseFirestore, | ||||||
internal val userDataReader: UserDataReader, | ||||||
internal val stages: List<Stage<*>> | ||||||
private val firestore: FirebaseFirestore, | ||||||
private val userDataReader: UserDataReader, | ||||||
private val stages: List<Stage<*>> | ||||||
) { | ||||||
internal constructor( | ||||||
firestore: FirebaseFirestore, | ||||||
userDataReader: UserDataReader, | ||||||
stage: Stage<*> | ||||||
) : this(firestore, userDataReader, listOf(stage)) | ||||||
|
||||||
private fun append(stage: Stage<*>): Pipeline { | ||||||
return Pipeline(firestore, userDataReader, stages.plus(stage)) | ||||||
} | ||||||
|
||||||
private fun toStructuredPipelineProto(options: InternalOptions?): StructuredPipeline { | ||||||
val builder = StructuredPipeline.newBuilder() | ||||||
builder.pipeline = toPipelineProto() | ||||||
|
@@ -79,17 +90,17 @@ internal constructor( | |||||
.build() | ||||||
|
||||||
private fun toExecutePipelineRequest(options: InternalOptions?): ExecutePipelineRequest { | ||||||
val database = firestore.databaseId | ||||||
val database = firestore!!.databaseId | ||||||
val builder = ExecutePipelineRequest.newBuilder() | ||||||
builder.database = "projects/${database.projectId}/databases/${database.databaseId}" | ||||||
builder.structuredPipeline = toStructuredPipelineProto(options) | ||||||
return builder.build() | ||||||
} | ||||||
|
||||||
protected fun execute(options: InternalOptions?): Task<PipelineSnapshot> { | ||||||
fun execute(options: InternalOptions?): Task<PipelineSnapshot> { | ||||||
val request = toExecutePipelineRequest(options) | ||||||
val observerTask = ObserverSnapshotTask() | ||||||
firestore.callClient { call -> call!!.executePipeline(request, observerTask) } | ||||||
firestore?.callClient { call -> call!!.executePipeline(request, observerTask) } | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
return observerTask.task | ||||||
} | ||||||
|
||||||
|
@@ -106,7 +117,7 @@ internal constructor( | |||||
) { | ||||||
results.add( | ||||||
PipelineResult( | ||||||
firestore, | ||||||
firestore!!, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
userDataWriter, | ||||||
if (key == null) null else DocumentReference(key, firestore), | ||||||
data, | ||||||
|
@@ -127,28 +138,9 @@ internal constructor( | |||||
val task: Task<PipelineSnapshot> | ||||||
get() = taskCompletionSource.task | ||||||
} | ||||||
} | ||||||
|
||||||
class Pipeline | ||||||
private constructor( | ||||||
firestore: FirebaseFirestore, | ||||||
userDataReader: UserDataReader, | ||||||
stages: List<Stage<*>> | ||||||
) : AbstractPipeline(firestore, userDataReader, stages) { | ||||||
internal constructor( | ||||||
firestore: FirebaseFirestore, | ||||||
userDataReader: UserDataReader, | ||||||
stage: Stage<*> | ||||||
) : this(firestore, userDataReader, listOf(stage)) | ||||||
|
||||||
private fun append(stage: Stage<*>): Pipeline { | ||||||
return Pipeline(firestore, userDataReader, stages.plus(stage)) | ||||||
} | ||||||
|
||||||
fun execute(): Task<PipelineSnapshot> = execute(null) | ||||||
|
||||||
fun execute(options: PipelineOptions): Task<PipelineSnapshot> = execute(options.options) | ||||||
|
||||||
internal fun documentReference(key: DocumentKey): DocumentReference { | ||||||
return DocumentReference(key, firestore) | ||||||
} | ||||||
|
@@ -627,7 +619,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto | |||||
* @param path A path to a collection that will be the source of this pipeline. | ||||||
* @return A new [Pipeline] object with documents from target collection. | ||||||
*/ | ||||||
fun collection(path: String): Pipeline = collection(CollectionSource.of(path)) | ||||||
fun collection(path: String): Pipeline = collection(firestore.collection(path)) | ||||||
|
||||||
/** | ||||||
* Set the pipeline's source to the collection specified by the given [CollectionReference]. | ||||||
|
@@ -637,7 +629,8 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto | |||||
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or | ||||||
* database than the pipeline. | ||||||
*/ | ||||||
fun collection(ref: CollectionReference): Pipeline = collection(CollectionSource.of(ref)) | ||||||
fun collection(ref: CollectionReference): Pipeline = | ||||||
collection(CollectionSource.of(ref, firestore.databaseId)) | ||||||
|
||||||
/** | ||||||
* Set the pipeline's source to the collection specified by CollectionSource. | ||||||
|
@@ -648,7 +641,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto | |||||
* or database than the pipeline. | ||||||
*/ | ||||||
fun collection(stage: CollectionSource): Pipeline { | ||||||
if (stage.firestore != null && stage.firestore.databaseId != firestore.databaseId) { | ||||||
if (stage.serializer.databaseId() != firestore.databaseId) { | ||||||
throw IllegalArgumentException("Provided collection is from a different Firestore instance.") | ||||||
} | ||||||
return Pipeline(firestore, firestore.userDataReader, stage) | ||||||
|
@@ -661,9 +654,9 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto | |||||
* @return A new [Pipeline] object with documents from target collection group. | ||||||
*/ | ||||||
fun collectionGroup(collectionId: String): Pipeline = | ||||||
pipeline(CollectionGroupSource.of((collectionId))) | ||||||
collectionGroup(CollectionGroupSource.of((collectionId))) | ||||||
|
||||||
fun pipeline(stage: CollectionGroupSource): Pipeline = | ||||||
internal fun collectionGroup(stage: CollectionGroupSource): Pipeline = | ||||||
Pipeline(firestore, firestore.userDataReader, stage) | ||||||
|
||||||
/** | ||||||
|
@@ -706,20 +699,34 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto | |||||
return Pipeline( | ||||||
firestore, | ||||||
firestore.userDataReader, | ||||||
DocumentsSource(documents.map { docRef -> "/" + docRef.path }.toTypedArray()) | ||||||
DocumentsSource(documents.map { ResourcePath.fromString(it.path) }.toTypedArray()) | ||||||
) | ||||||
} | ||||||
} | ||||||
|
||||||
class RealtimePipelineSource internal constructor(private val firestore: FirebaseFirestore) { | ||||||
/** | ||||||
* Convert the given Query into an equivalent Pipeline. | ||||||
* | ||||||
* @param query A Query to be converted into a Pipeline. | ||||||
* @return A new [Pipeline] object that is equivalent to [query] | ||||||
* @throws [IllegalArgumentException] Thrown if the [query] provided targets a different project | ||||||
* or database than the pipeline. | ||||||
*/ | ||||||
fun convertFrom(query: Query): RealtimePipeline { | ||||||
if (query.firestore.databaseId != firestore.databaseId) { | ||||||
throw IllegalArgumentException("Provided query is from a different Firestore instance.") | ||||||
} | ||||||
return query.query.toRealtimePipeline(firestore, firestore.userDataReader) | ||||||
} | ||||||
|
||||||
/** | ||||||
* Set the pipeline's source to the collection specified by the given path. | ||||||
* | ||||||
* @param path A path to a collection that will be the source of this pipeline. | ||||||
* @return A new [RealtimePipeline] object with documents from target collection. | ||||||
*/ | ||||||
fun collection(path: String): RealtimePipeline = collection(CollectionSource.of(path)) | ||||||
fun collection(path: String): RealtimePipeline = collection(firestore.collection(path)) | ||||||
|
||||||
/** | ||||||
* Set the pipeline's source to the collection specified by the given [CollectionReference]. | ||||||
|
@@ -729,7 +736,8 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas | |||||
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or | ||||||
* database than the pipeline. | ||||||
*/ | ||||||
fun collection(ref: CollectionReference): RealtimePipeline = collection(CollectionSource.of(ref)) | ||||||
fun collection(ref: CollectionReference): RealtimePipeline = | ||||||
collection(CollectionSource.of(ref, firestore.databaseId)) | ||||||
|
||||||
/** | ||||||
* Set the pipeline's source to the collection specified by CollectionSource. | ||||||
|
@@ -740,10 +748,10 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas | |||||
* or database than the pipeline. | ||||||
*/ | ||||||
fun collection(stage: CollectionSource): RealtimePipeline { | ||||||
if (stage.firestore != null && stage.firestore.databaseId != firestore.databaseId) { | ||||||
if (stage.serializer.databaseId() != firestore.databaseId) { | ||||||
throw IllegalArgumentException("Provided collection is from a different Firestore instance.") | ||||||
} | ||||||
return RealtimePipeline(firestore, firestore.userDataReader, stage) | ||||||
return RealtimePipeline(RemoteSerializer(firestore.databaseId), firestore.userDataReader, stage) | ||||||
} | ||||||
|
||||||
/** | ||||||
|
@@ -753,26 +761,26 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas | |||||
* @return A new [RealtimePipeline] object with documents from target collection group. | ||||||
*/ | ||||||
fun collectionGroup(collectionId: String): RealtimePipeline = | ||||||
pipeline(CollectionGroupSource.of((collectionId))) | ||||||
collectionGroup(CollectionGroupSource.of((collectionId))) | ||||||
|
||||||
fun pipeline(stage: CollectionGroupSource): RealtimePipeline = | ||||||
RealtimePipeline(firestore, firestore.userDataReader, stage) | ||||||
fun collectionGroup(stage: CollectionGroupSource): RealtimePipeline = | ||||||
RealtimePipeline(RemoteSerializer(firestore.databaseId), firestore.userDataReader, stage) | ||||||
} | ||||||
|
||||||
class RealtimePipeline | ||||||
internal constructor( | ||||||
firestore: FirebaseFirestore, | ||||||
userDataReader: UserDataReader, | ||||||
stages: List<Stage<*>> | ||||||
) : AbstractPipeline(firestore, userDataReader, stages), Canonicalizable { | ||||||
internal val serializer: RemoteSerializer, | ||||||
internal val userDataReader: UserDataReader, | ||||||
internal val stages: List<Stage<*>> | ||||||
) : Canonicalizable { | ||||||
internal constructor( | ||||||
firestore: FirebaseFirestore, | ||||||
serializer: RemoteSerializer, | ||||||
userDataReader: UserDataReader, | ||||||
stage: Stage<*> | ||||||
) : this(firestore, userDataReader, listOf(stage)) | ||||||
) : this(serializer, userDataReader, listOf(stage)) | ||||||
|
||||||
private fun with(stages: List<Stage<*>>): RealtimePipeline = | ||||||
RealtimePipeline(firestore, userDataReader, stages) | ||||||
RealtimePipeline(serializer, userDataReader, stages) | ||||||
|
||||||
private fun append(stage: Stage<*>): RealtimePipeline = with(stages.plus(stage)) | ||||||
|
||||||
|
@@ -820,14 +828,17 @@ internal constructor( | |||||
return rewrittenStages.joinToString("|") { stage -> (stage as Canonicalizable).canonicalId() } | ||||||
} | ||||||
|
||||||
override fun toString(): String = canonicalId() | ||||||
|
||||||
override fun equals(other: Any?): Boolean { | ||||||
if (this === other) return true | ||||||
if (other !is RealtimePipeline) return false | ||||||
return stages == other.stages | ||||||
if (serializer.databaseId() != other.serializer.databaseId()) return false | ||||||
return rewrittenStages == other.rewrittenStages | ||||||
} | ||||||
|
||||||
override fun hashCode(): Int { | ||||||
return stages.hashCode() | ||||||
return serializer.databaseId().hashCode() * 31 + stages.hashCode() | ||||||
} | ||||||
|
||||||
internal fun evaluate(inputs: List<MutableDocument>): List<MutableDocument> { | ||||||
|
@@ -883,6 +894,15 @@ internal constructor( | |||||
internal fun comparator(): Comparator<Document> = | ||||||
getLastEffectiveSortStage().comparator(evaluateContext()) | ||||||
|
||||||
internal fun toStructurePipelineProto(): StructuredPipeline { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There seems to be a typo in the method name. It should probably be
Suggested change
|
||||||
val builder = StructuredPipeline.newBuilder() | ||||||
builder.pipeline = | ||||||
com.google.firestore.v1.Pipeline.newBuilder() | ||||||
.addAllStages(rewrittenStages.map { it.toProtoStage(userDataReader) }) | ||||||
.build() | ||||||
return builder.build() | ||||||
} | ||||||
|
||||||
private fun getLastEffectiveSortStage(): SortStage { | ||||||
for (stage in rewrittenStages.asReversed()) { | ||||||
if (stage is SortStage) { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
firestore
property is non-nullable in thePipeline
class's primary constructor. The use of the non-null asserted (!!
) operator here is redundant and can be removed for clarity.