Implement an Airport-compatible server on Golang

A guide to implementing an Airport-compatible Arrow Flight server using the airport-go package.

This guide provides an overview of how to implement an Airport-compatible Arrow Flight server using the airport-go package. The airport-go package simplifies the process of creating an Arrow Flight server that adheres to the Airport protocol, enabling seamless integration with Airport-attached databases.

Prerequisites

Before you begin, ensure you have the following prerequisites:

  • Go programming language installed (version 1.25 or later).
  • Basic understanding of Apache Arrow, Arrow Flight and gRPC.

Setting Up the Airport-compatible Arrow Flight Server

  1. Install the airport-go package:

    Use the following command to install the airport-go package:

    go get github.com/hugr-lab/airport-go@latest
  2. Create a new Go project:

    mkdir airport-server
    cd airport-server
    go mod init airport-server
  3. Implement a basic server:

    Create a main.go file with the following content:

    package main
    
    import (
        "context"
        "log"
        "net"
    
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "google.golang.org/grpc"
    
        "github.com/hugr-lab/airport-go"
        "github.com/hugr-lab/airport-go/catalog"
    )
    
    func main() {
        // Define schema and scan function
        userSchema := arrow.NewSchema([]arrow.Field{
            {Name: "id", Type: arrow.PrimitiveTypes.Int64},
            {Name: "name", Type: arrow.BinaryTypes.String},
        }, nil)
    
        scanUsers := func(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
            builder := array.NewRecordBuilder(memory.DefaultAllocator, userSchema)
            defer builder.Release()
    
            builder.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
            builder.Field(1).(*array.StringBuilder).AppendValues([]string{"Alice", "Bob", "Charlie"}, nil)
    
            record := builder.NewRecord()
            defer record.Release()
    
            return array.NewRecordReader(userSchema, []arrow.Record{record})
        }
    
        // Build catalog
        cat, _ := airport.NewCatalogBuilder().
            Schema("demo").
            SimpleTable(airport.SimpleTableDef{
                Name:     "users",
                Comment:  "User accounts",
                Schema:   userSchema,
                ScanFunc: scanUsers,
            }).
            Build()
    
        // Start server
        grpcServer := grpc.NewServer()
        airport.NewServer(grpcServer, airport.ServerConfig{Catalog: cat})
    
        lis, _ := net.Listen("tcp", ":50051")
        log.Println("Airport server listening on :50051")
        grpcServer.Serve(lis)
    }
  4. Run the server:

    go run main.go

Connecting from DuckDB

Once your server is running, connect to it from DuckDB:

-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;

-- Connect to your Flight server
ATTACH '' AS my_server (TYPE AIRPORT, LOCATION 'grpc://localhost:50051');

-- Query the users table
SELECT * FROM my_server.demo.users;

Expected output:

┌───────┬─────────┐
│  id   │  name   │
│ int64 │ varchar │
├───────┼─────────┤
│     1 │ Alice   │
│     2 │ Bob     │
│     3 │ Charlie │
└───────┴─────────┘

Package Overview

The airport-go package is organized into several subpackages:

Package Description
airport Main package: server creation, catalog builder, authentication helpers
airport/catalog Core interfaces: Catalog, Schema, Table, Functions, DML/DDL options
airport/auth Authentication implementations (bearer token)
airport/filter Filter pushdown parsing and SQL encoding
airport/flight Internal Flight handler implementation

Server Configuration

The ServerConfig struct configures the Airport server:

type ServerConfig struct {
    // Catalog is the catalog implementation (required)
    Catalog catalog.Catalog

    // Auth is the authentication handler (optional)
    Auth Authenticator

    // Address is the server address for FlightEndpoint locations
    Address string

    // MaxMessageSize sets the maximum gRPC message size (default: 4MB)
    MaxMessageSize int

    // LogLevel sets the logging verbosity (default: Info)
    LogLevel *slog.Level

    // TransactionManager enables transaction support (optional)
    TransactionManager catalog.TransactionManager
}

Creating a Server

// Create gRPC server with Airport options
config := airport.ServerConfig{
    Catalog:        myCatalog,
    Auth:           myAuth,
    MaxMessageSize: 16 * 1024 * 1024, // 16MB
}

opts := airport.ServerOptions(config)
grpcServer := grpc.NewServer(opts...)

// Register Airport service
err := airport.NewServer(grpcServer, config)
if err != nil {
    log.Fatal(err)
}

