Skip to content

Commit

Permalink
test concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
Aetherall committed Sep 3, 2024
1 parent 4663b62 commit b894a12
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ export abstract class FirestoreEsAggregateStore<
(isFirestoreConcurrencyError || isDDDTSConcurrencyError) && hasAttempts;

if (shouldRetry) {
await new Promise((resolve) => setTimeout(resolve, 50));
const pristines = await Promise.all(
toSave.map(async ({ aggregate, changes }) => {
const pristine = await this.loadForce(aggregate.id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
process.env.FIRESTORE_EMULATOR_HOST = "localhost:8080";

// process.env.GOOGLE_APPLICATION_DEFAULT_CREDENTIALS =
// "/home/aetherall/.config/gcloud/sandbox-elies-sa.json";
import * as fb from "firebase-admin";

import {
Expand Down Expand Up @@ -33,6 +34,7 @@ describe("FirestoreEventStore", () => {
projectId: "demo-es",
});
const firestore = app.firestore();
// firestore.settings({ databaseId: "ddd-ts-us" });

const transaction = new FirestoreTransactionPerformer(firestore);
const eventStore = new FirestoreEventStore(firestore);
Expand Down Expand Up @@ -85,7 +87,7 @@ describe("FirestoreEventStore", () => {
this.balance += event.payload.amount;
}

deposit(amount: number) {
async deposit(amount: number) {
this.apply(Deposited.new({ id: this.id, amount }));
}

Expand All @@ -111,7 +113,7 @@ describe("FirestoreEventStore", () => {
),
);

it("should be fast", async () => {
it.skip("should be fast", async () => {
const accounts = [...Array(200).keys()].map(() => Account.open());

const expectMS = async (ms: number, fn: () => Promise<any>) => {
Expand All @@ -132,24 +134,30 @@ describe("FirestoreEventStore", () => {
await accountStore.load(account.id);
}
});

// pill config in room card -> rebuild room * room member on room display config change
//
});

it("should allow heavily concurrent writes", async () => {
const account = Account.open();

await accountStore.save(account);

let total = 20;
const balance = total;
await Promise.all(
[...Array(20).keys()].map(async (i) => {
[...Array(total).keys()].map(async (i) => {
const fresh = await accountStore.load(account.id);
fresh!.deposit(1);
await accountStore.save(fresh!);
await fresh!.deposit(1);
await accountStore.save(fresh!, undefined, total);
console.log(total--);
}),
);

const fresh = await accountStore.load(account.id);
expect(fresh!.balance).toBe(20);
});
expect(fresh!.balance).toBe(balance);
}, 20_000);
});

it("should support saveAll with transactions", async () => {
Expand Down
11 changes: 2 additions & 9 deletions packages/event-sourcing-firestore/src/firestore.event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
FirestoreTransaction,
} from "@ddd-ts/store-firestore";
import * as fb from "firebase-admin";

const serverTimestamp = fb.firestore.FieldValue.serverTimestamp;
export class FirestoreEventStore {
constructor(
public readonly firestore: fb.firestore.Firestore,
Expand Down Expand Up @@ -38,13 +38,6 @@ export class FirestoreEventStore {
}[],
trx: FirestoreTransaction,
) {
await Promise.all(
toAppend.map(
async ({ streamId, expectedRevision }) =>
await this.lockDocument(streamId, expectedRevision, trx),
),
);

await Promise.all(
toAppend.map(async ({ streamId, changes, expectedRevision }) => {
await this.commitChanges(streamId, changes, expectedRevision, trx);
Expand Down Expand Up @@ -97,7 +90,7 @@ export class FirestoreEventStore {
revision: revision,
name: change.name,
payload: change.payload,
occurredAt: fb.firestore.FieldValue.serverTimestamp(),
occurredAt: serverTimestamp(),
}),
);
revision++;
Expand Down
23 changes: 22 additions & 1 deletion packages/event-sourcing-firestore/src/firestore.snapshotter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import {
type IIdentifiable,
type ISerializer,
} from "@ddd-ts/core";
import { FirestoreStore } from "@ddd-ts/store-firestore";
import {
FirestoreStore,
type FirestoreTransaction,
} from "@ddd-ts/store-firestore";

export class FirestoreSnapshotter<
A extends IEventSourced & IIdentifiable,
Expand Down Expand Up @@ -39,4 +42,22 @@ export class FirestoreSnapshotter<
.collection("streams")
.withConverter(this.converter);
}

async save(snapshot: A, trx: FirestoreTransaction): Promise<void> {
await super.save(snapshot, trx);

const ref = this.firestore
.collection("test-snapshots")
.doc(this.aggregate)
.collection("snapshots")
.doc(snapshot.id.toString())
.collection("versions")
.doc(`${process.hrtime.bigint()}-${snapshot.acknowledgedRevision}`);

trx.transaction.set(ref, await this.serializer.serialize(snapshot));
}

async saveAll(models: A[], trx: FirestoreTransaction): Promise<void> {
await super.saveAll(models, trx);
}
}

0 comments on commit b894a12

Please sign in to comment.