Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testCollectionWithDocs;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.testFirestore;
import static com.google.firebase.firestore.testutil.IntegrationTestUtil.waitFor;
import static com.google.firebase.firestore.testutil.TestUtil.expectError;
import static com.google.firebase.firestore.testutil.TestUtil.map;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.firebase.firestore.core.AsyncEventListener;
import com.google.firebase.firestore.core.EventManager.ListenOptions;
import com.google.firebase.firestore.core.QueryListener;
import com.google.firebase.firestore.core.QueryOrPipeline;
import com.google.firebase.firestore.core.UserData.ParsedSetData;
import com.google.firebase.firestore.core.UserData.ParsedUpdateData;
import com.google.firebase.firestore.core.ViewSnapshot;
Expand Down Expand Up @@ -542,7 +543,8 @@ private ListenerRegistration addSnapshotListenerInternal(

return firestore.callClient(
client -> {
QueryListener queryListener = client.listen(query, options, asyncListener);
QueryListener queryListener =
client.listen(new QueryOrPipeline.QueryWrapper(query), options, asyncListener);
return ActivityScope.bind(
activity,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.google.firebase.firestore.core.Canonicalizable
import com.google.firebase.firestore.model.Document
import com.google.firebase.firestore.model.DocumentKey
import com.google.firebase.firestore.model.MutableDocument
import com.google.firebase.firestore.model.ResourcePath
import com.google.firebase.firestore.model.Values
import com.google.firebase.firestore.pipeline.AddFieldsStage
import com.google.firebase.firestore.pipeline.AggregateFunction
Expand All @@ -43,7 +44,6 @@ import com.google.firebase.firestore.pipeline.InternalOptions
import com.google.firebase.firestore.pipeline.LimitStage
import com.google.firebase.firestore.pipeline.OffsetStage
import com.google.firebase.firestore.pipeline.Ordering
import com.google.firebase.firestore.pipeline.PipelineOptions
import com.google.firebase.firestore.pipeline.RawStage
import com.google.firebase.firestore.pipeline.RemoveFieldsStage
import com.google.firebase.firestore.pipeline.ReplaceStage
Expand All @@ -55,17 +55,28 @@ import com.google.firebase.firestore.pipeline.Stage
import com.google.firebase.firestore.pipeline.UnionStage
import com.google.firebase.firestore.pipeline.UnnestStage
import com.google.firebase.firestore.pipeline.WhereStage
import com.google.firebase.firestore.remote.RemoteSerializer
import com.google.firebase.firestore.util.Assert.fail
import com.google.firestore.v1.ExecutePipelineRequest
import com.google.firestore.v1.StructuredPipeline
import com.google.firestore.v1.Value

open class AbstractPipeline
class Pipeline
internal constructor(
internal val firestore: FirebaseFirestore,
internal val userDataReader: UserDataReader,
internal val stages: List<Stage<*>>
private val firestore: FirebaseFirestore,
private val userDataReader: UserDataReader,
private val stages: List<Stage<*>>
) {
internal constructor(
firestore: FirebaseFirestore,
userDataReader: UserDataReader,
stage: Stage<*>
) : this(firestore, userDataReader, listOf(stage))

private fun append(stage: Stage<*>): Pipeline {
return Pipeline(firestore, userDataReader, stages.plus(stage))
}

private fun toStructuredPipelineProto(options: InternalOptions?): StructuredPipeline {
val builder = StructuredPipeline.newBuilder()
builder.pipeline = toPipelineProto()
Expand All @@ -79,17 +90,17 @@ internal constructor(
.build()

private fun toExecutePipelineRequest(options: InternalOptions?): ExecutePipelineRequest {
val database = firestore.databaseId
val database = firestore!!.databaseId
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The firestore property is non-nullable in the Pipeline class's primary constructor. The use of the non-null asserted (!!) operator here is redundant and can be removed for clarity.

Suggested change
val database = firestore!!.databaseId
val database = firestore.databaseId

val builder = ExecutePipelineRequest.newBuilder()
builder.database = "projects/${database.projectId}/databases/${database.databaseId}"
builder.structuredPipeline = toStructuredPipelineProto(options)
return builder.build()
}

protected fun execute(options: InternalOptions?): Task<PipelineSnapshot> {
fun execute(options: InternalOptions?): Task<PipelineSnapshot> {
val request = toExecutePipelineRequest(options)
val observerTask = ObserverSnapshotTask()
firestore.callClient { call -> call!!.executePipeline(request, observerTask) }
firestore?.callClient { call -> call!!.executePipeline(request, observerTask) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The firestore property is non-nullable, so the safe call (?.) is redundant here and can be removed.

Suggested change
firestore?.callClient { call -> call!!.executePipeline(request, observerTask) }
firestore.callClient { call -> call!!.executePipeline(request, observerTask) }

return observerTask.task
}

Expand All @@ -106,7 +117,7 @@ internal constructor(
) {
results.add(
PipelineResult(
firestore,
firestore!!,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The firestore property is non-nullable, so the non-null asserted call (!!) is redundant here and can be removed.

Suggested change
firestore!!,
firestore,

userDataWriter,
if (key == null) null else DocumentReference(key, firestore),
data,
Expand All @@ -127,28 +138,9 @@ internal constructor(
val task: Task<PipelineSnapshot>
get() = taskCompletionSource.task
}
}

class Pipeline
private constructor(
firestore: FirebaseFirestore,
userDataReader: UserDataReader,
stages: List<Stage<*>>
) : AbstractPipeline(firestore, userDataReader, stages) {
internal constructor(
firestore: FirebaseFirestore,
userDataReader: UserDataReader,
stage: Stage<*>
) : this(firestore, userDataReader, listOf(stage))

private fun append(stage: Stage<*>): Pipeline {
return Pipeline(firestore, userDataReader, stages.plus(stage))
}

fun execute(): Task<PipelineSnapshot> = execute(null)

fun execute(options: PipelineOptions): Task<PipelineSnapshot> = execute(options.options)

internal fun documentReference(key: DocumentKey): DocumentReference {
return DocumentReference(key, firestore)
}
Expand Down Expand Up @@ -627,7 +619,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
* @param path A path to a collection that will be the source of this pipeline.
* @return A new [Pipeline] object with documents from target collection.
*/
fun collection(path: String): Pipeline = collection(CollectionSource.of(path))
fun collection(path: String): Pipeline = collection(firestore.collection(path))

/**
* Set the pipeline's source to the collection specified by the given [CollectionReference].
Expand All @@ -637,7 +629,8 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
* database than the pipeline.
*/
fun collection(ref: CollectionReference): Pipeline = collection(CollectionSource.of(ref))
fun collection(ref: CollectionReference): Pipeline =
collection(CollectionSource.of(ref, firestore.databaseId))

/**
* Set the pipeline's source to the collection specified by CollectionSource.
Expand All @@ -648,7 +641,7 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
* or database than the pipeline.
*/
fun collection(stage: CollectionSource): Pipeline {
if (stage.firestore != null && stage.firestore.databaseId != firestore.databaseId) {
if (stage.serializer.databaseId() != firestore.databaseId) {
throw IllegalArgumentException("Provided collection is from a different Firestore instance.")
}
return Pipeline(firestore, firestore.userDataReader, stage)
Expand All @@ -661,9 +654,9 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
* @return A new [Pipeline] object with documents from target collection group.
*/
fun collectionGroup(collectionId: String): Pipeline =
pipeline(CollectionGroupSource.of((collectionId)))
collectionGroup(CollectionGroupSource.of((collectionId)))

fun pipeline(stage: CollectionGroupSource): Pipeline =
internal fun collectionGroup(stage: CollectionGroupSource): Pipeline =
Pipeline(firestore, firestore.userDataReader, stage)

/**
Expand Down Expand Up @@ -706,20 +699,34 @@ class PipelineSource internal constructor(private val firestore: FirebaseFiresto
return Pipeline(
firestore,
firestore.userDataReader,
DocumentsSource(documents.map { docRef -> "/" + docRef.path }.toTypedArray())
DocumentsSource(documents.map { ResourcePath.fromString(it.path) }.toTypedArray())
)
}
}

class RealtimePipelineSource internal constructor(private val firestore: FirebaseFirestore) {
/**
* Convert the given Query into an equivalent Pipeline.
*
* @param query A Query to be converted into a Pipeline.
* @return A new [Pipeline] object that is equivalent to [query]
* @throws [IllegalArgumentException] Thrown if the [query] provided targets a different project
* or database than the pipeline.
*/
fun convertFrom(query: Query): RealtimePipeline {
if (query.firestore.databaseId != firestore.databaseId) {
throw IllegalArgumentException("Provided query is from a different Firestore instance.")
}
return query.query.toRealtimePipeline(firestore, firestore.userDataReader)
}

/**
* Set the pipeline's source to the collection specified by the given path.
*
* @param path A path to a collection that will be the source of this pipeline.
* @return A new [RealtimePipeline] object with documents from target collection.
*/
fun collection(path: String): RealtimePipeline = collection(CollectionSource.of(path))
fun collection(path: String): RealtimePipeline = collection(firestore.collection(path))

/**
* Set the pipeline's source to the collection specified by the given [CollectionReference].
Expand All @@ -729,7 +736,8 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
* @throws [IllegalArgumentException] Thrown if the [ref] provided targets a different project or
* database than the pipeline.
*/
fun collection(ref: CollectionReference): RealtimePipeline = collection(CollectionSource.of(ref))
fun collection(ref: CollectionReference): RealtimePipeline =
collection(CollectionSource.of(ref, firestore.databaseId))

/**
* Set the pipeline's source to the collection specified by CollectionSource.
Expand All @@ -740,10 +748,10 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
* or database than the pipeline.
*/
fun collection(stage: CollectionSource): RealtimePipeline {
if (stage.firestore != null && stage.firestore.databaseId != firestore.databaseId) {
if (stage.serializer.databaseId() != firestore.databaseId) {
throw IllegalArgumentException("Provided collection is from a different Firestore instance.")
}
return RealtimePipeline(firestore, firestore.userDataReader, stage)
return RealtimePipeline(RemoteSerializer(firestore.databaseId), firestore.userDataReader, stage)
}

/**
Expand All @@ -753,26 +761,26 @@ class RealtimePipelineSource internal constructor(private val firestore: Firebas
* @return A new [RealtimePipeline] object with documents from target collection group.
*/
fun collectionGroup(collectionId: String): RealtimePipeline =
pipeline(CollectionGroupSource.of((collectionId)))
collectionGroup(CollectionGroupSource.of((collectionId)))

fun pipeline(stage: CollectionGroupSource): RealtimePipeline =
RealtimePipeline(firestore, firestore.userDataReader, stage)
fun collectionGroup(stage: CollectionGroupSource): RealtimePipeline =
RealtimePipeline(RemoteSerializer(firestore.databaseId), firestore.userDataReader, stage)
}

class RealtimePipeline
internal constructor(
firestore: FirebaseFirestore,
userDataReader: UserDataReader,
stages: List<Stage<*>>
) : AbstractPipeline(firestore, userDataReader, stages), Canonicalizable {
internal val serializer: RemoteSerializer,
internal val userDataReader: UserDataReader,
internal val stages: List<Stage<*>>
) : Canonicalizable {
internal constructor(
firestore: FirebaseFirestore,
serializer: RemoteSerializer,
userDataReader: UserDataReader,
stage: Stage<*>
) : this(firestore, userDataReader, listOf(stage))
) : this(serializer, userDataReader, listOf(stage))

private fun with(stages: List<Stage<*>>): RealtimePipeline =
RealtimePipeline(firestore, userDataReader, stages)
RealtimePipeline(serializer, userDataReader, stages)

private fun append(stage: Stage<*>): RealtimePipeline = with(stages.plus(stage))

Expand Down Expand Up @@ -820,14 +828,17 @@ internal constructor(
return rewrittenStages.joinToString("|") { stage -> (stage as Canonicalizable).canonicalId() }
}

override fun toString(): String = canonicalId()

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is RealtimePipeline) return false
return stages == other.stages
if (serializer.databaseId() != other.serializer.databaseId()) return false
return rewrittenStages == other.rewrittenStages
}

override fun hashCode(): Int {
return stages.hashCode()
return serializer.databaseId().hashCode() * 31 + stages.hashCode()
}

internal fun evaluate(inputs: List<MutableDocument>): List<MutableDocument> {
Expand Down Expand Up @@ -883,6 +894,15 @@ internal constructor(
internal fun comparator(): Comparator<Document> =
getLastEffectiveSortStage().comparator(evaluateContext())

internal fun toStructurePipelineProto(): StructuredPipeline {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There seems to be a typo in the method name. It should probably be toStructuredPipelineProto to match the class it returns (StructuredPipeline) and for consistency with other similar methods.

Suggested change
internal fun toStructurePipelineProto(): StructuredPipeline {
internal fun toStructuredPipelineProto(): StructuredPipeline {

val builder = StructuredPipeline.newBuilder()
builder.pipeline =
com.google.firestore.v1.Pipeline.newBuilder()
.addAllStages(rewrittenStages.map { it.toProtoStage(userDataReader) })
.build()
return builder.build()
}

private fun getLastEffectiveSortStage(): SortStage {
for (stage in rewrittenStages.asReversed()) {
if (stage is SortStage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.firebase.firestore.core.FieldFilter.Operator;
import com.google.firebase.firestore.core.OrderBy;
import com.google.firebase.firestore.core.QueryListener;
import com.google.firebase.firestore.core.QueryOrPipeline;
import com.google.firebase.firestore.core.ViewSnapshot;
import com.google.firebase.firestore.model.Document;
import com.google.firebase.firestore.model.DocumentKey;
Expand Down Expand Up @@ -964,7 +965,8 @@ public Task<QuerySnapshot> get(@NonNull Source source) {
validateHasExplicitOrderByForLimitToLast();
if (source == Source.CACHE) {
return firestore
.callClient(client -> client.getDocumentsFromLocalCache(query))
.callClient(
client -> client.getDocumentsFromLocalCache(new QueryOrPipeline.QueryWrapper(query)))
.continueWith(
Executors.DIRECT_EXECUTOR,
(Task<ViewSnapshot> viewSnap) ->
Expand Down Expand Up @@ -1182,7 +1184,8 @@ private ListenerRegistration addSnapshotListenerInternal(

return firestore.callClient(
client -> {
QueryListener queryListener = client.listen(query, options, asyncListener);
QueryListener queryListener =
client.listen(new QueryOrPipeline.QueryWrapper(query), options, asyncListener);
return ActivityScope.bind(
activity,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ public UserDataReader(DatabaseId databaseId) {
this.databaseId = databaseId;
}

public DatabaseId getDatabaseId() {
return databaseId;
}

/**
* Parse document data from a non-merge {@code set()} call.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static class ListenOptions {

private final SyncEngine syncEngine;

private final Map<Query, QueryListenersInfo> queries;
private final Map<QueryOrPipeline, QueryListenersInfo> queries;

private final Set<EventListener<Void>> snapshotsInSyncListeners = new HashSet<>();

Expand Down Expand Up @@ -105,7 +105,7 @@ private enum ListenerRemovalAction {
* @return the targetId of the listen call in the SyncEngine.
*/
public int addQueryListener(QueryListener queryListener) {
Query query = queryListener.getQuery();
QueryOrPipeline query = queryListener.getQuery();
ListenerSetupAction listenerAction = ListenerSetupAction.NO_ACTION_REQUIRED;

QueryListenersInfo queryInfo = queries.get(query);
Expand Down Expand Up @@ -163,7 +163,7 @@ public int addQueryListener(QueryListener queryListener) {

/** Removes a previously added listener. It's a no-op if the listener is not found. */
public void removeQueryListener(QueryListener listener) {
Query query = listener.getQuery();
QueryOrPipeline query = listener.getQuery();
QueryListenersInfo queryInfo = queries.get(query);
ListenerRemovalAction listenerAction = ListenerRemovalAction.NO_ACTION_REQUIRED;
if (queryInfo == null) return;
Expand Down Expand Up @@ -223,7 +223,7 @@ private void raiseSnapshotsInSyncEvent() {
public void onViewSnapshots(List<ViewSnapshot> snapshotList) {
boolean raisedEvent = false;
for (ViewSnapshot viewSnapshot : snapshotList) {
Query query = viewSnapshot.getQuery();
QueryOrPipeline query = viewSnapshot.getQuery();
QueryListenersInfo info = queries.get(query);
if (info != null) {
for (QueryListener listener : info.listeners) {
Expand All @@ -240,7 +240,7 @@ public void onViewSnapshots(List<ViewSnapshot> snapshotList) {
}

@Override
public void onError(Query query, Status error) {
public void onError(QueryOrPipeline query, Status error) {
QueryListenersInfo info = queries.get(query);
if (info != null) {
for (QueryListener listener : info.listeners) {
Expand Down
Loading
Loading