// Start serving
lis, _ := net.Listen("tcp", ":50051")
grpcServer.Serve(lis)

Catalog Builder API

The fluent builder API provides the easiest way to create static catalogs:

catalog, err := airport.NewCatalogBuilder().
    Schema("my_schema").
    Comment("Schema description").
    SimpleTable(airport.SimpleTableDef{
        Name:     "my_table",
        Comment:  "Table description",
        Schema:   arrowSchema,
        ScanFunc: scanFunction,
    }).
    Table(customTableImpl).        // Add custom Table implementation
    ScalarFunc(scalarFunction).    // Add scalar function
    TableFunc(tableFunction).      // Add table function
    Schema("another_schema").      // Start new schema
    // ... add more tables
    Build()

SimpleTableDef

For simple tables with a scan function:

type SimpleTableDef struct {
    Name     string
    Comment  string
    Schema   *arrow.Schema
    ScanFunc func(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error)
}

Core Interfaces

catalog.Catalog

The root interface for exposing data:

type Catalog interface {
    // Schemas returns all schemas in this catalog.
    Schemas(ctx context.Context) ([]Schema, error)

    // Schema returns a specific schema by name.
    // Returns (nil, nil) if schema doesn't exist.
    Schema(ctx context.Context, name string) (Schema, error)
}

catalog.Schema

Represents a namespace containing tables and functions:

type Schema interface {
    Name() string
    Comment() string
    Tables(ctx context.Context) ([]Table, error)
    Table(ctx context.Context, name string) (Table, error)
    ScalarFunctions(ctx context.Context) ([]ScalarFunction, error)
    TableFunctions(ctx context.Context) ([]TableFunction, error)
    TableFunctionsInOut(ctx context.Context) ([]TableFunctionInOut, error)
}

catalog.Table

Represents a scannable table:

type Table interface {
    Name() string
    Comment() string

    // ArrowSchema returns the table's Arrow schema.
    // If columns is non-nil, returns a projected schema.
    ArrowSchema(columns []string) *arrow.Schema

    // Scan returns a RecordReader for reading table data.
    Scan(ctx context.Context, opts *ScanOptions) (array.RecordReader, error)
}

catalog.ScanOptions

Options passed to Table.Scan:

type ScanOptions struct {
    Columns   []string   // Columns requested (nil = all)
    Filter    []byte     // JSON-serialized predicate expression
    Limit     int64      // Maximum rows to return
    BatchSize int        // Hint for batch size
    TimePoint *TimePoint // Point-in-time for time-travel queries
}

DML Operations (INSERT, UPDATE, DELETE)

InsertableTable

Enable INSERT operations:

type InsertableTable interface {
    Table

    // Insert adds new rows to the table.
    Insert(ctx context.Context, rows array.RecordReader, opts *DMLOptions) (*DMLResult, error)
}

UpdatableTable

Enable UPDATE operations:

type UpdatableTable interface {
    Table

    // Update modifies existing rows identified by rowIDs.
    Update(ctx context.Context, rowIDs []int64, rows array.RecordReader, opts *DMLOptions) (*DMLResult, error)
}

DeletableTable

Enable DELETE operations:

type DeletableTable interface {
    Table

    // Delete removes rows identified by rowIDs.
    Delete(ctx context.Context, rowIDs []int64, opts *DMLOptions) (*DMLResult, error)
}

UpdatableBatchTable

Alternative UPDATE interface where rowid is embedded in the RecordBatch (preferred over UpdatableTable):

type UpdatableBatchTable interface {
    Table

    // Update modifies existing rows using data from the RecordBatch.
    // The rows RecordBatch contains both the rowid column and new values.
    // Use FindRowIDColumn(rows.Schema()) to locate the rowid column.
    // Implementations MUST return ErrNullRowID if any rowid value is null.
    Update(ctx context.Context, rows arrow.RecordBatch, opts *DMLOptions) (*DMLResult, error)
}

DeletableBatchTable

Alternative DELETE interface where rowid is embedded in the RecordBatch (preferred over DeletableTable):

type DeletableBatchTable interface {
    Table

    // Delete removes rows identified by rowid values in the RecordBatch.
    // Use FindRowIDColumn(rows.Schema()) to locate the rowid column.
    // Implementations MUST return ErrNullRowID if any rowid value is null.
    Delete(ctx context.Context, rows arrow.RecordBatch, opts *DMLOptions) (*DMLResult, error)
}

