Wednesday, February 27, 2019

Any TPL Dataflow fans willing to do a code review?

I'm trying to add an filter to a basic buffer block. The idea is that if a message doesn't pass the filter then it should be dropped on the floor (or alternately, declined).

Do you see any mistakes or potential problems?

/// <summary> /// A FilterBlock is a BufferBlock that will discard or decline messages that do not pass the filter. /// </summary> /// <typeparam name="T"></typeparam> /// <seealso cref="System.Threading.Tasks.Dataflow.IPropagatorBlock{T, T}" /> public class FilterBlock<T> : IPropagatorBlock<T, T>, IReceivableSourceBlock<T> { static readonly DataflowBlockOptions Default = new DataflowBlockOptions(); readonly BufferBlock<T> m_Buffer; readonly Predicate<T> m_Filter; readonly object m_IncomingLock = new object(); bool m_DeclineMessages; bool m_TargetDecliningPermanently; /// <summary> /// Initializes a new instance of the <see cref="FilterBlock{T}"/> class. /// </summary> /// <param name="filter">The filter to apply to incoming messages.</param> /// <param name="dataflowBlockOptions">The dataflow block options.</param> /// <remarks>Messages that do not pass the filter will be marked as accepted and dropped.</remarks> public FilterBlock(Predicate<T> filter, DataflowBlockOptions dataflowBlockOptions) : this(filter, false, dataflowBlockOptions) { } /// <summary> /// Initializes a new instance of the <see cref="FilterBlock{T}"/> class. /// </summary> /// <param name="filter">The filter to apply to incoming messages.</param> /// <remarks>Messages that do not pass the filter will be marked as accepted and dropped.</remarks> public FilterBlock(Predicate<T> filter) : this(filter, false, Default) { } /// <summary> /// Initializes a new instance of the <see cref="FilterBlock{T}"/> class. /// </summary> /// <param name="filter">The filter to apply to incoming messages.</param> /// <param name="declineMessages">If true, messages that don't pass the filter will be declined. If false, messages that don't pass the filter will be accepted and dropped.</param> /// <param name="dataflowBlockOptions">The dataflow block options.</param> public FilterBlock(Predicate<T> filter, bool declineMessages, DataflowBlockOptions dataflowBlockOptions) { m_Filter = filter; m_DeclineMessages = declineMessages; m_Buffer = new BufferBlock<T>(dataflowBlockOptions); } /// <summary> /// Initializes a new instance of the <see cref="FilterBlock{T}"/> class. /// </summary> /// <param name="filter">The filter to apply to incoming messages.</param> /// <param name="declineMessages">If true, messages that don't pass the filter will be declined. If false, messages that don't pass the filter will be accepted and dropped.</param> public FilterBlock(Predicate<T> filter, bool declineMessages) : this(filter, declineMessages, Default) { } /// <summary> /// Gets a <see cref="T:System.Threading.Tasks.Task"></see> that represents the asynchronous operation and completion of the dataflow block. /// </summary> /// <value>The completion.</value> public Task Completion => m_Buffer.Completion; /// <summary> /// Gets the number of items currently stored in the buffer. /// </summary> /// <value>The number of items.</value> public int Count { get => m_Buffer.Count; } /// <summary> /// Signals to the <see cref="T:System.Threading.Tasks.Dataflow.IDataflowBlock"></see> that it should not accept nor produce any more messages nor consume any more postponed messages. /// </summary> public void Complete() { lock (m_IncomingLock) { m_TargetDecliningPermanently = true; } m_Buffer.Complete(); } T ISourceBlock<T>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed) { return ((ISourceBlock<T>)m_Buffer).ConsumeMessage(messageHeader, target, out messageConsumed); } void IDataflowBlock.Fault(Exception exception) { ((ISourceBlock<T>)m_Buffer).Fault(exception); } /// <summary> /// Links the System.Threading.Tasks.Dataflow.ISourceBlock`1 to the specified System.Threading.Tasks.Dataflow.ITargetBlock`1. /// </summary> /// <param name="target">The System.Threading.Tasks.Dataflow.ITargetBlock`1 to which to connect this source.</param> /// <param name="linkOptions"> /// A System.Threading.Tasks.Dataflow.DataflowLinkOptions instance that configures /// the link. /// </param> /// <returns>An IDisposable that, upon calling Dispose, will unlink the source from the target..</returns> public IDisposable LinkTo(ITargetBlock<T> target, DataflowLinkOptions linkOptions) { return m_Buffer.LinkTo(target, linkOptions); } DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept) { // Validate arguments if (!messageHeader.IsValid) throw new ArgumentException("Invalid message header", nameof(messageHeader)); if (source == null && consumeToAccept) throw new ArgumentException("Cannot consume from a null source", nameof(consumeToAccept)); lock (m_IncomingLock) { if (m_TargetDecliningPermanently) return DataflowMessageStatus.DecliningPermanently; if (m_Filter(messageValue)) { return ((ITargetBlock<T>)m_Buffer).OfferMessage(messageHeader, messageValue, source, consumeToAccept); } else //accept and drop or decline the message { // Consume the message from the source if necessary if (consumeToAccept) { bool consumed; messageValue = source.ConsumeMessage(messageHeader, this, out consumed); if (!consumed) return DataflowMessageStatus.NotAvailable; } return m_DeclineMessages ? DataflowMessageStatus.Declined : DataflowMessageStatus.Accepted; } } } void ISourceBlock<T>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T> target) { ((ISourceBlock<T>)m_Buffer).ReleaseReservation(messageHeader, target); } bool ISourceBlock<T>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T> target) { return ((ISourceBlock<T>)m_Buffer).ReserveMessage(messageHeader, target); } /// <summary> /// Attempts to synchronously receive an available output item from the <see cref="T:System.Threading.Tasks.Dataflow.IReceivableSourceBlock`1"></see>. /// </summary> /// <param name="filter">The predicate value must successfully pass in order for it to be received. filter may be null, in which case all items will pass.</param> /// <param name="item">The item received from the source.</param> /// <returns>true if an item could be received; otherwise, false.</returns> public bool TryReceive(Predicate<T> filter, out T item) { return m_Buffer.TryReceive(filter, out item); } /// <summary> /// Attempts to synchronously receive all available items from the <see cref="T:System.Threading.Tasks.Dataflow.IReceivableSourceBlock`1"></see>. /// </summary> /// <param name="items">The items received from the source.</param> /// <returns>true if one or more items could be received; otherwise, false.</returns> public bool TryReceiveAll(out IList<T> items) { return m_Buffer.TryReceiveAll(out items); } } 
Any TPL Dataflow fans willing to do a code review? Click here
  • Blogger Comment
  • Facebook Comment

0 comments:

Post a Comment

The webdev Team