Implement an Airport-compatible server on Golang

A guide to implementing an Airport-compatible Arrow Flight server using the airport-go package.

This guide provides an overview of how to implement an Airport-compatible Arrow Flight server using the airport-go package. The airport-go package simplifies the process of creating an Arrow Flight server that adheres to the Airport protocol, enabling seamless integration with Airport-attached databases.

Prerequisites

Before you begin, ensure you have the following prerequisites:

  • Go programming language installed (version 1.25 or later).
  • Basic understanding of Apache Arrow, Arrow Flight and gRPC.

Setting Up the Airport-compatible Arrow Flight Server

  1. Install the airport-go package:

    Use the following command to install the airport-go package:

    go get github.com/hugr-lab/airport-go@latest
  2. Create a new Go project:

    mkdir airport-server
    cd airport-server
    go mod init airport-server
  3. Implement a basic server:

    Create a main.go file with the following content:

    package main
    
    import (
        "context"
        "log"
        "net"
    
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "google.golang.org/grpc"
    
        "github.com/hugr-lab/airport-go"
        "github.com/hugr-lab/airport-go/catalog"
    )
    
    func main() {
        // Define schema and scan function
        userSchema := arrow.NewSchema([]arrow.Field{
            {Name: "id", Type: arrow.PrimitiveTypes.Int64},
            {Name: "name", Type: arrow.BinaryTypes.String},
        }, nil)
    
        scanUsers := func(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
            builder := array.NewRecordBuilder(memory.DefaultAllocator, userSchema)
            defer builder.Release()
    
            builder.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
            builder.Field(1).(*array.StringBuilder).AppendValues([]string{"Alice", "Bob", "Charlie"}, nil)
    
            record := builder.NewRecord()
            defer record.Release()
    
            return array.NewRecordReader(userSchema, []arrow.Record{record})
        }
    
        // Build catalog
        cat, _ := airport.NewCatalogBuilder().
            Schema("demo").
            SimpleTable(airport.SimpleTableDef{
                Name:     "users",
                Comment:  "User accounts",
                Schema:   userSchema,
                ScanFunc: scanUsers,
            }).
            Build()
    
        // Start server
        grpcServer := grpc.NewServer()
        airport.NewServer(grpcServer, airport.ServerConfig{Catalog: cat})
    
        lis, _ := net.Listen("tcp", ":50051")
        log.Println("Airport server listening on :50051")
        grpcServer.Serve(lis)
    }
  4. Run the server:

    go run main.go

Connecting from DuckDB

Once your server is running, connect to it from DuckDB:

-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;

-- Connect to your Flight server
ATTACH '' AS my_server (TYPE AIRPORT, LOCATION 'grpc://localhost:50051');

-- Query the users table
SELECT * FROM my_server.demo.users;

Expected output:

┌───────┬─────────┐
│  id   │  name   │
│ int64 │ varchar │
├───────┼─────────┤
│     1 │ Alice   │
│     2 │ Bob     │
│     3 │ Charlie │
└───────┴─────────┘

Package Overview

The airport-go package is organized into several subpackages:

Package Description
airport Main package: server creation, catalog builder, authentication helpers
airport/catalog Core interfaces: Catalog, Schema, Table, Functions, DML/DDL options
airport/auth Authentication implementations (bearer token)
airport/flight Internal Flight handler implementation

Server Configuration

The ServerConfig struct configures the Airport server:

type ServerConfig struct {
    // Catalog is the catalog implementation (required)
    Catalog catalog.Catalog

    // Auth is the authentication handler (optional)
    Auth Authenticator

    // Address is the server address for FlightEndpoint locations
    Address string

    // MaxMessageSize sets the maximum gRPC message size (default: 4MB)
    MaxMessageSize int

    // LogLevel sets the logging verbosity (default: Info)
    LogLevel *slog.Level

    // TransactionManager enables transaction support (optional)
    TransactionManager catalog.TransactionManager
}

Creating a Server

// Create gRPC server with Airport options
config := airport.ServerConfig{
    Catalog:        myCatalog,
    Auth:           myAuth,
    MaxMessageSize: 16 * 1024 * 1024, // 16MB
}

opts := airport.ServerOptions(config)
grpcServer := grpc.NewServer(opts...)

// Register Airport service
err := airport.NewServer(grpcServer, config)
if err != nil {
    log.Fatal(err)
}

// Start serving
lis, _ := net.Listen("tcp", ":50051")
grpcServer.Serve(lis)