ErrNullRowID

Sentinel error for null rowid validation in batch operations:

var ErrNullRowID = errors.New("null rowid value not allowed")

FindRowIDColumn

Helper function to locate the rowid column in a schema:

// FindRowIDColumn returns the index of the rowid column in the schema.
// Returns -1 if no rowid column is found.
// Rowid column is identified by name "rowid" or metadata key "is_rowid".
func FindRowIDColumn(schema *arrow.Schema) int

RowID Support

For UPDATE and DELETE operations, tables must include a rowid pseudocolumn with special metadata:

rowidMeta := arrow.NewMetadata([]string{"is_rowid"}, []string{"true"})
schema := arrow.NewSchema([]arrow.Field{
    {Name: "rowid", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: rowidMeta},
    {Name: "id", Type: arrow.PrimitiveTypes.Int64},
    {Name: "name", Type: arrow.BinaryTypes.String},
}, nil)

DML Usage Example

-- INSERT
INSERT INTO my_server.demo.users (id, name) VALUES (1, 'Alice');

-- UPDATE
UPDATE my_server.demo.users SET name = 'Alicia' WHERE id = 1;

-- DELETE
DELETE FROM my_server.demo.users WHERE id = 1;

-- RETURNING clause
INSERT INTO my_server.demo.users (id, name) VALUES (2, 'Bob') RETURNING *;

DDL Operations (CREATE, DROP, ALTER)

DynamicCatalog

Enable schema management:

type DynamicCatalog interface {
    Catalog

    CreateSchema(ctx context.Context, name string, opts CreateSchemaOptions) (Schema, error)
    DropSchema(ctx context.Context, name string, opts DropSchemaOptions) error
}

DynamicSchema

Enable table management:

type DynamicSchema interface {
    Schema

    CreateTable(ctx context.Context, name string, schema *arrow.Schema, opts CreateTableOptions) (Table, error)
    DropTable(ctx context.Context, name string, opts DropTableOptions) error
    RenameTable(ctx context.Context, oldName, newName string, opts RenameTableOptions) error
}

DynamicTable

Enable column management:

type DynamicTable interface {
    Table

    AddColumn(ctx context.Context, columnSchema *arrow.Schema, opts AddColumnOptions) error
    RemoveColumn(ctx context.Context, name string, opts RemoveColumnOptions) error
    RenameColumn(ctx context.Context, oldName, newName string, opts RenameColumnOptions) error
    ChangeColumnType(ctx context.Context, columnSchema *arrow.Schema, expression string, opts ChangeColumnTypeOptions) error
    SetNotNull(ctx context.Context, columnName string, opts SetNotNullOptions) error
    DropNotNull(ctx context.Context, columnName string, opts DropNotNullOptions) error
    SetDefault(ctx context.Context, columnName, expression string, opts SetDefaultOptions) error

    // Struct field operations
    AddField(ctx context.Context, columnSchema *arrow.Schema, opts AddFieldOptions) error
    RenameField(ctx context.Context, columnPath []string, newName string, opts RenameFieldOptions) error
    RemoveField(ctx context.Context, columnPath []string, opts RemoveFieldOptions) error
}

DDL Usage Example

-- Schema operations
CREATE SCHEMA my_server.analytics;
DROP SCHEMA my_server.analytics;

-- Table operations
CREATE TABLE my_server.demo.events (id INTEGER, name VARCHAR);
ALTER TABLE my_server.demo.events ADD COLUMN timestamp TIMESTAMP;
ALTER TABLE my_server.demo.events RENAME COLUMN name TO event_name;
DROP TABLE my_server.demo.events;

-- CREATE TABLE AS SELECT
CREATE TABLE my_server.demo.backup AS SELECT * FROM my_server.demo.users;

Scalar Functions

Scalar functions process input batches and return result arrays:

type ScalarFunction interface {
    Name() string
    Comment() string
    Signature() FunctionSignature

    // Execute runs the function on input record batch.
    Execute(ctx context.Context, input arrow.RecordBatch) (arrow.Array, error)
}

Example Scalar Function

type multiplyFunc struct{}

func (f *multiplyFunc) Name() string    { return "MULTIPLY" }
func (f *multiplyFunc) Comment() string { return "Multiplies input by factor" }

