Skip to content

Feature/7582 #7595

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: dev
Choose a base branch
from
10 changes: 10 additions & 0 deletions docs/articles/persistence/custom-persistence-provider.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ akka.persistence.journal-plugin-fallback {
# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

# journal supervisor strategy used.
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
# by default it restarts the journal on crash
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"

# Default serializer used as manifest serializer when applicable
# and payload serializer when no specific binding overrides are specified
serializer = "json"
Expand Down Expand Up @@ -406,6 +411,11 @@ akka.persistence.snapshot-store-plugin-fallback {
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"

# snapshot-store supervisor strategy used.
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
# by default it restarts the snapshot-store on crash
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"

# Default serializer used as manifest serializer when applicable
# and payload serializer when no specific binding overrides are specified
serializer = "json"
Expand Down
47 changes: 47 additions & 0 deletions docs/articles/persistence/storage-plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,50 @@ akka {
}
}
```


### Controlling journal or snapshot crash behavior.

By default the base implementations upon which all journal or snapshot-store implementations are build upon provides out of the box behavior for dealing with errors that occur during the writing or reading of data from the underlying store. Errors that occur will be communicated with the persistentactor that is using them at that time.
So in general once started succesfully the journal or snapshot-store will be ready and available for the duration of your application, and wont crash. However in the case they do crash, due to unforseen circumstances the default behavior is to immediatly restart them. This is generally the behavior you want.
But in case you do want to customize how the system handles the crashing of the journal or snapshot-store. You can specify your own supervision strategy using the `supervisor-strategy` property.
This class needs to inherit from `Akka.Actor.SupervisorStrategyConfigurator` and have a parameterless constructor.
Configuration example:

```hocon
akka {
persistence {
journal {
plugin = "akka.persistence.journal.sqlite"
auto-start-journals = ["akka.persistence.journal.sqlite"]
supervisor-strategy = "My.full.namespace.CustomSupervisorStrategyConfigurator"
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.sqlite"
auto-start-snapshot-stores = ["akka.persistence.snapshot-store.sqlite"]
supervisor-strategy = "My.full.namespace.CustomSupervisorStrategyConfigurator"
}
}
}
```

One such case could be to detect and handle misconfigured application settings during startup. For example if your using a SQL based journal and you misconfigured the connectionstring you might opt to return a supervisionstrategy that detects certain network connection errors, and after a few retries signals your application to shutdown instead of continue running with a journal or snapshot-store that in all likelyhood will never be able to recover, forever stuck in a restart loop while your application is running.

An example of what this could look like is this:

```csharp

public class MyCustomSupervisorConfigurator : SupervisorStrategyConfigurator
{
public override SupervisorStrategy Create()
{
//optionally only stop if the error occurs more then x times in y period
//this will be highly likely if its an unrecoverable error during start/init of the journal/snapshot store
return new OneForOneStrategy(10,TimeSpan.FromSeconds(5),ex =>
{
//detect unrecoverable exception here
return Directive.Stop;
});
}
}
```
81 changes: 81 additions & 0 deletions src/core/Akka.Persistence.Tests/PersistenceConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Configuration;
using Akka.Persistence.Journal;
using Akka.Persistence.Snapshot;
using Akka.TestKit;
using Akka.Util;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -68,6 +72,32 @@ protected internal override bool AroundReceive(Receive receive, object message)
}
}

public class TestSupervisorConfigurator : SupervisorStrategyConfigurator
{
public override SupervisorStrategy Create()
{
return new CustomStrategy(10,TimeSpan.FromSeconds(5),ex =>
{
//detect unrecoverable exception here
return Directive.Stop;
});
}
}

public class CustomStrategy : OneForOneStrategy
{
public CustomStrategy(int? maxNrOfRetries, TimeSpan? withinTimeRange, Func<Exception, Directive> localOnlyDecider) : base(maxNrOfRetries, withinTimeRange, localOnlyDecider)
{
}

public override void HandleChildTerminated(IActorContext actorContext, IActorRef child, IEnumerable<IInternalActorRef> children)
{
//because the journal does not has child actors, the ref is always the actor itself. So optionally do something special here
//to indicate to the system that the journal crashed in an unrecoverable way.
}
}


#endregion

private static readonly string SpecConfig = @"
Expand All @@ -82,6 +112,12 @@ class = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestJournal, Akka.Persist
plugin-dispatcher = ""akka.actor.default-dispatcher""
test-value = ""B""
}
test3 {
class = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestJournal, Akka.Persistence.Tests""
plugin-dispatcher = ""akka.actor.default-dispatcher""
test-value = ""B""
supervisor-strategy = ""Akka.Persistence.Tests.PersistenceConfigSpec+TestSupervisorConfigurator, Akka.Persistence.Tests""
}
}
akka.persistence.snapshot-store {
test1 {
Expand All @@ -100,6 +136,51 @@ public PersistenceConfigSpec(ITestOutputHelper output) : base(SpecConfig, output
{
}

/// <summary>
/// Verify that the journal config contains the expected default from our fallback configs
/// No spec for when the user overrides that because its not the goal to test the hocon config system.
/// Merely that the plugin system here properly applies the fallback config for this config value.
/// </summary>
[Fact]
public void Journal_has_supervision_strategy_configured()
{
var persistence = Persistence.Instance.Apply(Sys);

var config = persistence.JournalConfigFor("akka.persistence.journal.test2");
var defaultstrategy = config.GetString("supervisor-strategy");
defaultstrategy.ShouldBe(typeof(Akka.Actor.DefaultSupervisorStrategy).FullName);
}

/// <summary>
/// Verify that the snapshot config contains the expected default from our fallback configs
/// No spec for when the user overrides that because its not the goal to test the hocon config system.
/// Merely that the plugin system here properly applies the fallback config for this config value.
/// </summary>
[Fact]
public void Snapshot_has_supervision_strategy_configured()
{
var persistence = Persistence.Instance.Apply(Sys);

var config = persistence.JournalConfigFor("akka.persistence.snapshot-store.test1");
var defaultstrategy = config.GetString("supervisor-strategy");
defaultstrategy.ShouldBe(typeof(Akka.Actor.DefaultSupervisorStrategy).FullName);
}

[Fact]
public void Journal_has_custom_supervision_strategy_applied()
{
var persistence = Persistence.Instance.Apply(Sys);
var journal = persistence.JournalFor("akka.persistence.journal.test3"); //get our journal with the custom configuration

//waves magic wand
var magicref = journal as ActorRefWithCell;
var appliedStrat = magicref.Underlying.Props.SupervisorStrategy;
//because the configured value for our supervisor strategy is our CustomStrategy
//we verify that the strat returned is the same as currently applied
var customstrategy = new TestSupervisorConfigurator().Create();
appliedStrat.GetType().ShouldBe(customstrategy.GetType());
}

[Fact]
public void Persistence_should_use_inmem_journal_by_default()
{
Expand Down
7 changes: 1 addition & 6 deletions src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ protected sealed override bool Receive(object message)
{
return ReceiveWriteJournal(message) || ReceivePluginInternal(message);
}

/// <summary>
/// TBD
/// </summary>
/// <param name="message">TBD</param>
/// <returns>TBD</returns>

protected bool ReceiveWriteJournal(object message)
{
switch (message)
Expand Down
13 changes: 7 additions & 6 deletions src/core/Akka.Persistence/Journal/WriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
namespace Akka.Persistence.Journal
{
/// <summary>
/// TBD
/// Base class for the journal persistence
/// </summary>
public abstract class WriteJournalBase : ActorBase
{
Expand All @@ -30,10 +30,11 @@ protected WriteJournalBase()
}

/// <summary>
/// TBD
/// Creates a sequence of write actions to be executed based on the given messages.
/// Applies any registered EventAdapters to the payloads.
/// </summary>
/// <param name="resequenceables">TBD</param>
/// <returns>TBD</returns>
/// <param name="resequenceables">list of messages to write</param>
/// <returns></returns>
protected IEnumerable<AtomicWrite> PreparePersistentBatch(IEnumerable<IPersistentEnvelope> resequenceables)
{
foreach (var resequenceable in resequenceables)
Expand All @@ -53,7 +54,7 @@ protected IEnumerable<AtomicWrite> PreparePersistentBatch(IEnumerable<IPersisten
}

/// <summary>
/// INTERNAL API
/// Apply registered eventadapter to the data payload
/// </summary>
[InternalApi]
protected IEnumerable<IPersistentRepresentation> AdaptFromJournal(IPersistentRepresentation representation)
Expand All @@ -65,7 +66,7 @@ protected IEnumerable<IPersistentRepresentation> AdaptFromJournal(IPersistentRep
}

/// <summary>
/// INTERNAL API
/// Apply any registered eventadapter to the data payload
/// </summary>
protected IPersistentRepresentation AdaptToJournal(IPersistentRepresentation representation)
{
Expand Down
12 changes: 9 additions & 3 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,17 @@ private static IActorRef CreatePlugin(ExtendedActorSystem system, string configP
var pluginType = Type.GetType(pluginTypeName, true);
var pluginDispatcherId = pluginConfig.GetString("plugin-dispatcher", null);
object[] pluginActorArgs = pluginType.GetConstructor(new[] { typeof(Config) }) != null ? new object[] { pluginConfig } : null;
var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId);


//todo wrap in backoffsupervisor ?

//supervisor-strategy is defined by default in the fallback configs. So we always expect to get a value here even if the user has not explicitly defined anything
var configurator = SupervisorStrategyConfigurator.CreateConfigurator(pluginConfig.GetString("supervisor-strategy"));

var pluginActorProps = new Props(pluginType, pluginActorArgs).WithDispatcher(pluginDispatcherId).WithSupervisorStrategy(configurator.Create());

return system.SystemActorOf(pluginActorProps, pluginActorName);
}

private static EventAdapters CreateAdapters(ExtendedActorSystem system, string configPath)
{
var pluginConfig = system.Settings.Config.GetConfig(configPath);
Expand Down
13 changes: 11 additions & 2 deletions src/core/Akka.Persistence/persistence.conf
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ akka.persistence {

# Dispatcher for message replay.
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"

# journal supervisor strategy used.
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
# by default it restarts the journal on crash
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"

# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
serializer = "json"

Expand Down Expand Up @@ -173,7 +177,12 @@ akka.persistence {

# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher"


# snapshot supervisor strategy used.
# It needs to be a subclass of Akka.Actor.SupervisorStrategyConfigurator. And have a parameterless constructor
# by default it restarts the snapshot on crash
supervisor-strategy = "Akka.Actor.DefaultSupervisorStrategy"

# Default serializer used as manifest serializer when applicable and payload serializer when no specific binding overrides are specified
serializer = "json"
circuit-breaker {
Expand Down
41 changes: 7 additions & 34 deletions src/core/Akka/Actor/SupervisorStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -615,15 +615,7 @@ protected override Directive Handle(IActorRef child, Exception exception)
return Decider.Decide(exception);
}

/// <summary>
/// TBD
/// </summary>
/// <param name="context">TBD</param>
/// <param name="restart">TBD</param>
/// <param name="child">TBD</param>
/// <param name="cause">TBD</param>
/// <param name="stats">TBD</param>
/// <param name="children">TBD</param>
/// <inheritdoc/>
public override void ProcessFailure(IActorContext context, bool restart, IActorRef child, Exception cause, ChildRestartStats stats, IReadOnlyCollection<ChildRestartStats> children)
{
if (children.Count > 0)
Expand All @@ -645,12 +637,7 @@ public override void ProcessFailure(IActorContext context, bool restart, IActorR
}
}

/// <summary>
/// TBD
/// </summary>
/// <param name="actorContext">TBD</param>
/// <param name="child">TBD</param>
/// <param name="children">TBD</param>
/// <inheritdoc/>
public override void HandleChildTerminated(IActorContext actorContext, IActorRef child, IEnumerable<IInternalActorRef> children)
{
//Intentionally left blank
Expand Down Expand Up @@ -998,14 +985,10 @@ public override int GetHashCode()
}

/// <summary>
/// TBD
/// Base configurator class used for configuring the guardian-supervisor-strategy
/// </summary>
public abstract class SupervisorStrategyConfigurator
{
/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>
public abstract SupervisorStrategy Create();

/// <summary>
Expand Down Expand Up @@ -1037,30 +1020,20 @@ public static SupervisorStrategyConfigurator CreateConfigurator(string typeName)
}
}

/// <summary>
/// TBD
/// </summary>

public class DefaultSupervisorStrategy : SupervisorStrategyConfigurator
{
/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>

public override SupervisorStrategy Create()
{
return SupervisorStrategy.DefaultStrategy;
}
}

/// <summary>
/// TBD
/// </summary>

public class StoppingSupervisorStrategy : SupervisorStrategyConfigurator
{
/// <summary>
/// TBD
/// </summary>
/// <returns>TBD</returns>

public override SupervisorStrategy Create()
{
return SupervisorStrategy.StoppingStrategy;
Expand Down
Loading