# Azure Event Hubs with Microsoft Fabric

Azure Event Hub, Microsoft’s cloud-based event streaming platform addresses the need for providing a high-throughput distributed service capable of ingesting millions of events per second whether its tracking user activity or monitoring IoT devices or log data.

Just a brief background on the setup of Azure Stream Analytics . It operates around the concept of a Job, which includes an input, query, and output. It can ingest data from sources like Azure Event Hubs (including those from Apache Kafka), Azure IoT Hub, and Azure Blob Storage. The query built on SQL syntax basically enables efficient data streaming over a defined time period.

In this write-up, I will walk through a pretty simple setup that covers the entire data pipeline from ingestion to processing and finally the storage. We’ll start by ingesting data using Azure Event Hub created through a C# library called `Bogus` then process it with Azure Stream Analytics, applying transformations through SQL-based queries, and conclude by storing the results in an Azure SQL database and test if data can be replicated to Microsoft fabric. I hope this will provide a clear understanding of how to build an end-to-end solution for real-time data streaming and analytics across Azure and the Fabric ecosystem.

`Bogus` is a great fake data generator for `.NET` languages. You can check its details [here](https://github.com/bchavez/Bogus). To install the `Nuget` package use `Install-Package Bogus`.

To get started, we would mimic data of an product ordering system where there are customers and order details for those customers. The `Bogus` library would generate `JSON` objects containing customer information along with an array of order details associated with that customer.

### Setup

Create a new Console application and ensure following references added to the project

```csharp
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using Bogus;
using System.Text.Json;
```

Define constants to specify the number of records to generate through `Bogus`. We would generate 50 customer records with each customer record having 2 orders.

```csharp
  public const int NumberOfCustomers = 50;
  public const int NumberOfOrdersPerCustomer = 2;
```

Create POCO for `Customers` and `Orders`

```csharp
   public class Customer
   {
       public Guid Id { get; set; }
       public string FirstName { get; set; } = default!;
       public string LastName { get; set; } = default!;
       public string Address { get; set; } = default!;
       public string Email { get; set; } = default!;
       public string AboutMe { get; set; } = default!;
       public int YearsOld { get; set; }
       public IEnumerable<Order>? Orders { get; set; }
       public override string ToString()
       {
           return JsonSerializer.Serialize(this, new JsonSerializerOptions { WriteIndented = true });
       }
   }

public sealed class Order
 {
     public Guid OrderId { get; set; }
     public Guid CustomerId { get; set; }
     public string ProductName { get; set; } = default!;

     public override string ToString()
     {
         return JsonSerializer.Serialize(this, new JsonSerializerOptions { WriteIndented = true });
     }
 }
```

Next, create an instance of a class called `Faker<T>` to create customer data.

```csharp
 private static Faker<Customer> CustomerGenerator()
 {
     return new Faker<Customer>()
         .RuleFor(e => e.Id, _ => Guid.NewGuid())
         .RuleFor(e => e.FirstName, f => f.Name.FirstName())
         .RuleFor(e => e.LastName, f => f.Name.LastName())
         .RuleFor(e => e.Address, f => f.Address.FullAddress())
         .RuleFor(e => e.Email, (f, e) => f.Internet.Email(e.FirstName, e.LastName))
         .RuleFor(e => e.AboutMe, f => f.Lorem.Paragraph(1))
         .RuleFor(e => e.YearsOld, f => f.Random.Int(18, 60))
         .RuleFor(e => e.Orders, (_, e) =>
         {
             return GenerateOrderData(e.Id);
         });
 }

public static void GenerateCustomerData()
{
    var customerGenerator = CustomerGenerator();
    var generatedCustomers = customerGenerator.Generate(NumberOfCustomers);
    Customers.AddRange(generatedCustomers);
}
```

Do the same for Orders

```csharp
 private static Faker<Order> OrderGenerator(Guid customerId)
 {
     return new Faker<Order>()
         .RuleFor(v => v.OrderId, _ => Guid.NewGuid())
         .RuleFor(v => v.CustomerId, _ => customerId)
         .RuleFor(v => v.ProductName, f => f.Commerce.ProductName());
 }

 private static List<Order>GenerateOrderData(Guid customerId)
 {
     var orderGenerator = OrderGenerator(customerId);
     var generatedOrders = orderGenerator.Generate(NumberOfOrdersPerCustomer);
     Orders.AddRange(generatedOrders);
     return generatedOrders;
 }
```

Execute the data generation method.

```csharp
GenerateCustomerData();
```

Here is how the output from data generated by `Bogus` looks. You would have 50 customer `JSON` records created with 2 order details for each customer.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725970327788/a9bd0154-7238-4fd7-a785-bd7e07451014.png align="left")

We would then feed this data to Azure Event Hub and create a Stream analytics job to push this data to Azure SQL database.

So first create an Event Hub namespace and an Event hub under the namespace. You can check the step by step process to create one detailed [here](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-create).

Ensure that you are at least using the Standard Tier for your Event hub namespace else you would face issues with Consumer Group configuration.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725971478345/789e9f7f-4c38-42cb-b106-2e51152ac18e.png align="left")

Once everything is all set we would push the data generated by the `Bogus` library to the Event Hub. I am doing this through the `Main` method of the console application.