func (f *multiplyFunc) Signature() catalog.FunctionSignature {
    return catalog.FunctionSignature{
        Parameters: []arrow.DataType{
            arrow.PrimitiveTypes.Int64, // input value
            arrow.PrimitiveTypes.Int64, // multiplication factor
        },
        ReturnType: arrow.PrimitiveTypes.Int64,
    }
}

func (f *multiplyFunc) Execute(_ context.Context, input arrow.RecordBatch) (arrow.Array, error) {
    valueCol := input.Column(0).(*array.Int64)
    factorCol := input.Column(1).(*array.Int64)

    builder := array.NewInt64Builder(memory.DefaultAllocator)
    defer builder.Release()

    for i := 0; i < valueCol.Len(); i++ {
        builder.Append(valueCol.Value(i) * factorCol.Value(i))
    }

    return builder.NewInt64Array(), nil
}

Usage:

SELECT MULTIPLY(value, 10) FROM my_server.demo.users;

Table Functions

Table functions return result sets with dynamic schemas:

type TableFunction interface {
    Name() string
    Comment() string
    Signature() FunctionSignature

    // SchemaForParameters returns output schema based on parameters.
    SchemaForParameters(ctx context.Context, params []any) (*arrow.Schema, error)

    // Execute runs the table function.
    Execute(ctx context.Context, params []any, opts *ScanOptions) (array.RecordReader, error)
}

Example Table Function

type generateSeriesFunc struct{}

func (f *generateSeriesFunc) Name() string    { return "GENERATE_SERIES" }
func (f *generateSeriesFunc) Comment() string { return "Generates integer series" }

func (f *generateSeriesFunc) Signature() catalog.FunctionSignature {
    return catalog.FunctionSignature{
        Parameters: []arrow.DataType{
            arrow.PrimitiveTypes.Int64, // start
            arrow.PrimitiveTypes.Int64, // stop
            arrow.PrimitiveTypes.Int64, // step (optional)
        },
        ReturnType: nil, // Table function
    }
}

func (f *generateSeriesFunc) SchemaForParameters(_ context.Context, _ []any) (*arrow.Schema, error) {
    return arrow.NewSchema([]arrow.Field{
        {Name: "value", Type: arrow.PrimitiveTypes.Int64},
    }, nil), nil
}

func (f *generateSeriesFunc) Execute(_ context.Context, params []any, opts *catalog.ScanOptions) (array.RecordReader, error) {
    start := params[0].(int64)
    stop := params[1].(int64)
    step := int64(1)
    if params[2] != nil {
        step = params[2].(int64)
    }

    // Generate series and return RecordReader...
}

Usage:

SELECT * FROM my_server.demo.GENERATE_SERIES(1, 10, 2);

Table Functions with Row Input (In/Out)

Table functions that accept row sets as input:

type TableFunctionInOut interface {
    Name() string
    Comment() string
    Signature() FunctionSignature

    // SchemaForParameters returns output schema based on params and input schema.
    SchemaForParameters(ctx context.Context, params []any, inputSchema *arrow.Schema) (*arrow.Schema, error)

    // Execute processes input rows and returns output rows.
    Execute(ctx context.Context, params []any, input array.RecordReader, opts *ScanOptions) (array.RecordReader, error)
}

Authentication

Bearer Token Authentication

auth := airport.BearerAuth(func(token string) (string, error) {
    if token == "secret-api-key" {
        return "user-identity", nil
    }
    return "", airport.ErrUnauthorized
})

config := airport.ServerConfig{
    Catalog: cat,
    Auth:    auth,
}

// IMPORTANT: Use ServerOptions when creating gRPC server
opts := airport.ServerOptions(config)
grpcServer := grpc.NewServer(opts...)

Accessing Identity in Handlers

func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    identity := airport.IdentityFromContext(ctx)
    if identity != "" {
        // User is authenticated
    }
    // ...
}

DuckDB Authentication

-- Using persistent secret
CREATE PERSISTENT SECRET my_auth (
    TYPE airport,
    auth_token 'secret-api-key',
    scope 'grpc://localhost:50051'
);

ATTACH '' AS my_server (TYPE AIRPORT, LOCATION 'grpc://localhost:50051');

-- Or inline with headers
SELECT * FROM airport_take_flight(
    'grpc://localhost:50051',
    'SELECT * FROM demo.users',
    headers := MAP{'authorization': 'secret-api-key'}
);