Catalog Builder API

The fluent builder API provides the easiest way to create static catalogs:

catalog, err := airport.NewCatalogBuilder().
    Schema("my_schema").
    Comment("Schema description").
    SimpleTable(airport.SimpleTableDef{
        Name:     "my_table",
        Comment:  "Table description",
        Schema:   arrowSchema,
        ScanFunc: scanFunction,
    }).
    Table(customTableImpl).        // Add custom Table implementation
    ScalarFunc(scalarFunction).    // Add scalar function
    TableFunc(tableFunction).      // Add table function
    Schema("another_schema").      // Start new schema
    // ... add more tables
    Build()

SimpleTableDef

For simple tables with a scan function:

type SimpleTableDef struct {
    Name     string
    Comment  string
    Schema   *arrow.Schema
    ScanFunc func(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error)
}

Core Interfaces

catalog.Catalog

The root interface for exposing data:

type Catalog interface {
    // Schemas returns all schemas in this catalog.
    Schemas(ctx context.Context) ([]Schema, error)

    // Schema returns a specific schema by name.
    // Returns (nil, nil) if schema doesn't exist.
    Schema(ctx context.Context, name string) (Schema, error)
}

catalog.Schema

Represents a namespace containing tables and functions:

type Schema interface {
    Name() string
    Comment() string
    Tables(ctx context.Context) ([]Table, error)
    Table(ctx context.Context, name string) (Table, error)
    ScalarFunctions(ctx context.Context) ([]ScalarFunction, error)
    TableFunctions(ctx context.Context) ([]TableFunction, error)
    TableFunctionsInOut(ctx context.Context) ([]TableFunctionInOut, error)
}

catalog.Table

Represents a scannable table:

type Table interface {
    Name() string
    Comment() string

    // ArrowSchema returns the table's Arrow schema.
    // If columns is non-nil, returns a projected schema.
    ArrowSchema(columns []string) *arrow.Schema

    // Scan returns a RecordReader for reading table data.
    Scan(ctx context.Context, opts *ScanOptions) (array.RecordReader, error)
}

catalog.ScanOptions

Options passed to Table.Scan:

type ScanOptions struct {
    Columns   []string   // Columns requested (nil = all)
    Filter    []byte     // JSON-serialized predicate expression
    Limit     int64      // Maximum rows to return
    BatchSize int        // Hint for batch size
    TimePoint *TimePoint // Point-in-time for time-travel queries
}

DML Operations (INSERT, UPDATE, DELETE)

InsertableTable

Enable INSERT operations:

type InsertableTable interface {
    Table

    // Insert adds new rows to the table.
    Insert(ctx context.Context, rows array.RecordReader, opts *DMLOptions) (*DMLResult, error)
}

UpdatableTable

Enable UPDATE operations:

type UpdatableTable interface {
    Table

    // Update modifies existing rows identified by rowIDs.
    Update(ctx context.Context, rowIDs []int64, rows array.RecordReader, opts *DMLOptions) (*DMLResult, error)
}

DeletableTable

Enable DELETE operations:

type DeletableTable interface {
    Table

    // Delete removes rows identified by rowIDs.
    Delete(ctx context.Context, rowIDs []int64, opts *DMLOptions) (*DMLResult, error)
}

RowID Support

For UPDATE and DELETE operations, tables must include a rowid pseudocolumn with special metadata:

rowidMeta := arrow.NewMetadata([]string{"is_rowid"}, []string{"true"})
schema := arrow.NewSchema([]arrow.Field{
    {Name: "rowid", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: rowidMeta},
    {Name: "id", Type: arrow.PrimitiveTypes.Int64},
    {Name: "name", Type: arrow.BinaryTypes.String},
}, nil)

DML Usage Example

-- INSERT
INSERT INTO my_server.demo.users (id, name) VALUES (1, 'Alice');

-- UPDATE
UPDATE my_server.demo.users SET name = 'Alicia' WHERE id = 1;

-- DELETE
DELETE FROM my_server.demo.users WHERE id = 1;

-- RETURNING clause
INSERT INTO my_server.demo.users (id, name) VALUES (2, 'Bob') RETURNING *;

DDL Operations (CREATE, DROP, ALTER)

DynamicCatalog

Enable schema management:

type DynamicCatalog interface {
    Catalog

    CreateSchema(ctx context.Context, name string, opts CreateSchemaOptions) (Schema, error)
    DropSchema(ctx context.Context, name string, opts DropSchemaOptions) error
}