```csharp
 static async Task Main(string[] args)
 {
     Console.WriteLine("Bogus has started creating data");
     GenerateCustomerData();
     Console.WriteLine("Bogus ended creating data");
     
     var connectionString = "EvenHubConnectionString";
     var eventHubName = "EventHubName";

     Console.WriteLine(">>>>>>>>>>>>>>>>>>");
     Console.WriteLine("Started data upload to Event Hub");
     var producer = new EventHubProducerClient(connectionString, eventHubName);

     try
     {
         var eventsToSend = new List<EventData>();

         for (var index = 0; index < Customers.Count; ++index)
         {
             var eventData = new EventData(Customers[index].ToString());
             eventsToSend.Add(eventData);
         }

         await producer.SendAsync(eventsToSend);
     }
     finally
     {
         await producer.CloseAsync();
     }
      Console.WriteLine("Data upload to Event Hub ended");
 }
```

Query the data in Event Hubs to ensure that data is transferred. You could do it through Data Explorer for your Event Hub Namespace.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725972620774/2f36d5d7-d775-4094-96ad-997ca1e8952e.png align="center")

or through Stream Analytics query window.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725972314242/fd45a22a-2460-4182-9565-e717159b8c44.png align="center")

One interesting point to note is that the `Orders` array is stored as a string and this isnt desirable.

Lets try to push the data into Azure SQL DB. First I tried doing it using `Transform and store data to SQL database` option . This is a no code feature available to analyze and process the event hub data and push it across different sources.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725974484785/bfe2a5d8-9789-4460-adb8-84a2fa837ae1.png align="center")

I tested this feature by using the `Expand transformation` step in the flow to parse the `Orders` array in the `JSON`. But it only separates the individual elements from the array across rows in `JSON` format , which wasn’t helpful at all.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725974611171/7b6c3aeb-2c87-43a6-9e76-69ec6ee5e8a0.png align="left")

The only option left was to use `Stream Analytics SQL` and parse and split the Orders array from the `JSON`.

So I created a new Stream Analytics jobs and set up the Job Topology with Input as the Event Hub data and Output set to Azure SQL DB.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725975419140/ff2f7dcd-f2e8-4c99-99a4-d6515a6bfb69.png align="center")

I then used the following query for data transformation.

```sql
 SELECT   
    i.id,
    i.firstname,
    i.lastname,
    i.Address,	
    i.Email,
    i.AboutMe,
    i.YearsOld ,
    OrderData.ArrayValue.OrderId as OrderId,
    OrderData.ArrayValue.CustomerId as CustomerId,
	OrderData.ArrayValue.ProductName as ProductName
    INTO
    [EventHubs-DB] --name of the Azure SQL database
   FROM [evenr-hub-1] i --name of the event hub 
CROSS APPLY GetArrayElements(Orders) AS OrderData
```

and it gave me the output that I was looking for.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725975582176/63fee5ac-07f5-45fe-a3b2-e8e3c9437a4a.png align="center")

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725978662952/87cc9c83-c3e6-49f4-a5e4-84273359841f.png align="center")

This way, `OrderId`, `CustomerId`, and `ProductName` from the Orders array of the input `JSON` is parsed and flattened into separate rows and columns.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725991975309/9bbd1f53-6850-4cac-86f3-2c57c9e80854.gif align="center")

Out of curiosity I then decided to try the `EventStream` feature of Microsoft fabric to check if data from Azure Event hub gets perfectly replicated to event stream on fabric

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725984254239/23a4098e-63b3-4ee5-96ed-d23f87eadcdf.png align="center")

<s>and to my surprise the Orders array from the </s> `JSON` <s> input was completely missing.</s>

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1725977239092/a199b75b-c519-48f5-9f19-67fc77a7962e.png align="center")

<s>I am not sure if this is a bug or if there is something that I missed while the configuring Azure Event hub with Fabric </s> `Eventstream`<s>.</s>  
  
**An update on the above issue:**  
  
I deleted and recreated the `Eventstream` and the missing column `Orders` became available in the output.

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1726011499559/73ae7b08-79f0-4524-84a4-57c3a4146c47.png align="left")

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1726011927006/7f1fe545-76e3-4aaf-a7d6-14d38508ffd5.png align="left")

  
So in the next step I went ahead ahead and created an`EventHouse` in Fabric

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1726011827241/a9012f37-f8b2-4035-8b93-f3d2e1d1fbb6.png align="left")

and then created a `KQL database` and ingested the data from the `Azure` `EventHub` to `Fabric` `EventStream` and then to the newly created `KQL database` through the `Eventhouse`

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1726012200562/c5ccc388-d3cc-4f5e-8631-85e1b2850c6f.jpeg align="center")

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1726014470959/9de5fd48-ad6c-416a-8f88-a2f1e92ec0a0.gif align="center")

And once the data was available in the `KQL database`, I used the following KQL query to flatten the Orders `JSON` data.

```less
Customer_Orders
|extend ExtProp=parse_json(Orders)
|mv-expand ExtProp
| project Id, FirstName,LastName,Address,Email,AboutMe,YearsOld,
  Orderid= ExtProp.OrderId,
  CustomerId=ExtProp.CustomerId,
  ProductName=ExtProp.ProductName
```

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1726014057737/13a99f67-182e-4786-bce9-3d93fab48cf0.png align="center")

### Conclusion

Through this blog I tried to demonstrate a very simple example for generating quick sample data to test the process and storing options using a combination of `Azure Event Hub` for data ingestion and `Azure Stream Analytics` for data processing in Azure and store that data on `Azure SQL DB`.

As it wasn’t possible to port the data from `Stream Analytics` directly into `Fabric EventStream`, the only option left was to pull data from `Azure Eventhub` into Fabric `EventStream` and stored that data into a `KQL database` through `Fabric EventHouse`.

I hope this article would get you started on how to set up the interaction across Azure and Fabric with respect to streaming data and the limitations you might face during the setup.

Thanks for reading !!!