Filter Pushdown (Predicate Pushdown)

DuckDB can push WHERE clause predicates to the server via ScanOptions.Filter. The filter package provides parsing and SQL encoding for these expressions.

Using the Filter Package

import "github.com/hugr-lab/airport-go/filter"

func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    if opts.Filter != nil {
        // Parse filter JSON into strongly-typed structures
        fp, err := filter.Parse(opts.Filter)
        if err != nil {
            // Malformed JSON - fall back to unfiltered scan
            return t.scanAll(ctx)
        }

        // Encode to SQL WHERE clause
        enc := filter.NewDuckDBEncoder(nil)
        whereClause := enc.EncodeFilters(fp)

        if whereClause != "" {
            query := "SELECT * FROM table WHERE " + whereClause
            // Execute filtered query...
        }
    }

    return t.scanAll(ctx)
}

Column Mapping

Map DuckDB column names to backend storage names:

enc := filter.NewDuckDBEncoder(&filter.EncoderOptions{
    ColumnMapping: map[string]string{
        "user_id": "uid",           // user_id → uid
        "created": "created_at",    // created → created_at
    },
})

Column Expression Replacement

Replace column names with SQL expressions for computed columns:

enc := filter.NewDuckDBEncoder(&filter.EncoderOptions{
    ColumnExpressions: map[string]string{
        "full_name": "CONCAT(first_name, ' ', last_name)",
    },
})

Unsupported Expression Handling

The encoder gracefully handles unsupported expressions:

  • AND: Skips unsupported children, keeps others
  • OR: If any child is unsupported, skips entire OR expression
  • Returns empty string if all expressions are unsupported

This produces the widest possible filter, which is safe because DuckDB applies filters client-side as a fallback.

Supported Expression Types

Expression Type Description
ComparisonExpression Binary comparisons (=, <>, <, >, <=, >=, IN, NOT IN, BETWEEN)
ConjunctionExpression AND/OR with multiple children
ConstantExpression Literal values with type information
ColumnRefExpression References to table columns
FunctionExpression Function calls (LOWER, LENGTH, etc.)
CastExpression Type casts (CAST, TRY_CAST)
BetweenExpression BETWEEN lower AND upper
OperatorExpression Unary operators (IS NULL, IS NOT NULL, NOT)
CaseExpression CASE WHEN … THEN … ELSE … END

Supported Data Types

All DuckDB logical types are supported: BOOLEAN, TINYINT-BIGINT, UTINYINT-UBIGINT, HUGEINT, UHUGEINT, FLOAT, DOUBLE, DECIMAL, VARCHAR, BLOB, DATE, TIME, TIMESTAMP variants, INTERVAL, UUID, and complex types (LIST, STRUCT, MAP, ARRAY).

For detailed JSON format specification, see the Airport Extension documentation.

Time Travel Queries

Support point-in-time queries by handling TimePoint in scan options:

type TimePoint struct {
    Unit  string // "timestamp", "version", or "snapshot"
    Value string // Time value in appropriate format
}

func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    if opts.TimePoint != nil {
        // Query data at specific point in time
        switch opts.TimePoint.Unit {
        case "timestamp":
            // opts.TimePoint.Value = "2024-01-15T10:30:00Z"
        case "version":
            // opts.TimePoint.Value = "42"
        case "snapshot":
            // opts.TimePoint.Value = "abc123def"
        }
    }
    // ...
}

Transaction Support

Enable transaction coordination by implementing TransactionManager:

type TransactionManager interface {
    BeginTransaction(ctx context.Context) (txID string, err error)
    CommitTransaction(ctx context.Context, txID string) error
    RollbackTransaction(ctx context.Context, txID string) error
    GetTransactionStatus(ctx context.Context, txID string) (TransactionState, bool)
}

Configure with server:

config := airport.ServerConfig{
    Catalog:            cat,
    TransactionManager: myTxManager,
}

Usage in DuckDB:

BEGIN TRANSACTION;
INSERT INTO my_server.demo.users (id, name) VALUES (100, 'TxUser');
ROLLBACK;  -- Changes are discarded

Column Statistics

Provide column statistics for query optimization:

type StatisticsTable interface {
    Table

    ColumnStatistics(ctx context.Context, columnName string, columnType string) (*ColumnStats, error)
}