DynamicSchema

Enable table management:

type DynamicSchema interface {
    Schema

    CreateTable(ctx context.Context, name string, schema *arrow.Schema, opts CreateTableOptions) (Table, error)
    DropTable(ctx context.Context, name string, opts DropTableOptions) error
    RenameTable(ctx context.Context, oldName, newName string, opts RenameTableOptions) error
}

DynamicTable

Enable column management:

type DynamicTable interface {
    Table

    AddColumn(ctx context.Context, columnSchema *arrow.Schema, opts AddColumnOptions) error
    RemoveColumn(ctx context.Context, name string, opts RemoveColumnOptions) error
    RenameColumn(ctx context.Context, oldName, newName string, opts RenameColumnOptions) error
    ChangeColumnType(ctx context.Context, columnSchema *arrow.Schema, expression string, opts ChangeColumnTypeOptions) error
    SetNotNull(ctx context.Context, columnName string, opts SetNotNullOptions) error
    DropNotNull(ctx context.Context, columnName string, opts DropNotNullOptions) error
    SetDefault(ctx context.Context, columnName, expression string, opts SetDefaultOptions) error
}

DDL Usage Example

-- Schema operations
CREATE SCHEMA my_server.analytics;
DROP SCHEMA my_server.analytics;

-- Table operations
CREATE TABLE my_server.demo.events (id INTEGER, name VARCHAR);
ALTER TABLE my_server.demo.events ADD COLUMN timestamp TIMESTAMP;
ALTER TABLE my_server.demo.events RENAME COLUMN name TO event_name;
DROP TABLE my_server.demo.events;

-- CREATE TABLE AS SELECT
CREATE TABLE my_server.demo.backup AS SELECT * FROM my_server.demo.users;

Scalar Functions

Scalar functions process input batches and return result arrays:

type ScalarFunction interface {
    Name() string
    Comment() string
    Signature() FunctionSignature

    // Execute runs the function on input record batch.
    Execute(ctx context.Context, input arrow.RecordBatch) (arrow.Array, error)
}

Example Scalar Function

type multiplyFunc struct{}

func (f *multiplyFunc) Name() string    { return "MULTIPLY" }
func (f *multiplyFunc) Comment() string { return "Multiplies input by factor" }

func (f *multiplyFunc) Signature() catalog.FunctionSignature {
    return catalog.FunctionSignature{
        Parameters: []arrow.DataType{
            arrow.PrimitiveTypes.Int64, // input value
            arrow.PrimitiveTypes.Int64, // multiplication factor
        },
        ReturnType: arrow.PrimitiveTypes.Int64,
    }
}

func (f *multiplyFunc) Execute(_ context.Context, input arrow.RecordBatch) (arrow.Array, error) {
    valueCol := input.Column(0).(*array.Int64)
    factorCol := input.Column(1).(*array.Int64)

    builder := array.NewInt64Builder(memory.DefaultAllocator)
    defer builder.Release()

    for i := 0; i < valueCol.Len(); i++ {
        builder.Append(valueCol.Value(i) * factorCol.Value(i))
    }

    return builder.NewInt64Array(), nil
}

Usage:

SELECT MULTIPLY(value, 10) FROM my_server.demo.users;

Table Functions

Table functions return result sets with dynamic schemas:

type TableFunction interface {
    Name() string
    Comment() string
    Signature() FunctionSignature

    // SchemaForParameters returns output schema based on parameters.
    SchemaForParameters(ctx context.Context, params []any) (*arrow.Schema, error)

    // Execute runs the table function.
    Execute(ctx context.Context, params []any, opts *ScanOptions) (array.RecordReader, error)
}

Example Table Function

type generateSeriesFunc struct{}

func (f *generateSeriesFunc) Name() string    { return "GENERATE_SERIES" }
func (f *generateSeriesFunc) Comment() string { return "Generates integer series" }

func (f *generateSeriesFunc) Signature() catalog.FunctionSignature {
    return catalog.FunctionSignature{
        Parameters: []arrow.DataType{
            arrow.PrimitiveTypes.Int64, // start
            arrow.PrimitiveTypes.Int64, // stop
            arrow.PrimitiveTypes.Int64, // step (optional)
        },
        ReturnType: nil, // Table function
    }
}

func (f *generateSeriesFunc) SchemaForParameters(_ context.Context, _ []any) (*arrow.Schema, error) {
    return arrow.NewSchema([]arrow.Field{
        {Name: "value", Type: arrow.PrimitiveTypes.Int64},
    }, nil), nil
}

