Skip to content

Commit

Permalink
Create ReadWriteEventAdapter and set if event is bound to separate re… (
Browse files Browse the repository at this point in the history
#4568)

* Create ReadWriteEventAdapter and set if event is bound to separate read and write events

* Update ReadWriteEventAdapter to accept a single read event adapter, and use CombinedReadEventAdapter so logic isnt duplicated

* update unit test to match real world example

Co-authored-by: bsain <bsain@signifyhealth.com>
Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
3 people authored Dec 30, 2020
1 parent 7aee504 commit 58294a4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
22 changes: 22 additions & 0 deletions src/core/Akka.Persistence.Tests/Journal/MemoryEventAdaptersSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public MemoryEventAdaptersSpec()
""" + typeof (ReadMeEvent).FullName + @", Akka.Persistence.Tests"" = reader
""" + typeof (WriteMeEvent).FullName + @", Akka.Persistence.Tests"" = writer
""" + typeof(ReadMeTwiceEvent).FullName + @", Akka.Persistence.Tests"" = [reader, another-reader]
""" + typeof(ReadWriteEvent).FullName + @", Akka.Persistence.Tests"" = [reader, another-reader, writer]
}
}
}").WithFallback(ConfigurationFactory.Default());
Expand Down Expand Up @@ -130,6 +131,19 @@ public void EventAdapters_should_allow_combining_only_the_readside_CombinedReadE
.Should()
.BeEquivalentTo("from-ReadMeTwiceEvent()", "again-ReadMeTwiceEvent()");
}

[Fact]
public void EventAdapters_should_allow_read_write_ReadWriteEventAdapter()
{
var adapters = EventAdapters.Create(_extendedActorSystem, _memoryConfig);

var readWriteAdapter = adapters.Get<ReadWriteEvent>();
var events = readWriteAdapter.FromJournal(readWriteAdapter.ToJournal(new ReadWriteEvent()), "").Events
.Select(c => c.ToString())
.Should()
.BeEquivalentTo("from-to-ReadWriteEvent()", "again-to-ReadWriteEvent()");
}

}

public abstract class BaseTestAdapter : IEventAdapter
Expand Down Expand Up @@ -170,6 +184,14 @@ public override string ToString()
}
}

public class ReadWriteEvent
{
public override string ToString()
{
return "ReadWriteEvent()";
}
}

public class ReaderAdapter : IReadEventAdapter
{
public IEventSequence FromJournal(object evt, string manifest)
Expand Down
40 changes: 39 additions & 1 deletion src/core/Akka.Persistence/Journal/EventAdapters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,34 @@ public IEventSequence FromJournal(object evt, string manifest)
}
}

[Serializable]
internal class ReadWriteEventAdapter : IEventAdapter
{
private readonly IWriteEventAdapter _writeEventAdapter;
private readonly IReadEventAdapter _readEventAdapter;

public ReadWriteEventAdapter(IReadEventAdapter readEventAdapter, IWriteEventAdapter writeEventAdapter)
{
_readEventAdapter = readEventAdapter;
_writeEventAdapter = writeEventAdapter;
}

public string Manifest(object evt)
{
return _writeEventAdapter.Manifest(evt);
}

public object ToJournal(object evt)
{
return _writeEventAdapter.ToJournal(evt);
}

public IEventSequence FromJournal(object evt, string manifest)
{
return _readEventAdapter.FromJournal(evt, manifest);
}
}

/// <summary>
/// TBD
/// </summary>
Expand Down Expand Up @@ -302,7 +330,7 @@ private static EventAdapters Create(ExtendedActorSystem system, IDictionary<stri
var type = Type.GetType(kv.Key);
var adapter = kv.Value.Length == 1
? handlers[kv.Value[0]]
: new NoopWriteEventAdapter(new CombinedReadEventAdapter(kv.Value.Select(h => handlers[h])));
: CombineAdapters(kv.Value.Select(h => handlers[h]));
return new KeyValuePair<Type, IEventAdapter>(type, adapter);
}).ToList());

Expand Down Expand Up @@ -332,6 +360,16 @@ private static List<KeyValuePair<Type, IEventAdapter>> Sort(List<KeyValuePair<Ty
});
}

private static IEventAdapter CombineAdapters(IEnumerable<IEventAdapter> adapters)
{
var writeAdapters = adapters.Where(a => a is NoopReadEventAdapter);
if (writeAdapters.Count() == 0)
return new NoopWriteEventAdapter(new CombinedReadEventAdapter(adapters));
else if (writeAdapters.Count() == 1)
return new ReadWriteEventAdapter(new CombinedReadEventAdapter(adapters.Where(a => a is NoopWriteEventAdapter)), writeAdapters.First());
throw new IllegalStateException("Cannot have multiple write adapters for a single adapter binding");
}

private static int IndexWhere<T>(IList<T> list, Predicate<T> predicate)
{
for (int i = 0; i < list.Count; i++)
Expand Down

0 comments on commit 58294a4

Please sign in to comment.