Skip to content
Open
605 changes: 605 additions & 0 deletions Test/DurableTask.Core.Tests/ContinueAsNewTraceBehaviorTests.cs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,12 @@ public class OrchestrationCompleteOrchestratorAction : OrchestratorAction
/// Gets a collection of tags associated with the completion action.
/// </summary>
public IDictionary<string, string> Tags { get; } = new Dictionary<string, string>();

/// <summary>
/// Gets or sets how distributed tracing should behave for the next <c>ContinueAsNew</c> generation.
/// Defaults to <see cref="ContinueAsNewTraceBehavior.PreserveTraceContext"/>.
/// </summary>
public ContinueAsNewTraceBehavior ContinueAsNewTraceBehavior { get; set; } =
ContinueAsNewTraceBehavior.PreserveTraceContext;
}
}
46 changes: 46 additions & 0 deletions src/DurableTask.Core/ContinueAsNewOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// ----------------------------------------------------------------------------------
// Copyright Microsoft Corporation
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------
#nullable enable
namespace DurableTask.Core
{
/// <summary>
/// Configures how an orchestration continues as new.
/// </summary>
public sealed class ContinueAsNewOptions
{
/// <summary>
/// Gets or sets how distributed tracing should behave for the next generation.
/// The default is <see cref="ContinueAsNewTraceBehavior.PreserveTraceContext"/>,
/// which keeps the next generation in the same distributed trace.
/// </summary>
public ContinueAsNewTraceBehavior TraceBehavior { get; set; } =
ContinueAsNewTraceBehavior.PreserveTraceContext;
}

/// <summary>
/// Describes how distributed tracing should behave for the next <c>ContinueAsNew</c> generation.
/// </summary>
public enum ContinueAsNewTraceBehavior
{
/// <summary>
/// Preserve the current trace lineage across generations. This is the default.
/// </summary>
PreserveTraceContext = 0,

/// <summary>
/// Start the next generation in a fresh distributed trace. Useful for long-running
/// periodic orchestrations where each cycle should be independently observable.
/// </summary>
StartNewTrace = 1,
}
}
10 changes: 10 additions & 0 deletions src/DurableTask.Core/History/ExecutionStartedEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ internal ExecutionStartedEvent(ExecutionStartedEvent other)
Correlation = other.Correlation;
ScheduledStartTime = other.ScheduledStartTime;
Generation = other.Generation;
GenerateNewTrace = other.GenerateNewTrace;
}

/// <summary>
Expand Down Expand Up @@ -133,6 +134,15 @@ internal ExecutionStartedEvent(ExecutionStartedEvent other)
[DataMember]
public int? Generation { get; set; }

/// <summary>
/// When true, indicates that this execution should start a fresh distributed trace
/// rather than inheriting the trace context from the previous generation.
/// This flag is consumed once by the trace infrastructure and reset to false after
/// the new trace is created, so that subsequent replays use the persisted trace identity.
/// </summary>
[DataMember]
public bool GenerateNewTrace { get; set; }