func (f *generateSeriesFunc) Execute(_ context.Context, params []any, opts *catalog.ScanOptions) (array.RecordReader, error) {
    start := params[0].(int64)
    stop := params[1].(int64)
    step := int64(1)
    if params[2] != nil {
        step = params[2].(int64)
    }

    // Generate series and return RecordReader...
}

Usage:

SELECT * FROM my_server.demo.GENERATE_SERIES(1, 10, 2);

Table Functions with Row Input (In/Out)

Table functions that accept row sets as input:

type TableFunctionInOut interface {
    Name() string
    Comment() string
    Signature() FunctionSignature

    // SchemaForParameters returns output schema based on params and input schema.
    SchemaForParameters(ctx context.Context, params []any, inputSchema *arrow.Schema) (*arrow.Schema, error)

    // Execute processes input rows and returns output rows.
    Execute(ctx context.Context, params []any, input array.RecordReader, opts *ScanOptions) (array.RecordReader, error)
}

Authentication

Bearer Token Authentication

auth := airport.BearerAuth(func(token string) (string, error) {
    if token == "secret-api-key" {
        return "user-identity", nil
    }
    return "", airport.ErrUnauthorized
})

config := airport.ServerConfig{
    Catalog: cat,
    Auth:    auth,
}

// IMPORTANT: Use ServerOptions when creating gRPC server
opts := airport.ServerOptions(config)
grpcServer := grpc.NewServer(opts...)

Accessing Identity in Handlers

func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    identity := airport.IdentityFromContext(ctx)
    if identity != "" {
        // User is authenticated
    }
    // ...
}

DuckDB Authentication

-- Using persistent secret
CREATE PERSISTENT SECRET my_auth (
    TYPE airport,
    auth_token 'secret-api-key',
    scope 'grpc://localhost:50051'
);

ATTACH '' AS my_server (TYPE AIRPORT, LOCATION 'grpc://localhost:50051');

-- Or inline with headers
SELECT * FROM airport_take_flight(
    'grpc://localhost:50051',
    'SELECT * FROM demo.users',
    headers := MAP{'authorization': 'secret-api-key'}
);

Filter Pushdown (Predicate Pushdown)

DuckDB can push WHERE clause predicates to the server via ScanOptions.Filter. The filter is a JSON-serialized expression tree.

func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    if opts.Filter != nil {
        // Parse the JSON filter
        var filterExpr struct {
            Filters            []json.RawMessage `json:"filters"`
            ColumnBindingNames []string          `json:"column_binding_names_by_index"`
        }
        if err := json.Unmarshal(opts.Filter, &filterExpr); err != nil {
            return t.scanAll(ctx)
        }

        // Interpret and apply filter to your data source
    }

    return t.scanAll(ctx)
}

Filter Expression Types

Expression Class Description
BOUND_COMPARISON Comparison operators (=, >, <, >=, <=, !=)
BOUND_COLUMN_REF Column references
BOUND_CONSTANT Literal values
BOUND_CONJUNCTION Logical AND/OR operators
BOUND_FUNCTION Function calls

For detailed format specification, see the Airport Extension documentation.

Time Travel Queries

Support point-in-time queries by handling TimePoint in scan options:

type TimePoint struct {
    Unit  string // "timestamp", "version", or "snapshot"
    Value string // Time value in appropriate format
}

func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    if opts.TimePoint != nil {
        // Query data at specific point in time
        switch opts.TimePoint.Unit {
        case "timestamp":
            // opts.TimePoint.Value = "2024-01-15T10:30:00Z"
        case "version":
            // opts.TimePoint.Value = "42"
        case "snapshot":
            // opts.TimePoint.Value = "abc123def"
        }
    }
    // ...
}

Transaction Support

Enable transaction coordination by implementing TransactionManager:

type TransactionManager interface {
    BeginTransaction(ctx context.Context) (txID string, err error)
    CommitTransaction(ctx context.Context, txID string) error
    RollbackTransaction(ctx context.Context, txID string) error
    GetTransactionStatus(ctx context.Context, txID string) (TransactionState, bool)
}

Configure with server:

config := airport.ServerConfig{
    Catalog:            cat,
    TransactionManager: myTxManager,
}

Usage in DuckDB:

BEGIN TRANSACTION;
INSERT INTO my_server.demo.users (id, name) VALUES (100, 'TxUser');
ROLLBACK;  -- Changes are discarded

Column Statistics

