Calling the Streamr API from .NET Core 3 Clients (Part 2)

rj
Streamr API Tutorials
13 min readOct 23, 2020

Connecting to streams, and reading and writing from/to streams from a console app

In Part 1, we showed how to use the Streamr API GEN code generator tool to build an interface library for the Streamr REST API. In part 2, we will use that library to connect to our test stream, and to then write/read data to/from that stream. In Part 3, I show you to authenticate to Streamr using Web3 credentials.

[Note 01/21/21: The username/password auth shown in parts 1 and 2 has now been discontinued, so use Web 3 auth example explained in part 3.]

All the code from this tutorial will be up in a GitHub here: link.

These example programs will be necessarily pretty simple and omit a lot of logic that would be required for any production class code, and are provided in much the same style as the Streamr sample JavaScript tutorials. This allows us to focus on the API usage without getting bogged down in a lot of extra code details.

Step 1: List Streams

Let’s make a simple .NET Core 3 console app to connect to our Streamr account using username/password authentication and list our streams out to the console. We’re going to be using the Login and Streams sections of the Streamr REST API in this tutorial:

Opening Visual Studio again (or closing the solution which is still open from Part 1), we’re going to create a new .NET Core 3 console app via File -> New Solution:

I called this first app ListStreams.

I saved the solution as StreamrClient, although that name doesn’t matter of course. In Project dependencies, click Add Reference -> DotNet Assembly and select the DLL that you built from Step 1:

You should be able to do a successful “Build All” now, and if you open a terminal window and run the console app, you will simply see “Hello World”:

cd ListStreams
dotnet run
Hello World!

So now we know we have a good compile, let’s add code to connect to Streamr and list our streams to the console. Editing our Program.cs file, we add code as shown:

using System;
using IO.Swagger.Api;
using IO.Swagger.Client;
using IO.Swagger.Model;
using System.Collections.Generic;
namespace ListStreams
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Welcome to Streamr!");
var _loginAPI = new LoginApi();
string _username = Environment.GetEnvironmentVariable("STREAMR_USERNAME");
string _password = Environment.GetEnvironmentVariable("STREAMR_PASSWORD");
var body = new UsernamePassword(_username, _password);
try
{
// Log in with password verification
SessionToken _sessionToken = _loginAPI.LoginPasswordPost(body);
Console.WriteLine("Login Ok: " + _sessionToken);
// Read the available streams:
Configuration.Default.AddApiKeyPrefix("Authorization", "Bearer " + _sessionToken.Token);
Console.WriteLine("Reading Available Streams...");
var _streamsAPI = new StreamsApi();
//var name = name_example; // string | Only return streams that exactly match the given name. (optional)
//var uiChannel = true; // bool? | Filter by whether this stream is an UI channel or not. UI channels are streams used to push updates to visualisation widgets. For the typical list of streams, set this to false. (optional) (default to false)
//var noConfig = true; // bool? | Remove config object from JSON response. (optional) (default to false)
//var search = search_example; // string | Filter by search term in name or description (optional)
//var sortBy = sortBy_example; // string | Sort the returned results by the given field. (optional) (default to id)
//var order = order_example; // string | Control whether the results are sorted in ascending or descending order. Used with parameter `sortBy`. (optional) (default to asc)
//var max = 56; // int? | Maximum number of returned results (capped at 100) (optional) (default to 100)
//var offset = 56; // int? | Skip the first (offset) results. Used together with max for paging. (optional) (default to 0)
//var grantedAccess = true; // bool? | If false, excludes resources that user has been granted specific permission to from results. (optional) (default to true)
//var publicAccess = true; // bool? | If true, includes publicly available resources in the results. (optional) (default to false)
//var operation = operation_example; // string | Filter results by Permission (access level) (optional) (default to stream_get)
try
{
// List streams
List<Stream> result = _streamsAPI.StreamsGet(); // name, uiChannel, noConfig, search, sortBy, order, max, offset, grantedAccess, publicAccess, operation);
foreach (Stream s in result)
{
Console.WriteLine("Stream: Name='" + s.Name + "', Id='" + s.Id + "'");
}
}
catch (Exception e)
{
Console.WriteLine("Exception when calling StreamsApi.StreamsGet: " + e.Message);
}
}
catch (Exception e)
{
Console.WriteLine("Exception when calling LoginApi.LoginPasswordPost: " + e.Message);
}
}
}
}

I’ve left some comments in so you can see that the API supports quite a large number of options and parameters, although we’re not going to use them for these demos, so I commented them out. I have also setup the code to pull our Streamr username and password from environment vars instead of hard-coding them. You can set them in your command shell before running:

export STREAMR_USERNAME='{your email here}'export STREAMR_PASSWORD='{your password here}'

Also, before we do a build, you’ll need to add a NuGet Package reference for System.Security.Permissions:

Now build all, and from the console, re-run the app. If all goes well, it should list the Streamr stream names from your Streamr Core account in the console window:

This means you have now successfully connected to Streamr from your .NET Core 3 console app using the auto-generated library from Part 1. Take note of the Id of the test stream you want to use, as we’ll need that next part to read and write data to it.

Step 2: Writing to Streams

Streamr is setup to allow sending (arbitrary) JSON packets of data into the streams. How you do this will be up to your application specific data needs, etc. You can setup different streams for different data sets also. In this step, we’re going to set up another .NET Core 3 console app to write new data packets to one of our test streams. The program will loop 1000 times and write one data packet to the stream every 5 seconds (or until you exit the program).

One note also, we’re going to be omitting exception and error handling, as well as session keep-alive code for now, since we don’t really need it for this tutorial, and we just want to focus on the Streamr API usage itself. Any production code of course would be written to be much more robust.

So, let’s add another new .NET Core 3 console app to our solution, I called it StreamWriter. Once added, again add the reference to the IO.Swagger DLL and the NuGet System.Security.Permissions again to it like we did in Step 1.

Next, edit Program.cs to match:

using System;
using System.Threading.Tasks;
using IO.Swagger.Api;
using IO.Swagger.Client;
using IO.Swagger.Model;
namespace StreamWriter
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Welcome to Streamr!");
var _loginAPI = new LoginApi();
string _username = Environment.GetEnvironmentVariable("STREAMR_USERNAME");
string _password = Environment.GetEnvironmentVariable("STREAMR_PASSWORD");
var body = new UsernamePassword(_username, _password);
// Log in with password verification
SessionToken _sessionToken = _loginAPI.LoginPasswordPost(body);
Configuration.Default.AddApiKeyPrefix("Authorization", "Bearer " + _sessionToken.Token);
var _streamsAPI = new StreamsApi();
Console.WriteLine("Login Ok: " + _sessionToken);
// Just hard-coding my test stream id for now (could have parsed it from API calls of course):
string _testStreamId = "uTsGe6-gRbua5Ln45KGnag";
for (int i = 1; i <= 1000; i++)
{
var rand = new Random().Next();
var _data = new { IntData = rand, StringData = "test" + rand };
string _jsonData = System.Text.Json.JsonSerializer.Serialize(_data);
Console.WriteLine("Putting Data: " + _jsonData + " to stream id: " + _testStreamId);
_streamsAPI.StreamsIdDataPost(_testStreamId, _jsonData); //, ttl, pkey, signatureType, address, signature);
Console.WriteLine("Put Successful");
await Task.Delay(5000);
}
}
}
}

I just hard-coded the stream id for now; you can adjust that or parse it from other API calls as needed for any production system of course. You can then try to build (Project -> Rebuild All), but you will get a compile error on the StreamsIdDataPost call (unless Streamr has updated their API YAML). This is because the API code gen tool doesn’t add any parameter to accept the JSON body data for the REST API data POST call.

So, we’ll have to go back to the IO.Swagger library project and make some changes to the auto generated code. Close this solution and re-open the IO.Swagger solution from Part 1 again. I did it this way to show you that auto-generated code sometimes (oftentimes) has issues which need to be fixed, so you can see how to do that.

So with the IO.Swagger solution and project open from Step 1, we’re going to make some changes as follows. In StreamsApi.cs file, we’re going to add a string jsonData parameter, and pass it through down into the REST call:

We need to add it to the Interface definition also:

That’s all we’re going to do for now, but make note that there are other calls in the API that would need this parameter added to it as well if you want to use them, but this is the only call we are using in this tutorial.

Go ahead and Build -> Rebuild All, and you’ve updated the library. Close the solution and go re-open the StreamrClient solution we created in step 1. You should be able to Build -> Rebuild All now without error.

Go ahead and run the program from a terminal window, and you should see it successfully writing packets to your test stream:

You should also see the new packets showing up in (near) real time in Streamr Core on the Streamr website. Go to your account, click on the test stream you setup, then click on Preview and then Inspect Data, and it should update as new packets are received from our StreamWriter app:

Congratulations, you are now successfully writing Streamr data packets from a .NET Core 3 app to Streamr Core. 😁

Step 3: Reading from Streams

Let’s add a third .NET Core 3 console project to our solution, called StreamReader:

Once again, add the reference to the IO.Swagger DLL and to System.Security.Permissions NuGet package as before. We’ll also need to add a NuGet reference to Newtonsoft JSON library, as we’re going to need to deserialize data from the API.

We’re going to set this project up to read packets from our test stream and dump them to the console window, using a simple polling loop (see notes at end of article). Edit Program.cs to match:

using System;
using System.Threading.Tasks;
using IO.Swagger.Api;
using IO.Swagger.Client;
using IO.Swagger.Model;
using System.Collections.Generic;
using Newtonsoft.Json;
namespace StreamReader
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Welcome to Streamr!");
var _loginAPI = new LoginApi();
string _username = Environment.GetEnvironmentVariable("STREAMR_USERNAME");
string _password = Environment.GetEnvironmentVariable("STREAMR_PASSWORD");
var body = new UsernamePassword(_username, _password);
// Log in with password verification
SessionToken _sessionToken = _loginAPI.LoginPasswordPost(body);
Configuration.Default.AddApiKeyPrefix("Authorization", "Bearer " + _sessionToken.Token);
var _streamsAPI = new StreamsApi();
Console.WriteLine("Login Ok: " + _sessionToken);
// Just hard-coding my test stream id for now (could have parsed it from API calls of course):
string _testStreamId = "uTsGe6-gRbua5Ln45KGnag";
long lastTimestamp = 0;
while (true)
{
// let's ask for the most recent packet:
ApiResponse<object> apiResponse = _streamsAPI.StreamsIdDataPartitionsPartitionLastGetWithHttpInfo(_testStreamId, 0, 1);
string _rawJsonData = (string)apiResponse.Data;
List<MyStreamrData> myStreamrData = JsonConvert.DeserializeObject<List<MyStreamrData>>(_rawJsonData);
// only write the packet if we got data and also only if it's a new one:
if (myStreamrData.Count > 0)
{
long thisTimestamp = myStreamrData[0].timestamp;
if (thisTimestamp != lastTimestamp)
{
Console.WriteLine("Received Packet: Timestamp=" + thisTimestamp + ", IntData=" + myStreamrData[0].content.IntData + ", StringData=" + myStreamrData[0].content.StringData);
}
lastTimestamp = thisTimestamp;
}
}
}
}
// lets define a class to map to our API result, to make conversion from JSON easier:
class MyStreamrData
{
public string streamId { get; set; }
public int streamPartition { get; set; }
public long timestamp { get; set; }
public int sequenceNumber { get; set; }
public string publisherId { get; set; }
public string msgChainId { get; set; }
public int messageType { get; set; }
public int contentType { get; set; }
public string groupKeyId { get; set; }
public JsonBodyContentType content { get; set; }
public int signatureType { get; set; }
public string signature { get; set; }
}
class JsonBodyContentType
{
public int IntData { get; set; }
public string StringData { get; set; }
}
}

Ok, walking through the code a bit, what we’re doing is simply looping calls to request the most recent data packet from our test stream, and writing it to output if it’s timestamp is newer than the one we just wrote.

Now before you compile and run it, we need to go back to the IO.Swagger project (from step 1) and edit some more code, as it was not setting the API body content result. I have edited the StreamsApi.cs file as shown:

Now rebuild the IO.Swagger library and go back to the project solution we created for Step 2, and Build All.

I next used Postman to see the actual data that is coming back from the API call, so I could build up some simple class objects to encapsulate the data, making JSON parsing much easier. All Streamr API calls can be done natively direct to the REST API interface also:

In the StreamReader for this tutorial, I have used a polling approach and that’s probably not how you’d write it in an actual production system, but it shows the API working properly. Even if you take this approach (polling), you would really also need to get the latest N results on each call and dump all of the packets from that result set to the terminal where each packet’s timestamp is > the lastTimestamp written out. This is because your stream could have received multiple events since the last polling call. Even further, we could rewrite this to be completely async awaiting on the Async API calls in the loop. Again, this is just demo code showing how all the pieces come together.

Step 4: Putting it all together

Now to show Streamr end-to-end from .NET, open 2 separate terminal windows now and run the StreamWriter in one window and the StreamReader in another window simultaneously, you should see them both working together: packets go out from the writer to the Streamr Network to Streamr Core then back again to the reader:

The data preview for your test stream in Streamr Core website should also be updating again in real time as you do this to match what is happening:

So, there you have it, connecting, writing and reading to Streamr from .NET. Further, while we have taken shortcuts in our code (as this is not production), what is working here is not a simulation, but it is a fully live API invocation to the live Streamr network on both ends.

I hope you enjoyed the tutorial. There are many other features of the Streamr API (subscriptions, canvases, data unions, etc.) and this tutorial has just scratched the surface, but I hope it gets you off to a faster start which is the primary objective. Cheers!

Closing Notes

In closing, there are many areas that could be explored and improved upon for sure when building a real production .NET system using this API. I’ll list a few things as “exercises for the reader”, something my college professors used to do all the time (which I didn’t find all that amusing at the time):

  1. Convert the IO.Swagger Library to 100% native .NET Core 3.1 (or the upcoming .NET Core 5) instead of .NET Framework 4.5. We are mixing framework code in this tutorial, which again isn’t ideal for production. To do this we would have to convert the RestSharp library as well.
  2. Handle timeout of the Streamr API authentication session object gracefully, reconnecting (re-auth) as needed. A simple helper class could be written to accomplish this which checks to see if the auth session token is stale and obtain a new one as needed.
  3. Add robust error checking and handling, etc.
  4. Use await Task async calls in the reader, instead of polling.
  5. Write wrapper classes for your application data instead of dealing with JSON strings in various places. I did some of this for the Reader, but the application specific data could really all be encapsulated in classes with supporting code as appropriate. What I did above was just rudimentary to allow easier JSON parsing. We could then also rewrite the IO.Swagger API Library to use C# generics so you could instantiate it with your application class data and fully automate the serialization/deserialization of JSON.
  6. And for extra credit, you could convert the IO.Swagger library to use the new C#8 async streams. This would in effect model Streamr streams as C#8 async streams. 😅

Next, in Part 3, we’ll show how to use Web3 auth to connect to Streamr.

Helpful Links

--

--