// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package grpc provides a gRPC server that exposes the DDA peripheral services
// to gRPC and gRPC-Web clients.
package grpc
import (
"context"
"net"
"os"
"strings"
"sync"
"github.com/coatyio/dda/apis"
"github.com/coatyio/dda/apis/grpc/stubs/golang/com"
"github.com/coatyio/dda/apis/grpc/stubs/golang/state"
"github.com/coatyio/dda/apis/grpc/stubs/golang/store"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services"
comapi "github.com/coatyio/dda/services/com/api"
stateapi "github.com/coatyio/dda/services/state/api"
storeapi "github.com/coatyio/dda/services/store/api"
"github.com/google/uuid"
rpc "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
var metadata_dda_suback = metadata.Pairs("dda-suback", "OK")
// grpcServer realizes a gRPC server that exposes the peripheral DDA services to
// gRPC and gRPC-Web clients. grpcServer implements the apis.ApiServer interface
// and all the ServiceServer interfaces of the generated gRPC stubs.
type grpcServer struct {
com.UnimplementedComServiceServer
comApi comapi.Api
store.UnimplementedStoreServiceServer
storeApi storeapi.Api
state.UnimplementedStateServiceServer
stateApi stateapi.Api
mu sync.RWMutex // protects following fields
srv *rpc.Server
grpcWebServer
actionCallbacks map[string]comapi.ActionCallback
queryCallbacks map[string]comapi.QueryCallback
}
// New returns an apis.ApiServer interface that implements an uninitialized gRPC
// server exposing the peripheral DDA services to gRPC and gRPC-Web clients. To
// start the returned server, invoke Open with a gRPC-enabled DDA configuration.
func New(com comapi.Api, store storeapi.Api, state stateapi.Api) apis.ApiServer {
return &grpcServer{
comApi: com,
storeApi: store,
stateApi: state,
actionCallbacks: make(map[string]comapi.ActionCallback),
queryCallbacks: make(map[string]comapi.QueryCallback),
}
}
// Open creates a ready-to-run gRPC server with the configured server
// credentials that accepts client requests over gRPC/gRPC-Web on the configured
// addresses. In case of gRPC connections supported protocols include TCP and
// Unix domain sockets.
func (s *grpcServer) Open(cfg *config.Config) error {
address := cfg.Apis.Grpc.Address
s.mu.Lock()
defer s.mu.Unlock()
if s.srv == nil {
// Set up gRPC server.
srvOpts := []rpc.ServerOption{}
if cfg.Apis.Cert != "" && cfg.Apis.Key != "" {
creds, err := credentials.NewServerTLSFromFile(cfg.Apis.Cert, cfg.Apis.Key)
if err != nil {
return err
}
srvOpts = append(srvOpts, rpc.Creds(creds))
}
// Use configurable values for Keepalive Server Parameters.
//
// https://pkg.go.dev/google.golang.org/grpc/keepalive
// https://grpc.io/docs/guides/keepalive/
// https://github.com/grpc/grpc-go/blob/v1.53.0/Documentation/keepalive.md
srvOpts = append(srvOpts, rpc.KeepaliveParams(keepalive.ServerParameters{
Time: cfg.Apis.Grpc.Keepalive,
}))
s.srv = rpc.NewServer(srvOpts...)
com.RegisterComServiceServer(s.srv, s)
store.RegisterStoreServiceServer(s.srv, s)
state.RegisterStateServiceServer(s.srv, s)
plog.Printf("Open gRPC server listening on address %s...\n", address)
protocol := "tcp"
if sockfile, isUnix := strings.CutPrefix(address, "unix:"); isUnix {
protocol = "unix"
address = sockfile
// Remove existing socket file to clean up in crash situations where
// the listener is not being closed.
if err := os.Remove(sockfile); err != nil && !os.IsNotExist(err) {
return err
}
}
lis, err := net.Listen(protocol, address)
if err != nil {
return err
}
go s.srv.Serve(lis)
return s.openWebServer(s.srv, cfg)
}
return nil
}
// Close stops the gRPC server, canceling all active RPCs on the server side,
// and closing all open connections. Pending RPCs on the client side will get
// notified by connection errors.
func (s *grpcServer) Close() {
s.mu.Lock()
defer s.mu.Unlock()
if s.srv != nil {
s.srv.Stop()
s.srv = nil
s.closeWebServer()
for k := range s.actionCallbacks {
delete(s.actionCallbacks, k)
}
for k := range s.queryCallbacks {
delete(s.queryCallbacks, k)
}
plog.Println("Closed gRPC server")
}
}
// Com Api
func (s *grpcServer) PublishEvent(ctx context.Context, event *com.Event) (*com.Ack, error) {
if s.comApi == nil {
return nil, s.serviceDisabledError("com")
}
if err := s.comApi.PublishEvent(comapi.Event{
Type: event.GetType(),
Id: event.GetId(),
Source: event.GetSource(),
Time: event.GetTime(),
Data: event.GetData(),
DataContentType: event.GetDataContentType(),
}); err != nil {
err = status.Errorf(s.codeByError(err), "failed publishing Event: %v", err)
plog.Println(err)
return nil, err
}
return &com.Ack{}, nil
}
func (s *grpcServer) SubscribeEvent(filter *com.SubscriptionFilter, stream com.ComService_SubscribeEventServer) error {
if s.comApi == nil {
return s.serviceDisabledError("com")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
evts, err := s.comApi.SubscribeEvent(ctx, comapi.SubscriptionFilter{
Type: filter.GetType(),
Share: filter.GetShare(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed subscribing Event: %v", err)
plog.Println(err)
return err
}
// Send header with custom metadata to signal acknowledgment of the DDA
// subscription. The gRPC client should await this acknowledgment to prevent
// race conditions with subsequent related publications that may be
// delivered and responded before this subscription is being fully
// established by the pub-sub server.
if err := stream.SendHeader(metadata_dda_suback); err != nil {
plog.Println(err)
return err
}
for {
select {
case evt, ok := <-evts:
if !ok {
// End stream if channel has been closed by communication service.
return nil
}
if err := stream.Send(&com.Event{
Type: evt.Type,
Id: evt.Id,
Source: evt.Source,
Time: evt.Time,
Data: evt.Data,
DataContentType: evt.DataContentType,
}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}
func (s *grpcServer) PublishAction(action *com.Action, stream com.ComService_PublishActionServer) error {
if s.comApi == nil {
return s.serviceDisabledError("com")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results, err := s.comApi.PublishAction(ctx, comapi.Action{
Type: action.GetType(),
Id: action.GetId(),
Source: action.GetSource(),
Params: action.GetParams(),
DataContentType: action.GetDataContentType(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed publishing Action: %v", err)
plog.Println(err)
return err
}
// Send header with custom metadata to signal acknowledgment of the DDA
// response subscription and the publication. The gRPC client may await this
// acknowledgment to prevent race conditions with subsequent related
// publications that may be delivered and responded on the response
// subscription before it is being fully established by the pub-sub server.
if err := stream.SendHeader(metadata_dda_suback); err != nil {
plog.Println(err)
return err
}
for {
select {
case res, ok := <-results:
if !ok {
// End stream if channel has been closed by communication service.
return nil
}
if err := stream.Send(&com.ActionResult{
Context: res.Context,
Data: res.Data,
DataContentType: res.DataContentType,
SequenceNumber: res.SequenceNumber,
}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}
func (s *grpcServer) SubscribeAction(filter *com.SubscriptionFilter, stream com.ComService_SubscribeActionServer) error {
if s.comApi == nil {
return s.serviceDisabledError("com")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
acts, err := s.comApi.SubscribeAction(ctx, comapi.SubscriptionFilter{
Type: filter.GetType(),
Share: filter.GetShare(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed subscribing Action: %v", err)
plog.Println(err)
return err
}
// Send header with custom metadata to signal acknowledgment of the DDA
// subscription. The gRPC client should await this acknowledgment to prevent
// race conditions with subsequent related publications that may be
// delivered and responded before this subscription is being fully
// established by the pub-sub server.
if err := stream.SendHeader(metadata_dda_suback); err != nil {
plog.Println(err)
return err
}
for {
select {
case act, ok := <-acts:
if !ok {
// End stream if channel has been closed by communication service.
return nil
}
cid := uuid.NewString()
s.mu.Lock()
s.actionCallbacks[cid] = act.Callback
s.mu.Unlock()
if err := stream.Send(&com.ActionCorrelated{
Action: &com.Action{
Type: act.Type,
Id: act.Id,
Source: act.Source,
Params: act.Params,
DataContentType: act.DataContentType,
},
CorrelationId: cid,
}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // Server streaming call canceled by client
return stream.Context().Err()
}
}
}
func (s *grpcServer) PublishActionResult(ctx context.Context, result *com.ActionResultCorrelated) (*com.Ack, error) {
if s.comApi == nil {
return nil, s.serviceDisabledError("com")
}
s.mu.RLock()
cb, ok := s.actionCallbacks[result.GetCorrelationId()]
s.mu.RUnlock()
if !ok {
err := status.Errorf(codes.InvalidArgument, "failed publishing ActionResult: unknown correlation id")
plog.Println(err)
return nil, err
}
res := result.GetResult()
if res.GetSequenceNumber() <= 0 {
s.mu.Lock()
delete(s.actionCallbacks, result.GetCorrelationId()) // no more results will follow
s.mu.Unlock()
}
if err := cb(comapi.ActionResult{
Context: res.GetContext(),
Data: res.GetData(),
DataContentType: res.GetDataContentType(),
SequenceNumber: res.GetSequenceNumber(),
}); err != nil {
err = status.Errorf(s.codeByError(err), "failed publishing ActionResult: %v", err)
plog.Println(err)
return nil, err
}
return &com.Ack{}, nil
}
func (s *grpcServer) PublishQuery(query *com.Query, stream com.ComService_PublishQueryServer) error {
if s.comApi == nil {
return s.serviceDisabledError("com")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
results, err := s.comApi.PublishQuery(ctx, comapi.Query{
Type: query.GetType(),
Id: query.GetId(),
Source: query.GetSource(),
Data: query.GetData(),
DataContentType: query.GetDataContentType(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed publishing Query: %v", err)
plog.Println(err)
return err
}
// Send header with custom metadata to signal acknowledgment of the DDA
// response subscription and the publication. The gRPC client may await this
// acknowledgment to prevent race conditions with subsequent related
// publications that may be delivered and responded on the response
// subscription before it is being fully established by the pub-sub server.
if err := stream.SendHeader(metadata_dda_suback); err != nil {
plog.Println(err)
return err
}
for {
select {
case res, ok := <-results:
if !ok {
// End stream if channel has been closed by communication service.
return nil
}
if err := stream.Send(&com.QueryResult{
Context: res.Context,
Data: res.Data,
DataContentType: res.DataContentType,
SequenceNumber: res.SequenceNumber,
}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}
func (s *grpcServer) SubscribeQuery(filter *com.SubscriptionFilter, stream com.ComService_SubscribeQueryServer) error {
if s.comApi == nil {
return s.serviceDisabledError("com")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
qrys, err := s.comApi.SubscribeQuery(ctx, comapi.SubscriptionFilter{
Type: filter.GetType(),
Share: filter.GetShare(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed subscribing Query: %v", err)
plog.Println(err)
return err
}
// Send header with custom metadata to signal acknowledgment of the DDA
// subscription. The gRPC client should await this acknowledgment to prevent
// race conditions with subsequent code that indirectly triggers a remote
// publication on the subscription which can be lost if the subscription has
// not yet been fully established by the pub-sub infrastructure.
if err := stream.SendHeader(metadata_dda_suback); err != nil {
plog.Println(err)
return err
}
for {
select {
case qry, ok := <-qrys:
if !ok {
// End stream if channel has been closed by communication service.
return nil
}
cid := uuid.NewString()
s.mu.Lock()
s.queryCallbacks[cid] = qry.Callback
s.mu.Unlock()
if err := stream.Send(&com.QueryCorrelated{
Query: &com.Query{
Type: qry.Type,
Id: qry.Id,
Source: qry.Source,
Data: qry.Data,
DataContentType: qry.DataContentType,
},
CorrelationId: cid,
}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}
func (s *grpcServer) PublishQueryResult(ctx context.Context, result *com.QueryResultCorrelated) (*com.Ack, error) {
if s.comApi == nil {
return nil, s.serviceDisabledError("com")
}
s.mu.RLock()
cb, ok := s.queryCallbacks[result.GetCorrelationId()]
s.mu.RUnlock()
if !ok {
err := status.Errorf(codes.InvalidArgument, "failed publishing QueryResult: unknown correlation id")
plog.Println(err)
return nil, err
}
res := result.GetResult()
if res.GetSequenceNumber() <= 0 {
s.mu.Lock()
delete(s.queryCallbacks, result.GetCorrelationId()) // no more results will follow
s.mu.Unlock()
}
if err := cb(comapi.QueryResult{
Context: res.GetContext(),
Data: res.GetData(),
DataContentType: res.GetDataContentType(),
SequenceNumber: res.GetSequenceNumber(),
}); err != nil {
err = status.Errorf(s.codeByError(err), "failed publishing QueryResult: %v", err)
plog.Println(err)
return nil, err
}
return &com.Ack{}, nil
}
// Store API
func (s *grpcServer) Get(ctx context.Context, key *store.Key) (*store.Value, error) {
if s.storeApi == nil {
return nil, s.serviceDisabledError("store")
}
val, err := s.storeApi.Get(key.GetKey())
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
if val == nil {
return &store.Value{}, nil // non-existing store key not explicit present
}
return &store.Value{Value: val}, nil
}
func (s *grpcServer) Set(ctx context.Context, kv *store.KeyValue) (*store.Ack, error) {
if s.storeApi == nil {
return nil, s.serviceDisabledError("store")
}
err := s.storeApi.Set(kv.GetKey(), kv.GetValue())
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &store.Ack{}, nil
}
func (s *grpcServer) Delete(ctx context.Context, key *store.Key) (*store.Ack, error) {
if s.storeApi == nil {
return nil, s.serviceDisabledError("store")
}
err := s.storeApi.Delete(key.GetKey())
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &store.Ack{}, nil
}
func (s *grpcServer) DeleteAll(ctx context.Context, p *store.DeleteAllParams) (*store.Ack, error) {
if s.storeApi == nil {
return nil, s.serviceDisabledError("store")
}
err := s.storeApi.DeleteAll()
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &store.Ack{}, nil
}
func (s *grpcServer) DeletePrefix(ctx context.Context, r *store.Key) (*store.Ack, error) {
if s.storeApi == nil {
return nil, s.serviceDisabledError("store")
}
err := s.storeApi.DeletePrefix(r.GetKey())
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &store.Ack{}, nil
}
func (s *grpcServer) DeleteRange(ctx context.Context, r *store.Range) (*store.Ack, error) {
if s.storeApi == nil {
return nil, s.serviceDisabledError("store")
}
err := s.storeApi.DeleteRange(r.GetStart(), r.GetEnd())
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &store.Ack{}, nil
}
func (s *grpcServer) ScanPrefix(key *store.Key, stream store.StoreService_ScanPrefixServer) error {
if s.storeApi == nil {
return s.serviceDisabledError("store")
}
err := s.storeApi.ScanPrefix(key.GetKey(), func(key string, value []byte) bool {
select {
case <-stream.Context().Done(): // stop scanning if server streaming call canceled by client
return false
default:
}
err := stream.Send(&store.KeyValue{
Key: key,
Value: value,
})
if err != nil {
plog.Println(err) // stop scanning on first failure
return false
}
return true
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
}
return err
}
func (s *grpcServer) ScanRange(r *store.Range, stream store.StoreService_ScanRangeServer) error {
if s.storeApi == nil {
return s.serviceDisabledError("store")
}
err := s.storeApi.ScanRange(r.GetStart(), r.GetEnd(), func(key string, value []byte) bool {
select {
case <-stream.Context().Done(): // stop scanning if server streaming call canceled by client
return false
default:
}
err := stream.Send(&store.KeyValue{
Key: key,
Value: value,
})
if err != nil {
plog.Println(err) // stop scanning on first failure
return false
}
return true
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
}
return err
}
// State API
func (s *grpcServer) ProposeInput(ctx context.Context, in *state.Input) (*state.Ack, error) {
if s.stateApi == nil {
return nil, s.serviceDisabledError("state")
}
err := s.stateApi.ProposeInput(ctx, &stateapi.Input{
Op: stateapi.InputOp(in.GetOp()),
Key: in.GetKey(),
Value: in.GetValue(),
})
if err != nil {
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return nil, err
}
return &state.Ack{}, nil
}
func (s *grpcServer) ObserveStateChange(p *state.ObserveStateChangeParams, stream state.StateService_ObserveStateChangeServer) error {
if s.stateApi == nil {
return s.serviceDisabledError("state")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, err := s.stateApi.ObserveStateChange(ctx)
if err != nil {
err = status.Errorf(s.codeByError(err), "failed observing state changes: %v", err)
plog.Println(err)
return err
}
for {
select {
case in, ok := <-ch:
if !ok {
// End stream if channel has been closed.
return nil
}
if err := stream.Send(&state.Input{Op: state.InputOperation(in.Op), Key: in.Key, Value: in.Value}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}
func (s *grpcServer) ObserveMembershipChange(p *state.ObserveMembershipChangeParams, stream state.StateService_ObserveMembershipChangeServer) error {
if s.stateApi == nil {
return s.serviceDisabledError("state")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch, err := s.stateApi.ObserveMembershipChange(ctx)
if err != nil {
err = status.Errorf(s.codeByError(err), "failed observing membership changes: %v", err)
plog.Println(err)
return err
}
for {
select {
case mc, ok := <-ch:
if !ok {
// End stream if channel has been closed.
return nil
}
if err := stream.Send(&state.MembershipChange{Id: mc.Id, Joined: mc.Joined}); err != nil {
// Do not return err, but keep stream alive for further transmissions.
plog.Println(err)
}
case <-stream.Context().Done(): // server streaming call canceled by client
return stream.Context().Err()
}
}
}
// Utils
func (s *grpcServer) serviceDisabledError(srv string) error {
err := services.RetryableErrorf("service %s is disabled", srv)
err = status.Errorf(s.codeByError(err), "failed: %v", err)
plog.Println(err)
return err
}
func (s *grpcServer) codeByError(err error) codes.Code {
if services.IsRetryable(err) {
return codes.Unavailable
} else {
return codes.InvalidArgument
}
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
package grpc
import (
"crypto/tls"
"fmt"
"net/http"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
rpcweb "github.com/improbable-eng/grpc-web/go/grpcweb"
rpc "google.golang.org/grpc"
)
// grpcWebServer realizes a gRCP-Web HTTP proxy server that routes incoming
// gRPC-Web requests to the DDA gRPC server.
type grpcWebServer struct {
httpSrv *http.Server
wrappedGrpc *rpcweb.WrappedGrpcServer
}
func (s *grpcWebServer) openWebServer(rs *rpc.Server, cfg *config.Config) error {
if cfg.Apis.GrpcWeb.Disabled {
return nil
}
// Set up gRPC-Web http server that wraps the gRPC server.
options := []rpcweb.Option{}
originFunc := s.makeHttpOriginFunc(cfg.Apis.GrpcWeb.AccessControlAllowOrigin)
if originFunc != nil {
options = append(options, rpcweb.WithOriginFunc(originFunc))
}
s.wrappedGrpc = rpcweb.WrapServer(rs, options...)
var tlsConfig *tls.Config
if cfg.Apis.Cert != "" && cfg.Apis.Key != "" {
cert, err := tls.LoadX509KeyPair(cfg.Apis.Cert, cfg.Apis.Key)
if err != nil {
return fmt.Errorf("invalid or missing PEM file in DDA configuration under 'apis.cert/.key' : %w", err)
}
tlsConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
Certificates: []tls.Certificate{cert},
ClientAuth: tls.NoClientCert,
}
}
webAddress := cfg.Apis.GrpcWeb.Address
s.httpSrv = &http.Server{
Addr: webAddress,
ErrorLog: plog.WithPrefix("http.Server "),
TLSConfig: tlsConfig,
ReadHeaderTimeout: 0, // no timeout to enable long-lived responses
ReadTimeout: 0, // no timeout to enable long-lived responses
WriteTimeout: 0, // no timeout to enable long-lived responses
IdleTimeout: 0, // no timeout waiting for next request with keep-live
}
s.httpSrv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
// plog.Printf("gRPC-Web ServeHTTP %+v\n", req)
s.wrappedGrpc.ServeHTTP(resp, req)
})
plog.Printf("Open gRPC-Web http server listening on address %s...\n", webAddress)
go func() {
if tlsConfig == nil {
if err := s.httpSrv.ListenAndServe(); err != http.ErrServerClosed {
plog.Printf("Unexpected gRPC-Web http server error: %v", err)
}
} else {
if err := s.httpSrv.ListenAndServeTLS("", ""); err != http.ErrServerClosed {
plog.Printf("Unexpected gRPC-Web https server error: %v", err)
}
}
}()
return nil
}
func (s *grpcWebServer) closeWebServer() {
s.wrappedGrpc = nil
if s.httpSrv != nil {
if err := s.httpSrv.Close(); err != nil {
plog.Printf("Closed gRPC-Web server with error: %v", err)
} else {
plog.Println("Closed gRPC-Web server")
}
s.httpSrv = nil
}
}
func (s *grpcWebServer) makeHttpOriginFunc(origins []string) func(string) bool {
if len(origins) == 0 {
return func(origin string) bool {
return true // all origins are allowed
}
}
originSet := make(map[string]struct{}, len(origins))
for _, origin := range origins {
originSet[origin] = struct{}{}
}
return func(origin string) bool {
_, ok := originSet[origin]
return ok
}
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package config defines DDA configuration types to be used programmatically or
// within a DDA configuration file in YAML format.
//
// Note that the fields of all configuration types are not documented in code
// but solely in the default [DDA YAML] configuration file located in the
// project root folder (single source of truth).
//
// [DDA YAML]: https://github.com/coatyio/dda/blob/main/dda.yaml
package config
import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"
"github.com/google/uuid"
"gopkg.in/yaml.v3"
)
const configSchemaVersion = "1"
var compliesWithNamingConvention = regexp.MustCompile("^[-_.0-9a-zA-Z]+$").MatchString
// Asserts that the given name conforms to the DDA naming convention for
// configuration items used as pub-sub topic fields. Such a name must be
// non-empty and only contain ASCII digits 0-9, lowercase letters a-z, uppercase
// letters A-Z, dot (.), hyphen (-), or underscore (_). Returns an error if
// assertion fails; nil otherwise.
//
// This function is intended to be used by communication binding
// implementations.
func ValidateName(name string, context ...string) error {
if !compliesWithNamingConvention(name) {
return fmt.Errorf(`invalid %s "%s": must only contain characters 0-9, a-z, A-Z, dot, hyphen, or underscore`, strings.Join(context, " "), name)
}
return nil
}
// Asserts that the given value is a non-empty string. Returns an error if
// assertion fails; nil otherwise. The given context strings are embedded into
// the error.
func ValidateNonEmpty(value string, context ...string) error {
if value == "" {
return fmt.Errorf(`invalid %s "%s": must not be empty`, strings.Join(context, " "), value)
}
return nil
}
// A Config represents the complete hierarchy of configuration parameters for
// all DDA services as nested struct types that map to the underlying YAML
// configuration schema. It should be created with New() or ReadConfig() to
// ensure all fields are correctly populated.
type Config struct {
Version string
Identity Identity
Cluster string
Apis ConfigApis
Services ConfigServices
}
// Verify asserts that the configuration version matches the supported
// configuration schema and that the Cluster name is valid. Returns an error
// otherwise.
func (c *Config) Verify() error {
if c.Version != configSchemaVersion {
return fmt.Errorf(`incompatible configuration version "%s", requiring version "%s"`, c.Version, configSchemaVersion)
}
if err := ValidateName(c.Cluster, "configuration cluster"); err != nil {
return err
}
return nil
}
// An Identity represents the unique identity of a DDA.
type Identity struct {
Name string
Id string
}
// ConfigApis comprises server-side configuration options of all public DDA
// Client APIs.
type ConfigApis struct {
Grpc ConfigApi
GrpcWeb ConfigWebApi `yaml:"grpc-web"`
Cert string
Key string
}
// A ConfigApi comprises server-side configuration options of a specific public
// DDA Client API.
type ConfigApi struct {
Address string
Disabled bool
Keepalive time.Duration
}
// A ConfigApi used by Web HTTP clients.
type ConfigWebApi struct {
Address string
Disabled bool
AccessControlAllowOrigin []string `yaml:"access-control-allow-origin"`
}
// ConfigServices provides configuration options for all peripheral DDA
// services.
type ConfigServices struct {
Com ConfigComService
Store ConfigStoreService
State ConfigStateService
}
// ConfigComService defines configuration options for a selected pub-sub
// messaging protocol.
type ConfigComService struct {
Protocol string
Url string
Auth AuthOptions
Opts map[string]any
Disabled bool
}
// AuthOptions defines authentication options for a selected pub-sub
// messaging protocol.
type AuthOptions struct {
Method string
Cert string
Key string
Verify bool
Username string
Password string
}
// ConfigStoreService provides configuration options for a selected local
// key-value storage.
type ConfigStoreService struct {
Engine string
Location string
Disabled bool
}
// ConfigStateService provides configuration options for a selected consensus
// protocol.
type ConfigStateService struct {
Protocol string
Store string
Bootstrap bool
Disabled bool
Opts map[string]any
}
// ReadConfig reads and parses the given DDA configuration file in YAML format
// and returns a *Config on success or an error, otherwise.
func ReadConfig(file string) (*Config, error) {
c, err := os.ReadFile(filepath.Clean(file))
if err != nil {
return nil, err
}
config := New()
err = yaml.Unmarshal(c, config)
if err != nil {
return nil, err
}
return config, nil
}
// New creates a Config struct pre-filled with default values as specified in
// the YAML DDA configuration format.
func New() *Config {
return &Config{
Version: configSchemaVersion,
Identity: Identity{
Name: "DDA",
Id: uuid.NewString(),
},
Cluster: "dda",
Apis: ConfigApis{
Grpc: ConfigApi{
Address: ":8900",
Disabled: false,
Keepalive: 2 * time.Hour,
},
GrpcWeb: ConfigWebApi{
Address: ":8800",
AccessControlAllowOrigin: nil,
Disabled: false,
},
Cert: "",
Key: "",
},
Services: ConfigServices{
Com: ConfigComService{
Protocol: "mqtt5",
Url: "",
Auth: AuthOptions{
Method: "none",
Cert: "",
Key: "",
Verify: true,
Username: "",
Password: "",
},
Opts: make(map[string]any),
Disabled: false,
},
Store: ConfigStoreService{
Engine: "pebble",
Location: "",
Disabled: true,
},
State: ConfigStateService{
Protocol: "raft",
Store: "",
Bootstrap: false,
Disabled: true,
Opts: make(map[string]any),
},
},
}
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package dda provides a ready-to-use Data Distribution Agent (DDA).
package dda
import (
"fmt"
"time"
"github.com/coatyio/dda/apis"
"github.com/coatyio/dda/apis/grpc"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services/com"
comapi "github.com/coatyio/dda/services/com/api"
"github.com/coatyio/dda/services/state"
stateapi "github.com/coatyio/dda/services/state/api"
"github.com/coatyio/dda/services/store"
storeapi "github.com/coatyio/dda/services/store/api"
)
// comApi is a non-exposed type alias for the communication API interface.
type comApi = comapi.Api
// storeApi is a non-exposed type alias for the local storage API interface.
type storeApi = storeapi.Api
// storeApi is a non-exposed type alias for the local storage API interface.
type stateApi = stateapi.Api
// Dda represents a Data Distribution Agent with peripheral services and public
// client APIs. It must be created with New() to ensure that all services and
// APIs are correctly initialized.
type Dda struct {
cfg *config.Config // agent configuration
comApi // Communication API
storeApi // Local storage API
stateApi // State synchronization API
grpcServer apis.ApiServer
}
// New creates a *Dda structure with DDA services and APIs initialized from the
// given configuration. An error is returned if the given configuration is
// invalid or if one of the DDA services or APIs cannot be initialized.
//
// To start the initialized DDA services and APIs invoke Open on the returned
// *Dda structure.
func New(cfg *config.Config) (*Dda, error) {
if err := cfg.Verify(); err != nil {
return nil, err
}
config := *cfg // copy to not mutate passed in config
dda := Dda{cfg: &config}
if !config.Services.Com.Disabled {
comApi, err := com.New(config.Services.Com.Protocol)
if err != nil {
return nil, err
} else {
dda.comApi = *comApi
}
}
if !config.Services.Store.Disabled {
storeApi, err := store.New(config.Services.Store.Engine)
if err != nil {
return nil, err
} else {
dda.storeApi = *storeApi
}
}
if !config.Services.State.Disabled {
if config.Services.Com.Disabled {
return nil, fmt.Errorf("Dda cannot be created: state service requires com service to be enabled")
}
stateApi, err := state.New(config.Services.State.Protocol)
if err != nil {
return nil, err
} else {
dda.stateApi = *stateApi
}
}
if !config.Apis.Grpc.Disabled {
dda.grpcServer = grpc.New(dda.comApi, dda.storeApi, dda.stateApi)
}
plog.Printf("Created DDA %+v", dda.Identity())
return &dda, nil
}
// Identity gets the Identity of the DDA.
func (d *Dda) Identity() config.Identity {
return d.cfg.Identity
}
// Open starts all configured DDA services and APIs and blocks waiting for them
// to be ready for use. An error is returned if some DDA services or APIs cannot
// be started, or if the given timeout elapses before setup of a single service
// or API completes. Specify a zero timeout to disable preliminary timeout
// behavior.
func (d *Dda) Open(timeout time.Duration) error {
if d.comApi != nil {
if err := <-d.comApi.Open(d.cfg, timeout); err != nil {
return err
}
}
if d.storeApi != nil {
if err := d.storeApi.Open(d.cfg); err != nil {
return err
}
}
if d.stateApi != nil {
if err := d.stateApi.Open(d.cfg, d.comApi); err != nil {
return err
}
}
if d.grpcServer != nil {
if err := d.grpcServer.Open(d.cfg); err != nil {
return err
}
}
plog.Printf("Opened DDA %+v", d.Identity())
return nil
}
// Close synchronously shuts down all configured DDA services and APIs
// gracefully and releases associated resources.
func (d *Dda) Close() {
if d.grpcServer != nil {
d.grpcServer.Close()
}
if d.stateApi != nil {
d.stateApi.Close()
}
if d.storeApi != nil {
d.storeApi.Close()
}
if d.comApi != nil { // finally
<-d.comApi.Close()
}
plog.Printf("Closed DDA %+v", d.Identity())
}
//go:build testing
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
package dda
// This file provides getters to access non-exposed Dda fields for testing.
import (
comapi "github.com/coatyio/dda/services/com/api"
)
// ComApi gets the communication API of a Dda. Accessible for testing only.
func (d *Dda) ComApi() comapi.Api {
return d.comApi
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package api provides the communication API for data-centric message-driven
// communication among decoupled DDAs. The communication API provides
// data-centric communication patterns for one-way and two-way communication
// based on the publish-subscribe messaging paradigm. The communication API is
// implemented by communication protocol bindings that use specific underlying
// pub-sub messaging transport protocols, like MQTT, Zenoh, Kafka, and others.
// Structured Event, Action, and Query data is described in a common way
// adhering closely to the CloudEvents specification.
//
// In addition, package api also provides common functionality to be reused by
// communication binding implementations. For example, type Router and related
// types may be used to manage subscription filters and to lookup associated
// handlers.
package api
import (
"context"
"fmt"
"time"
"github.com/coatyio/dda/config"
)
// A Scope identifies one of the supported DDA services that uses pub-sub
// communication. Scope is used as part of publications and corresponding
// subscription filters to support isolated routing of service specific
// messages.
type Scope string
const (
ScopeDef Scope = "" // Default scope (ScopeCom)
ScopeCom Scope = "com" // Communication scope
ScopeState Scope = "sta" // Distributed state synchronization scope
ScopeSideChannel Scope = "sdc" // Side channel scope
)
// ToScope converts a Scope string to a Scope.
func ToScope(scope string) (Scope, error) {
switch scope {
case string(ScopeDef):
return ScopeCom, nil
case string(ScopeCom):
return ScopeCom, nil
case string(ScopeState):
return ScopeState, nil
case string(ScopeSideChannel):
return ScopeSideChannel, nil
default:
return "", fmt.Errorf("unknown scope %s", scope)
}
}
// Api is an interface for data-centric message-driven communication among
// decoupled DDAs. The communication API provides data-centric patterns for
// one-way and two-way communication based on the publish-subscribe messaging
// paradigm. The communication API is implemented by communication protocol
// bindings that use specific underlying pub-sub messaging transport protocols,
// like MQTT, Zenoh, Kafka, and others. Structured Event, Action, and Query data
// is described in a common way adhering closely to the CloudEvents
// specification.
//
// Note that Api implementations are meant to be thread-safe and individual Api
// interface methods may be run on concurrent goroutines.
type Api interface {
// Open asynchronously connects to the communication network of the
// underlying protocol binding with the supplied DDA configuration.
//
// Upon successful connection or if the binding has been opened already, the
// returned error channel yields nil and is closed. If connection fails
// eventually, or if the given timeout elapses before connection is up, the
// returned error channel yields an error and is closed. Specify a zero
// timeout to disable preliminary timeout behavior.
Open(cfg *config.Config, timeout time.Duration) <-chan error
// Close asynchronously disconnects from the communication network of the
// underlying protocol binding previously opened.
//
// The returned done channel is closed upon disconnection or if the binding
// is not yet open.
Close() (done <-chan struct{})
// PublishEvent transmits the given Event with the given optional scope.
//
// PublishEvent blocks waiting for an acknowledgment, if configured
// accordingly. An error is returned if the Event cannot be transmitted, if
// acknowledgement times out, or if the binding is not yet open.
PublishEvent(event Event, scope ...Scope) error
// SubscribeEvent receives Events published on the specified subscription
// filter.
//
// SubscribeEvent blocks waiting for an acknowledgment, if configured
// accordingly. An error is returned along with a nil channel if
// subscription fails, if acknowledgement times out, or if the binding is
// not yet open.
//
// To unsubscribe from receiving events, cancel the given context which
// causes the close of the events channel asynchronously.
SubscribeEvent(ctx context.Context, filter SubscriptionFilter) (events <-chan Event, err error)
// PublishAction transmits the given Action with the given optional scope,
// receiving ActionResults through the returned buffered results channel.
//
// PublishAction blocks waiting for acknowledgments, if configured
// accordingly. An error is returned along with a nil channel if Action
// cannot be transmitted, if acknowledgement times out, if ActionResults
// cannot be received, or if the binding is not yet open.
//
// To unsubscribe from receiving results, cancel the given context which
// causes the close of the results channel asynchronously.
PublishAction(ctx context.Context, action Action, scope ...Scope) (results <-chan ActionResult, err error)
// SubscribeAction receives Actions published on the specified subscription
// filter, and provides a callback to be invoked to transmit back
// ActionResults.
//
// SubscribeAction blocks waiting for an acknowledgment, if configured
// accordingly. An error is returned along with a nil channel if
// subscription fails, if acknowledgement times out, or if the binding is
// not yet open.
//
// To unsubscribe from receiving actions, cancel the given context which
// causes the close of the actions channel asynchronously.
SubscribeAction(ctx context.Context, filter SubscriptionFilter) (actions <-chan ActionWithCallback, err error)
// PublishQuery transmits the given Query with the given optional scope,
// receiving QueryResults through the returned buffered results channel.
//
// PublishQuery blocks waiting for acknowledgments, if configured
// accordingly. An error is returned along with a nil channel if Query
// cannot be transmitted, if acknowledgement times out, if QueryResults
// cannot be received, or if the binding is not yet open.
//
// To unsubscribe from receiving results, cancel the given context which
// causes the close of the results channel asynchronously.
PublishQuery(ctx context.Context, query Query, scope ...Scope) (results <-chan QueryResult, err error)
// SubscribeQuery receives Querys published on the specified subscription
// filter, and provides a callback to be invoked to transmit back
// QueryResults.
//
// SubscribeQuery blocks waiting for an acknowledgment, if configured
// accordingly. An error is returned along with a nil channel if
// subscription fails, if acknowledgement times out, or if the binding is
// not yet open.
//
// To unsubscribe from receiving queries, cancel the given context which
// causes the close of the queries channel asynchronously.
SubscribeQuery(ctx context.Context, filter SubscriptionFilter) (queries <-chan QueryWithCallback, err error)
}
// A SubscriptionFilter defines the context that determines which publications
// should be transmitted to a subscriber.
type SubscriptionFilter struct {
// Scope of the Event, Action, or Query with respect to the DDA service that
// triggers it (optional).
//
// If not present or an empty string, the default scope "com" is used.
Scope Scope
// Type of Event, Action, or Query to be filtered (required).
//
// Must be a non-empty string consisting of lower-case ASCII letters ('a' to
// 'z'), upper-case ASCII letters ('A' to 'Z'), ASCII digits ('0' to '9'),
// ASCII dot ('.'), ASCII hyphen (-), or ASCII underscore (_).
Type string
// Name to be used for a shared subscription (optional).
//
// A shared subscription is not routed to all subscribers specifying the
// same Scope, Type, and Share, but only to one of these. Shared
// subscriptions may be used to load balance published tasks so as to
// distribute workload evenly among a set of subscribers. Another use case
// is high availability through redundancy where a secondary subscribers
// takes over published tasks if the primary subscriber is no longer
// reachable (hot standby). Typically, shared subscriptions are used with
// the Action pattern.
//
// A published Event, Action, or Query is matching a shared subscription if
// it provides the same Scope and Type. If multiple shared subscriptions
// with different Share names but the same Scope and Type match such a
// publication, it will be routed to one (and only one) in each Share group.
//
// If non-empty, must consist of lower-case ASCII letters ('a' to 'z'),
// upper-case ASCII letters ('A' to 'Z'), ASCII digits ('0' to '9'), ASCII
// dot ('.'), ASCII hyphen (-), or ASCII underscore (_).
//
// If not present or an empty string, the related subscription is not
// shared.
Share string
}
// Event is a structure expressing an occurrence and its context. An event may
// occur due to a raised or observed signal, a state change, an elapsed timer,
// an observed or taken measurement, or any other announcement or activity. An
// Event is routed from an event producer (source) to interested event consumers
// using pub-sub messaging.
type Event struct {
// Type of event related to the originating occurrence (required).
//
// Type is used as a subscription filter for routing the event to consumers
// via pub-sub messaging. Must be a non-empty string consisting of
// lower-case ASCII letters ('a' to 'z'), upper-case ASCII letters ('A' to
// 'Z'), ASCII digits ('0' to '9'), ASCII dot ('.'), ASCII hyphen (-), or
// ASCII underscore (_).
//
// Follow a consistent naming convention for types throughout an application
// to avoid naming collisions. For example, Type could use Reverse Domain
// Name Notation (com.mycompany.myapp.mytype) or some other hierarchical
// naming pattern with some levels in the hierarchy separated by dots,
// hyphens, or underscores.
Type string
// Identifies the event (required).
//
// Id must be non-empty and unique within the scope of the producer.
// Producers must ensure that (Source, Id) is unique for each distinct
// event. Consumers may assume that events with identical Source and Id are
// duplicates.
//
// Typically, Id is a UUID or a counter maintained by the producer.
Id string
// Identifies the context in which the event occurred (required).
//
// An event source is defined by the event producer. Producers must ensure
// that (Source, Id) is unique for each distinct event. Source must be
// non-empty.
//
// Typically, Source may be a URI describing the organization publishing the
// event or the process that generates the event.
Source string
// Timestamp when the occurrence happened or when the event data has been
// generated (optional).
//
// If present, must adhere to the format specified in [RFC 3339]. An empty
// string value indicates that a timestamp is not available or needed.
//
// [RFC 3339]: https://www.rfc-editor.org/rfc/rfc3339
Time string
// Domain-specific payload information about the occurrence (required).
//
// Encoding and decoding of the transmitted binary data is left to the user
// of the API interface. Any binary serialization format can be used.
Data []byte
// Content type of Data value (optional).
//
// If present, it must adhere to the format specified in [RFC 2046]. An
// empty string value indicates that a content type is implied by the
// application.
//
// [RFC 2046]: https://www.rfc-editor.org/rfc/rfc2046
DataContentType string
}
// Action is a structure expressing an action, command, or operation to be
// carried out by interested action consumers. An Action is routed from an
// action invoker to interested action consumers using pub-sub messaging.
type Action struct {
// Type of action, command or operation to be performed (required).
//
// Type is used as a subscription filter for routing the action to consumers
// via pub-sub messaging. Must be a non-empty string consisting of
// lower-case ASCII letters ('a' to 'z'), upper-case ASCII letters ('A' to
// 'Z'), ASCII digits ('0' to '9'), ASCII dot ('.'), ASCII hyphen (-), or
// ASCII underscore (_).
//
// Follow a consistent naming convention for types throughout an application
// to avoid naming collisions. For example, Type could use Reverse Domain
// Name Notation (com.mycompany.myapp.mytype) or some other hierarchical
// naming pattern with some levels in the hierarchy separated by dots,
// hyphens, or underscores.
Type string
// Identifies the action (required).
//
// Id must be non-empty and unique within the scope of the action invoker.
// Invokers must ensure that (Source, Id) is unique for each distinct
// action. Consumers may assume that actions with identical Source and Id
// are duplicates.
//
// Typically, Id is a UUID or a counter maintained by the invoker.
Id string
// Identifies the context in which the action is invoked (required).
//
// An action source is defined by the action invoker. Invokers must ensure
// that (Source, Id) is unique for each distinct action. Source must be
// non-empty.
//
// Typically, Source may be a URI describing the organization publishing the
// action or the process that invokes the action.
Source string
// Data describing the parameters of the action (optional).
//
// Encoding and decoding of the transmitted binary data is left to the user
// of the API interface. Any binary serialization format can be used.
Params []byte
// Content type of Params value (optional).
//
// If present, it must adhere to the format specified in [RFC 2046]. An
// empty string value indicates that a content type is implied by the
// application.
//
// [RFC 2046]: https://www.rfc-editor.org/rfc/rfc2046
DataContentType string
}
// ActionResult is a structure containing resulting information returned to the
// invoker of an Action. Each interested action consumer may transmit its own
// action result(s) independently of the others. Multiple ActionResults over
// time may be generated by a consumer for a single Action to transmit
// progressive series of results.
type ActionResult struct {
// Identifies the context, in which the action is executed (required).
//
// Typically, Context may be a URI describing the organization consuming the
// action or the process that carries out the action.
Context string
// Resulting data to be returned to the action invoker (required).
//
// Note that errors occurring while processing an action are also encoded as
// result binary data in a domain-specific way.
//
// Encoding and decoding of the transmitted binary data is left to the user
// of the API interface. Any binary serialization format can be used.
Data []byte
// Content type of Data value (optional).
//
// If present, it must adhere to the format specified in [RFC 2046]. An
// empty string value indicates that a content type is implied by the
// application.
//
// [RFC 2046]: https://www.rfc-editor.org/rfc/rfc2046
DataContentType string
// The sequence number of a multi-result response (required for progressive
// responses only).
//
// A zero value or -1 indicates a single result. If multiple ActionResults
// are to be returned, the sequence number is 1 for the first result and
// incremented by one with each newly generated result. If sequence number
// overflows its maximum value 9223372036854775807, the next value should
// revert to 1. A final result should be indicated by using the additive
// inverse of the generated sequence number.
//
// A zero or negative sequence number indicates that no more results will be
// published for the correlated action after the given one.
SequenceNumber int64
}
// ActionWithCallback embeds an Action with an associated callback function to
// be invoked whenever an ActionResult should be transmitted back to the
// publisher of the Action.
type ActionWithCallback struct {
// Action associated with response callback function.
Action
// Callback when invoked transmits an ActionResult to the publisher of the
// correlated Action.
//
// An error is returned if ActionResult cannot be transmitted, or if the
// binding is not yet opened.
Callback ActionCallback
}
// ActionCallback is invoked by subscribers to transmit an ActionResult back to
// the publisher.
type ActionCallback func(result ActionResult) error
// Query is a structure expressing a query to be answered by interested query
// consumers. A Query is routed from a querier to interested query consumers
// using pub-sub messaging.
type Query struct {
// Type of query indicating intent or desired result (required).
//
// Type is used as a subscription filter for routing the query to consumers
// via pub-sub messaging. Must be a non-empty string consisting of
// lower-case ASCII letters ('a' to 'z'), upper-case ASCII letters ('A' to
// 'Z'), ASCII digits ('0' to '9'), ASCII dot ('.'), ASCII hyphen (-), or
// ASCII underscore (_).
//
// Follow a consistent naming convention for types throughout an application
// to avoid naming collisions. For example, Type could use Reverse Domain
// Name Notation (com.mycompany.myapp.mytype) or some other hierarchical
// naming pattern with some levels in the hierarchy separated by dots,
// hyphens, or underscores.
Type string
// Identifies the query (required).
//
// Id must be non-empty and unique within the scope of the querier. Queriers
// must ensure that (Source, Id) is unique for each distinct query.
// Consumers may assume that queries with identical Source and Id are
// duplicates.
//
// Typically, Id is a UUID or a counter maintained by the querier.
Id string
// Identifies the context in which the query is posed (required).
//
// A query source is defined by the querier. Queriers must ensure that
// (Source, Id) is unique for each distinct query. Source must be non-empty.
//
// Typically, Source may be a URI describing the organization publishing the
// query or the process that poses the query.
Source string
// Query data represented as indicated by Format (required).
//
// Encoding and decoding of the transmitted binary data is left to the user
// of the API interface. Any binary serialization format can be used.
Data []byte
// Content type of Data value (optional).
//
// If present, it must adhere to the format specified in [RFC 2046]. An
// empty string value indicates that a content type is implied by the
// application.
//
// The context type should represent the query language/format. For example,
// a GraphQL query should use "application/graphql" and a SPARQL query
// should use "application/sparql-query".
//
// [RFC 2046]: https://www.rfc-editor.org/rfc/rfc2046
DataContentType string
}
// QueryResult is a structure containing resulting information returned to the
// querier. Each interested query consumer may transmit its own query result(s)
// independently of the others. Multiple QueryResults over time may be generated
// by a consumer for a single Query to transmit live query results whenever the
// query yields new results due to update operations on the database.
type QueryResult struct {
// Identifies the context, in which the query is executed (required).
//
// Typically, Context may be a URI describing the organization consuming the
// query or the process that retrieves query result data.
Context string
// Query result data returned to the querier (required).
//
// Encoding and decoding of the transmitted binary data is left to the user
// of the API interface. Any binary serialization format can be used.
Data []byte
// Content type of Data value (optional).
//
// If present, it must adhere to the format specified in [RFC 2046]. An
// empty string value indicates that a content type is implied by the
// application.
//
// If present, use MIME Content Types to specify the query result format.
// For example, use "application/sql" for a SQL query result,
// "application/graphql" for a GraphQL query result,
// "application/sparql-results+json" for a SPARQL query result encoded in
// JSON.
//
// [RFC 2046]: https://www.rfc-editor.org/rfc/rfc2046
DataContentType string
// The sequence number of a multi-result live query (required for live query
// responses only).
//
// A zero value or -1 indicates a single result. If multiple QueryResults
// are to be returned, the sequence number is 1 for the first result and
// incremented by one with each newly generated result. If sequence number
// overflows its maximum value 9223372036854775807, the next value should
// revert to 1. A final result should be indicated by using the additive
// inverse of the generated sequence number.
//
// A zero or negative sequence number indicates that no more results will be
// published for the correlated action after the given one.
SequenceNumber int64
}
// QueryWithCallback embeds a Query with an associated callback function to be
// invoked whenever a QueryResult should be transmitted back to the publisher of
// the Query.
type QueryWithCallback struct {
// Query associated with response callback function.
Query
// Callback when invoked transmits a QueryResult to the publisher of the
// correlated Query.
//
// An error is returned if QueryResult cannot be transmitted, or if the
// binding is not yet opened.
Callback QueryCallback
}
// QueryCallback is invoked by subscribers to transmit a QueryResult back to the
// publisher.
type QueryCallback func(result QueryResult) error
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
package api
import (
"context"
"sync"
)
const (
receiveBufferSize = 1 // default buffer size for receive channels
)
// Routable defines a union of communication pattern types that can be routed,
// i.e. dispatched to either a subscriber or a publisher awaiting incoming
// responses.
//
// This type is intended to be used by communication binding implementations.
type Routable interface {
Event | ActionWithCallback | ActionResult | QueryWithCallback | QueryResult
}
// RouteFilter defines a subscription filter for a specific topic with a
// correlation ID for response topics.
//
// This type is intended to be used by communication binding implementations.
type RouteFilter[T comparable] struct {
Topic T // subscription topic
CorrelationId T // unique correlation ID for response topic only
}
// RouteChannel is a struct representing the receive channel and the unsubscribe
// function of a Routable type.
//
// This type is intended to be used by communication binding implementations.
type RouteChannel[R Routable, T comparable] struct {
// Channel on which incoming routable data is received.
ReceiveChan chan R
// Done channel of the originating context. Function invoked to signal that
// the subscriber is no longer interested in receiving messages over
// ReceiveChan. May be invoked multiple times and simultaneously but only
// the first call will close the receive channel and unsubscribe on the
// communication binding if necessary.
CtxDone <-chan struct{}
correlationId T // Correlation id of response channel, if present
unsub ComBindingFunc[T] // binding-specific unsubscribe function
}
// ComBindingFunc subscribes, publishes, or unsubscribes a topic of type T
// (captured by the function) on a pub-sub communication binding.
//
// This type is intended to be used by communication binding implementations.
type ComBindingFunc[T comparable] func() error
// Router manages subscription-specific RouteFilters for a specific Routable
// type and dispatches incoming messages on the associated registered receive
// channels. It should be created with NewRouter() to ensure all internal fields
// are correctly populated. Router operations may be invoked concurrently.
//
// This type is intended to be used by communication binding implementations.
type Router[R Routable, T comparable] struct {
mu sync.RWMutex // protects field routes
routes map[T][]*RouteChannel[R, T] // maps route filters to associated route channels
}
// NewRouter creates a new *Router for the given Routable and RouteFilter type.
func NewRouter[R Routable, T comparable]() *Router[R, T] {
return &Router[R, T]{
routes: make(map[T][]*RouteChannel[R, T]),
}
}
// GetTopics returns all registered RouteFilter Topics in a slice.
func (r *Router[R, T]) GetTopics() []T {
r.mu.RLock()
defer r.mu.RUnlock()
topicSet := make(map[T]struct{}, len(r.routes))
for topic := range r.routes {
topicSet[topic] = struct{}{}
}
topics := make([]T, len(topicSet))
i := 0
for topic := range topicSet {
topics[i] = topic
i++
}
return topics
}
// Add creates a new RouteChannel of a specific Routable type and registers it
// with the given RouteFilter, if subscription by invoking the subscribe
// function and the publish function is successful. Returns the associated
// *RouteChannel with a receive channel of the given buffer size (if not
// present, defaults to 1), and an unsubscribe function to be invoked by the
// user to deregister the channel and stop receiving data over the channel. If
// subscription and/or publication fails, an error is returned instead.
//
// Note that the publish function should only be used in combination with
// registering response filters. Request filters should always specify a no-op
// publish function.
func (r *Router[R, T]) Add(ctx context.Context, filter RouteFilter[T], subscribe ComBindingFunc[T], publish ComBindingFunc[T], unsubscribe ComBindingFunc[T], bufferSize ...int) (*RouteChannel[R, T], error) {
r.mu.Lock()
defer r.mu.Unlock()
size := receiveBufferSize
if len(bufferSize) > 0 {
size = bufferSize[0]
}
recvChan := make(chan R, size)
rc := &RouteChannel[R, T]{ReceiveChan: recvChan, correlationId: filter.CorrelationId}
rc.unsub = unsubscribe
rc.CtxDone = ctx.Done()
if rcs, ok := r.routes[filter.Topic]; ok { // subscription already set up
if err := publish(); err != nil {
return nil, err
}
r.routes[filter.Topic] = append(rcs, rc)
context.AfterFunc(ctx, func() {
r.remove(filter, rc)
})
return rc, nil
} else { // subscription not yet set up
if err := subscribe(); err != nil {
return nil, err
}
if err := publish(); err != nil {
_ = unsubscribe() // try unsubscribe as subscription is unused
return nil, err
}
r.routes[filter.Topic] = []*RouteChannel[R, T]{rc}
context.AfterFunc(ctx, func() {
r.remove(filter, rc)
})
return rc, nil
}
}
// Dispatch sends the given incoming Routable message on all the registered
// RouteChannels of the associated RouteFilter.
//
// To be used with communication bindings that provide callback functions to
// handle incoming messages.
func (r *Router[R, T]) Dispatch(filter RouteFilter[T], message R) {
r.dispatch(filter, message)
}
// DispatchChan sends incoming Routable messages received on a channel on all
// the registered RouteChannels of the associated RouteFilter.
//
// To be used with communication bindings that provide channels to receive
// incoming messages.
func (r *Router[R, T]) DispatchChan(filter RouteFilter[T], messages <-chan R) {
for msg := range messages {
r.dispatch(filter, msg)
}
}
func (r *Router[R, T]) dispatch(filter RouteFilter[T], message R) {
r.mu.RLock()
defer r.mu.RUnlock()
dispatch := func(rc *RouteChannel[R, T], msg R) {
// Make sure no more messages are dispatched when associated context has
// been canceled/timed out but route channel has not yet been removed by
// context's AfterFunc.
select {
case <-rc.CtxDone:
return
default:
}
select {
case <-rc.CtxDone:
return
case rc.ReceiveChan <- message:
}
}
zerocid := *new(T)
cid := filter.CorrelationId
if rcs, ok := r.routes[filter.Topic]; ok {
if cid == zerocid {
// Dispatch incoming request to all non-response route channels.
for _, rc := range rcs {
if rc.correlationId == zerocid {
dispatch(rc, message)
}
}
} else {
// Dispatch incoming response to correlated route channel only.
for _, rc := range rcs {
if cid == rc.correlationId {
dispatch(rc, message)
}
}
}
}
}
func (r *Router[R, T]) remove(filter RouteFilter[T], routeChannel *RouteChannel[R, T]) {
r.mu.Lock()
defer r.mu.Unlock()
if rcs, ok := r.routes[filter.Topic]; ok {
found := false
l := len(rcs)
for i := 0; i < l; i++ {
if found {
rcs[i-1] = rcs[i]
} else if rcs[i] == routeChannel {
found = true
}
}
if found {
defer close(routeChannel.ReceiveChan)
rcs[l-1] = nil
if l == 1 {
if err := routeChannel.unsub(); err != nil {
// As binding subscription is kept, also keep topic without
// route channel to prevent a subsequent resubscription from
// failing.
r.routes[filter.Topic] = rcs[:0]
return
}
delete(r.routes, filter.Topic)
return
}
r.routes[filter.Topic] = rcs[:l-1]
return
}
}
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package com provides a factory function that creates a specific communication
// binding from a given communication protocol.
package com
import (
"fmt"
"github.com/coatyio/dda/services/com/api"
"github.com/coatyio/dda/services/com/mqtt5"
)
// New creates and initializes a new protocol-specific communication binding as
// configured by the given communication protocol.
//
// Returns the new communication binding as a *Api interface. An error is
// returned if the given communication protocol is not supported.
func New(protocol string) (*api.Api, error) {
var api api.Api
switch protocol {
case "mqtt5":
api = &mqtt5.Mqtt5Binding{}
default:
// TODO Whensoever Go plugin mechanism is really cross platform, use it
// to look up communication bindings that are provided externally.
return nil, fmt.Errorf("communication protocol %s: not supported", protocol)
}
return &api, nil
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package mqtt5 provides a communication protocol binding implementation using
// the [MQTT v5] pub-sub messaging protocol.
//
// [MQTT v5]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
package mqtt5
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services"
"github.com/coatyio/dda/services/com/api"
"github.com/eclipse/paho.golang/autopaho"
"github.com/eclipse/paho.golang/paho"
"github.com/google/uuid"
)
const (
disconnectTimeout = 1 * time.Second // time to wait for disconnect to complete
pubSubAckTimeout = 1 * time.Second // time to wait for acknowledgments with QoS 1 and 2
)
const (
levelShare = "$share" // Shared topic prefix
levelEvent = "evt" // Event topic level
levelAction = "act" // Action topic level
levelActionResult = "acr" // ActionResult topic level
levelQuery = "qry" // Query topic level
levelQueryResult = "qrr" // QueryResult topic level
)
const (
userPropId = "id" // user property Id
userPropSource = "sr" // user property Source
userPropTime = "tm" // user property Time
userPropContext = "ct" // user property Context
userPropSequenceNumber = "sq" // user property SequenceNumber
userPropDataContentType = "dt" // user property DataContentType
)
var strictClientIdRegex = regexp.MustCompile("[^0-9a-zA-Z]")
type mqttRouteFilter = api.RouteFilter[string]
// Mqtt5Binding realizes a communication protocol binding for [MQTT v5] by
// implementing the communication API interface [api.Api].
//
// [MQTT v5]: https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
type Mqtt5Binding struct {
mu sync.RWMutex // protects following fields
eventRouter *api.Router[api.Event, string]
actionRouter *api.Router[api.ActionWithCallback, string]
actionResultRouter *api.Router[api.ActionResult, string]
queryRouter *api.Router[api.QueryWithCallback, string]
queryResultRouter *api.Router[api.QueryResult, string]
clientId string
conn *autopaho.ConnectionManager
qos byte // used for all publications and subscriptions
noLocal bool // used for all subscriptions
cluster string // used as topic root field
responseId string // unique ID in response topics
sharedSubAvailable bool // are shared subscriptions supported by broker
}
// ClientId returns the MQTT client ID used to connect to the broker (exposed
// for testing purposes).
func (b *Mqtt5Binding) ClientId() string {
return b.clientId
}
// Open implements the [api.Api] interface.
func (b *Mqtt5Binding) Open(cfg *config.Config, timeout time.Duration) <-chan error {
ch := make(chan error, 1)
defer close(ch)
b.mu.Lock()
defer b.mu.Unlock()
if b.conn != nil {
ch <- nil
return ch
}
b.clientId = b.getClientId(cfg)
b.cluster = cfg.Cluster
b.responseId = cfg.Identity.Id
b.noLocal = cfg.Services.Com.Opts["noLocal"] == true
b.eventRouter = api.NewRouter[api.Event, string]()
b.actionRouter = api.NewRouter[api.ActionWithCallback, string]()
b.actionResultRouter = api.NewRouter[api.ActionResult, string]()
b.queryRouter = api.NewRouter[api.QueryWithCallback, string]()
b.queryResultRouter = api.NewRouter[api.QueryResult, string]()
connackChan := make(chan *paho.Connack, 1)
ccfg, err := b.getClientConfig(cfg, connackChan)
if err != nil {
ch <- err
return ch
}
plog.Printf("Open MQTT5 communication binding connecting to %s...\n", ccfg.BrokerUrls[0])
// Note that we cannot use a context.WithTimeout for NewConnection as
// autopaho disconnects as soon as the passed context is canceled, i.e. when
// the timeout elapses, even if the connection is already up!
//
// Note that NewConnection never returns an error in the currently used
// implementation.
conn, err := autopaho.NewConnection(context.Background(), *ccfg)
if err != nil {
ch <- err
return ch
}
// If a broker URL has an unsupported schema or if broker connection cannot
// be established paho.golang simply tries the next URL, endlessly. In this
// case AwaitConnection will block until the passed timeout elapses.
ctx := context.Background()
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
if err := conn.AwaitConnection(ctx); err != nil {
_ = conn.Disconnect(context.Background())
ch <- services.NewRetryableError(err)
return ch
}
b.sharedSubAvailable = (<-connackChan).Properties.SharedSubAvailable
b.conn = conn
return ch
}
// Close implements the [api.Api] interface.
func (b *Mqtt5Binding) Close() (done <-chan struct{}) {
ch := make(chan struct{})
defer close(ch)
b.mu.Lock()
defer b.mu.Unlock()
if b.conn == nil {
return ch
}
ctx, cancel := context.WithTimeout(context.Background(), disconnectTimeout)
defer cancel()
_ = b.conn.Disconnect(ctx)
b.conn = nil
plog.Printf("Closed MQTT5 communication binding\n")
return ch
}
// PublishEvent implements the [api.Api] interface.
func (b *Mqtt5Binding) PublishEvent(event api.Event, scope ...api.Scope) error {
if err := b.validatePatternTypeIdSource("Event", event.Type, event.Id, event.Source); err != nil {
return err
}
scp := api.ScopeDef
if len(scope) > 0 {
scp = scope[0]
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return fmt.Errorf("PublishEvent %+v failed as binding is not yet open", event)
}
topic, _ := b.topicWithLevels("", scp, levelEvent, event.Type)
return b.publish(topic, event.Data, paho.UserProperties{
{Key: userPropId, Value: event.Id},
{Key: userPropSource, Value: event.Source},
{Key: userPropTime, Value: event.Time},
{Key: userPropDataContentType, Value: event.DataContentType},
}, "", nil)
}
// SubscribeEvent implements the [api.Api] interface.
func (b *Mqtt5Binding) SubscribeEvent(ctx context.Context, filter api.SubscriptionFilter) (events <-chan api.Event, err error) {
if err := b.validatePatternFilter("Event", filter); err != nil {
return nil, err
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return nil, fmt.Errorf("SubscribeEvent failed as binding is not yet open")
}
pubTopic, subTopic := b.topicWithLevels(filter.Share, filter.Scope, levelEvent, filter.Type)
routeFilter := mqttRouteFilter{Topic: pubTopic}
rc, err := b.eventRouter.Add(ctx, routeFilter,
func() error { return b.subscribe(subTopic) },
func() error { return nil },
func() error { return b.unsubscribe(subTopic) },
)
if err != nil {
return nil, err
}
return rc.ReceiveChan, nil
}
// PublishAction implements the [api.Api] interface.
func (b *Mqtt5Binding) PublishAction(ctx context.Context, action api.Action, scope ...api.Scope) (results <-chan api.ActionResult, err error) {
if err := b.validatePatternTypeIdSource("Action", action.Type, action.Id, action.Source); err != nil {
return nil, err
}
scp := api.ScopeDef
if len(scope) > 0 {
scp = scope[0]
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return nil, fmt.Errorf("PublishAction %+v failed as binding is not yet open", action)
}
pubTopic, _ := b.topicWithLevels("", scp, levelAction, action.Type)
pubResTopic, _ := b.topicWithLevels("", scp, levelActionResult, action.Type)
responseTopic, correlationId := b.responseInfo(pubResTopic)
routeFilter := mqttRouteFilter{Topic: responseTopic, CorrelationId: correlationId}
rc, err := b.actionResultRouter.Add(ctx, routeFilter,
func() error { return b.subscribe(responseTopic) },
func() error {
return b.publish(pubTopic, action.Params, paho.UserProperties{
{Key: userPropId, Value: action.Id},
{Key: userPropSource, Value: action.Source},
{Key: userPropDataContentType, Value: action.DataContentType},
}, responseTopic, []byte(correlationId))
},
func() error { return b.unsubscribe(responseTopic) },
)
if err != nil {
return nil, err
}
return rc.ReceiveChan, nil
}
func (b *Mqtt5Binding) publishActionResult(result api.ActionResult, responseTopic string, correlationId []byte) error {
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return fmt.Errorf("publishActionResult %+v failed as binding is not yet open", result)
}
return b.publish(responseTopic, result.Data, paho.UserProperties{
{Key: userPropContext, Value: result.Context},
{Key: userPropDataContentType, Value: result.DataContentType},
{Key: userPropSequenceNumber, Value: strconv.FormatInt(result.SequenceNumber, 10)},
}, "", []byte(correlationId))
}
// SubscribeAction implements the [api.Api] interface.
func (b *Mqtt5Binding) SubscribeAction(ctx context.Context, filter api.SubscriptionFilter) (actions <-chan api.ActionWithCallback, err error) {
if err := b.validatePatternFilter("Action", filter); err != nil {
return nil, err
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return nil, fmt.Errorf("SubscribeAction failed as binding is not yet open")
}
pubTopic, subTopic := b.topicWithLevels(filter.Share, filter.Scope, levelAction, filter.Type)
routeFilter := mqttRouteFilter{Topic: pubTopic}
rc, err := b.actionRouter.Add(ctx, routeFilter,
func() error { return b.subscribe(subTopic) },
func() error { return nil },
func() error { return b.unsubscribe(subTopic) },
)
if err != nil {
return nil, err
}
return rc.ReceiveChan, nil
}
// PublishQuery implements the [api.Api] interface.
func (b *Mqtt5Binding) PublishQuery(ctx context.Context, query api.Query, scope ...api.Scope) (results <-chan api.QueryResult, err error) {
if err := b.validatePatternTypeIdSource("Query", query.Type, query.Id, query.Source); err != nil {
return nil, err
}
scp := api.ScopeDef
if len(scope) > 0 {
scp = scope[0]
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return nil, fmt.Errorf("PublishQuery %+v failed as binding is not yet open", query)
}
pubTopic, _ := b.topicWithLevels("", scp, levelQuery, query.Type)
pubResTopic, _ := b.topicWithLevels("", scp, levelQueryResult, query.Type)
responseTopic, correlationId := b.responseInfo(pubResTopic)
routeFilter := mqttRouteFilter{Topic: responseTopic, CorrelationId: correlationId}
rc, err := b.queryResultRouter.Add(ctx, routeFilter,
func() error { return b.subscribe(responseTopic) },
func() error {
return b.publish(pubTopic, query.Data, paho.UserProperties{
{Key: userPropId, Value: query.Id},
{Key: userPropSource, Value: query.Source},
{Key: userPropDataContentType, Value: query.DataContentType},
}, responseTopic, []byte(correlationId))
},
func() error { return b.unsubscribe(responseTopic) },
)
if err != nil {
return nil, err
}
return rc.ReceiveChan, nil
}
func (b *Mqtt5Binding) publishQueryResult(result api.QueryResult, responseTopic string, correlationId []byte) error {
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return fmt.Errorf("publishQueryResult %+v failed as binding is not yet open", result)
}
return b.publish(responseTopic, result.Data, paho.UserProperties{
{Key: userPropContext, Value: result.Context},
{Key: userPropDataContentType, Value: result.DataContentType},
{Key: userPropSequenceNumber, Value: strconv.FormatInt(result.SequenceNumber, 10)},
}, "", []byte(correlationId))
}
// SubscribeQuery implements the [api.Api] interface.
func (b *Mqtt5Binding) SubscribeQuery(ctx context.Context, filter api.SubscriptionFilter) (queries <-chan api.QueryWithCallback, err error) {
if err := b.validatePatternFilter("Query", filter); err != nil {
return nil, err
}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return nil, fmt.Errorf("SubscribeQuery failed as binding is not yet open")
}
pubTopic, subTopic := b.topicWithLevels(filter.Share, filter.Scope, levelQuery, filter.Type)
routeFilter := mqttRouteFilter{Topic: pubTopic}
rc, err := b.queryRouter.Add(ctx, routeFilter,
func() error { return b.subscribe(subTopic) },
func() error { return nil },
func() error { return b.unsubscribe(subTopic) },
)
if err != nil {
return nil, err
}
return rc.ReceiveChan, nil
}
func (b *Mqtt5Binding) getClientConfig(cfg *config.Config, connackChan chan<- *paho.Connack) (*autopaho.ClientConfig, error) {
var comCfg = cfg.Services.Com
var clientConfig autopaho.ClientConfig
clientConfig = autopaho.ClientConfig{
OnConnectionUp: func(conn *autopaho.ConnectionManager, ack *paho.Connack) {
if connackChan != nil {
clientConfig.Debug.Printf("broker connection up\n")
defer close(connackChan)
connackChan <- ack
connackChan = nil
} else {
clientConfig.Debug.Printf("broker reconnection up\n")
b.resubscribe()
}
},
OnConnectError: func(err error) { clientConfig.Debug.Printf("broker connection error: %v\n", err) },
Debug: paho.NOOPLogger{},
ClientConfig: paho.ClientConfig{
ClientID: b.clientId,
OnClientError: func(err error) { clientConfig.Debug.Printf("client error: %v\n", err) },
OnServerDisconnect: func(d *paho.Disconnect) {
if d.Properties != nil {
clientConfig.Debug.Printf("broker requested disconnect: %s\n", d.Properties.ReasonString)
} else {
clientConfig.Debug.Printf("broker requested disconnect with reason code: %d\n", d.ReasonCode)
}
},
Router: paho.NewSingleHandlerRouter(b.handle),
},
}
if comCfg.Opts["debug"] == true && plog.Enabled() {
// Inject paho and autopaho log output into standard logger.
clientConfig.Debug = plog.WithPrefix("autopaho ")
clientConfig.PahoDebug = plog.WithPrefix("paho ")
}
clientConfig.SetConnectPacketConfigurator(func(c *paho.Connect) *paho.Connect {
// As long as client persistence is not supported in paho, a Connection
// should start a new session on both client and server. See comment
// https://github.com/eclipse/paho.golang/blob/d63b3b28d25ff73076c8846c92c4d062503e646e/autopaho/auto.go#L125
c.CleanStart = true
return c
})
sUrl := comCfg.Url
if sUrl == "" {
sUrl = "tcp://localhost:1883"
}
if brokerUrl, err := url.Parse(sUrl); err != nil {
return nil, fmt.Errorf("invalid 'services.com.url' in DDA configuration: %w", err)
} else {
clientConfig.BrokerUrls = []*url.URL{brokerUrl}
}
if comCfg.Auth.Password != "" {
// Note: MQTT version 5 of the protocol allows the sending of a Password
// with no User Name, where MQTT v3.1.1 did not. This reflects the
// common use of Password for credentials other than a password.
clientConfig.SetUsernamePassword(comCfg.Auth.Username, []byte(comCfg.Auth.Password))
} else {
clientConfig.ResetUsernamePassword()
}
switch comCfg.Auth.Method {
case "none", "":
case "tls":
cert, err := tls.LoadX509KeyPair(comCfg.Auth.Cert, comCfg.Auth.Key)
if err != nil {
return nil, fmt.Errorf("invalid or missing PEM file in DDA configuration under 'services.com.auth.cert/.key' : %w", err)
}
//#nosec G402 -- Default for configuration option Verify is true
clientConfig.TlsCfg = &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: !comCfg.Auth.Verify,
}
default:
return nil, fmt.Errorf("unsupported 'services.com.auth.method' %s in DDA configuration", comCfg.Auth.Method)
}
switch v := comCfg.Opts["keepAlive"].(type) {
case int:
clientConfig.KeepAlive = uint16(v)
default:
clientConfig.KeepAlive = 30
}
switch v := comCfg.Opts["qos"].(type) {
case int:
b.qos = byte(v)
default:
b.qos = byte(0)
}
switch v := comCfg.Opts["connectRetryDelay"].(type) {
case int:
clientConfig.ConnectRetryDelay = time.Duration(v) * time.Millisecond
default:
clientConfig.ConnectRetryDelay = 1000 * time.Millisecond
}
switch v := comCfg.Opts["connectTimeout"].(type) {
case int:
clientConfig.ConnectTimeout = time.Duration(v) * time.Millisecond
default:
clientConfig.ConnectTimeout = 10000 * time.Millisecond
}
return &clientConfig, nil
}
func (b *Mqtt5Binding) getClientId(cfg *config.Config) string {
clientId := cfg.Identity.Name + cfg.Identity.Id
if cfg.Services.Com.Opts["strictClientId"] == true {
// MQTT Version 5.0 Specification: [MQTT-3.1.3-5]
//
// The Server MUST allow ClientID’s which are between 1 and 23 UTF-8
// encoded bytes in length, and that contain only the characters
// "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ".
//
// The Server MAY allow ClientID’s that contain more than 23 encoded
// bytes. The Server MAY allow ClientID’s that contain characters not
// included in the list given above.
clientId = strictClientIdRegex.ReplaceAllLiteralString(clientId, "0")
if len(clientId) > 23 {
clientId = clientId[0:23]
}
}
return clientId
}
func (b *Mqtt5Binding) validatePatternTypeIdSource(pat string, typ string, id string, source string) error {
if err := config.ValidateName(typ, pat, "Type"); err != nil {
return err
}
if err := config.ValidateNonEmpty(id, pat, "Id"); err != nil {
return err
}
if err := config.ValidateNonEmpty(source, pat, "Source"); err != nil {
return err
}
return nil
}
func (b *Mqtt5Binding) validatePatternFilter(pat string, filter api.SubscriptionFilter) error {
if err := config.ValidateName(filter.Type, pat, "Type"); err != nil {
return err
}
if filter.Share != "" {
if !b.sharedSubAvailable {
return fmt.Errorf("shared subscriptions are not supported by the MQTT 5 broker")
}
if err := config.ValidateName(filter.Share, pat, "Share"); err != nil {
return err
}
}
return nil
}
func (b *Mqtt5Binding) topicWithLevels(share string, scope api.Scope, levels ...string) (pubTopic string, subTopic string) {
if scope == api.ScopeDef {
scope = api.ScopeCom
}
pubTopic = fmt.Sprintf("%s/%s/%s", b.cluster, scope, strings.Join(levels, "/"))
subTopic = pubTopic
if share != "" {
subTopic = fmt.Sprintf("%s/%s/%s/%s/%s", levelShare, share, b.cluster, scope, strings.Join(levels, "/"))
}
return
}
func (b *Mqtt5Binding) createPubSubContext() (context.Context, context.CancelFunc) {
var cancel context.CancelFunc = func() {}
ctx := context.Background()
if b.qos != 0 {
ctx, cancel = context.WithTimeout(ctx, pubSubAckTimeout)
}
return ctx, cancel
}
func (b *Mqtt5Binding) responseInfo(topic string) (string, string) {
return fmt.Sprintf("%s/%s", topic, b.responseId), uuid.NewString()
}
func (b *Mqtt5Binding) publish(topic string, payload []byte, userProps paho.UserProperties, responseTopic string, correlationId []byte) error {
ctx, cancel := b.createPubSubContext()
defer cancel()
props := &paho.PublishProperties{
// Note: PayloadFormat 1 means payload must conform to UTF-8 string
// encoding (broker must check it!). As any binary data can be sent,
// do not use it!
//
// PayloadFormat: paho.Byte(1), ContentType: "text/plain", //
// "application/json",
User: userProps,
}
if responseTopic != "" {
props.ResponseTopic = responseTopic
}
if correlationId != nil {
props.CorrelationData = correlationId
}
p := &paho.Publish{
QoS: b.qos,
Topic: topic,
Payload: payload,
Properties: props,
}
if _, err := b.conn.Publish(ctx, p); err != nil {
return services.NewRetryableError(err)
}
return nil
}
func (b *Mqtt5Binding) subscribe(topics ...string) error {
if len(topics) == 0 {
return nil
}
ctx, cancel := b.createPubSubContext()
defer cancel()
subs := make([]paho.SubscribeOptions, 0, len(topics))
for _, topic := range topics {
subs = append(subs, paho.SubscribeOptions{
Topic: topic,
QoS: b.qos,
// It is a Protocol Error to set the No Local bit to 1 on a
// Shared Subscription [MQTT-3.8.3-4].
NoLocal: b.noLocal && !strings.HasPrefix(topic, levelShare),
})
}
s := &paho.Subscribe{Subscriptions: subs}
b.mu.RLock()
defer b.mu.RUnlock()
if b.conn == nil {
return fmt.Errorf("subscribe failed as binding is not yet open")
}
if _, err := b.conn.Subscribe(ctx, s); err != nil {
return services.RetryableErrorf("subscribe %v failed: %w", topics, err)
}
return nil
}
func (b *Mqtt5Binding) unsubscribe(topics ...string) error {
if len(topics) == 0 {
return nil
}
ctx, cancel := b.createPubSubContext()
defer cancel()
u := &paho.Unsubscribe{Topics: topics}
// Trying to acquire read lock only fails if unsubscribe is invoked in a
// context that has already acquired a write lock, i.e. within the Close
// method invoking router.Clear. In this case, unsubscribe would deadlock
// when acquiring a recursive read lock. As the write lock is already
// acquired we can safely continue without read locking. All other
// invocations of unsubscribe are in a user code context when calling an
// UnsubscribeBindungFunc, so acquiring the read lock won't fail.
if b.mu.TryRLock() {
defer b.mu.RUnlock()
}
if b.conn == nil {
return fmt.Errorf("unsubscribe failed as binding is not yet open")
}
if _, err := b.conn.Unsubscribe(ctx, u); err != nil {
plog.Printf("unsubscribe %v failed: %v", topics, err)
return services.RetryableErrorf("unsubscribe %v failed: %w", topics, err)
}
return nil
}
func (b *Mqtt5Binding) resubscribe() {
if err := b.subscribe(b.getTopics()...); err != nil {
plog.Printf("resubscribe failed: %v", err)
}
}
func (b *Mqtt5Binding) getTopics() []string {
eventTopics := b.eventRouter.GetTopics()
actionTopics := b.actionRouter.GetTopics()
actionResultTopics := b.actionResultRouter.GetTopics()
queryTopics := b.queryRouter.GetTopics()
queryResultTopics := b.queryResultRouter.GetTopics()
topicsLen := len(eventTopics) + len(actionTopics) + len(actionResultTopics) + len(queryTopics) + len(queryResultTopics)
topics := make([]string, topicsLen)
i := 0
i += copy(topics[i:], eventTopics)
i += copy(topics[i:], actionTopics)
i += copy(topics[i:], actionResultTopics)
i += copy(topics[i:], queryTopics)
i += copy(topics[i:], queryResultTopics)
return topics
}
func (b *Mqtt5Binding) handle(p *paho.Publish) {
if p.Properties == nil || p.Properties.User == nil {
plog.Printf("handle: discard incoming topic %s without Properties", p.Topic)
return
}
levels := strings.Split(p.Topic, "/")
levelPattern, typeName := levels[2], levels[3]
id, source := p.Properties.User.Get(userPropId), p.Properties.User.Get(userPropSource)
routeFilter := mqttRouteFilter{Topic: p.Topic}
correlationId := p.Properties.CorrelationData
responseTopic := p.Properties.ResponseTopic
if _, err := api.ToScope(levels[1]); err != nil {
plog.Printf("handle: error on incoming topic: %v", err)
return
}
switch levelPattern {
case levelEvent:
event := api.Event{
Type: typeName,
Id: id,
Source: source,
Time: p.Properties.User.Get(userPropTime),
Data: p.Payload,
DataContentType: p.Properties.User.Get(userPropDataContentType),
}
b.eventRouter.Dispatch(routeFilter, event)
case levelAction:
actionCb := api.ActionWithCallback{
Action: api.Action{
Type: typeName,
Id: id,
Source: source,
Params: p.Payload,
DataContentType: p.Properties.User.Get(userPropDataContentType),
},
Callback: func(result api.ActionResult) error {
return b.publishActionResult(result, responseTopic, correlationId)
},
}
b.actionRouter.Dispatch(routeFilter, actionCb)
case levelActionResult:
seqNo, err := strconv.ParseInt(p.Properties.User.Get(userPropSequenceNumber), 10, 64)
if err != nil {
plog.Printf("handle: error on SequenceNumber: %v", err)
return
}
result := api.ActionResult{
Context: p.Properties.User.Get(userPropContext),
Data: p.Payload,
DataContentType: p.Properties.User.Get(userPropDataContentType),
SequenceNumber: seqNo,
}
routeFilter.CorrelationId = string(p.Properties.CorrelationData)
b.actionResultRouter.Dispatch(routeFilter, result)
case levelQuery:
queryCb := api.QueryWithCallback{
Query: api.Query{
Type: typeName,
Id: id,
Source: source,
Data: p.Payload,
DataContentType: p.Properties.User.Get(userPropDataContentType),
},
Callback: func(result api.QueryResult) error {
return b.publishQueryResult(result, responseTopic, correlationId)
},
}
b.queryRouter.Dispatch(routeFilter, queryCb)
case levelQueryResult:
seqNo, err := strconv.ParseInt(p.Properties.User.Get(userPropSequenceNumber), 10, 64)
if err != nil {
plog.Printf("handle: error on SequenceNumber: %v", err)
return
}
result := api.QueryResult{
Context: p.Properties.User.Get(userPropContext),
Data: p.Payload,
DataContentType: p.Properties.User.Get(userPropDataContentType),
SequenceNumber: seqNo,
}
routeFilter.CorrelationId = string(p.Properties.CorrelationData)
b.queryResultRouter.Dispatch(routeFilter, result)
default:
plog.Printf("handle: discard malformed incoming topic %s", p.Topic)
}
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package services provides common functionality to be reused by service
// implementations.
package services
import (
"context"
"fmt"
"time"
)
type retryable interface {
Retryable() bool
}
type retryableError struct {
error
}
func (r *retryableError) Error() string {
return "retryable error: " + r.error.Error()
}
func (r *retryableError) Retryable() bool {
return true
}
// RetryableErrorf returns a retryable error that formats according to the given
// format specifier.
//
// Use the function IsRetryable(error) to check whether a given error is
// retryable or not.
func RetryableErrorf(format string, a ...any) error {
return &retryableError{error: fmt.Errorf(format, a...)}
}
// NewRetryableError creates and returns a new error from the given error
// indicating that it is retryable. Returns nil if the given error is nil.
//
// Use the function IsRetryable(error) to check whether a given error is
// retryable or not.
func NewRetryableError(err error) error {
if err == nil {
return nil
}
return &retryableError{error: err}
}
// IsRetryable indicates whether the operation causing the error may be retried
// with a backoff potentially. For example, a retryable error may be caused by
// an operation that times out or due to a temporary unavailability.
//
// IsRetryable returns false if the error is nil or if the error is caused by an
// operation that cannot be retried due to failed preconditions, invalid
// arguments, or on any other grounds. Otherwise, true is returned.
func IsRetryable(err error) bool {
rerr, ok := err.(retryable)
return ok && rerr.Retryable()
}
// ErrorRetryable gets the underlying error of a retryable error. If the given
// error err is not retryable, it is returned.
func ErrorRetryable(err error) error {
if rerr, ok := err.(*retryableError); ok {
return rerr.error
}
return err
}
// Backoff represents a capped exponential backoff strategy for an operation
// that yields a retryable error (see [RetryWithBackoff]).
//
// Note that you can also define a linear backoff strategy by setting Cap to
// the same value as Base.
type Backoff struct {
Base time.Duration // base delay of the first retry attempt
Cap time.Duration // if nonzero, maximum delay between retry attempts
Max int // if nonzero, maximum number of retry attempts
}
// RetryWithBackoff synchronously retries the given operation according to the
// given backoff strategy as long as as it yields a retryable error.
//
// RetryWithBackoff returns nil if the operation succeeds eventually, a
// non-retryable error if it fails eventually (also due to cancelation of given
// context) or a retryable error to indicate that the maximum number of retry
// attempts has been reached.
//
// The given operation is passed a counter that, starting with 1, sums up the
// number of invocations, including the current one.
func RetryWithBackoff(ctx context.Context, b Backoff, op func(cnt int) error) error {
retry := 0
base := b.Base
for {
if err := op(retry + 1); !IsRetryable(err) {
return err
}
retry++
if b.Max > 0 && retry > b.Max {
return RetryableErrorf("reached maximum number of retry attempts")
}
d := base
if b.Cap > 0 {
d = min(b.Cap, base)
}
timer := time.NewTimer(d)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C // drain channel
}
return ctx.Err()
case <-timer.C:
timer.Stop()
}
if d == base {
base *= 2
}
}
}
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT
// Package raft exports [MessagePack] codec functionality to serialize log
// entries, LogStore items, and FSM snaphosts.
//
// [MessagePack]: https://msgpack.org/
package raft
import (
"io"
"github.com/hashicorp/go-msgpack/v2/codec"
)
// DecodeMsgPack decodes from a MessagePack encoded byte slice.
func DecodeMsgPack(b []byte, out any) error {
var hd codec.MsgpackHandle
dec := codec.NewDecoderBytes(b, &hd)
return dec.Decode(out)
}
// DecodeMsgPackFromReader decodes from a MessagePack encoded reader.
func DecodeMsgPackFromReader(r io.ReadCloser, out any) error {
var hd codec.MsgpackHandle
dec := codec.NewDecoder(r, &hd)
return dec.Decode(out)
}
// EncodeMsgPack returns an encoded MessagePack object as a byte slice.
func EncodeMsgPack(in any) ([]byte, error) {
var b []byte
var hd codec.MsgpackHandle
enc := codec.NewEncoderBytes(&b, &hd)
err := enc.Encode(in)
return b, err
}
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT
// Package raft exports a Raft finite state machine (FSM) for DDA state members
// to make use of replicated state.
package raft
import (
"fmt"
"io"
"sync"
"github.com/coatyio/dda/services/state/api"
hraft "github.com/hashicorp/raft"
)
// RaftFsm implements the [hraft.FSM] interface to model replicated state in the
// form of a key-value dictionary.
type RaftFsm struct {
mu sync.Mutex // protects following fields
state api.State // key-value pairs
observers map[uint64]chan api.Input // observing state change channels
nextObserverId uint64 // id of next registered state change channel
}
// NewRaftFsm creates a new Raft FSM that models replicated state in the form of
// a key-value dictionary.
func NewRaftFsm() *RaftFsm {
return &RaftFsm{state: make(api.State), observers: make(map[uint64]chan api.Input), nextObserverId: 0}
}
// State gets a deep copy of the current key-value pairs of a RaftFsm. The
// returned state can be safely mutated. To be used for testing purposes only.
func (f *RaftFsm) State() api.State {
f.mu.Lock()
defer f.mu.Unlock()
return f.cloneState()
}
// AddStateChangeObserver registers the given channel to listen to state changes
// and returns a channel ID that can be used to deregister the channel later.
//
// The channel should continuously receive data on the channel in a non-blocking
// manner to prevent blocking send operations.
func (f *RaftFsm) AddStateChangeObserver(ch chan api.Input) uint64 {
f.mu.Lock()
defer f.mu.Unlock()
f.nextObserverId++
f.observers[f.nextObserverId] = ch
// Emit synthetic inputs reproducing the current key-value pairs.
go func() {
for k, v := range f.state {
ch <- f.cloneInput(&api.Input{Op: api.InputOpSet, Key: k, Value: v})
}
}()
return f.nextObserverId
}
// RemoveStateChangeObserver deregisters the channel with the given channel id.
//
// Note that the channel is not closed, it must be closed by the caller.
func (f *RaftFsm) RemoveStateChangeObserver(chanId uint64) {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.observers, chanId)
}
// Apply is called once a log entry is committed by a majority of the cluster.
//
// Apply should apply the log to the FSM. Apply must be deterministic and
// produce the same result on all peers in the cluster.
//
// The returned value is returned to the client as the ApplyFuture.Response.
// Note that if Apply returns an error, it will be returned by Response, and not
// by the Error method of ApplyFuture, so it is always important to check
// Response for errors from the FSM. If the given input operation is applied
// successfully, ApplyFuture.Response returns nil.
//
// Apply implements the [hraft.FSM] interface.
func (f *RaftFsm) Apply(entry *hraft.Log) any {
var input api.Input
if err := DecodeMsgPack(entry.Data, &input); err != nil {
return err
}
switch input.Op {
case api.InputOpUndefined:
return nil
case api.InputOpSet:
f.mu.Lock()
defer f.mu.Unlock()
f.state[input.Key] = input.Value
f.sendStateChange(&input)
return nil
case api.InputOpDelete:
f.mu.Lock()
defer f.mu.Unlock()
delete(f.state, input.Key)
f.sendStateChange(&input)
return nil
default:
return fmt.Errorf("unknown input operation in Raft log entry: %v", input.Op)
}
}
// Snapshot returns an FSMSnapshot used to: support log compaction, to restore
// the FSM to a previous state, or to bring out-of-date followers up to a recent
// log index.
//
// The Snapshot implementation should return quickly, because Apply can not be
// called while Snapshot is running. Generally this means Snapshot should only
// capture a pointer to the state, and any expensive IO should happen as part of
// FSMSnapshot.Persist.
//
// Apply and Snapshot are always called from the same thread, but Apply will be
// called concurrently with FSMSnapshot.Persist. This means the FSM should be
// implemented to allow for concurrent updates while a snapshot is happening.
//
// Snapshot implements the [hraft.FSM] interface.
func (f *RaftFsm) Snapshot() (hraft.FSMSnapshot, error) {
f.mu.Lock()
defer f.mu.Unlock()
// Deep copy state to allow concurrent updates while persisting.
return &fsmSnapshot{State: f.cloneState()}, nil
}
// Restore is used to restore an FSM from a snapshot. It is not called
// concurrently with any other command. The FSM must discard all previous state
// before restoring the snapshot.
//
// Restore implements the [hraft.FSM] interface.
func (f *RaftFsm) Restore(snapshot io.ReadCloser) error {
var snap fsmSnapshot
if err := DecodeMsgPackFromReader(snapshot, &snap); err != nil {
return err
}
f.state = snap.State
return nil
}
func (f *RaftFsm) sendStateChange(in *api.Input) {
for _, ch := range f.observers {
ch <- f.cloneInput(in)
}
}
func (f *RaftFsm) cloneInput(in *api.Input) api.Input {
switch in.Op {
case api.InputOpSet:
cv := make([]byte, len(in.Value))
copy(cv, in.Value)
return api.Input{Op: in.Op, Key: in.Key, Value: cv}
default:
return api.Input{Op: in.Op, Key: in.Key}
}
}
func (f *RaftFsm) cloneState() api.State {
s := make(api.State, len(f.state))
for k, v := range f.state {
cv := make([]byte, len(v))
copy(cv, v)
s[k] = cv
}
return s
}
// fsmSnapshot implements the [hraft.FSMSnapshot] interface. It is returned by
// an FSM in response to a Snapshot. It must be safe to invoke FSMSnapshot
// methods with concurrent calls to Apply.
type fsmSnapshot struct {
State api.State // field must be exported for serialization
}
// Persist should dump all necessary state to the WriteCloser 'sink', and call
// sink.Close() when finished or call sink.Cancel() on error.
//
// Persist implements the [hraft.FSMSnapshot] interface.
func (f *fsmSnapshot) Persist(sink hraft.SnapshotSink) error {
err := func() error {
b, err := EncodeMsgPack(f)
if err != nil {
return err
}
if _, err := sink.Write(b); err != nil {
return err
}
if err := sink.Close(); err != nil {
return err
}
return nil
}()
if err != nil {
_ = sink.Cancel()
return err
}
return nil
}
// Release is invoked when we are finished with the snapshot.
//
// Release implements the [hraft.FSMSnapshot] interface.
func (f *fsmSnapshot) Release() {
}
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT
// Package raft provides a state synchronization binding implementation using
// the [Raft] consensus algorithm.
//
// [Raft]: https://raft.github.io/
package raft
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services"
comapi "github.com/coatyio/dda/services/com/api"
"github.com/coatyio/dda/services/state/api"
"github.com/hashicorp/go-hclog"
hraft "github.com/hashicorp/raft"
)
const (
// DefaultStartupTimeout is the default timeout when starting up a new Raft
// node, either as a leader or as a follower.
DefaultStartupTimeout = 10000 * time.Millisecond
// DefaultLfwTimeout is the default timeout for leaderforwarded Propose and
// GetState remote operation responses. It only applies in situations where
// there is no leader. It must not be set too low as Propose operations may
// take some time.
DefaultLfwTimeout = 20000 * time.Millisecond
)
// RaftBinding realizes a state synchronization binding for the [Raft] consensus
// protocol by implementing the state synchronization API interface [api.Api]
// using the [HashiCorp Raft] library.
//
// [Raft]: https://raft.github.io/
// [HashiCorp Raft]: https://github.com/hashicorp/raft
type RaftBinding struct {
raftId string // Raft node ID
raftDir string // Raft storage location
mu sync.Mutex // protects following fields
raft *hraft.Raft // Raft node implementation
raftLogger hclog.Logger // Raft logger
store *RaftStore // Raft LogStore, StableStore, and FileSnapshotStore provider
fsm *RaftFsm // Raft finite state machine
trans *RaftTransport // Raft transport layer
lfwTimeout time.Duration // leader forwarded response timeout
members map[hraft.ServerID]struct{} // set of current members
}
// Node returns the Raft node (exposed for testing purposes).
func (b *RaftBinding) Node() *hraft.Raft {
return b.raft
}
// NodeId returns the Raft node ID (exposed for testing purposes).
func (b *RaftBinding) NodeId() string {
return b.raftId
}
// Open implements the [api.Api] interface.
func (b *RaftBinding) Open(cfg *config.Config, com comapi.Api) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.raft != nil {
return nil
}
plog.Printf("Open Raft state synchronization binding...\n")
var err error
b.raftId = cfg.Identity.Id
b.members = make(map[hraft.ServerID]struct{})
b.raftDir = cfg.Services.State.Store
if b.store, err = NewRaftStore(b.raftDir); err != nil {
return err
}
switch v := cfg.Services.State.Opts["lfwTimeout"].(type) {
case int:
b.lfwTimeout = time.Duration(v) * time.Millisecond
default:
b.lfwTimeout = DefaultLfwTimeout
}
config := hraft.DefaultConfig()
config.LocalID = hraft.ServerID(b.raftId)
switch v := cfg.Services.State.Opts["heartbeatTimeout"].(type) {
case int:
// Note: heartbeat messages are sent to followers periodically in the
// range 1x to 2x of config.HeartbeatTimeout / 10.
config.HeartbeatTimeout = time.Duration(v) * time.Millisecond
}
switch v := cfg.Services.State.Opts["electionTimeout"].(type) {
case int:
config.ElectionTimeout = time.Duration(v) * time.Millisecond
}
switch v := cfg.Services.State.Opts["snapshotInterval"].(type) {
case int:
config.SnapshotInterval = time.Duration(v) * time.Millisecond
}
switch v := cfg.Services.State.Opts["snapshotThreshold"].(type) {
case int:
config.SnapshotThreshold = uint64(v)
}
// Increase commit timeout to reduce rate of periodic AppendEntries RPCs to
// followers (see https://github.com/hashicorp/raft/issues/282).
config.CommitTimeout = config.HeartbeatTimeout / 10
config.LogLevel = "OFF" // too many "ERROR" messages are just informational in hashicorp-raft
if plog.Enabled() {
config.LogLevel = "ERROR"
if cfg.Services.State.Opts["debug"] == true {
config.LogLevel = "DEBUG"
}
}
config.Logger = hclog.New(&hclog.LoggerOptions{
Name: "raft",
Level: hclog.LevelFromString(config.LogLevel),
Output: os.Stderr,
})
b.raftLogger = config.Logger
transConfig := &RaftTransportConfig{}
switch v := cfg.Services.State.Opts["rpcTimeout"].(type) {
case int:
transConfig.Timeout = time.Duration(v) * time.Millisecond
default:
transConfig.Timeout = 0 // use transport-specific default
}
switch v := cfg.Services.State.Opts["installSnapshotTimeoutScale"].(type) {
case int:
transConfig.TimeoutScale = v
default:
transConfig.TimeoutScale = 0 // use transport-specific default
}
b.trans = NewRaftTransport(transConfig, hraft.ServerAddress(b.raftId), com)
b.fsm = NewRaftFsm()
// Create Raft node.
b.raft, err = hraft.NewRaft(config, b.fsm, b.store, b.store, b.store.SnapStore, b.trans)
if err != nil {
return err
}
startupTimeout := DefaultStartupTimeout
switch v := cfg.Services.State.Opts["startupTimeout"].(type) {
case int:
startupTimeout = time.Duration(v) * time.Millisecond
}
if cfg.Services.State.Bootstrap { // bootstrap Raft node as leader
var configuration hraft.Configuration
configuration.Servers = append(configuration.Servers, hraft.Server{
Suffrage: hraft.Voter,
ID: hraft.ServerID(b.raftId),
Address: hraft.ServerAddress(b.raftId),
})
boot := b.raft.BootstrapCluster(configuration)
if err = boot.Error(); err != nil {
return err
}
select {
case <-time.After(startupTimeout):
return fmt.Errorf("timeout on bootstrapping Raft node as a leader")
case v := <-b.raft.LeaderCh(): // await leadership assignment
if !v {
return fmt.Errorf("on bootstrap Raft node should become leader")
}
}
} else { // add Raft node as follower using leader forwarding
err := services.RetryWithBackoff(context.Background(), services.Backoff{Base: 100 * time.Millisecond, Cap: 100 * time.Millisecond}, func(cnt int) error {
ctx, cancel := context.WithTimeout(context.Background(), b.lfwTimeout)
defer cancel()
if err := b.addVoterByLeaderForwarding(ctx, 0); err == ctx.Err() {
// If context deadline is exceeded, no response has been sent as
// there is no leader currently. In this case return a retryable
// error so that we can reinvoke the operation with a backoff
// until a new leader is present/elected eventually.
return services.NewRetryableError(err)
} else {
return err
}
})
if err != nil {
return err
}
}
go b.processLeaderForwardedRPCs()
return nil
}
// Close implements the [api.Api] interface.
func (b *RaftBinding) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.raft == nil {
return
}
// Turn off Raft logging to silence irrelevant error messages while
// shuttting down.
b.raftLogger.SetLevel(hclog.Off)
if b.raft.State() == hraft.Leader {
// RemoveServer call finally invokes Shutdown but without closing
// transport due to a bug in hashicorp-raft: the shutdown future is not
// awaited so that side effects defined in the future Error function,
// i.e. closing the transport, are never processed. Even if this bug is
// fixed some day, the explicit transport Close operation can be kept as
// all invocations except the first one are no-ops.
_ = b.raft.RemoveServer(hraft.ServerID(b.raftId), 0, 0).Error()
_ = b.trans.Close()
} else {
func() {
ctx, cancel := context.WithTimeout(context.Background(), b.trans.Timeout())
defer cancel()
_ = b.removeServerByLeaderForwarding(ctx, 0)
}()
_ = b.raft.Shutdown().Error()
}
if err := b.store.Close(false); err != nil { // preserve persistent storage
plog.Printf("Error closing Raft store: %v\n", err)
}
b.raft = nil
plog.Printf("Closed Raft state synchronization binding\n")
}
// ProposeInput implements the [api.Api] interface.
func (b *RaftBinding) ProposeInput(ctx context.Context, in *api.Input) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.raft == nil {
return fmt.Errorf("ProposeInput %+v failed as binding is not yet open", in)
}
cmd, err := EncodeMsgPack(in)
if err != nil {
return err
}
// First try to Apply locally, if in follower state forward Apply to leader.
// Repeat this procedure until a non-error result or a non-retryable error
// is returned or until the given context is canceled.
err = services.RetryWithBackoff(ctx, services.Backoff{Base: 100 * time.Millisecond, Cap: 100 * time.Millisecond}, func(cnt int) error {
f := b.raft.Apply(cmd, 0)
if f.Error() != nil {
if f.Error() != hraft.ErrNotLeader {
return services.NewRetryableError(f.Error())
} else {
ctx, cancel := context.WithTimeout(context.Background(), b.lfwTimeout)
defer cancel()
if err := b.applyByLeaderForwarding(ctx, cmd, 0); err == ctx.Err() {
// If context deadline is exceeded, no response has been
// sent as there is no leader currently. In this case return
// a retryable error so that we can repropose with a backoff
// until a new leader is elected eventually.
return services.NewRetryableError(err)
} else {
return err
}
}
} else {
if err, ok := f.Response().(error); ok {
return err // non-retryable error by RaftFsm.Apply (never happens in this implementation)
}
return nil
}
})
if err != nil {
return err
}
return nil
}
// ObserveStateChange implements the [api.Api] interface.
func (b *RaftBinding) ObserveStateChange(ctx context.Context) (<-chan api.Input, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.raft == nil {
return nil, fmt.Errorf("ObserveStateChange failed as binding is not yet open")
}
stateChangeCh := make(chan api.Input, 256)
id := b.fsm.AddStateChangeObserver(stateChangeCh)
go func() {
defer close(stateChangeCh)
defer b.fsm.RemoveStateChangeObserver(id)
<-ctx.Done()
}()
return stateChangeCh, nil
}
// ObserveMembershipChange implements the [api.Api] interface.
func (b *RaftBinding) ObserveMembershipChange(ctx context.Context) (<-chan api.MembershipChange, error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.raft == nil {
return nil, fmt.Errorf("ObserveMemberChange failed as binding is not yet open")
}
memberChangeCh := make(chan api.MembershipChange, 32)
// Note that hraft.PeerObservation cannot be used to monitor member changes
// as they are only observable on the leader node.
id := b.store.AddMembershipChangeObserver(memberChangeCh, b.raft)
go func() {
defer close(memberChangeCh)
defer b.store.RemoveMembershipChangeObserver(id)
<-ctx.Done()
}()
return memberChangeCh, nil
}
func (b *RaftBinding) addVoterByLeaderForwarding(ctx context.Context, startDelayOnLeader time.Duration) error {
req := &AddVoterRequest{
ServerId: hraft.ServerID(b.raftId),
ServerAddress: hraft.ServerAddress(b.raftId),
Timeout: startDelayOnLeader,
}
var resp AddVoterResponse
if err := b.trans.LfwAddVoter(ctx, req, &resp); err != nil {
return err
}
return nil
}
func (b *RaftBinding) removeServerByLeaderForwarding(ctx context.Context, startDelayOnLeader time.Duration) error {
req := &RemoveServerRequest{
ServerId: hraft.ServerID(b.raftId),
Timeout: startDelayOnLeader,
}
var resp RemoveServerResponse
if err := b.trans.LfwRemoveServer(ctx, req, &resp); err != nil {
return err
}
return nil
}
func (b *RaftBinding) applyByLeaderForwarding(ctx context.Context, cmd []byte, startDelayOnLeader time.Duration) error {
req := &ApplyRequest{Command: cmd, Timeout: startDelayOnLeader}
var resp ApplyResponse
if err := b.trans.LfwApply(ctx, req, &resp); err != nil {
return err
}
if err := resp.Response; err != nil { // any error response by RaftFsm.Apply is non-retryable
return err
}
return nil
}
func (b *RaftBinding) processLeaderForwardedRPCs() {
for rpc := range b.trans.LfwConsumer() {
if b.processLeaderForwardingRPC(rpc) {
return
}
}
}
func (b *RaftBinding) processLeaderForwardingRPC(rpc hraft.RPC) (stop bool) {
b.mu.Lock()
defer b.mu.Unlock()
if b.raft == nil {
stop = true
rpc.Respond(nil, hraft.ErrNotLeader)
return
}
// All commands fail if the Raft node is not a leader or is transferring
// leadership. They may also fail on other conditions, such as Raft node
// shutdown.
switch cmd := rpc.Command.(type) {
case *AddVoterRequest:
if f := b.raft.AddVoter(cmd.ServerId, cmd.ServerAddress, 0, cmd.Timeout); f.Error() != nil {
rpc.Respond(nil, services.NewRetryableError(f.Error()))
} else {
// Wait until all preceding log operations have been applied to leader fsm.
if fb := b.raft.Barrier(0); fb.Error() != nil {
rpc.Respond(nil, services.NewRetryableError(fb.Error()))
} else {
rpc.Respond(&AddVoterResponse{Index: f.Index()}, nil)
}
}
case *RemoveServerRequest:
if f := b.raft.RemoveServer(cmd.ServerId, 0, cmd.Timeout); f.Error() != nil {
rpc.Respond(nil, services.NewRetryableError(f.Error()))
} else {
rpc.Respond(&RemoveServerResponse{Index: f.Index()}, nil)
}
case *ApplyRequest:
if f := b.raft.Apply(cmd.Command, cmd.Timeout); f.Error() != nil {
rpc.Respond(nil, services.NewRetryableError(f.Error()))
} else {
if err, ok := f.Response().(error); ok {
rpc.Respond(&ApplyResponse{Response: err}, nil) // non-retryable error by RaftFsm.Apply
} else {
// Wait until all preceding log operations have been applied to leader fsm.
if fb := b.raft.Barrier(0); fb.Error() != nil {
rpc.Respond(nil, services.NewRetryableError(fb.Error()))
} else {
rpc.Respond(&ApplyResponse{Index: f.Index(), Response: nil}, nil)
}
}
}
default:
rpc.Respond(nil, fmt.Errorf("unexpected leader forwarding command %v", cmd))
}
return
}
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT
// Package raft exports the RaftStore which is an implementation of a LogStore,
// a StableStore, and a FileSnapshotStore for the [HashiCorp Raft] library.
// RaftStore uses the internal DDA implementation of the [Pebble] storage
// engine.
//
// [HashiCorp Raft]: https://github.com/hashicorp/raft
// [Pebble]: https://github.com/cockroachdb/pebble
package raft
import (
"encoding/binary"
"errors"
"math"
"os"
"path/filepath"
"slices"
"sync"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services/state/api"
"github.com/coatyio/dda/services/store/pebble"
hraft "github.com/hashicorp/raft"
)
const retainSnapshotCount = 2 // how many snapshots should be retained, must be at least 1
var (
ErrKeyNotFound = errors.New("not found") // corresponds with Hashicorp raft key not found error string
)
type observer chan api.MembershipChange
// RaftStore implements a LogStore, a StableStore, and a FileSnapshotStore for
// the [HashiCorp Raft] library.
type RaftStore struct {
logDir string
logStore *pebble.PebbleBinding
stableDir string
stableStore *pebble.PebbleBinding
snapDir string
SnapStore hraft.SnapshotStore // Raft snapshot store
mu sync.Mutex // protects following fields
observers map[uint64]observer // membership change observers indexed by observer id
nextObserverId uint64 // id of next registered membership change channel
memberIds map[string]struct{} // set of current deduped members indexed by member id
}
// NewRaftStore creates local storage for persisting Raft specific durable data
// including log entries, stable store, and file snapshot store. Returns an
// error along with a nil *RaftStore if any of the stores couldn't be created.
//
// The given storage location should specify a directory given by an absolute
// pathname or a pathname relative to the working directory of the DDA sidecar
// or instance, or an empty string to indicate that storage is non-persistent
// and completely memory-backed as long as the DDA instance is running.
func NewRaftStore(location string) (*RaftStore, error) {
logDir := location
if logDir != "" {
logDir = filepath.Join(location, "log")
}
stableDir := location
if stableDir != "" {
stableDir = filepath.Join(location, "stable")
}
var snapStore hraft.SnapshotStore
var err error
snapDir := location
if snapDir != "" {
// Snapshots are stored in subfolder "snapshots" in parent folder location.
snapStore, err = hraft.NewFileSnapshotStore(location, retainSnapshotCount, os.Stderr)
if err != nil {
return nil, err
}
} else {
// Create an in-memory SnapshotStore that retains only the most recent snapshot.
snapStore = hraft.NewInmemSnapshotStore()
}
rs := &RaftStore{
logDir: logDir,
logStore: &pebble.PebbleBinding{},
stableDir: stableDir,
stableStore: &pebble.PebbleBinding{},
snapDir: snapDir,
SnapStore: snapStore,
observers: make(map[uint64]observer),
nextObserverId: 0,
memberIds: make(map[string]struct{}),
}
cfg := config.New()
cfg.Services.Store = config.ConfigStoreService{
Engine: "pebble",
Location: rs.logDir,
Disabled: false,
}
if err := rs.logStore.Open(cfg); err != nil {
return nil, err
}
cfg.Services.Store = config.ConfigStoreService{
Engine: "pebble",
Location: rs.stableDir,
Disabled: false,
}
if err := rs.stableStore.Open(cfg); err != nil {
return nil, err
}
return rs, nil
}
// Close gracefully closes the Raft store, optionally removing all associated
// persistent storage files and folders.
//
// You should not remove storage if you want to restart your DDA state member at
// a later point in time.
func (s *RaftStore) Close(removeStorage bool) error {
s.logStore.Close()
s.stableStore.Close()
if removeStorage && s.logDir != "" {
_ = os.RemoveAll(s.logDir)
}
if removeStorage && s.stableDir != "" {
_ = os.RemoveAll(s.stableDir)
}
if removeStorage && s.snapDir != "" {
_ = os.RemoveAll(s.snapDir)
}
return nil
}
// AddMembershipChangeObserver registers the given channel to listen to
// membership changes and returns a channel ID that can be used to deregister
// the channel later.
//
// The channel should continuously receive data on the channel in a non-blocking
// manner to prevent blocking send operations.
func (s *RaftStore) AddMembershipChangeObserver(ch chan api.MembershipChange, raft *hraft.Raft) uint64 {
s.mu.Lock()
defer s.mu.Unlock()
s.nextObserverId++
go s.sendMembershipChangesForObserver(raft, ch, s.nextObserverId)
return s.nextObserverId
}
// RemoveMembershipChangeObserver deregisters the channel with the given channel
// id.
//
// Note that the channel is not closed, it must be closed by the caller.
func (s *RaftStore) RemoveMembershipChangeObserver(chanId uint64) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.observers, chanId)
if len(s.observers) == 0 {
clear(s.memberIds)
return
}
}
// FirstIndex returns the first index written. 0 for no entries.
//
// FirstIndex implements the [hraft.LogStore] interface.
func (s *RaftStore) FirstIndex() (uint64, error) {
first := uint64(0) // zero on empty log
err := s.logStore.ScanRangeB(nil, nil, func(k, v []byte) bool {
first = bytesToUint64(k)
return false // stop scanning
})
if err != nil {
return 0, err
}
return first, nil
}
// LastIndex returns the last index written. 0 for no entries.
//
// LastIndex implements the [hraft.LogStore] interface.
func (s *RaftStore) LastIndex() (uint64, error) {
last := uint64(0) // zero on empty log
err := s.logStore.ScanRangeReverseB(nil, nil, func(k, v []byte) bool {
last = bytesToUint64(k)
return false // stop scanning
})
if err != nil {
return 0, err
}
return last, nil
}
// GetLog gets a log entry at a given index.
//
// GetLog implements the [hraft.LogStore] interface.
func (s *RaftStore) GetLog(index uint64, log *hraft.Log) error {
val, err := s.logStore.GetB(uint64ToBytes(index))
if err != nil {
return err
}
if val == nil {
return hraft.ErrLogNotFound
}
return DecodeMsgPack(val, log)
}
// StoreLog stores a log entry.
//
// StoreLog implements the [hraft.LogStore] interface.
func (s *RaftStore) StoreLog(log *hraft.Log) error {
return s.StoreLogs([]*hraft.Log{log})
}
// StoreLogs stores multiple log entries.
//
// By default the logs stored may not be contiguous with previous logs (i.e. may
// have a gap in Index since the last log written). If an implementation can't
// tolerate this it may optionally implement `MonotonicLogStore` to indicate
// that this is not allowed. This changes Raft's behaviour after restoring a
// user snapshot to remove all previous logs instead of relying on a "gap" to
// signal the discontinuity between logs before the snapshot and logs after.
//
// StoreLogs implements the [hraft.LogStore] interface.
func (s *RaftStore) StoreLogs(logs []*hraft.Log) error {
for _, log := range logs {
if log.Type == hraft.LogConfiguration {
s.sendMembershipChangesForServers(hraft.DecodeConfiguration(log.Data).Servers)
}
key := uint64ToBytes(log.Index)
b, err := EncodeMsgPack(log)
if err != nil {
return err
}
if err := s.logStore.SetB(key, b); err != nil {
return err
}
}
return nil
}
// DeleteRange deletes a range of log entries. The range is inclusive.
//
// DeleteRange implements the [hraft.LogStore] interface.
func (s *RaftStore) DeleteRange(min, max uint64) error {
var maxb []byte
if max == math.MaxUint64 {
maxb = s.logStore.KeyUpperBound(uint64ToBytes(max))
} else {
maxb = uint64ToBytes(max + 1)
}
return s.logStore.DeleteRangeB(uint64ToBytes(min), maxb)
}
// Set sets the given key-value pair.
//
// Set implements the [hraft.StableStore] interface.
func (s *RaftStore) Set(key []byte, val []byte) error {
return s.stableStore.SetB(key, val)
}
// Get returns the value for key, or an empty byte slice if key was not found.
//
// Get implements the [hraft.StableStore] interface.
func (s *RaftStore) Get(key []byte) ([]byte, error) {
v, err := s.stableStore.GetB(key)
if err != nil {
return nil, err
}
if v == nil {
return []byte{}, ErrKeyNotFound
}
return v, nil
}
// SetUint64 sets the given key-value pair.
//
// SetUint64 implements the [hraft.StableStore] interface.
func (s *RaftStore) SetUint64(key []byte, val uint64) error {
return s.stableStore.SetB(key, uint64ToBytes(val))
}
// GetUint64 returns the uint64 value for key, or 0 if key was not found.
//
// GetUint64 implements the [hraft.StableStore] interface.
func (s *RaftStore) GetUint64(key []byte) (uint64, error) {
v, err := s.stableStore.GetB(key)
if err != nil {
return 0, err
}
if v == nil {
return 0, ErrKeyNotFound
}
return bytesToUint64(v), nil
}
// bytesToUint64 converts bytes to a uint64.
func bytesToUint64(b []byte) uint64 {
return binary.BigEndian.Uint64(b)
}
// uint64ToBytes converts a uint64 to a byte slice.
func uint64ToBytes(u uint64) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, u)
return buf
}
func (s *RaftStore) sendMembershipChangesForObserver(raft *hraft.Raft, o observer, observerId uint64) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.observers) == 0 {
// For first observer, dispatch member changes from current Raft
// configuration and initialize memberIds (has been cleared by
// RemoveMembershipChangeObserver).
f := raft.GetConfiguration()
if f.Error() != nil {
plog.Printf("Raft configuration could not be retrieved for membership changes: %v", f.Error())
return
}
for _, sv := range f.Configuration().Servers {
id := string(sv.ID)
if _, ok := s.memberIds[id]; !ok { // dedupe member ids
s.memberIds[id] = struct{}{}
o <- api.MembershipChange{Id: id, Joined: true}
}
}
} else {
// For late observers, dispatch member changes from current memberIds.
for id := range s.memberIds {
o <- api.MembershipChange{Id: id, Joined: true}
}
}
// Add observer finally, ensuring that changes issued by
// sendMembershipChangesForServers are not dispatched on the new observer
// before initialization is completed.
s.observers[s.nextObserverId] = o
}
func (s *RaftStore) sendMembershipChangesForServers(servers []hraft.Server) {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.observers) == 0 {
// Ignore early changes that happen before any observer is added, and
// late changes that happen after the last observer is removed.
return
}
for id := range s.memberIds {
if !slices.ContainsFunc(servers, func(srv hraft.Server) bool { return id == string(srv.ID) }) {
delete(s.memberIds, id)
for _, o := range s.observers {
o <- api.MembershipChange{Id: id, Joined: false}
}
}
}
for _, sv := range servers {
id := string(sv.ID)
if _, ok := s.memberIds[id]; !ok { // dedupe member ids
s.memberIds[id] = struct{}{}
for _, o := range s.observers {
o <- api.MembershipChange{Id: id, Joined: true}
}
}
}
}
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT
// Package raft exports a Raft transport based on DDA pub-sub communication.
package raft
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services"
comapi "github.com/coatyio/dda/services/com/api"
hraft "github.com/hashicorp/raft"
)
const (
// DefaultRpcTimeout is the default remote operation timeout in the Raft
// transport.
DefaultRpcTimeout = 1000 * time.Millisecond
// DefaultInstallSnapshotTimeoutScale is the default TimeoutScale for
// InstallSnapshot operations in the Raft transport.
DefaultInstallSnapshotTimeoutScale = 256 * 1024 // 256KB
)
const (
rpcAppendEntries string = "ae" // a node-targeted operation
rpcRequestVote string = "rv" // a node-targeted operation
rpcInstallSnapshot string = "is" // a node-targeted operation
rpcTimeoutNow string = "tn" // a node-targeted operation
rpcAddVoter string = "av" // a leader forwarding operation
rpcRemoveServer string = "rs" // a leader forwarding operation
rpcApply string = "ap" // a leader forwarding operation
)
const (
rpcTargetedType rpcType = "rpc" // action type of node-targeted rpc operations
rpcLeaderForwardedType rpcType = "lfw" // action type of leader forwarding operations
)
var (
// ErrTransportShutdown is returned when operations on a transport are
// invoked after it's been terminated.
ErrTransportShutdown = fmt.Errorf("transport shutdown")
// ErrPipelineShutdown is returned when the pipeline is closed.
ErrPipelineShutdown = fmt.Errorf("append pipeline closed")
)
// rpcType represents specific types of remote operations between Raft nodes.
type rpcType string
// rpcResponse wraps a typed hraft response and a potential error string for a
// retryable or non-retryable error for transmission by pub-sub communication.
type rpcResponse[T any] struct {
Response T
Error string
Retryable bool // whether error is retryable
}
// installSnapshotRequestWithData wraps an [hraft.InstallSnapshotRequest] and
// associated snapshot data.
type installSnapshotRequestWithData struct {
Req *hraft.InstallSnapshotRequest
Data []byte // snapshot data
}
// AddVoterRequest represents a leader forwarded request to add this node as a
// voting follower. The request responds with an error if the transport's
// configured rpcTimeout elapses before the corresponding command
// completes on the leader.
type AddVoterRequest struct {
ServerId hraft.ServerID
ServerAddress hraft.ServerAddress
Timeout time.Duration // initial time to wait for AddVoter command to be started on leader
}
// AddVoterResponse represents a response to a leader forwarded
// [AddVoterRequest].
type AddVoterResponse struct {
Index uint64 // holds the index of the newly applied log entry
}
// AddVoterRequest represents a leader forwarded request to remove this node as
// a server from the cluster. The request responds with an error if the
// transport's configured rpcTimeout elapses before the corresponding command
// completes on the leader.
type RemoveServerRequest struct {
ServerId hraft.ServerID
Timeout time.Duration // initial time to wait for RemoveServer command to be started on leader
}
// RemoveServerResponse represents a response to a leader forwarded
// [RemoveServerRequest].
type RemoveServerResponse struct {
Index uint64 // holds the index of the newly applied log entry
}
// ApplyRequest represents a leader forwarded request to apply a given log
// command. The request responds with an error if the transport's configured
// rpcTimeout elapses before the corresponding command completes on the leader.
type ApplyRequest struct {
Command []byte
Timeout time.Duration // initial time to wait for Apply command to be started on leader
}
// ApplyResponse represents a response to a leader forwarded
// [ApplyRequest].
type ApplyResponse struct {
Index uint64 // holds the index of the newly applied log entry on the leader
Response error // nil if operation is successful; an error otherwise
}
// RaftTransportConfig encapsulates configuration options for the Raft pub-sub
// transport layer.
type RaftTransportConfig struct {
// Timeout used to apply I/O deadlines to remote operations on the Raft
// transport. For InstallSnapshot, we multiply the timeout by (SnapshotSize
// / TimeoutScale).
//
// If not present or zero, the default timeout is applied.
Timeout time.Duration
// For InstallSnapshot, timeout is proportional to the snapshot size. The
// timeout is multiplied by (SnapshotSize / TimeoutScale).
//
// If not present or zero, a default value of 256KB is used.
TimeoutScale int
}
// RaftTransport implements the [hraft.Transport] interface to allow Raft to
// communicate with other Raft nodes over the configured DDA pub-sub
// communication protocol. In addition, it supports leader forwarding to allow
// non-leader Raft nodes to accept Apply, Barrier, and AddVoter commands.
type RaftTransport struct {
localAddr hraft.ServerAddress
timeout time.Duration // remote operation timeout
timeoutScale int // scale factor for InstallSnapshot operation
consumeCh chan hraft.RPC // emit node-targeted RPCs
lfwChan chan hraft.RPC // emit leader forwarding RPCs
shutdownCh chan struct{}
shutdownMu sync.RWMutex // protects shutdown
shutdown bool
heartbeatFnMu sync.Mutex // protects heartbeatFn
heartbeatFn func(hraft.RPC)
com comapi.Api
}
// NewRaftTransport creates a new Raft pub-sub transport with the given
// transport configuration, a local address, and a ready-to-use DDA pub-sub
// communication API.
func NewRaftTransport(config *RaftTransportConfig, addr hraft.ServerAddress, com comapi.Api) *RaftTransport {
if config == nil {
config = &RaftTransportConfig{}
}
t := &RaftTransport{
localAddr: addr,
timeout: config.Timeout,
timeoutScale: config.TimeoutScale,
consumeCh: make(chan hraft.RPC),
lfwChan: make(chan hraft.RPC),
shutdownCh: make(chan struct{}),
com: com,
}
if t.timeout <= 0 {
t.timeout = DefaultRpcTimeout
}
if t.timeoutScale <= 0 {
t.timeoutScale = DefaultInstallSnapshotTimeoutScale
}
t.subscribeRpcs()
return t
}
// Timeout gets the configured timeout duration of the transport.
func (t *RaftTransport) Timeout() time.Duration {
return t.timeout
}
// Consumer returns a channel that can be used to consume and respond to RPC
// requests. This channel is not used for leader forwarding operations (see
// [LfwConsumer]).
//
// Consumer implements the [hraft.Transport] interface.
func (t *RaftTransport) Consumer() <-chan hraft.RPC {
return t.consumeCh
}
// LocalAddr is used to return our local address to distinguish from our peers.
//
// LocalAddr implements the [hraft.Transport] interface.
func (t *RaftTransport) LocalAddr() hraft.ServerAddress {
return t.localAddr
}
// AppendEntriesPipeline returns an interface that can be used to pipeline
// AppendEntries requests to the target node.
//
// AppendEntriesPipeline implements the [hraft.Transport] interface.
func (t *RaftTransport) AppendEntriesPipeline(id hraft.ServerID, target hraft.ServerAddress) (hraft.AppendPipeline, error) {
// Pipelining is not supported by DDA pub-sub transport since no more than
// one request can be outstanding at once. Skip the whole code path and use
// synchronous requests.
return nil, hraft.ErrPipelineReplicationNotSupported
}
// AppendEntries sends the appropriate RPC to the target node.
//
// AppendEntries implements the [hraft.Transport] interface.
func (t *RaftTransport) AppendEntries(id hraft.ServerID, target hraft.ServerAddress, args *hraft.AppendEntriesRequest, resp *hraft.AppendEntriesResponse) error {
ctx, cancel := context.WithTimeout(context.Background(), t.timeout)
defer cancel()
return sendRPC[hraft.AppendEntriesResponse](ctx, t, t.publicationType(rpcTargetedType, target), rpcAppendEntries, args, resp)
}
// RequestVote sends the appropriate RPC to the target node.
//
// RequestVote implements the [hraft.Transport] interface.
func (t *RaftTransport) RequestVote(id hraft.ServerID, target hraft.ServerAddress, args *hraft.RequestVoteRequest, resp *hraft.RequestVoteResponse) error {
ctx, cancel := context.WithTimeout(context.Background(), t.timeout)
defer cancel()
return sendRPC[hraft.RequestVoteResponse](ctx, t, t.publicationType(rpcTargetedType, target), rpcRequestVote, args, resp)
}
// InstallSnapshot is used to push a snapshot down to a follower. The data is
// read from the ReadCloser and streamed to the client.
//
// InstallSnapshot implements the [hraft.Transport] interface.
func (t *RaftTransport) InstallSnapshot(id hraft.ServerID, target hraft.ServerAddress, args *hraft.InstallSnapshotRequest, resp *hraft.InstallSnapshotResponse, data io.Reader) error {
timeout := t.timeout
timeout = timeout * time.Duration(args.Size/int64(t.timeoutScale))
if timeout < t.timeout {
timeout = t.timeout
}
// Send the RPC along with snapshot data.
//
// TODO Since DDA pub-sub payload size is limited, we should chunk large
// snapshot data (using bufio) in multiple dedicated Event messages with a
// unique Event Type which are dynamically subscribed and decoded by
// [handleRpc].
b, err := io.ReadAll(data)
if err != nil {
return err
}
req := &installSnapshotRequestWithData{
Req: args,
Data: b,
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return sendRPC[hraft.InstallSnapshotResponse](ctx, t, t.publicationType(rpcTargetedType, target), rpcInstallSnapshot, req, resp)
}
// EncodePeer is used to serialize a peer's address
//
// EncodePeer implements the [hraft.Transport] interface.
func (t *RaftTransport) EncodePeer(id hraft.ServerID, addr hraft.ServerAddress) []byte {
return []byte(addr)
}
// DecodePeer is used to deserialize a peer's address.
//
// DecodePeer implements the [hraft.Transport] interface.
func (t *RaftTransport) DecodePeer(buf []byte) hraft.ServerAddress {
return hraft.ServerAddress(buf)
}
// SetHeartbeatHandler is used to setup a heartbeat handler as a fast-path. This
// is to avoid head-of-line blocking from RPC invocations. If a Transport does
// not support this, it can simply ignore the callback, and push the heartbeat
// onto the Consumer channel. Otherwise, it MUST be safe for this callback to be
// invoked concurrently with a blocking RPC.
//
// SetHeartbeatHandler implements the [hraft.Transport] interface.
func (t *RaftTransport) SetHeartbeatHandler(cb func(rpc hraft.RPC)) {
t.heartbeatFnMu.Lock()
defer t.heartbeatFnMu.Unlock()
t.heartbeatFn = cb
}
// TimeoutNow is used to start a leadership transfer to the target node.
//
// TimeoutNow implements the [hraft.Transport] interface.
func (t *RaftTransport) TimeoutNow(id hraft.ServerID, target hraft.ServerAddress, args *hraft.TimeoutNowRequest, resp *hraft.TimeoutNowResponse) error {
ctx, cancel := context.WithTimeout(context.Background(), t.timeout)
defer cancel()
return sendRPC[hraft.TimeoutNowResponse](ctx, t, t.publicationType(rpcTargetedType, target), rpcTimeoutNow, args, resp)
}
// Close permanently closes a transport, stopping any associated goroutines and
// freeing other resources.
//
// Close implements the [hraft.WithClose] interface. WithClose is an interface
// that a transport may provide which allows a transport to be shut down cleanly
// when a Raft instance shuts down.
func (t *RaftTransport) Close() error {
t.shutdownMu.Lock()
defer t.shutdownMu.Unlock()
if !t.shutdown {
close(t.consumeCh)
t.consumeCh = nil
close(t.lfwChan)
t.lfwChan = nil
close(t.shutdownCh)
t.shutdown = true
}
return nil
}
// LfwConsumer returns a channel that can be used to consume and respond to
// leader forwarded RPC requests. This channel is not used for node-targeted RPC
// operations (see [Consumer]).
func (t *RaftTransport) LfwConsumer() <-chan hraft.RPC {
return t.lfwChan
}
// LfwAddVoter implements transparent leader forwarding for [hraft.AddVoter]
// command. It will forward the request to the leader which will add the given
// server to the cluster as a staging server, promoting it to a voter once that
// server is ready.
//
// LfwAddVoter is a blocking operation that will time out with an error in case
// no response is received within a time interval given by the context.
func (t *RaftTransport) LfwAddVoter(ctx context.Context, args *AddVoterRequest, resp *AddVoterResponse) error {
return sendRPC[AddVoterResponse](ctx, t, t.publicationType(rpcLeaderForwardedType), rpcAddVoter, args, resp)
}
// LfwRemoveServer implements transparent leader forwarding for
// [hraft.RemoveServer] command. It will forward the request to the leader which
// will remove the given server from the cluster. If the current leader is being
// removed, it will cause a new election to occur.
//
// LfwRemoveServer is a blocking operation that will time out with an error in
// case no response is received within a time interval given by the context.
func (t *RaftTransport) LfwRemoveServer(ctx context.Context, args *RemoveServerRequest, resp *RemoveServerResponse) error {
return sendRPC[RemoveServerResponse](ctx, t, t.publicationType(rpcLeaderForwardedType), rpcRemoveServer, args, resp)
}
// LfwApply implements transparent leader forwarding for [hraft.Apply] command.
// It will forward a command to the leader which applies it to the FSM in a
// highly consistent manner.
//
// LfwApply is a blocking operation that will time out with an error in case no
// response is received within a time interval given by the context.
func (t *RaftTransport) LfwApply(ctx context.Context, args *ApplyRequest, resp *ApplyResponse) error {
return sendRPC[ApplyResponse](ctx, t, t.publicationType(rpcLeaderForwardedType), rpcApply, args, resp)
}
func sendRPC[T any](ctx context.Context, t *RaftTransport, pubType string, rpcOp string, args any, resp *T) error {
b, err := EncodeMsgPack(args)
if err != nil {
return err
}
res, err := t.com.PublishAction(ctx, comapi.Action{
Type: pubType,
Id: rpcOp,
Source: string(t.localAddr),
Params: b,
}, comapi.ScopeState)
if err != nil {
return err
}
select {
case <-t.shutdownCh:
return ErrTransportShutdown
case ar, ok := <-res:
if !ok {
return ctx.Err()
}
var res rpcResponse[T]
if err := DecodeMsgPack(ar.Data, &res); err != nil {
return err
}
if res.Error == "" {
*resp = res.Response
return nil
} else {
if res.Retryable {
return services.RetryableErrorf(res.Error)
} else {
return fmt.Errorf(res.Error)
}
}
}
}
func sendRPCResponse[T any](t *RaftTransport, resp hraft.RPCResponse, ac comapi.ActionWithCallback) error {
res := &rpcResponse[*T]{
Response: nil,
Error: "",
Retryable: false,
}
if resp.Error != nil {
res.Error = resp.Error.Error()
res.Retryable = services.IsRetryable(resp.Error)
} else {
res.Response = resp.Response.(*T)
}
if b, err := EncodeMsgPack(res); err != nil {
return err
} else {
if err := ac.Callback(comapi.ActionResult{
Context: string(t.localAddr),
Data: b,
}); err != nil {
return err
}
return nil
}
}
func (t *RaftTransport) publicationType(rpcType rpcType, target ...hraft.ServerAddress) string {
switch rpcType {
case rpcTargetedType:
return fmt.Sprintf("%s%s", rpcTargetedType, target[0])
default:
return string(rpcLeaderForwardedType)
}
}
func (t *RaftTransport) subscriptionType(rpcType rpcType) string {
switch rpcType {
case rpcTargetedType:
return fmt.Sprintf("%s%s", rpcTargetedType, t.localAddr)
default:
return string(rpcLeaderForwardedType)
}
}
func (t *RaftTransport) subscribeRpcs() {
ctx, cancel := context.WithCancel(context.Background())
// Subscribe RPCs targeted at this individual Raft node.
filter := comapi.SubscriptionFilter{
Scope: comapi.ScopeState,
Type: t.subscriptionType(rpcTargetedType),
}
acsNode, err := t.com.SubscribeAction(ctx, filter)
if err != nil {
plog.Printf("subscribing action type failed: %v", err)
}
// Subscribe leader forwarding RPCs targeted at all Raft nodes.
filter = comapi.SubscriptionFilter{
Scope: comapi.ScopeState,
Type: t.subscriptionType(rpcLeaderForwardedType),
}
acsLfw, err := t.com.SubscribeAction(ctx, filter)
if err != nil {
plog.Printf("subscribing action type failed: %v", err)
}
go func() {
defer cancel() // Unsubscribe RPC subscriptions on shutdown
for {
select {
case <-t.shutdownCh:
return // Stop fast on closing transport
default:
}
select {
case <-t.shutdownCh:
return
case ac, ok := <-acsNode:
if !ok {
return // stop fast on closing transport
}
if err := t.handleRpc(ac); err != nil {
plog.Printf("failed to handle incoming action %+v: %v", ac.Action, err)
}
case ac, ok := <-acsLfw:
if !ok {
return // stop fast on closing transport
}
if err := t.handleRpc(ac); err != nil {
plog.Printf("failed to handle incoming action %+v: %v", ac.Action, err)
}
}
}
}()
}
// handleRpc decodes and dispatches any incoming remote operation. Returns nil
// if a response (including a potential error) could be transmitted
// successfully; otherwise an error is returned.
func (t *RaftTransport) handleRpc(ac comapi.ActionWithCallback) error {
// Ignore leader forwarded requests that originate from the same Raft node.
// Such operations must be tried locally first (with success on a leader
// node) before forwarding them to all other nodes.
if ac.Type == string(rpcLeaderForwardedType) && ac.Source == string(t.localAddr) {
return nil
}
t.shutdownMu.Lock()
defer t.shutdownMu.Unlock()
if t.shutdown {
// Stop handling action immediately when shutdown is ongoing.
return nil
}
// Create the RPC object
respCh := make(chan hraft.RPCResponse, 1)
rpc := hraft.RPC{
RespChan: respCh,
}
var consumeChan chan hraft.RPC
// Decode the RPC
isHeartbeat := false
switch ac.Type {
case string(rpcLeaderForwardedType): // leader forwarding operations
consumeChan = t.lfwChan
switch ac.Id {
case rpcAddVoter:
var req AddVoterRequest
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[AddVoterResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = &req
case rpcRemoveServer:
var req RemoveServerRequest
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[RemoveServerResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = &req
case rpcApply:
var req ApplyRequest
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[ApplyResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = &req
default:
return fmt.Errorf("unknown leader forwarding rpc type %s from %s", ac.Id, ac.Source)
}
default: // node-targeted operations
consumeChan = t.consumeCh
switch ac.Id {
case rpcAppendEntries:
var req hraft.AppendEntriesRequest
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[hraft.AppendEntriesResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = &req
leaderAddr := req.RPCHeader.Addr
// Check if this is a heartbeat (sent as AppendEntries request)
if req.Term != 0 && leaderAddr != nil &&
req.PrevLogEntry == 0 && req.PrevLogTerm == 0 &&
len(req.Entries) == 0 && req.LeaderCommitIndex == 0 {
isHeartbeat = true
}
case rpcRequestVote:
var req hraft.RequestVoteRequest
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[hraft.RequestVoteResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = &req
case rpcInstallSnapshot:
var req installSnapshotRequestWithData
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[hraft.InstallSnapshotResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = req.Req
rpc.Reader = bytes.NewReader(req.Data)
case rpcTimeoutNow:
var req hraft.TimeoutNowRequest
if err := DecodeMsgPack(ac.Params, &req); err != nil {
return sendRPCResponse[hraft.TimeoutNowResponse](t, hraft.RPCResponse{Error: err}, ac)
}
rpc.Command = &req
default:
return fmt.Errorf("unknown rpc type %s from %s", ac.Id, ac.Source)
}
}
// Check for heartbeat fast-path
if isHeartbeat {
t.heartbeatFnMu.Lock()
fn := t.heartbeatFn
t.heartbeatFnMu.Unlock()
if fn != nil {
fn(rpc)
goto RESP
}
}
consumeChan <- rpc // dispatch non-heartbeat RPC
RESP:
resp := <-respCh
switch ac.Type {
case string(rpcLeaderForwardedType):
switch ac.Id {
case rpcAddVoter:
return sendLfwResponse[AddVoterResponse](t, resp, ac)
case rpcRemoveServer:
return sendLfwResponse[RemoveServerResponse](t, resp, ac)
case rpcApply:
return sendLfwResponse[ApplyResponse](t, resp, ac)
default:
return fmt.Errorf("unknown leader forwarding rpc type %s from %s", ac.Id, ac.Source)
}
default:
switch ac.Id {
case rpcAppendEntries:
return sendRPCResponse[hraft.AppendEntriesResponse](t, resp, ac)
case rpcRequestVote:
return sendRPCResponse[hraft.RequestVoteResponse](t, resp, ac)
case rpcInstallSnapshot:
return sendRPCResponse[hraft.InstallSnapshotResponse](t, resp, ac)
case rpcTimeoutNow:
return sendRPCResponse[hraft.TimeoutNowResponse](t, resp, ac)
default:
return fmt.Errorf("unknown rpc type %s from %s", ac.Id, ac.Source)
}
}
}
func sendLfwResponse[T any](t *RaftTransport, resp hraft.RPCResponse, ac comapi.ActionWithCallback) error {
rError := services.ErrorRetryable(resp.Error)
if rError == nil {
return sendRPCResponse[T](t, resp, ac)
}
// Only respond on errors that are safely originating from the leader.
switch rError {
case hraft.ErrNotLeader:
// Don't respond if we are not leader. This may cause the associated request to
// time out on the invoker if there is currently no other leader.
return nil
case hraft.ErrLeadershipTransferInProgress:
return sendRPCResponse[T](t, resp, ac) // respond if leader is rejecting client request because of leadership transfer
case hraft.ErrLeadershipLost:
return sendRPCResponse[T](t, resp, ac) // respond if leadership is lost while executing command
case hraft.ErrLeader:
return sendRPCResponse[T](t, resp, ac) // respond if command can't be completed by leader
case hraft.ErrAbortedByRestore:
return sendRPCResponse[T](t, resp, ac) // respond if leader fails to commit log due to snapshot restore
default:
// Don't respond on error for which we cannot safely detect if it is
// originating from a leader. This may cause the associated request to
// time out on the invoker. Note that this error is logged by subscribeRpcs.
return rError
}
}
// SPDX-FileCopyrightText: © 2024 Siemens AG
// SPDX-License-Identifier: MIT
// Package state provides a factory function that creates a specific state
// synchronization binding from a given consensus protocol.
package state
import (
"fmt"
"github.com/coatyio/dda/services/state/api"
"github.com/coatyio/dda/services/state/raft"
)
// New creates and initializes a new specific state synchronization binding as
// configured by the given consensus protocol.
//
// Returns the new state synchronization binding as a *Api interface. An error
// is returned if the given consensus protool is not supported.
func New(protocol string) (*api.Api, error) {
var api api.Api
switch protocol {
case "raft":
api = &raft.RaftBinding{}
default:
// TODO Whensoever Go plugin mechanism is really cross platform, use it
// to look up bindings that are provided externally.
return nil, fmt.Errorf("consensus protocol %s: not supported", protocol)
}
return &api, nil
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package pebble provides a storage binding implementation using the [Pebble]
// storage engine.
//
// [Pebble]: https://github.com/cockroachdb/pebble
package pebble
import (
"fmt"
"sync"
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/plog"
"github.com/coatyio/dda/services"
"github.com/coatyio/dda/services/store/api"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
)
// errorOnlyLogger outputs Pebble error messages using the plog package.
type errorOnlyLogger struct{}
func (l *errorOnlyLogger) Infof(format string, args ...any) {
// Suppress informational output.
}
func (l *errorOnlyLogger) Errorf(format string, args ...any) {
plog.Printf(format, args...)
}
func (l *errorOnlyLogger) Fatalf(format string, args ...any) {
plog.Printf(format, args...)
}
// PebbleBinding realizes a storage binding for the [Pebble] key-value store by
// implementing the storage API interface [api.Api].
//
// [Pebble]: https://github.com/cockroachdb/pebble
type PebbleBinding struct {
mu sync.RWMutex // protects following fields
db *pebble.DB
}
// Open implements the [api.Api] interface.
func (b *PebbleBinding) Open(cfg *config.Config) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.db != nil {
return nil
}
loc := cfg.Services.Store.Location
opts := &pebble.Options{
Logger: &errorOnlyLogger{},
}
if loc == "" {
opts.FS = vfs.NewMem()
plog.Printf("Open Pebble storage binding with in-memory store...\n")
} else {
plog.Printf("Open Pebble storage binding with persistent store at %s...\n", loc)
}
var err error
if b.db, err = pebble.Open(loc, opts); err != nil {
return err
}
return nil
}
// Close implements the [api.Api] interface.
func (b *PebbleBinding) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.db == nil {
return
}
if err := b.db.Close(); err != nil {
plog.Printf("close Pebble store failed: %v", err)
}
b.db = nil
plog.Printf("Closed Pebble storage binding\n")
}
// Get implements the [api.Api] interface.
func (b *PebbleBinding) Get(key string) ([]byte, error) {
return b.GetB([]byte(key))
}
// GetB implements the [api.Api] interface.
func (b *PebbleBinding) GetB(key []byte) ([]byte, error) {
b.mu.RLock() // it is safe to call Pebble Get and NewIter from concurrent goroutines
defer b.mu.RUnlock()
if b.db == nil {
return nil, fmt.Errorf("GetB %v failed as binding is not yet open", key)
}
v, closer, err := b.db.Get(key)
if err == pebble.ErrNotFound {
return nil, nil
}
if err != nil {
return nil, services.NewRetryableError(err)
}
defer closer.Close()
val := make([]byte, len(v))
copy(val, v)
return val, nil
}
// Set implements the [api.Api] interface.
func (b *PebbleBinding) Set(key string, value []byte) error {
return b.SetB([]byte(key), value)
}
// SetB implements the [api.Api] interface.
func (b *PebbleBinding) SetB(key []byte, value []byte) error {
if value == nil {
return fmt.Errorf("SetB %v failed as value must not be nil", key)
}
b.mu.Lock()
defer b.mu.Unlock()
if b.db == nil {
return fmt.Errorf("SetB %v failed as binding is not yet open", key)
}
if err := b.db.Set(key, value, pebble.Sync); err != nil {
return services.NewRetryableError(err)
}
return nil
}
// Delete implements the [api.Api] interface.
func (b *PebbleBinding) Delete(key string) error {
return b.DeleteB([]byte(key))
}
// DeleteB implements the [api.Api] interface.
func (b *PebbleBinding) DeleteB(key []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.db == nil {
return fmt.Errorf("DeleteB %v failed as binding is not yet open", key)
}
if err := b.db.Delete(key, pebble.Sync); err != nil {
return services.NewRetryableError(err)
}
return nil
}
// DeleteAll implements the [api.Api] interface.
func (b *PebbleBinding) DeleteAll() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.db == nil {
return fmt.Errorf("DeleteAll failed as binding is not yet open")
}
iter := b.db.NewIter(nil)
defer b.db.Flush() // NoSync+Flush is one order of magnitude faster than Sync after every Delete
for iter.First(); iter.Valid(); iter.Next() {
err := b.db.Delete(iter.Key(), pebble.NoSync)
if err != nil { // fail fast
return services.NewRetryableError(fmt.Errorf("DeleteAll failed on key %v: %w: %w", iter.Key(), err, iter.Close()))
}
}
return iter.Close()
}
// DeletePrefix implements the [api.Api] interface.
func (b *PebbleBinding) DeletePrefix(prefix string) error {
return b.DeletePrefixB([]byte(prefix))
}
// DeletePrefixB implements the [api.Api] interface.
func (b *PebbleBinding) DeletePrefixB(prefix []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.db == nil {
return fmt.Errorf("DeletePrefixB %v failed as binding is not yet open", prefix)
}
if err := b.db.DeleteRange(prefix, b.KeyUpperBound(prefix), pebble.Sync); err != nil {
return services.NewRetryableError(err)
}
return nil
}
// DeleteRange implements the [api.Api] interface.
func (b *PebbleBinding) DeleteRange(start, end string) error {
return b.DeleteRangeB([]byte(start), []byte(end))
}
// DeleteRangeB implements the [api.Api] interface.
func (b *PebbleBinding) DeleteRangeB(start, end []byte) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.db == nil {
return fmt.Errorf("DeleteRangeB [%v,%v) failed as binding is not yet open", start, end)
}
if err := b.db.DeleteRange(start, end, pebble.Sync); err != nil {
return services.NewRetryableError(err)
}
return nil
}
// ScanPrefix implements the [api.Api] interface.
func (b *PebbleBinding) ScanPrefix(prefix string, callback api.ScanKeyValue) error {
return b.ScanPrefixB([]byte(prefix), func(key []byte, value []byte) bool {
return callback(string(key), value)
})
}
// ScanPrefixB implements the [api.Api] interface.
func (b *PebbleBinding) ScanPrefixB(prefix []byte, callback api.ScanKeyValueB) error {
b.mu.RLock() // it is safe to call Pebble NewIter and Get from concurrent goroutines
defer b.mu.RUnlock()
if b.db == nil {
return fmt.Errorf("ScanPrefixB %v failed as binding is not yet open", prefix)
}
iter := b.db.NewIter(&pebble.IterOptions{
LowerBound: prefix,
UpperBound: b.KeyUpperBound(prefix), // excluding
})
for iter.First(); iter.Valid(); iter.Next() {
key := make([]byte, len(iter.Key()))
val := make([]byte, len(iter.Value()))
copy(key, iter.Key())
copy(val, iter.Value())
if !callback(key, val) {
break
}
}
return iter.Close()
}
// ScanRange implements the [api.Api] interface.
func (b *PebbleBinding) ScanRange(start, end string, callback api.ScanKeyValue) error {
return b.ScanRangeB([]byte(start), []byte(end), func(key []byte, value []byte) bool {
return callback(string(key), value)
})
}
// ScanRangeB implements the [api.Api] interface.
func (b *PebbleBinding) ScanRangeB(start, end []byte, callback api.ScanKeyValueB) error {
b.mu.RLock() // it is safe to call Pebble NewIter and Get from concurrent goroutines
defer b.mu.RUnlock()
if b.db == nil {
return fmt.Errorf("ScanRangeB [%v,%v) failed as binding is not yet open", start, end)
}
iter := b.db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
})
for iter.First(); iter.Valid(); iter.Next() {
key := make([]byte, len(iter.Key()))
val := make([]byte, len(iter.Value()))
copy(key, iter.Key())
copy(val, iter.Value())
if !callback(key, val) {
break
}
}
return iter.Close()
}
// ScanRangeReverseB implements the [api.Api] interface.
func (b *PebbleBinding) ScanRangeReverseB(start, end []byte, callback api.ScanKeyValueB) error {
b.mu.RLock() // it is safe to call Pebble NewIter and Get from concurrent goroutines
defer b.mu.RUnlock()
if b.db == nil {
return fmt.Errorf("ScanRangeReverseB [%v,%v) failed as binding is not yet open", start, end)
}
iter := b.db.NewIter(&pebble.IterOptions{
LowerBound: start,
UpperBound: end,
})
for iter.Last(); iter.Valid(); iter.Prev() {
key := make([]byte, len(iter.Key()))
val := make([]byte, len(iter.Value()))
copy(key, iter.Key())
copy(val, iter.Value())
if !callback(key, val) {
break
}
}
return iter.Close()
}
// KeyUpperBound implements the [api.Api] interface.
func (b *PebbleBinding) KeyUpperBound(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
for i := len(end) - 1; i >= 0; i-- {
end[i] = end[i] + 1
if end[i] != 0 {
return end[:i+1]
}
}
return nil // no upper-bound
}
// SPDX-FileCopyrightText: © 2023 Siemens AG
// SPDX-License-Identifier: MIT
// Package store provides a factory function that creates a specific storage
// binding from a given storage engine.
package store
import (
"fmt"
"github.com/coatyio/dda/services/store/api"
"github.com/coatyio/dda/services/store/pebble"
)
// New creates and initializes a new specific storage binding as configured by
// the given storage engine.
//
// Returns the new storage binding as a *Api interface. An error is returned if
// the given storage engine is not supported.
func New(engine string) (*api.Api, error) {
var api api.Api
switch engine {
case "pebble":
api = &pebble.PebbleBinding{}
default:
// TODO Whensoever Go plugin mechanism is really cross platform, use it
// to look up storage bindings that are provided externally.
return nil, fmt.Errorf("storage engine %s: not supported", engine)
}
return &api, nil
}