Skip to content

Commit

Permalink
Update Propulsion.EventStore to 4.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 15, 2022
1 parent 9d762f7 commit b46eb59
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 30 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ clients to current ones by means of adjusting package references while retaining

(Reading and position metrics are exposed via `Propulsion.CosmosStore.Prometheus`)

- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `3.0.7`, `Serilog`
- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `4.0.0`, `Serilog`

- **Deprecated as reading (and writing) relies on the legacy EventStoreDB TCP interface**
- Contains ultra-high throughput striped reader implementation
- Presently Used by [`proSync` template](https://github.com/jet/dotnet-templates/tree/master/propulsion-sync)

(Reading and position metrics are emitted to Console / Serilog; no Prometheus support)

Expand Down
8 changes: 4 additions & 4 deletions src/Propulsion.EventStore/Checkpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -93,25 +93,25 @@ type Service internal (resolve : CheckpointSeriesId -> Equinox.Decider<Events.Ev
/// Determines the present state of the CheckpointSequence
member _.Read(series) =
let stream = resolve series
stream.Query id
stream.Query(id, load = Equinox.AllowStale)

/// Start a checkpointing series with the supplied parameters
/// NB will fail if already existing; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted
member _.Start(series, freq : TimeSpan, pos : int64) =
let stream = resolve series
stream.Transact(interpret (Command.Start(DateTimeOffset.UtcNow, freq, pos)))
stream.Transact(interpret (Command.Start(DateTimeOffset.UtcNow, freq, pos)), load = Equinox.AllowStale)

/// Override a checkpointing series with the supplied parameters
/// NB fails if not already initialized; caller should select to `Start` or `Override` based on whether Read indicates state is Running Or NotStarted
member _.Override(series, freq : TimeSpan, pos : int64) =
let stream = resolve series
stream.Transact(interpret (Command.Override(DateTimeOffset.UtcNow, freq, pos)))
stream.Transact(interpret (Command.Override(DateTimeOffset.UtcNow, freq, pos)), load = Equinox.AllowStale)

/// Ingest a position update
/// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start
member _.Commit(series, pos : int64) =
let stream = resolve series
stream.Transact(interpret (Command.Update(DateTimeOffset.UtcNow, pos)))
stream.Transact(interpret (Command.Update(DateTimeOffset.UtcNow, pos)), load = Equinox.AllowStale)

let create resolve = Service(streamName >> resolve)

Expand Down
23 changes: 1 addition & 22 deletions src/Propulsion.EventStore/EventStoreSink.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,6 @@ open System.Collections.Generic
open System
open System.Threading

module private StreamSpan =

#if EVENTSTORE_LEGACY
let private nativeToDefault_ x = FsCodec.Core.TimelineEvent.Map<ReadOnlyMemory<byte>> (fun (xs : byte array) -> ReadOnlyMemory xs) x
let inline nativeToDefault span = Array.map nativeToDefault_ span
let defaultToNative_ = FsCodec.Core.TimelineEvent.Map<byte array> (fun (xs : ReadOnlyMemory<byte>) -> xs.ToArray())
let inline defaultToNative span = Array.map defaultToNative_ span
#else
let nativeToDefault = id
let defaultToNative_ = id
let defaultToNative = id
#endif

module Internal =

[<AutoOpen>]
Expand All @@ -55,21 +42,13 @@ module Internal =

let write (log : ILogger) (context : EventStoreContext) stream (span : Default.StreamSpan) = async {
log.Debug("Writing {s}@{i}x{n}", stream, span[0].Index, span.Length)
let! res = context.Sync(log, stream, span[0].Index - 1L, (span |> Array.map (fun span -> StreamSpan.defaultToNative_ span :> _)))
let! res = context.Sync(log, stream, span[0].Index - 1L, span |> Array.map (fun span -> span :> _))
let ress =
match res with
| GatewaySyncResult.Written (Token.Unpack pos') ->
#if EVENTSTORE_LEGACY
Ok (pos'.pos.streamVersion + 1L)
#else
Ok (pos'.streamVersion + 1L)
#endif
| GatewaySyncResult.ConflictUnknown (Token.Unpack pos) ->
#if EVENTSTORE_LEGACY
match pos.pos.streamVersion + 1L with
#else
match pos.streamVersion + 1L with
#endif
| actual when actual < span[0].Index -> PrefixMissing (span, actual)
| actual when actual >= span[0].Index + span.LongLength -> Duplicate actual
| actual -> PartialDuplicate (span |> Array.skip (actual - span[0].Index |> int))
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/EventStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module Mapping =
member x.Timestamp = DateTimeOffset.FromUnixTimeMilliseconds(x.CreatedEpoch)

let (|PropulsionTimelineEvent|) (x : RecordedEvent) : FsCodec.ITimelineEvent<_> =
let inline len0ToNull (x : _[]) = match x with null -> null | x when x.Length = 0 -> null | x -> x
let inline len0ToNull (x : _[]) = match x with null -> ReadOnlyMemory.Empty | x when x.Length = 0 -> ReadOnlyMemory.Empty | x -> ReadOnlyMemory x
FsCodec.Core.TimelineEvent.Create(x.EventNumber, x.EventType, len0ToNull x.Data, len0ToNull x.Metadata, timestamp = x.Timestamp) :> _

let (|PropulsionStreamEvent|) (x : RecordedEvent) : StreamEvent<_> =
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.EventStore/Propulsion.EventStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="Equinox.EventStore" Version="[3.0.7, 3.99.0]" />
<PackageReference Include="Equinox.EventStore" Version="4.0.0-rc.1" />
<PackageReference Include="FsCodec.Box" Version="3.0.0-rc.7.1" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.Kafka/Propulsion.Kafka.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<ItemGroup>
<PackageReference Include="MinVer" Version="4.2.0" PrivateAssets="All" />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.2.2" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="3.0.0-rc.7.1" />
<PackageReference Include="FsKafka" Version="[1.7.0, 1.9.99)" />
</ItemGroup>

Expand Down

0 comments on commit b46eb59

Please sign in to comment.