Provide column statistics for query optimization:

type StatisticsTable interface {
    Table

    ColumnStatistics(ctx context.Context, columnName string, columnType string) (*ColumnStats, error)
}

type ColumnStats struct {
    HasNotNull      *bool
    HasNull         *bool
    DistinctCount   *uint64
    Min             any
    Max             any
    MaxStringLength *uint64
    ContainsUnicode *bool
}

Catalog Versioning

Enable schema caching with version tracking:

type VersionedCatalog interface {
    Catalog

    CatalogVersion(ctx context.Context) (CatalogVersion, error)
}

type CatalogVersion struct {
    Version uint64 // Current version number
    IsFixed bool   // If true, version is fixed for session
}

Dynamic Catalogs

For catalogs that change at runtime, implement the Catalog interface directly:

type DynamicCatalog struct {
    mu      sync.RWMutex
    schemas map[string]*DynamicSchema
}

func (c *DynamicCatalog) Schemas(ctx context.Context) ([]catalog.Schema, error) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    // Filter based on user permissions
    identity := airport.IdentityFromContext(ctx)

    var result []catalog.Schema
    for _, schema := range c.schemas {
        if schema.canAccess(identity) {
            result = append(result, schema)
        }
    }
    return result, nil
}

func (c *DynamicCatalog) Schema(ctx context.Context, name string) (catalog.Schema, error) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    schema, ok := c.schemas[name]
    if !ok {
        return nil, nil // Not found
    }
    return schema, nil
}

Performance Tips

Batch Sizing

Optimal batch size: 10,000-100,000 rows per batch.

// Good: Multiple rows per batch
func scanLarge(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    records := make([]arrow.Record, 0)
    for i := 0; i < 10; i++ {
        record := buildBatch(10000) // 10k rows per batch
        records = append(records, record)
    }
    return array.NewRecordReader(schema, records)
}

Streaming Large Datasets

Don’t load entire results into memory:

func streamScan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    rows, _ := db.QueryContext(ctx, "SELECT * FROM large_table")

    // Create streaming reader that builds batches on-demand
    return NewStreamingReader(rows, batchSize), nil
}

Context Cancellation

Respect client cancellations:

func scanWithCancellation(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    for rows.Next() {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
        // Process row...
    }
}

Memory Management

Release Arrow objects to avoid memory leaks:

func buildRecord(schema *arrow.Schema) arrow.Record {
    builder := array.NewRecordBuilder(memory.DefaultAllocator, schema)
    defer builder.Release()  // Always release builders

    // Build record...
    return builder.NewRecord()
}

gRPC Message Size

Configure larger message sizes for big Arrow batches:

config := airport.ServerConfig{
    Catalog:        cat,
    MaxMessageSize: 16 * 1024 * 1024, // 16MB (default is 4MB)
}

Utility Functions

catalog.ProjectSchema

Projects an Arrow schema to include only specified columns:

projected := catalog.ProjectSchema(fullSchema, []string{"id", "name"})

Context Helpers

// Get identity from context
identity := airport.IdentityFromContext(ctx)

// Get transaction ID from context
txID, ok := catalog.TransactionIDFromContext(ctx)

// Add transaction ID to context
ctx = catalog.WithTransactionID(ctx, txID)

Error Types

var (
    ErrUnauthorized   = errors.New("unauthorized")
    ErrNotFound       = errors.New("not found")
    ErrAlreadyExists  = errors.New("already exists")
    ErrSchemaNotEmpty = errors.New("schema contains tables")
    ErrNotImplemented = errors.New("not implemented")
)

Thread Safety

All interface implementations must be safe for concurrent use:

  • Multiple goroutines may call Scan simultaneously
  • Schema/Table discovery may happen during scans
  • DDL operations may occur concurrently with queries

Use appropriate synchronization in your implementations.

Known Limitations

  • Reserved schema name: The schema name main cannot be used in Airport-attached servers. This is a limitation of the current Airport DuckDB Extension implementation. While DuckDB itself handles main schema normally, Airport-attached catalogs cannot expose schemas named main. Choose a different name for your schemas (e.g., demo, app, public, etc.).

Examples

The airport-go repository includes several examples:

Example Description
basic Simple server with in-memory data
auth Bearer token authentication
ddl DDL operations (CREATE/DROP/ALTER)
dml DML operations with transactions
dynamic Dynamic catalog with live schema updates
functions Scalar and table functions
tls TLS/SSL configuration
timetravel Time travel queries

Further Documentation

References