Creating a weak subscription to an IObservable
You can subscribe a proxy observer to the observable that holds a weak reference to the actual observer and disposes the subscription when the actual observer is no longer alive:
static IDisposable WeakSubscribe<T>(
this IObservable<T> observable, IObserver<T> observer)
{
return new WeakSubscription<T>(observable, observer);
}
class WeakSubscription<T> : IDisposable, IObserver<T>
{
private readonly WeakReference reference;
private readonly IDisposable subscription;
private bool disposed;
public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
{
this.reference = new WeakReference(observer);
this.subscription = observable.Subscribe(this);
}
void IObserver<T>.OnCompleted()
{
var observer = (IObserver<T>)this.reference.Target;
if (observer != null) observer.OnCompleted();
else this.Dispose();
}
void IObserver<T>.OnError(Exception error)
{
var observer = (IObserver<T>)this.reference.Target;
if (observer != null) observer.OnError(error);
else this.Dispose();
}
void IObserver<T>.OnNext(T value)
{
var observer = (IObserver<T>)this.reference.Target;
if (observer != null) observer.OnNext(value);
else this.Dispose();
}
public void Dispose()
{
if (!this.disposed)
{
this.disposed = true;
this.subscription.Dispose();
}
}
}
There is another option using the weak-event-patterns
Essentially System.Windows.WeakEventManager
has you covered.
Using MVVM when your ViewModel relies on services with events you can weakly subscribe to those services allowing your ViewModel to be collected with the view without the event subscription keeping it alive.
using System;
using System.Windows;
class LongLivingSubject
{
public event EventHandler<EventArgs> Notifications = delegate { };
}
class ShortLivingObserver
{
public ShortLivingObserver(LongLivingSubject subject)
{
WeakEventManager<LongLivingSubject, EventArgs>
.AddHandler(subject, nameof(subject.Notifications), Subject_Notifications);
}
private void Subject_Notifications(object sender, EventArgs e)
{
}
}
this is my implementation (quit simple one)
public class WeakObservable<T>: IObservable<T>
{
private IObservable<T> _source;
public WeakObservable(IObservable<T> source)
{
#region Validation
if (source == null)
throw new ArgumentNullException("source");
#endregion Validation
_source = source;
}
public IDisposable Subscribe(IObserver<T> observer)
{
IObservable<T> source = _source;
if(source == null)
return Disposable.Empty;
var weakObserver = new WaekObserver<T>(observer);
IDisposable disp = source.Subscribe(weakObserver);
return disp;
}
}
public class WaekObserver<T>: IObserver<T>
{
private WeakReference<IObserver<T>> _target;
public WaekObserver(IObserver<T> target)
{
#region Validation
if (target == null)
throw new ArgumentNullException("target");
#endregion Validation
_target = new WeakReference<IObserver<T>>(target);
}
private IObserver<T> Target
{
get
{
IObserver<T> target;
if(_target.TryGetTarget(out target))
return target;
return null;
}
}
#region IObserver<T> Members
/// <summary>
/// Notifies the observer that the provider has finished sending push-based notifications.
/// </summary>
public void OnCompleted()
{
IObserver<T> target = Target;
if (target == null)
return;
target.OnCompleted();
}
/// <summary>
/// Notifies the observer that the provider has experienced an error condition.
/// </summary>
/// <param name="error">An object that provides additional information about the error.</param>
public void OnError(Exception error)
{
IObserver<T> target = Target;
if (target == null)
return;
target.OnError(error);
}
/// <summary>
/// Provides the observer with new data.
/// </summary>
/// <param name="value">The current notification information.</param>
public void OnNext(T value)
{
IObserver<T> target = Target;
if (target == null)
return;
target.OnNext(value);
}
#endregion IObserver<T> Members
}
public static class RxExtensions
{
public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source)
{
return new WeakObservable<T>(source);
}
}
static void Main(string[] args)
{
Console.WriteLine("Start");
var xs = Observable.Interval(TimeSpan.FromSeconds(1));
Sbscribe(xs);
Thread.Sleep(2020);
Console.WriteLine("Collect");
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
Console.WriteLine("Done");
Console.ReadKey();
}
private static void Sbscribe<T>(IObservable<T> source)
{
source.ToWeakObservable().Subscribe(v => Console.WriteLine(v));
}
Ran across this thread a couple years later...just wanted to point forward to the solution identified on Samuel Jack's blog which adds an extension method to IObservable called WeaklySubscribe. It uses an approach of adding a shim between the subject and observer that tracks the target with a WeakReference. That is similar to solutions offered by others for the problem of strong references in event subscriptions, such as in this article or this solution by Paul Stovell. Having for awhile used something based on Paul's approach I like Samuel's solution to weak IObservable Subscribes.