INFO
Documentation for aggregates has not been written yet but you can read the README of the aggregate
package below.
Aggregates
Package aggregate
provides a framework for building event-sourced aggregates. It builds on top of the event system, so make sure to read the event documentation first before reading further.
Introduction
An aggregate is any type that implements the Aggregate
interface:
package aggregate
type Aggregate interface {
// Aggregate returns the id, name and version of the aggregate.
Aggregate() (uuid.UUID, string, int)
// AggregateChanges returns the uncommited events of the aggregate.
AggregateChanges() []event.Event
// ApplyEvent applies an event to the aggregate.
ApplyEvent(event.Event)
}
You can either implement this interface by yourself or embed the *Base
type. Use the New
function to initialize *Base
:
package example
// Foo is the "foo" aggregate.
type Foo struct {
*aggregate.Base
}
// NewFoo returns the "foo" aggregate with the given id.
func NewFoo(id uuid.UUID) *Foo {
return &Foo{
Base: aggregate.New("foo", id),
}
}
Additional APIs
Aggregates can make use of additional, optional APIs provided by goes. An aggregate that embeds *Base
implements all of these APIs automatically:
Read the documentation for each of these interfaces for more details.
Aggregate events
An event-sourced aggregate transitions its state by applying events on itself. Events are applied by the ApplyEvent(event.Event)
method of the aggregate. Here is a minimal "todo list" example:
package todo
type List struct {
*aggregate.Base
Tasks []string
}
// NewList returns the todo list with the given id.
func NewList(id uuid.UUID) *User {
return &List{Base: aggregate.New("list", id)}
}
func (l *List) ApplyEvent(evt event.Event) {
switch evt.Name() {
case "task_added":
l.Tasks = append(l.Tasks, evt.Data().(string))
case "task_removed":
name := evt.Data().(string)
for i, task := range l.Tasks {
if task == name {
l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
return
}
}
}
}
The todo list now knows how to apply "task_added"
and "task_removed"
events. What's missing are the commands to actually create the events and call the ApplyEvent
method with the created event:
// ... previous code ...
// AddTask adds the given task to the list.
func (l *List) AddTask(task string) error {
if l.Contains(task) {
return fmt.Errorf("list already contains %q", task)
}
// aggregate.Next() creates the event and applies it using l.ApplyEvent()
aggregate.Next(l, "task_added", task)
return nil
}
// RemoveTask removes the given task from the list.
func (l *List) RemoveTask(task string) error {
if !l.Contains(task) {
return fmt.Errorf("list does not contain %q", task)
}
aggregate.Next(l, "task_removed", task)
return nil
}
// Contains returns whether the list contains the given task.
func (l *List) Contains(task string) bool {
task = strings.ToLower(task)
for _, t := range l.Tasks {
if strings.ToLower(t) == task {
return true
}
}
return false
}
That's it. Now you can create todo lists, add tasks, and remove them again:
// ... previous code ...
func example() {
list := NewList(uuid.New())
if err := list.AddTask("do this and that"); err != nil {
panic(fmt.Errorf("add task: %w", err))
}
if err := list.RemoveTask("do this and that"); err != nil {
panic(fmt.Errorf("remove task: %w", err))
}
// list.AggregateVersion() == 2
// list.AggregateChanges() returns two events – one "task_added" event
// and one "task_removed" event.
}
Generic helpers
Applying events within the ApplyEvent
function is the most straightforward way to implement an aggregate but can become quite messy if an aggregate consists of many events.
goes provides type-safe, generic helpers that allow you to setup an event applier function for each individual event. This is what the todo list example looks like using generics:
package todo
import (
"github.com/google/uuid"
"github.com/modernice/goes/aggregate"
"github.com/modernice/goes/event"
)
type List struct {
*aggregate.Base
Tasks []string
}
func NewList(id uuid.UUID) *List {
l := &List{Base: aggregate.New("list", id)}
event.ApplyWith(l, l.addTask, "task_added")
event.ApplyWith(l, l.removeTask, "task_removed")
return l
}
func (l *List) AddTask(task string) error { ... }
func (l *List) RemoveTask(task string) error { ... }
func (l *List) addTask(evt event.Of[string]) {
l.Tasks = append(l.Tasks, evt.Data())
}
func (l *List) removeTask(evt event.Of[string]) {
name := evt.Data()
for i, task := range l.Tasks {
if task == name {
l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
return
}
}
}
Testing
TL;DR
Use the test.Change()
and test.NoChange()
testing helpers to ensure correct implementation of aggregate methods.
package todo_test
import (
"github.com/modernice/goes/test"
)
func TestNewList(t *testing.T) {
// Test that todo.NewList() returns a valid aggregate.
test.NewAggregate(t, todo.NewList, "list")
}
func TestXXX(t *testing.T) {
// Aggregate should have applied and recorded the given event.
test.Change(t, foo, "<event-name>")
// Aggregate should have applied and recorded the given event with
// the given event data.
test.Change(t, foo, "<event-name>", test.EventData(<event-data>))
// Aggregate should have applied and recorded the given event with
// the given event data exactly 3 times.
test.Change(
t, foo, "<event-name>",
test.EventData(<event-data>),
test.Exactly(3),
)
// Aggregate should NOT have applied and recorded the given event.
test.NoChange(t, foo, "<event-name>")
// Aggregate should NOT have applied and recorded the given event with
// the given event data.
test.NoChange(t, foo, "<event-name>", test.EventData(<event-data>))
}
Testing of aggregates can become error-prone if one forgets to consider that aggregates are event-sourced. Take a look at this example:
package todo_test
func TestList_AddTask(t *testing.T) {
l := todo.NewList(uuid.New())
if l.Contains("foo") {
t.Fatalf("list should not contain %q until added", "foo")
}
if err := l.AddTask("foo"); err != nil {
t.Fatalf("failed to add task %q", "foo")
}
if !l.Contains("foo") {
t.Fatalf("list should contain %q after adding", "foo")
}
}
Even if the above test suceeds, it does not guarantee that the aggregate was implemented correctly. The following AddTask
implementation bypasses the indirection through the ApplyEvent
method and updates the state directly, resulting in a passing test even though the aggregate would behave incorrectly when used in goes' components.
package todo
func (l *List) AddTask(task string) error {
l.Tasks = append(l.Tasks, task)
}
To circumvent this issue, goes provides helpers to test aggregate changes. The above test would be rewritten as:
package todo_test
import "github.com/modernice/goes/test"
func TestList_AddTask(t *testing.T) {
l := todo.NewList(uuid.New())
if l.Contains("foo") {
t.Fatalf("list should not contain %q until added", "foo")
}
if err := l.AddTask("foo"); err != nil {
t.Fatalf("failed to add task %q", "foo")
}
if !l.Contains("foo") {
t.Fatalf("list should contain %q after adding", "foo")
}
test.Change(t, l, "task_added", test.EventData("foo"))
}
The test.Change()
helper checks if the aggregate has recorded a "task_added"
change with "foo"
as the event data.
Persistence
The Repository
type defines an aggregate repository that allows you to save and fetch aggregates to and from an underlying event store:
package aggregate
type Repository interface {
Save(ctx context.Context, a Aggregate) error
Fetch(ctx context.Context, a Aggregate) error
FetchVersion(ctx context.Context, a Aggregate, v int) error
Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)
Use(ctx context.Context, a Aggregate, fn func() error) error
Delete(ctx context.Context, a Aggregate) error
}
The implementation of this repository can be found in the repository
package. Use repository.New
to create a repository from an event store:
package example
import (
"github.com/modernice/goes/aggregate/repository"
"github.com/modernice/goes/event"
)
func example(store event.Store) {
repo := repository.New(store)
}
Save an aggregate
package example
func example(repo aggregate.Repository) {
l := todo.NewList(uuid.New())
l.AddTask("foo")
l.AddTask("bar")
l.AddTask("baz")
if err := repo.Save(context.TODO(), l); err != nil {
panic(fmt.Errorf("save todo list: %w", err))
}
}
Fetch an aggregate
In order to fetch an aggregate, it must be passed to Repository.Fetch()
. The repository fetches and applies the event stream of the aggregate to reconstruct its current state.
An aggregate does not need to have an event stream to be fetched; if an aggregate has no events, Repository.Fetch()
is a no-op.
Fetching an aggregate multiple times is also not a problem because the repository will only fetch and apply events that haven't been applied yet. This also means that Repository.Fetch()
can be used to "refresh" an aggregate – to get to its most current state without fetching unnecessary events.
package example
func example(repo aggregate.Repository) {
l := todo.NewList(uuid.New())
if err := repo.Fetch(context.TODO(), l); err != nil {
panic(fmt.Errorf(
"fetch todo list: %w [id=%s]", err, l.AggregateID(),
))
}
}
You can also fetch a specific version of an aggregate, ignoring all events with a version higher than the provided version:
package example
func example(repo aggregate.Repository) {
l := todo.NewList(uuid.New())
if err := repo.FetchVersion(context.TODO(), l, 5); err != nil {
panic(fmt.Errorf(
"fetch todo list at version %d: %w [id=%s]",
5, err, l.AggregateID(),
))
}
}
"Use" an aggregate
Repository.Use()
is a convenience method to fetch an aggregate, "use" it, and then insert new changes into the event store:
package example
func example(repo aggregate.Repository) {
l := todo.NewList(uuid.New())
if err := repo.Use(context.TODO(), l, func() error {
return l.AddTask("foo")
}); err != nil {
panic(err)
}
}
Delete an aggregate
Hard-deleting aggregates should be avoided because that can lead to esoteric issues that are hard to debug. Consider using soft-deletes instead.
To delete an aggregate, the repository deletes its event stream from the event store:
package example
func example(repo aggregate.Repository) {
l := todo.NewList(uuid.New())
if err := repo.Delete(context.TODO(), l); err != nil {
panic(fmt.Errorf(
"delete todo list: %w [id=%s]", err, l.AggregateID(),
))
}
}
Soft-delete an aggregate
Soft-deleted aggregates cannot be fetched and are excluded from query results of aggregate repositories. In order to soft-delete an aggregate, a specific event that flags the aggregate as soft-deleted must be inserted into the event store. The event must have event data that implements the SoftDeleter
interface:
package example
type DeletedData struct {}
func (DeletedData) SoftDelete() bool { return true }
func example() {
evt := event.New("deleted", DeletedData{}, event.Aggregate(...))
}
If the event stream of an aggregate contains such an event, the aggregate is considered to be soft-deleted and will be excluded from query results of the aggregate repository. Additionally, the Repository.Fetch()
method will return repository.ErrDeleted
for the aggregate.
Soft-deleted aggregates can also be restored by inserting an event with event data that implements SoftRestorer
:
package example
type RestoredData struct {}
func (RestoredData) SoftRestore() bool { return true }
func example() {
evt := event.New("restored", RestoredData{}, event.Aggregate(...))
}
Query aggregates
Aggregates can be queried from the event store. When queried, the repository returns a History
channel (lol) and an error
channel. A History
can be applied to an aggregate to reconstruct its current state.
package example
import (
"github.com/modernice/goes/aggregate"
"github.com/modernice/goes/aggregate/query"
"github.com/modernice/goes/helper/streams"
)
func example(repo aggregate.Repository) {
res, errs, err := repo.Query(context.TODO(), query.New(
// Query "foo", "bar", and "baz" aggregates.
query.Name("foo", "bar", "baz"),
// Query aggregates that have one of the provided ids.
query.ID(uuid.UUID{...}, uuid.UUID{...}),
))
if err := streams.Walk(
context.TODO(),
func(his aggregate.History) error {
log.Printf(
"Name: %s ID: %s",
his.AggregateName(),
his.AggregateID(),
)
var foo aggregate.Aggregate // fetch the aggregate
his.Apply(foo) // apply the history
},
res,
errs,
); err != nil {
panic(err)
}
}
Typed repositories
The Repository
interface defines a generic aggregate repository for all kinds of aggregates. The TypedRepository
can be used to define a type-safe repository for a specific aggregate. The TypedRepository
removes the need for passing the aggregate instance to repository methods.
To create a type-safe repository for an aggregate, pass the Repository
and the constructor of the aggregate to repository.Typed()
:
package todo
import (
"github.com/modernice/goes/aggregate"
"github.com/modernice/goes/aggregate/repository"
"github.com/modernice/goes/event"
)
// List is the "todo list" aggregate.
type List struct { *aggregate.Base }
// ListRepository is the "todo list" repository.
type ListRepository = aggregate.TypedRepository[*List]
// NewList returns the "todo list" with the given id.
func NewList(id uuid.UUID) *List {
return &List{Base: aggregate.New("list", id)}
}
// NewListRepository returns the "todo list" repository.
func NewListRepository(repo aggregate.Repository) ListRepository {
return repository.Typed(repo, NewList)
}
func example(store event.Store) {
repo := repository.New(store)
lists := NewListRepository(repo)
// Fetch a todo list by id.
l, err := lists.Fetch(context.TODO(), uuid.New())
if err != nil {
panic(fmt.Errorf("fetch list: %w", err))
}
// l is a *List
// "Use" a list by id.
if err := lists.Use(context.TODO(), uuid.New(), func(l *List) error {
return l.AddTask("foo")
}); err != nil {
panic(fmt.Errof("use list: %w", err))
}
// The TypedRepository will only ever return *List aggregates.
// All other aggregates that would be returned by the passed query,
// are simply discarded from the result.
res, errs, err := lists.Query(context.TODO(), query.New(...))
}