ADLS Gen2 API to manage Onelake storage in Microsoft fabric

My recent article here focused on fetching the object hierarchy and metadata and exporting its details to a csv file.
This article is focused on demonstrating some basic directory/file level operations across lakehouses in Microsoft fabric through the ADLS Gen2 API. In an earlier article, I had showed how to perform such operations using the DataServiceLakeClient class from the Azure Storage namespace.
In this article, I will be performing operations similar to those carried out using the DataServiceLakeClient class, but through the ADLS Gen2 APIs.
Note : If you would want to skip the writeup you can instead watch the code walkthrough video here.
Code
Import the following references in your .Net console application.
using Microsoft.Identity.Client;
using Newtonsoft.Json.Linq;
using System.Net.Http.Headers;
using System.Text;
using File = System.IO.File;
Declare a bunch of variables
private static string RedirectURI = "http://localhost";
private static string clientId = "Service Principal Client Id";
private static string workSpace= "Your Workspace";
private static string lakeHouse = "Your LakeHouse";
private static readonly HttpClient client = new HttpClient();
private static string[] scopes = new string[] { "https://storage.azure.com/.default" };
private static string Authority = "https://login.microsoftonline.com/organizations";
public HttpClient Client => client;
Next, create a method ReturnAuthenticationResult that uses MSAL to return the bearer token and cache it.
public async static Task<AuthenticationResult> ReturnAuthenticationResult()
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
return result;
}
Create custom GET/POST/SEND/PUT/FILESTREAMSEND methods
/* GET Method */
public async static Task<string> GetAsync(string url)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
response.EnsureSuccessStatusCode();
try
{
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
/* SEND method */
public async static Task<byte[]> SendAsync(HttpRequestMessage httprequestMessage)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.SendAsync(httprequestMessage);
response.EnsureSuccessStatusCode();
try
{
return await response.Content.ReadAsByteArrayAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsByteArrayAsync().Result);
return null;
}
}
/*POST method*/
public async static Task<string> PostAsync(string url, HttpContent content)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PostAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
/* PUT method */
public async static Task<string> PutAsync(string url, HttpContent content)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PutAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
/* DELETE method*/
public async static Task<string> DeleteAsync(string url)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.DeleteAsync(url);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
// Uploads the file in 2 calls
// Send the file as stream
// Flush the file to persist
public async static Task<string> FileStreamSendAsync(Stream stream, string directory, string filename)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
var content = new StreamContent(stream);
string url = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?action=append&position=0";
var streamMessage = new HttpRequestMessage
{
Method = HttpMethod.Patch,
RequestUri = new Uri(url),
Content = content
};
HttpResponseMessage response = await client.SendAsync(streamMessage);
try
{
response.EnsureSuccessStatusCode();
await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
if (response.IsSuccessStatusCode)
{
url = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?action=flush&position={stream.Length}";
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
var flushMessage = new HttpRequestMessage
{
Method = HttpMethod.Patch,
RequestUri = new Uri(url)
};
response = await client.SendAsync(flushMessage);
response.EnsureSuccessStatusCode();
}
return null;
}
The following methods perform directory and file operations.
Create Lakehouse Directory
public static async Task CreateDirectory(string directoryfullpath)
{
string jsonString = System.String.Empty;
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}?resource=directory";
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
try
{
await PutAsync(dfsendpoint, content);
}
catch (Exception ex)
{
Console.WriteLine("Directory creation failed : " + ex.Message);
}
}
Rename Lakehouse Directory
public static async Task RenameDirectory(string old_directoryfullpath, string new_directoryfullpath)
{
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{new_directoryfullpath}?restype=directory&comp=rename";
var Metadata = new HttpRequestMessage
{
Method = HttpMethod.Put,
RequestUri = new Uri(dfsendpoint)
};
Metadata.Headers.Add("x-ms-rename-source", $"/{workSpace}/{lakeHouse}.Lakehouse/{old_directoryfullpath}");
try
{
await SendAsync(Metadata);
}
catch (Exception ex)
{
Console.WriteLine("Directory rename failed : " + ex.Message);
}
}
Delete Lakehouse Directory
public static async Task DeleteDirectory(string directoryfullpath)
{
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}?restype=directory";
try
{
await DeleteAsync(dfsendpoint);
}
catch (Exception ex)
{
Console.WriteLine("Directory deletion failed : " + ex.Message);
}
}
Upload files from a Local Directory
public static async Task UploadFilesToLakeHouse(string uploadpath, string lakehousedirectory)
{
DirectoryInfo d = new DirectoryInfo(uploadpath);
byte[] bytes;
foreach (FileInfo file in d.GetFiles())
{
using (Stream stream = File.OpenRead(file.FullName))
{
string RequestUri = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{lakehousedirectory}/{file.Name}?resource=file";
string jsonString = System.String.Empty;
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
var response = await PutAsync(RequestUri, content);
await FileStreamSendAsync(stream, directory, file.Name);
}
}
}
Note that we are calling the FileStreamSendAsync method from the above function.
The entire upload process occurs in three calls. One call from the UploadFilesToLakeHouse method and two calls from the FileStreamSendAsync method.
In the UploadFilesToLakeHouse method, we PUT the filesystem and the directory path. Once its successful we start a file stream and append the stream to the same path with the starting position at 0 through PATCH call through HttpRequestMessage. In the FileStreamSendAsync method we then flush the file stream to the filesystem with the stream length again with PATCH call in a HttpRequestMessage.
Download Files from a Lakehouse directory
We first traverse across the Lakehouse directory and then download the individual files from the directory
public static async Task<JObject> TraverseDirectoryInLakeHouse(string directoryfullpath)
{
string dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.LakeHouse/{directoryfullpath}?resource=filesystem&recursive=false";
string response = await GetAsync(dfsendpoint);
JObject jsonObject_lakehouse = JObject.Parse(response);
try
{
return jsonObject_lakehouse;
}
catch (Exception ex)
{
Console.WriteLine("Directory traverse failed : " + ex.Message);
return null;
}
}
Once traversed, the following method will download the existing files ONLY from the root level of a given directory. The directory name is sent as an argument to the method.
public static async Task DownloadFilesFromLakeHouse(string local_directoryfullpath, string lakehouse_directoryfullpath)
{
JObject jsonObject = await TraverseDirectoryInLakeHouse(lakehouse_directoryfullpath);
JArray pathsArray = (JArray)jsonObject["paths"];
foreach (JObject path in pathsArray)
{
if (path["isDirectory"] == null)
{
int lastSlashIndex = path["name"].ToString().LastIndexOf('/');
string filename_n = path["name"].ToString().Substring(lastSlashIndex + 1);
string dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{lakehouse_directoryfullpath}/{filename_n}";
var downloadMessage = new HttpRequestMessage
{
Method = HttpMethod.Get,
RequestUri = new Uri(dfsendpoint)
};
try
{
var response = await SendAsync(downloadMessage);
File.WriteAllBytes($"{local_directoryfullpath}\\{filename_n}", response);
}
catch (Exception ex)
{
Console.WriteLine("File download failed : " + ex.Message);
}
}
}
}
The method can be modified to recursively loop across all the sub directories to download all files.
You can refer to my other article https://www.azureguru.net/using-adls-gen2-apis-to-export-object-hierarchy-and-metadata-in-mircrosoft-fabric on how it can be done.
Rename a File in Lakehouse
public static async Task RenameFile(string oldfilename, string newfilename, string directoryfullpath)
{
try
{
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}/{newfilename}?restype=directory&comp=rename";
var Metadata = new HttpRequestMessage
{
Method = HttpMethod.Put,
RequestUri = new Uri(dfsendpoint)
};
Metadata.Headers.Add("x-ms-rename-source", $"/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}/{oldfilename}");//Source directory
var byte_response = await SendAsync(Metadata);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
The calls to the above operations
public static async Task Main(string[] args)
{
//create directory
await CreateDirectory("Files/mydirectory");
//Rename a directory
await RenameDirectory("Files/mydirectory", "Files/notmydirectory");
//delete directory
await DeleteDirectory("Files/notmydirectory");
//upload files to a lakehouse.Files/Data is the folder in the lakehouse
//If you want the files to be uploaded to the Files root folder just set the value as Files
await UploadFilesToLakeHouse("Your Local Path", "Files/Data");
//download files from a lakehouse.Files/Data is the folder in the lakehouse
//If you want the files to be uploaded to the Files root folder just set the value as Files
await DownloadFilesFromLakeHouse("Your Local Path","Files/Data");
//Rename a file.The "Files" is the folder in the lakehouse
await RenameFile("myfile.txt", "notmyfile.txt", "Files");
}
Complete Code
using Microsoft.Identity.Client;
using Newtonsoft.Json.Linq;
using System.Net;
using System.Net.Http.Headers;
using System.Text;
using File = System.IO.File;
namespace LakeHouseFileDirectoryOperations
{
internal class Program
{
private static string RedirectURI = "http://localhost";
private static string clientId = "Service Principal Client Id";
private static string workSpace = "Your Workspace";
private static string lakeHouse = "Your LakeHouse";
private static string dfsendpoint = "";
private static readonly HttpClient client = new HttpClient();
private static string[] scopes = new string[] { "https://storage.azure.com/.default" };
private static string Authority = "https://login.microsoftonline.com/organizations";
public HttpClient Client => client;
public static async Task Main(string[] args)
{
//create directory
await CreateDirectory("Files/mydirectory");
//Rename a directory
await RenameDirectory("Files/mydirectory", "Files/notmydirectory");
//delete directory
await DeleteDirectory("Files/notmydirectory");
//upload files to a lakehouse.Files/Data is the folder in the lakehouse
//If you want the files to be uploaded to the Files root folder just pass the value as Files
await UploadFilesToLakeHouse("Your Local Path", "Files/Data");
//download files from a lakehouse.Files/Data is the folder in the lakehouse
//If you want the files to be uploaded to the Files root folder just pass the value as Files
await DownloadFilesFromLakeHouse("Your Local Path","Files/Data");
//Rename a file.The "Files" is the folder in the lakehouse
await RenameFile("myfile.txt", "notmyfile.txt", "Files");
}
public static async Task CreateDirectory(string directoryfullpath)
{
string jsonString = System.String.Empty;
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}?resource=directory";
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
try
{
await PutAsync(dfsendpoint, content);
}
catch (Exception ex)
{
Console.WriteLine("Directory creation failed : " + ex.Message);
}
}
public static async Task RenameDirectory(string old_directoryfullpath, string new_directoryfullpath)
{
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{new_directoryfullpath}?restype=directory&comp=rename";
var Metadata = new HttpRequestMessage
{
Method = HttpMethod.Put,
RequestUri = new Uri(dfsendpoint)
};
Metadata.Headers.Add("x-ms-rename-source", $"/{workSpace}/{lakeHouse}.Lakehouse/{old_directoryfullpath}");
try
{
await SendAsync(Metadata);
}
catch (Exception ex)
{
Console.WriteLine("Directory rename failed : " + ex.Message);
}
}
public static async Task DeleteDirectory(string directoryfullpath)
{
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}?restype=directory";
try
{
await DeleteAsync(dfsendpoint);
}
catch (Exception ex)
{
Console.WriteLine("Directory deletion failed : " + ex.Message);
}
}
public static async Task<JObject> TraverseDirectoryInLakeHouse(string directoryfullpath)
{
string dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.LakeHouse/{directoryfullpath}?resource=filesystem&recursive=false";
string response = await GetAsync(dfsendpoint);
JObject jsonObject_lakehouse = JObject.Parse(response);
try
{
return jsonObject_lakehouse;
}
catch (Exception ex)
{
Console.WriteLine("Directory traverse failed : " + ex.Message);
return null;
}
}
public static async Task UploadFilesToLakeHouse(string uploadpath, string lakehousedirectory)
{
DirectoryInfo d = new DirectoryInfo(uploadpath);
byte[] bytes;
foreach (FileInfo file in d.GetFiles())
{
using (Stream stream = File.OpenRead(file.FullName))
{
string RequestUri = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{lakehousedirectory}/{file.Name}?resource=file";
string jsonString = System.String.Empty;
var content = new StringContent(jsonString, Encoding.UTF8, "application/json");
var response = await PutAsync(RequestUri, content);
await FileStreamSendAsync(stream, lakehousedirectory, file.Name);
}
}
}
public static async Task DownloadFilesFromLakeHouse(string local_directoryfullpath, string lakehouse_directoryfullpath)
{
JObject jsonObject = await TraverseDirectoryInLakeHouse(lakehouse_directoryfullpath);
JArray pathsArray = (JArray)jsonObject["paths"];
foreach (JObject path in pathsArray)
{
if (path["isDirectory"] == null)
{
int lastSlashIndex = path["name"].ToString().LastIndexOf('/');
string filename_n = path["name"].ToString().Substring(lastSlashIndex + 1);
string dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{lakehouse_directoryfullpath}/{filename_n}";
var downloadMessage = new HttpRequestMessage
{
Method = HttpMethod.Get,
RequestUri = new Uri(dfsendpoint)
};
try
{
var response = await SendAsync(downloadMessage);
File.WriteAllBytes($"{local_directoryfullpath}\\{filename_n}", response);
}
catch (Exception ex)
{
Console.WriteLine("File download failed : " + ex.Message);
}
}
}
}
public static async Task RenameFile(string oldfilename, string newfilename, string directoryfullpath)
{
try
{
dfsendpoint = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directoryfullpath}/{newfilename}?restype=directory&comp=rename";
var Metadata = new HttpRequestMessage
{
Method = HttpMethod.Put,
RequestUri = new Uri(dfsendpoint)
};
Metadata.Headers.Add("x-ms-rename-source", $"/My_Workspace/LakeHouse_1.Lakehouse/{directoryfullpath}/{oldfilename}");//Source directory
var byte_response = await SendAsync(Metadata);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
public async static Task<string> GetAsync(string url)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.GetAsync(url);
response.EnsureSuccessStatusCode();
try
{
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<byte[]> SendAsync(HttpRequestMessage httprequestMessage)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.SendAsync(httprequestMessage);
response.EnsureSuccessStatusCode();
try
{
return await response.Content.ReadAsByteArrayAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsByteArrayAsync().Result);
return null;
}
}
public async static Task<string> DeleteAsync(string url)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.DeleteAsync(url);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<string> PutAsync(string url, HttpContent content)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PutAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<AuthenticationResult> ReturnAuthenticationResult()
{
string AccessToken;
PublicClientApplicationBuilder PublicClientAppBuilder =
PublicClientApplicationBuilder.Create(clientId)
.WithAuthority(Authority)
.WithCacheOptions(CacheOptions.EnableSharedCacheOptions)
.WithRedirectUri(RedirectURI);
IPublicClientApplication PublicClientApplication = PublicClientAppBuilder.Build();
var accounts = await PublicClientApplication.GetAccountsAsync();
AuthenticationResult result;
try
{
result = await PublicClientApplication.AcquireTokenSilent(scopes, accounts.First())
.ExecuteAsync()
.ConfigureAwait(false);
}
catch
{
result = await PublicClientApplication.AcquireTokenInteractive(scopes)
.ExecuteAsync()
.ConfigureAwait(false);
}
return result;
}
public async static Task<string> PostAsync(string url, HttpContent content)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
HttpResponseMessage response = await client.PostAsync(url, content);
try
{
response.EnsureSuccessStatusCode();
return await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
}
public async static Task<string> FileStreamSendAsync(Stream stream, string directory, string filename)
{
AuthenticationResult result = await ReturnAuthenticationResult();
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
var content = new StreamContent(stream);
string url = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?action=append&position=0";
var streamMessage = new HttpRequestMessage
{
Method = HttpMethod.Patch,
RequestUri = new Uri(url),
Content = content
};
HttpResponseMessage response = await client.SendAsync(streamMessage);
try
{
response.EnsureSuccessStatusCode();
await response.Content.ReadAsStringAsync();
}
catch
{
Console.WriteLine(response.Content.ReadAsStringAsync().Result);
return null;
}
if (response.IsSuccessStatusCode)
{
url = $"https://onelake.dfs.fabric.microsoft.com/{workSpace}/{lakeHouse}.Lakehouse/{directory}/{filename}?action=flush&position={stream.Length}";
client.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", result.AccessToken);
var flushMessage = new HttpRequestMessage
{
Method = HttpMethod.Patch,
RequestUri = new Uri(url)
};
response = await client.SendAsync(flushMessage);
response.EnsureSuccessStatusCode();
}
return null;
}
}
}
Code Walkthrough
Final Thoughts
Using ADLS Gen2 API in Microsoft Fabric allows seamless integration for managing storage operations out of the box. ADLS Gen2 API provides broader integration across wider Azure ecosystem. ADLS Gen2 API can help to maintain Fabric storage out of the box which can be seamlessly integrated with other compatible applications and services.
Thanks for reading !!!




