Implement an Airport-compatible server on Golang
This guide provides an overview of how to implement an Airport-compatible Arrow Flight server using the airport-go package. The airport-go package simplifies the process of creating an Arrow Flight server that adheres to the Airport protocol, enabling seamless integration with Airport-attached databases.
Prerequisites
Before you begin, ensure you have the following prerequisites:
- Go programming language installed (version 1.25 or later).
- Basic understanding of Apache Arrow, Arrow Flight and gRPC.
Setting Up the Airport-compatible Arrow Flight Server
Install the airport-go package:
Use the following command to install the airport-go package:
go get github.com/hugr-lab/airport-go@latestCreate a new Go project:
mkdir airport-server cd airport-server go mod init airport-serverImplement a basic server:
Create a
main.gofile with the following content:package main import ( "context" "log" "net" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "google.golang.org/grpc" "github.com/hugr-lab/airport-go" "github.com/hugr-lab/airport-go/catalog" ) func main() { // Define schema and scan function userSchema := arrow.NewSchema([]arrow.Field{ {Name: "id", Type: arrow.PrimitiveTypes.Int64}, {Name: "name", Type: arrow.BinaryTypes.String}, }, nil) scanUsers := func(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) { builder := array.NewRecordBuilder(memory.DefaultAllocator, userSchema) defer builder.Release() builder.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil) builder.Field(1).(*array.StringBuilder).AppendValues([]string{"Alice", "Bob", "Charlie"}, nil) record := builder.NewRecord() defer record.Release() return array.NewRecordReader(userSchema, []arrow.Record{record}) } // Build catalog cat, _ := airport.NewCatalogBuilder(). Schema("demo"). SimpleTable(airport.SimpleTableDef{ Name: "users", Comment: "User accounts", Schema: userSchema, ScanFunc: scanUsers, }). Build() // Start server grpcServer := grpc.NewServer() airport.NewServer(grpcServer, airport.ServerConfig{Catalog: cat}) lis, _ := net.Listen("tcp", ":50051") log.Println("Airport server listening on :50051") grpcServer.Serve(lis) }Run the server:
go run main.go
Connecting from DuckDB
Once your server is running, connect to it from DuckDB:
-- Install and load Airport extension
INSTALL airport FROM community;
LOAD airport;
-- Connect to your Flight server
ATTACH '' AS my_server (TYPE AIRPORT, LOCATION 'grpc://localhost:50051');
-- Query the users table
SELECT * FROM my_server.demo.users;Expected output:
┌───────┬─────────┐
│ id │ name │
│ int64 │ varchar │
├───────┼─────────┤
│ 1 │ Alice │
│ 2 │ Bob │
│ 3 │ Charlie │
└───────┴─────────┘
Package Overview
The airport-go package is organized into several subpackages:
| Package | Description |
|---|---|
airport |
Main package: server creation, catalog builder, authentication helpers |
airport/catalog |
Core interfaces: Catalog, Schema, Table, Functions, DML/DDL options |
airport/auth |
Authentication implementations (bearer token) |
airport/flight |
Internal Flight handler implementation |
Server Configuration
The ServerConfig struct configures the Airport server:
type ServerConfig struct {
// Catalog is the catalog implementation (required)
Catalog catalog.Catalog
// Auth is the authentication handler (optional)
Auth Authenticator
// Address is the server address for FlightEndpoint locations
Address string
// MaxMessageSize sets the maximum gRPC message size (default: 4MB)
MaxMessageSize int
// LogLevel sets the logging verbosity (default: Info)
LogLevel *slog.Level
// TransactionManager enables transaction support (optional)
TransactionManager catalog.TransactionManager
}Creating a Server
// Create gRPC server with Airport options
config := airport.ServerConfig{
Catalog: myCatalog,
Auth: myAuth,
MaxMessageSize: 16 * 1024 * 1024, // 16MB
}
opts := airport.ServerOptions(config)
grpcServer := grpc.NewServer(opts...)
// Register Airport service
err := airport.NewServer(grpcServer, config)
if err != nil {
log.Fatal(err)
}
// Start serving
lis, _ := net.Listen("tcp", ":50051")
grpcServer.Serve(lis)Catalog Builder API
The fluent builder API provides the easiest way to create static catalogs:
catalog, err := airport.NewCatalogBuilder().
Schema("my_schema").
Comment("Schema description").
SimpleTable(airport.SimpleTableDef{
Name: "my_table",
Comment: "Table description",
Schema: arrowSchema,
ScanFunc: scanFunction,
}).
Table(customTableImpl). // Add custom Table implementation
ScalarFunc(scalarFunction). // Add scalar function
TableFunc(tableFunction). // Add table function
Schema("another_schema"). // Start new schema
// ... add more tables
Build()SimpleTableDef
For simple tables with a scan function:
type SimpleTableDef struct {
Name string
Comment string
Schema *arrow.Schema
ScanFunc func(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error)
}Core Interfaces
catalog.Catalog
The root interface for exposing data:
type Catalog interface {
// Schemas returns all schemas in this catalog.
Schemas(ctx context.Context) ([]Schema, error)
// Schema returns a specific schema by name.
// Returns (nil, nil) if schema doesn't exist.
Schema(ctx context.Context, name string) (Schema, error)
}catalog.Schema
Represents a namespace containing tables and functions:
type Schema interface {
Name() string
Comment() string
Tables(ctx context.Context) ([]Table, error)
Table(ctx context.Context, name string) (Table, error)
ScalarFunctions(ctx context.Context) ([]ScalarFunction, error)
TableFunctions(ctx context.Context) ([]TableFunction, error)
TableFunctionsInOut(ctx context.Context) ([]TableFunctionInOut, error)
}catalog.Table
Represents a scannable table:
type Table interface {
Name() string
Comment() string
// ArrowSchema returns the table's Arrow schema.
// If columns is non-nil, returns a projected schema.
ArrowSchema(columns []string) *arrow.Schema
// Scan returns a RecordReader for reading table data.
Scan(ctx context.Context, opts *ScanOptions) (array.RecordReader, error)
}catalog.ScanOptions
Options passed to Table.Scan:
type ScanOptions struct {
Columns []string // Columns requested (nil = all)
Filter []byte // JSON-serialized predicate expression
Limit int64 // Maximum rows to return
BatchSize int // Hint for batch size
TimePoint *TimePoint // Point-in-time for time-travel queries
}DML Operations (INSERT, UPDATE, DELETE)
InsertableTable
Enable INSERT operations:
type InsertableTable interface {
Table
// Insert adds new rows to the table.
Insert(ctx context.Context, rows array.RecordReader, opts *DMLOptions) (*DMLResult, error)
}UpdatableTable
Enable UPDATE operations:
type UpdatableTable interface {
Table
// Update modifies existing rows identified by rowIDs.
Update(ctx context.Context, rowIDs []int64, rows array.RecordReader, opts *DMLOptions) (*DMLResult, error)
}DeletableTable
Enable DELETE operations:
type DeletableTable interface {
Table
// Delete removes rows identified by rowIDs.
Delete(ctx context.Context, rowIDs []int64, opts *DMLOptions) (*DMLResult, error)
}RowID Support
For UPDATE and DELETE operations, tables must include a rowid pseudocolumn with special metadata:
rowidMeta := arrow.NewMetadata([]string{"is_rowid"}, []string{"true"})
schema := arrow.NewSchema([]arrow.Field{
{Name: "rowid", Type: arrow.PrimitiveTypes.Int64, Nullable: false, Metadata: rowidMeta},
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
{Name: "name", Type: arrow.BinaryTypes.String},
}, nil)DML Usage Example
-- INSERT
INSERT INTO my_server.demo.users (id, name) VALUES (1, 'Alice');
-- UPDATE
UPDATE my_server.demo.users SET name = 'Alicia' WHERE id = 1;
-- DELETE
DELETE FROM my_server.demo.users WHERE id = 1;
-- RETURNING clause
INSERT INTO my_server.demo.users (id, name) VALUES (2, 'Bob') RETURNING *;DDL Operations (CREATE, DROP, ALTER)
DynamicCatalog
Enable schema management:
type DynamicCatalog interface {
Catalog
CreateSchema(ctx context.Context, name string, opts CreateSchemaOptions) (Schema, error)
DropSchema(ctx context.Context, name string, opts DropSchemaOptions) error
}DynamicSchema
Enable table management:
type DynamicSchema interface {
Schema
CreateTable(ctx context.Context, name string, schema *arrow.Schema, opts CreateTableOptions) (Table, error)
DropTable(ctx context.Context, name string, opts DropTableOptions) error
RenameTable(ctx context.Context, oldName, newName string, opts RenameTableOptions) error
}DynamicTable
Enable column management:
type DynamicTable interface {
Table
AddColumn(ctx context.Context, columnSchema *arrow.Schema, opts AddColumnOptions) error
RemoveColumn(ctx context.Context, name string, opts RemoveColumnOptions) error
RenameColumn(ctx context.Context, oldName, newName string, opts RenameColumnOptions) error
ChangeColumnType(ctx context.Context, columnSchema *arrow.Schema, expression string, opts ChangeColumnTypeOptions) error
SetNotNull(ctx context.Context, columnName string, opts SetNotNullOptions) error
DropNotNull(ctx context.Context, columnName string, opts DropNotNullOptions) error
SetDefault(ctx context.Context, columnName, expression string, opts SetDefaultOptions) error
}DDL Usage Example
-- Schema operations
CREATE SCHEMA my_server.analytics;
DROP SCHEMA my_server.analytics;
-- Table operations
CREATE TABLE my_server.demo.events (id INTEGER, name VARCHAR);
ALTER TABLE my_server.demo.events ADD COLUMN timestamp TIMESTAMP;
ALTER TABLE my_server.demo.events RENAME COLUMN name TO event_name;
DROP TABLE my_server.demo.events;
-- CREATE TABLE AS SELECT
CREATE TABLE my_server.demo.backup AS SELECT * FROM my_server.demo.users;Scalar Functions
Scalar functions process input batches and return result arrays:
type ScalarFunction interface {
Name() string
Comment() string
Signature() FunctionSignature
// Execute runs the function on input record batch.
Execute(ctx context.Context, input arrow.RecordBatch) (arrow.Array, error)
}Example Scalar Function
type multiplyFunc struct{}
func (f *multiplyFunc) Name() string { return "MULTIPLY" }
func (f *multiplyFunc) Comment() string { return "Multiplies input by factor" }
func (f *multiplyFunc) Signature() catalog.FunctionSignature {
return catalog.FunctionSignature{
Parameters: []arrow.DataType{
arrow.PrimitiveTypes.Int64, // input value
arrow.PrimitiveTypes.Int64, // multiplication factor
},
ReturnType: arrow.PrimitiveTypes.Int64,
}
}
func (f *multiplyFunc) Execute(_ context.Context, input arrow.RecordBatch) (arrow.Array, error) {
valueCol := input.Column(0).(*array.Int64)
factorCol := input.Column(1).(*array.Int64)
builder := array.NewInt64Builder(memory.DefaultAllocator)
defer builder.Release()
for i := 0; i < valueCol.Len(); i++ {
builder.Append(valueCol.Value(i) * factorCol.Value(i))
}
return builder.NewInt64Array(), nil
}Usage:
SELECT MULTIPLY(value, 10) FROM my_server.demo.users;Table Functions
Table functions return result sets with dynamic schemas:
type TableFunction interface {
Name() string
Comment() string
Signature() FunctionSignature
// SchemaForParameters returns output schema based on parameters.
SchemaForParameters(ctx context.Context, params []any) (*arrow.Schema, error)
// Execute runs the table function.
Execute(ctx context.Context, params []any, opts *ScanOptions) (array.RecordReader, error)
}Example Table Function
type generateSeriesFunc struct{}
func (f *generateSeriesFunc) Name() string { return "GENERATE_SERIES" }
func (f *generateSeriesFunc) Comment() string { return "Generates integer series" }
func (f *generateSeriesFunc) Signature() catalog.FunctionSignature {
return catalog.FunctionSignature{
Parameters: []arrow.DataType{
arrow.PrimitiveTypes.Int64, // start
arrow.PrimitiveTypes.Int64, // stop
arrow.PrimitiveTypes.Int64, // step (optional)
},
ReturnType: nil, // Table function
}
}
func (f *generateSeriesFunc) SchemaForParameters(_ context.Context, _ []any) (*arrow.Schema, error) {
return arrow.NewSchema([]arrow.Field{
{Name: "value", Type: arrow.PrimitiveTypes.Int64},
}, nil), nil
}
func (f *generateSeriesFunc) Execute(_ context.Context, params []any, opts *catalog.ScanOptions) (array.RecordReader, error) {
start := params[0].(int64)
stop := params[1].(int64)
step := int64(1)
if params[2] != nil {
step = params[2].(int64)
}
// Generate series and return RecordReader...
}Usage:
SELECT * FROM my_server.demo.GENERATE_SERIES(1, 10, 2);Table Functions with Row Input (In/Out)
Table functions that accept row sets as input:
type TableFunctionInOut interface {
Name() string
Comment() string
Signature() FunctionSignature
// SchemaForParameters returns output schema based on params and input schema.
SchemaForParameters(ctx context.Context, params []any, inputSchema *arrow.Schema) (*arrow.Schema, error)
// Execute processes input rows and returns output rows.
Execute(ctx context.Context, params []any, input array.RecordReader, opts *ScanOptions) (array.RecordReader, error)
}Authentication
Bearer Token Authentication
auth := airport.BearerAuth(func(token string) (string, error) {
if token == "secret-api-key" {
return "user-identity", nil
}
return "", airport.ErrUnauthorized
})
config := airport.ServerConfig{
Catalog: cat,
Auth: auth,
}
// IMPORTANT: Use ServerOptions when creating gRPC server
opts := airport.ServerOptions(config)
grpcServer := grpc.NewServer(opts...)Accessing Identity in Handlers
func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
identity := airport.IdentityFromContext(ctx)
if identity != "" {
// User is authenticated
}
// ...
}DuckDB Authentication
-- Using persistent secret
CREATE PERSISTENT SECRET my_auth (
TYPE airport,
auth_token 'secret-api-key',
scope 'grpc://localhost:50051'
);
ATTACH '' AS my_server (TYPE AIRPORT, LOCATION 'grpc://localhost:50051');
-- Or inline with headers
SELECT * FROM airport_take_flight(
'grpc://localhost:50051',
'SELECT * FROM demo.users',
headers := MAP{'authorization': 'secret-api-key'}
);Filter Pushdown (Predicate Pushdown)
DuckDB can push WHERE clause predicates to the server via ScanOptions.Filter. The filter is a JSON-serialized expression tree.
func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
if opts.Filter != nil {
// Parse the JSON filter
var filterExpr struct {
Filters []json.RawMessage `json:"filters"`
ColumnBindingNames []string `json:"column_binding_names_by_index"`
}
if err := json.Unmarshal(opts.Filter, &filterExpr); err != nil {
return t.scanAll(ctx)
}
// Interpret and apply filter to your data source
}
return t.scanAll(ctx)
}Filter Expression Types
| Expression Class | Description |
|---|---|
BOUND_COMPARISON |
Comparison operators (=, >, <, >=, <=, !=) |
BOUND_COLUMN_REF |
Column references |
BOUND_CONSTANT |
Literal values |
BOUND_CONJUNCTION |
Logical AND/OR operators |
BOUND_FUNCTION |
Function calls |
For detailed format specification, see the Airport Extension documentation.
Time Travel Queries
Support point-in-time queries by handling TimePoint in scan options:
type TimePoint struct {
Unit string // "timestamp", "version", or "snapshot"
Value string // Time value in appropriate format
}
func (t *MyTable) Scan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
if opts.TimePoint != nil {
// Query data at specific point in time
switch opts.TimePoint.Unit {
case "timestamp":
// opts.TimePoint.Value = "2024-01-15T10:30:00Z"
case "version":
// opts.TimePoint.Value = "42"
case "snapshot":
// opts.TimePoint.Value = "abc123def"
}
}
// ...
}Transaction Support
Enable transaction coordination by implementing TransactionManager:
type TransactionManager interface {
BeginTransaction(ctx context.Context) (txID string, err error)
CommitTransaction(ctx context.Context, txID string) error
RollbackTransaction(ctx context.Context, txID string) error
GetTransactionStatus(ctx context.Context, txID string) (TransactionState, bool)
}Configure with server:
config := airport.ServerConfig{
Catalog: cat,
TransactionManager: myTxManager,
}Usage in DuckDB:
BEGIN TRANSACTION;
INSERT INTO my_server.demo.users (id, name) VALUES (100, 'TxUser');
ROLLBACK; -- Changes are discardedColumn Statistics
Provide column statistics for query optimization:
type StatisticsTable interface {
Table
ColumnStatistics(ctx context.Context, columnName string, columnType string) (*ColumnStats, error)
}
type ColumnStats struct {
HasNotNull *bool
HasNull *bool
DistinctCount *uint64
Min any
Max any
MaxStringLength *uint64
ContainsUnicode *bool
}Catalog Versioning
Enable schema caching with version tracking:
type VersionedCatalog interface {
Catalog
CatalogVersion(ctx context.Context) (CatalogVersion, error)
}
type CatalogVersion struct {
Version uint64 // Current version number
IsFixed bool // If true, version is fixed for session
}Dynamic Catalogs
For catalogs that change at runtime, implement the Catalog interface directly:
type DynamicCatalog struct {
mu sync.RWMutex
schemas map[string]*DynamicSchema
}
func (c *DynamicCatalog) Schemas(ctx context.Context) ([]catalog.Schema, error) {
c.mu.RLock()
defer c.mu.RUnlock()
// Filter based on user permissions
identity := airport.IdentityFromContext(ctx)
var result []catalog.Schema
for _, schema := range c.schemas {
if schema.canAccess(identity) {
result = append(result, schema)
}
}
return result, nil
}
func (c *DynamicCatalog) Schema(ctx context.Context, name string) (catalog.Schema, error) {
c.mu.RLock()
defer c.mu.RUnlock()
schema, ok := c.schemas[name]
if !ok {
return nil, nil // Not found
}
return schema, nil
}Performance Tips
Batch Sizing
Optimal batch size: 10,000-100,000 rows per batch.
// Good: Multiple rows per batch
func scanLarge(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
records := make([]arrow.Record, 0)
for i := 0; i < 10; i++ {
record := buildBatch(10000) // 10k rows per batch
records = append(records, record)
}
return array.NewRecordReader(schema, records)
}Streaming Large Datasets
Don’t load entire results into memory:
func streamScan(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
rows, _ := db.QueryContext(ctx, "SELECT * FROM large_table")
// Create streaming reader that builds batches on-demand
return NewStreamingReader(rows, batchSize), nil
}Context Cancellation
Respect client cancellations:
func scanWithCancellation(ctx context.Context, opts *catalog.ScanOptions) (array.RecordReader, error) {
for rows.Next() {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Process row...
}
}Memory Management
Release Arrow objects to avoid memory leaks:
func buildRecord(schema *arrow.Schema) arrow.Record {
builder := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer builder.Release() // Always release builders
// Build record...
return builder.NewRecord()
}gRPC Message Size
Configure larger message sizes for big Arrow batches:
config := airport.ServerConfig{
Catalog: cat,
MaxMessageSize: 16 * 1024 * 1024, // 16MB (default is 4MB)
}Utility Functions
catalog.ProjectSchema
Projects an Arrow schema to include only specified columns:
projected := catalog.ProjectSchema(fullSchema, []string{"id", "name"})Context Helpers
// Get identity from context
identity := airport.IdentityFromContext(ctx)
// Get transaction ID from context
txID, ok := catalog.TransactionIDFromContext(ctx)
// Add transaction ID to context
ctx = catalog.WithTransactionID(ctx, txID)Error Types
var (
ErrUnauthorized = errors.New("unauthorized")
ErrNotFound = errors.New("not found")
ErrAlreadyExists = errors.New("already exists")
ErrSchemaNotEmpty = errors.New("schema contains tables")
ErrNotImplemented = errors.New("not implemented")
)Thread Safety
All interface implementations must be safe for concurrent use:
- Multiple goroutines may call Scan simultaneously
- Schema/Table discovery may happen during scans
- DDL operations may occur concurrently with queries
Use appropriate synchronization in your implementations.
Known Limitations
- Reserved schema name: The schema name
maincannot be used in Airport-attached servers. This is a limitation of the current Airport DuckDB Extension implementation. While DuckDB itself handlesmainschema normally, Airport-attached catalogs cannot expose schemas namedmain. Choose a different name for your schemas (e.g.,demo,app,public, etc.).
Examples
The airport-go repository includes several examples:
| Example | Description |
|---|---|
| basic | Simple server with in-memory data |
| auth | Bearer token authentication |
| ddl | DDL operations (CREATE/DROP/ALTER) |
| dml | DML operations with transactions |
| dynamic | Dynamic catalog with live schema updates |
| functions | Scalar and table functions |
| tls | TLS/SSL configuration |
| timetravel | Time travel queries |
Further Documentation
- GoDoc API Reference - Full API documentation
- Protocol Overview - Airport protocol details
- API Guide - Interface documentation
- Implementation Guide - Custom catalog implementation