type ColumnStats struct {
    HasNotNull      *bool
    HasNull         *bool
    DistinctCount   *uint64
    Min             any
    Max             any
    MaxStringLength *uint64
    ContainsUnicode *bool
}

Catalog Versioning

Enable schema caching with version tracking:

type VersionedCatalog interface {
    Catalog

    CatalogVersion(ctx context.Context) (CatalogVersion, error)
}

type CatalogVersion struct {
    Version uint64 // Current version number
    IsFixed bool   // If true, version is fixed for session
}

Dynamic Catalogs

For catalogs that change at runtime, implement the Catalog interface directly:

type DynamicCatalog struct {
    mu      sync.RWMutex
    schemas map[string]*DynamicSchema
}

func (c *DynamicCatalog) Schemas(ctx context.Context) ([]catalog.Schema, error) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    // Filter based on user permissions
    identity := airport.IdentityFromContext(ctx)

    var result []catalog.Schema
    for _, schema := range c.schemas {
        if schema.canAccess(identity) {
            result = append(result, schema)
        }
    }
    return result, nil
}

func (c *DynamicCatalog) Schema(ctx context.Context, name string) (catalog.Schema, error) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    schema, ok := c.schemas[name]
    if !ok {
        return nil, nil // Not found
    }
    return schema, nil
}

Performance Tips

Batch Sizing

Optimal batch size: 10,000-100,000 rows per batch.

// Good: Multiple rows per batch
func scanLarge(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    records := make([]arrow.Record, 0)
    for i := 0; i < 10; i++ {
        record := buildBatch(10000) // 10k rows per batch
        records = append(records, record)
    }
    return array.NewRecordReader(schema, records)
}

Streaming Large Datasets

Don’t load entire results into memory:

func streamScan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    rows, _ := db.QueryContext(ctx, "SELECT * FROM large_table")

    // Create streaming reader that builds batches on-demand
    return NewStreamingReader(rows, batchSize), nil
}

Context Cancellation

Respect client cancellations:

func scanWithCancellation(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
    for rows.Next() {
        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
        }
        // Process row...
    }
}

Memory Management

Release Arrow objects to avoid memory leaks:

func buildRecord(schema *arrow.Schema) arrow.Record {
    builder := array.NewRecordBuilder(memory.DefaultAllocator, schema)
    defer builder.Release()  // Always release builders

    // Build record...
    return builder.NewRecord()
}

gRPC Message Size

Configure larger message sizes for big Arrow batches:

config := airport.ServerConfig{
    Catalog:        cat,
    MaxMessageSize: 16 * 1024 * 1024, // 16MB (default is 4MB)
}

Utility Functions

catalog.ProjectSchema

Projects an Arrow schema to include only specified columns:

projected := catalog.ProjectSchema(fullSchema, []string{"id", "name"})

Context Helpers

// Get identity from context
identity := airport.IdentityFromContext(ctx)

// Get transaction ID from context
txID, ok := catalog.TransactionIDFromContext(ctx)

// Add transaction ID to context
ctx = catalog.WithTransactionID(ctx, txID)

Error Types

var (
    ErrUnauthorized   = errors.New("unauthorized")
    ErrNotFound       = errors.New("not found")
    ErrAlreadyExists  = errors.New("already exists")
    ErrSchemaNotEmpty = errors.New("schema contains tables")
    ErrNotImplemented = errors.New("not implemented")
    ErrNullRowID      = errors.New("null rowid value not allowed") // For batch DML operations
)

Thread Safety

All interface implementations must be safe for concurrent use:

  • Multiple goroutines may call Scan simultaneously
  • Schema/Table discovery may happen during scans
  • DDL operations may occur concurrently with queries

Use appropriate synchronization in your implementations.

Known Limitations

  • Reserved schema name: The schema name main cannot be used in Airport-attached servers. This is a limitation of the current Airport DuckDB Extension implementation. While DuckDB itself handles main schema normally, Airport-attached catalogs cannot expose schemas named main. Choose a different name for your schemas (e.g., demo, app, public, etc.).

Examples

The airport-go repository includes several examples:

Example Description
basic Simple server with in-memory data
auth Bearer token authentication
ddl DDL operations (CREATE/DROP/ALTER)
dml DML operations with transactions
dynamic Dynamic catalog with live schema updates
functions Scalar and table functions
filter Filter pushdown parsing and SQL encoding
tls TLS/SSL configuration
timetravel Time travel queries

Further Documentation

References