Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObjectDisposedException in provided example #2

Closed
mikhailshilkov opened this issue Jul 30, 2016 · 3 comments
Closed

ObjectDisposedException in provided example #2

mikhailshilkov opened this issue Jul 30, 2016 · 3 comments

Comments

@mikhailshilkov
Copy link

If I put more messages (several dozens) into the queue and then try to run the provided example, ObjectDisposedException gets thrown at n-th attempt to get BrokeredMessage's body.

at Microsoft.ServiceBus.Messaging.BufferedInputStream.ThrowIfDisposed()
at Microsoft.ServiceBus.Messaging.BufferedInputStream.Read(Byte[] buffer, Int32 offset, Int32 count)
at System.IO.Stream.ReadByte()
at System.Xml.XmlBufferReader.TryEnsureByte()
at System.Xml.XmlBufferReader.GetByteHard()
at System.Xml.XmlBufferReader.GetByte()
at System.Xml.XmlBinaryReader.ReadElementContentAsString()
at System.Runtime.Serialization.XmlReaderDelegator.ReadElementContentAsString()
at System.Runtime.Serialization.StringDataContract.ReadXmlValue(XmlReaderDelegator reader, XmlObjectSerializerReadContext context)
at System.Runtime.Serialization.DataContractSerializer.InternalReadObject(XmlReaderDelegator xmlReader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
at System.Runtime.Serialization.DataContractSerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)
at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)
at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlReader reader, Boolean verifyObjectName)
at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName)
at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)
at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlDictionaryReader reader)
at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(Stream stream)
at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T](XmlObjectSerializer serializer)
at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBodyT
at RxStreams.Program.<>c.

b__2_1(BrokeredMessage x)

@marcpiechura
Copy link
Owner

I think the problem is that I use the OnMessage callback and that AutoComplete is set to true by default and therefore the message gets completed as far as the OnMessage callback completes.

I will rewrite the ServiceBus implementation and use a simple poll version instead which should fix the issue.

I haven't used ServiceBus so if you have more experience with it any feedback would be very welcome!

@mikhailshilkov
Copy link
Author

I'm more or less familiar to Service Bus, but totally new to Akka Streams. The problem here is that actions happen in the following sequence:

  1. OnMessageAsync is called. You return task completion source. Ok.
  2. OnMessage calls TryPush. Ok.
  3. You call Push and pass the message, which is still alive. Ok.
  4. Now you complete the completion source, which also disposes the message. But...
  5. Select processing stage is called only now. Error happens.

So, we shouldn't complete until we know the processing of the message ended. But is that possible to know? One of the stream stages could be parallel, so we need to carry multiple messages open.

Other possibility would be to call message.GetBody from inside the source, and then send the result (obviously it must be generic) to the pipeline.

@marcpiechura
Copy link
Owner

marcpiechura commented Aug 1, 2016

Right, we can't know when the message is processed and I don't want to make this decision for the user.
My plan was to use ReceiveBatchAsync instead of OnMessage and implement it the same way the Storage Queue is working.
This way the user can decide if and when he want's to complete the message.

I wasn't aware of this behavior otherwise I would have used ReceiveBatchAsync from the beginning.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants