Skip to content

HEF-Sharp/HEF.Flink

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

35 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

HEF.Flink

Build status License Apache

Flink dotNet sdk by HEF, currently contains Flink SQL Gateway Client and Flink Ado.Net Driver

Flink SQL Gateway Client

Latest version

Flink SQL Gateway Client is a CSharp library for invoking Restful Api provided by Flink SQL gateway

Usage

dependency injection during application startup

Startup.cs

services.AddHttpApi<IFlinkSqlGatewayApi>(o =>
{
    o.HttpHost = new Uri("http://localhost:8083/v1/");
});

then invoking by get implementation of IFlinkSqlGatewayApi

provider.GetRequiredService<IFlinkSqlGatewayApi>();

Flink Ado.Net Driver

Latest version

Flink Ado.Net driver is a CSharp library for accessing and manipulating Apache Flink clusters by connecting to a Flink SQL gateway as the Ado.Net server, base on the above Flink SQL Gatewy Client

Usage

Using DbProviderFactories

dependency injection during application startup

Startup.cs

DbProviderFactories.RegisterFactory("FlinkSqlClient", FlinkSqlClientFactory.Instance);

then invoking as follows

var factory = DbProviderFactories.GetFactory("FlinkSqlClient");
using var connection = factory.CreateConnection();
connection.ConnectionString = "DataSource=localhost;Port=8083;Planner=blink";
await connection.OpenAsync();

Direct New

using var connection = new FlinkSqlConnection("DataSource=localhost;Port=8083;Planner=blink");
await connection.OpenAsync();

Sample

using var connection = new FlinkSqlConnection(ConnectionString);
await connection.OpenAsync();

var createTableSql = "CREATE TABLE T(\n" +
                "  a INT,\n" +
                "  b VARCHAR(10)\n" +
                ") WITH (\n" +
                "  'connector.type' = 'filesystem',\n" +
                "  'connector.path' = 'file:///tmp/T.csv',\n" +
                "  'format.type' = 'csv',\n" +
                "  'format.derive-schema' = 'true'\n" +
                ")";
var createTableCommand = connection.CreateCommand(createTableSql);
await createTableCommand.ExecuteNonQueryAsync();

var insertSql = "INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')";
var insertCommand = connection.CreateCommand(insertSql);
await insertCommand.ExecuteNonQueryAsync();

var selectSql = "SELECT * FROM T";
var selectCommand = connection.CreateCommand(selectSql);
using var reader = await selectCommand.ExecuteReaderAsync();
while(await reader.ReadAsync())
{
    Console.WriteLine($"{reader.GetInt32(0)}, {reader.GetString(1)}");    
}