// Used for Continue-as-New scenarios
internal void SetParentTraceContext(ExecutionStartedEvent parent)
{
Expand Down
27 changes: 27 additions & 0 deletions src/DurableTask.Core/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,33 @@ public abstract Task<T> CreateSubOrchestrationInstance<T>(string name, string ve
/// </param>
public abstract void ContinueAsNew(string newVersion, object input);

/// <summary>
/// Checkpoint the orchestration instance by completing the current execution in the ContinueAsNew
/// state and creating a new execution of this instance with the specified input parameter.
/// This overload allows the caller to customize how the next generation behaves, such as
/// starting a fresh distributed trace.
/// </summary>
/// <param name="newVersion">
/// New version of the orchestration to start. Pass <c>null</c> to keep the current version.
/// </param>
/// <param name="input">
/// Input to the new execution of this instance. This is the same type as the one used to start
/// the first execution of this orchestration instance.
/// </param>
/// <param name="options">
/// Options that customize the next generation.
/// </param>
/// <exception cref="NotSupportedException">
/// Thrown if the current <see cref="OrchestrationContext"/> implementation does not support
/// <see cref="ContinueAsNewOptions"/>. Override this method in a derived class to add support.
/// </exception>
public virtual void ContinueAsNew(string newVersion, object input, ContinueAsNewOptions options)
{
throw new NotSupportedException(
$"This {GetType().Name} implementation does not support ContinueAsNewOptions. " +
"Override this method in a derived class to add support.");
}

/// <summary>
/// Create a proxy client class to schedule remote TaskActivities via a strongly typed interface.
/// </summary>
Expand Down
22 changes: 17 additions & 5 deletions src/DurableTask.Core/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal class TaskOrchestrationContext : OrchestrationContext
private readonly IDictionary<int, OpenTaskInfo> openTasks;
private readonly IDictionary<int, OrchestratorAction> orchestratorActionsMap;
private OrchestrationCompleteOrchestratorAction continueAsNew;
private static readonly ContinueAsNewOptions DefaultContinueAsNewOptions = new ContinueAsNewOptions();
private bool executionCompletedOrTerminated;
private int idCounter;
private readonly Queue<HistoryEvent> eventsWhileSuspended;
Expand Down Expand Up @@ -249,23 +250,34 @@ public override void SendEvent(OrchestrationInstance orchestrationInstance, stri

public override void ContinueAsNew(object input)
{
ContinueAsNew(null, input);
this.ContinueAsNew(null, input, DefaultContinueAsNewOptions);
}

public override void ContinueAsNew(string newVersion, object input)
{
ContinueAsNewCore(newVersion, input);
this.ContinueAsNew(newVersion, input, DefaultContinueAsNewOptions);
}
Comment thread
chandramouleswaran marked this conversation as resolved.

void ContinueAsNewCore(string newVersion, object input)
public override void ContinueAsNew(string newVersion, object input, ContinueAsNewOptions options)
{
if (options == null)
{
throw new ArgumentNullException(nameof(options));
}

ContinueAsNewCore(newVersion, input, options);
}

void ContinueAsNewCore(string newVersion, object input, ContinueAsNewOptions options)
{
string serializedInput = this.MessageDataConverter.SerializeInternal(input);

this.continueAsNew = new OrchestrationCompleteOrchestratorAction
{
Result = serializedInput,
OrchestrationStatus = OrchestrationStatus.ContinuedAsNew,
NewVersion = newVersion
NewVersion = newVersion,
ContinueAsNewTraceBehavior = options.TraceBehavior,
};
}

Expand Down Expand Up @@ -742,4 +754,4 @@ class OpenTaskInfo
public TaskCompletionSource<string> Result { get; set; }
}
}
}
}
29 changes: 25 additions & 4 deletions src/DurableTask.Core/TaskOrchestrationDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace DurableTask.Core
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -641,8 +642,20 @@ protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem work
continueAsNewExecutionStarted!.Correlation = CorrelationTraceContext.Current.SerializableTraceContext;
});

// Copy the distributed trace context, if any
continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
// Copy the distributed trace context to preserve lineage, unless
// the next generation was explicitly requested to start a fresh trace.
if (!continueAsNewExecutionStarted!.GenerateNewTrace)
{
continueAsNewExecutionStarted.SetParentTraceContext(runtimeState.ExecutionStartedEvent);
Comment on lines +645 to +649
Copy link

Copilot AI Apr 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnProcessWorkItemAsync can execute the next ContinueAsNew generation immediately in the same do { ... } while (continuedAsNew) loop after rebuilding runtimeState. However, the orchestration traceActivity/DistributedTraceActivity.Current is created once before the loop and is never reset/restarted for continueAsNewExecutionStarted, so ContinueAsNewTraceBehavior.StartNewTrace won’t actually produce a fresh trace for the next generation when it begins executing in this same loop. Consider stopping/clearing the current activity and starting a new one from continueAsNewExecutionStarted before the next loop iteration (and update the traceActivity variable used for child spans accordingly).

Copilot uses AI. Check for mistakes.
}
else
{
// Stamp the dispatcher processing time for this continuation request
// so the producer span created by TraceHelper uses this timestamp.
continueAsNewExecutionStarted.Tags ??= new Dictionary<string, string>();
continueAsNewExecutionStarted.Tags[OrchestrationTags.RequestTime] =
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this path, the RequestTime will be override everytime, so the value should be always accurate, but this might have a minor tag leak to the user. user will always see one extra tag set by the library

DateTimeOffset.UtcNow.ToString("O", CultureInfo.InvariantCulture);
}

