Deserialize an Avro file with C#
I was able to get full data access working using dynamic
. Here's the code for accessing the raw body
data, which is stored as an array of bytes. In my case, those bytes contain UTF8-encoded JSON, but of course it depends on how you initially created your EventData
instances that you published to the Event Hub:
using (var reader = AvroContainer.CreateGenericReader(stream))
{
while (reader.MoveNext())
{
foreach (dynamic record in reader.Current.Objects)
{
var sequenceNumber = record.SequenceNumber;
var bodyText = Encoding.UTF8.GetString(record.Body);
Console.WriteLine($"{sequenceNumber}: {bodyText}");
}
}
}
If someone can post a statically-typed solution, I'll upvote it, but given that the bigger latency in any system will almost certainly be the connection to the Event Hub Archive blobs, I wouldn't worry about parsing performance. :)
This Gist shows how to deserialize an event hub capture with C# using Microsoft.Hadoop.Avro2, which has the advantage of being both .NET Framework 4.5 and .NET Standard 1.6 compliant:
var connectionString = "<Azure event hub capture storage account connection string>";
var containerName = "<Azure event hub capture container name>";
var blobName = "<Azure event hub capture BLOB name (ends in .avro)>";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var blobClient = storageAccount.CreateCloudBlobClient();
var container = blobClient.GetContainerReference(containerName);
var blob = container.GetBlockBlobReference(blobName);
using (var stream = blob.OpenRead())
using (var reader = AvroContainer.CreateGenericReader(stream))
while (reader.MoveNext())
foreach (dynamic result in reader.Current.Objects)
{
var record = new AvroEventData(result);
record.Dump();
}
public struct AvroEventData
{
public AvroEventData(dynamic record)
{
SequenceNumber = (long) record.SequenceNumber;
Offset = (string) record.Offset;
DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc);
EnqueuedTimeUtc = enqueuedTimeUtc;
SystemProperties = (Dictionary<string, object>) record.SystemProperties;
Properties = (Dictionary<string, object>) record.Properties;
Body = (byte[]) record.Body;
}
public long SequenceNumber { get; set; }
public string Offset { get; set; }
public DateTime EnqueuedTimeUtc { get; set; }
public Dictionary<string, object> SystemProperties { get; set; }
public Dictionary<string, object> Properties { get; set; }
public byte[] Body { get; set; }
}
NuGet references:
- Microsoft.Hadoop.Avro2 (1.2.1 works)
- WindowsAzure.Storage (8.3.0 works)
Namespaces:
- Microsoft.Hadoop.Avro.Container
- Microsoft.WindowsAzure.Storage
I was finally able to get this to work with the Apache C# library / framework.
I was stuck for a while because the Capture feature of the Azure Event Hubs sometimes outputs a file without any message content.
I may have also had an issue with how the messages were originally serialized into the EventData object.
The code below was for a file saved to disk from a capture blob container.
var dataFileReader = DataFileReader<EventData>.OpenReader(file);
foreach (var record in dataFileReader.NextEntries)
{
// Do work on EventData object
}
This also works using the GenericRecord object.
var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
This took some effort to figure out. However I now agree this Azure Event Hubs Capture feature is a great feature to backup all events. I still feel they should make the format optional like they did with Stream Analytic job output but maybe I will get used to Avro.