runtimeState = new OrchestrationRuntimeState();
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
Expand Down Expand Up @@ -1055,10 +1068,18 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt
InstanceId = runtimeState.OrchestrationInstance!.InstanceId,
ExecutionId = Guid.NewGuid().ToString("N")
},
Tags = runtimeState.Tags,
// Clone tags to avoid mutating the current generation's tag dictionary.
// Preserve the comparer if the underlying type is Dictionary<string, string>.
Tags = runtimeState.Tags == null
? null
: runtimeState.Tags is Dictionary<string, string> dictionaryTags
? new Dictionary<string, string>(dictionaryTags, dictionaryTags.Comparer)
: new Dictionary<string, string>(runtimeState.Tags),
ParentInstance = runtimeState.ParentInstance,
Name = runtimeState.Name,
Version = completeOrchestratorAction.NewVersion ?? runtimeState.Version
Version = completeOrchestratorAction.NewVersion ?? runtimeState.Version,
// Signal that the next generation should start a fresh distributed trace
GenerateNewTrace = completeOrchestratorAction.ContinueAsNewTraceBehavior == ContinueAsNewTraceBehavior.StartNewTrace,
};

taskMessage.OrchestrationInstance = startedEvent.OrchestrationInstance;
Expand Down
38 changes: 29 additions & 9 deletions src/DurableTask.Core/Tracing/TraceHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,42 @@ public class TraceHelper
return null;
}

if (startEvent.Tags != null && startEvent.Tags.ContainsKey(OrchestrationTags.CreateTraceForNewOrchestration))
Comment thread
chandramouleswaran marked this conversation as resolved.
// Check both the typed GenerateNewTrace property (used by ContinueAsNew)
// and the legacy CreateTraceForNewOrchestration tag (used by external clients
// like durabletask-dotnet's ShimDurableTaskClient) for backward compatibility.
bool shouldGenerateNewTrace = startEvent.GenerateNewTrace;
if (!shouldGenerateNewTrace &&
startEvent.Tags != null &&
startEvent.Tags.ContainsKey(OrchestrationTags.CreateTraceForNewOrchestration))
{
shouldGenerateNewTrace = true;
}

if (shouldGenerateNewTrace)
{
startEvent.Tags.Remove(OrchestrationTags.CreateTraceForNewOrchestration);
// Note that if we create the trace activity for starting a new orchestration here, then its duration will be longer since its end time will be set to once we
// start processing the orchestration rather than when the request for a new orchestration is committed to storage.
using var activityForNewOrchestration = StartActivityForNewOrchestration(startEvent);

// Consume the signals only after the fresh trace identity is established.
// ActivitySource.StartActivity may return null when sampling suppresses the
// producer span, in which case the request must remain for replay.
if (activityForNewOrchestration != null || startEvent.ParentTraceContext != null)
{
startEvent.GenerateNewTrace = false;
startEvent.Tags?.Remove(OrchestrationTags.CreateTraceForNewOrchestration);
}
}

if (!startEvent.TryGetParentTraceContext(out ActivityContext activityContext))
{
return null;
}

DistributedTraceContext parentTraceContext = startEvent.ParentTraceContext!;
string activityName = CreateSpanName(TraceActivityConstants.Orchestration, startEvent.Name, startEvent.Version);
ActivityKind activityKind = ActivityKind.Server;
DateTimeOffset startTime = startEvent.ParentTraceContext.ActivityStartTime ?? default;
DateTimeOffset startTime = parentTraceContext.ActivityStartTime ?? default;

Activity? activity = ActivityTraceSource.StartActivity(
activityName,
Expand All @@ -132,16 +152,16 @@ public class TraceHelper
activity.SetTag(Schema.Task.Version, startEvent.Version);
}

if (startEvent.ParentTraceContext.Id != null && startEvent.ParentTraceContext.SpanId != null)
if (parentTraceContext.Id != null && parentTraceContext.SpanId != null)
{
activity.SetId(startEvent.ParentTraceContext.Id!);
activity.SetSpanId(startEvent.ParentTraceContext.SpanId!);
activity.SetId(parentTraceContext.Id!);
activity.SetSpanId(parentTraceContext.SpanId!);
}
else
{
startEvent.ParentTraceContext.Id = activity.Id;
startEvent.ParentTraceContext.SpanId = activity.SpanId.ToString();
startEvent.ParentTraceContext.ActivityStartTime = activity.StartTimeUtc;
parentTraceContext.Id = activity.Id;
parentTraceContext.SpanId = activity.SpanId.ToString();
parentTraceContext.ActivityStartTime = activity.StartTimeUtc;
}

DistributedTraceActivity.Current = activity;
Expand Down
Loading