This document targets developers who want to use data-centric services exposed by a Data Distribution Agent (DDA) to realize distributed, decentralized, and collaborative application scenarios.
We assume you have delved into the README before working with this guide. The README contains a motivation and overview of DDA concepts, and a quick start guide on how to configure and run a DDA sidecar.
This developer guide is accompanied by a collection of ready-to-run best practice code examples and complete reference documentation.
The software design and coding principles on which DDA is based follow the MAYA (Most Advanced. Yet Acceptable.) design strategy and the YAGNI (You Aren’t Gonna Need It) principle.
For application developers DDA functionality comes in two variants:
In both variants DDA exposes a collection of application-level peripheral services to its primary application component, including data-centric communication, distributed state synchronization, and local persistent storage.
Particular DDA peripheral services may require additional infrastructure components to be set up. For example, the data-centric communication service uses a configurable underlying publish-subscribe messaging protocol which may depend on a message broker or router that must be running and reachable in your network. If the default protocol MQTT 5 is configured, an MQTT 5 compatible broker, such as emqx or mosquitto, must be made available.
Principally, application data transmitted to/received from a peripheral service is an opaque binary blob to DDA and must be encoded/decoded in a binary serialization format by the application. Typical examples include UTF-8 encoded byte arrays for textual representations, such as plain strings or JSON, and binary representations, such as Protobuf and MsgPack, for structured data.
An application may choose to use a uniform binary encoding for all services or use specific ones for specific services. You may also use different encodings in different service invocation contexts, e.g. specific to a certain communication Event/Action/Query type. Encodings may even be different for request and response data in two-way communication patterns.
In addition, the optional DataContentType
property of Event/Action/Query types
may indicate to application components the concrete RFC 2046 content type of
binary encoded data.
Although a DDA sidecar is typically attached to a single application component co-located on the same host, it is technically feasible to share it with other application components located on the same or even on different hosts.
A DDA sidecar may be run as a standalone platform-specific binary or as a Docker image and is configured by a YAML configuration file deployed along with it. For details, see Quick Start.
Each DDA sidecar running in your distributed application needs to be configured
regarding its identity, public Client APIs, and peripheral services. A default
YAML configuration file dda.yaml
is part of all DDA sidecar
deployments and serves as a fully
documented template to configure your application-specific sidecars.
An application component can consume the co-located DDA sidecar services using gRPC, a high performance, open source universal RPC framework supporting a wide range of programming languages and platforms. For browser clients gRPC-Web, a JavaScript implementation of gRPC is supported by DDA (see next section).
DDA sidecar services are exposed as gRPC services which are defined in a declarative way using Protocol Buffers (aka Protobuf). The services are run on a gRPC server which is hosted inside the DDA sidecar. To make use of the services, you have to implement a gRPC client in your application component that connects to the gRPC server and invokes the defined Remote Procedure Calls (RPCs). As gRPC works across languages and platforms, you can program application components in any supported language. Usually, this is an easy undertaking as gRPC supports automatic generation of client stubs based on the given Protobuf definitions of RPCs and messages.
Performance Tip: A DDA sidecar’s gRPC server supports connections over TCP or Unix domain sockets. You may configure Unix sockets in the server address for fast, reliable, stable and low latency communication between gRPC client and server co-located on the same Linux or macOS machine. To use Unix sockets with a DDA sidecar hosted in a local container the configured socket file needs to be shared via a volume.
If you are new to gRPC or would like to learn more, we recommend reading the following documentation:
To program a gRPC client for one of the DDA peripheral services, follow these steps:
bytes
scalar value type. Therefore, you must encode your data into a sequence of
bytes on the sending side, and decode the bytes
message field on the
receiving side.When programming a gRPC client follow these guidelines:
dda-suback
on the server
streaming call to acknowledge that the subscription has been established. A
client may await and read this header from the stream before receiving data
to prevent race conditions with subsequent code that indirectly triggers a
remote publication on the subscription which can be lost if the subscription
has not yet been fully established by the pub-sub infrastructure.grpc
configuration options which defaults
to 2 hours.If your application component is running in a browser, you may simply use the prebuilt and ready-to-use gRPC-Web client stubs available as JavaScript modules in the DDA sidecar deployments.
Detailed documentation on the usage of gRPC-Web client stubs can be found here and in the DDA light control example.
The general guidelines mentioned in the previous section also apply to gRPC-Web client programming. In addition, please note the following:
.d.ts
TypeScript typings.grpc-web
and google-protobuf
.Fields of type bytes
in Protobuf message types are mapped to client stub
getter/setter functions that expect an argument of type Uint8Array | string
.
If a string
is passed it must be base64 encoded, as payloads are
encoded/decoded as such by gRPC-Web. Prefer to pass UInt8Array
values
directly. For example, to get/set a plain string or a JavaScript object as
parameters in an Action communication pattern:
// Serialize an Action params object.
const a = new Action().setParams(new TextEncoder().encode(JSON.stringify(params)));
// Deserialize an Action params object.
JSON.parse(new TextDecoder("utf-8").decode(a.getParams()));
cancel
function when the final response has been received or
when no more responses should be processed. Closing the stream ensures that an
underlying HTTP/1.1 connection is also closed (see remarks below).NOTE: The grpc-web protocol is not really scalable if your browser uses HTTP/1.1 connections because each unary or server streaming call is invoked on a separate connection. If you have multiple long-lived gRPC-Web requests with unlimited responses over time, you might run into an issue with browser’s maximum HTTP connection limit (e.g. 6 per domain and 10 total in Chrome). In this case, further requests are blocked until some HTTP connections are available again. If requester and responder clients run in the same browser this may also lead to deadlocks as a responder can no longer publish its response if the limit is reached. Note that this limitation doesn’t exist if the browser uses HTTP/2 as it supports multiplexing multiple grpc-web calls on the same connection. The DDA gRPC-Web sidecar service supports both HTTP/1.1 and HTTP/2 connections established by browser clients.
NOTE: Header metadata with dda-suback acknowledgment is not emitted until the server stream is closed. You cannot receive metadata as header data before any response data although the HTTP response header contains it. This is an issue with the official grpc-web client library. If you want to make use of dda-suback metadata in your app, consider using the @improbable-eng/grpc-web library as an alternative. It supports header metadata to be received before any response data.
An application component written in Go may directly import DDA as a Go module:
go get github.com/coatyio/dda
Reference documentation of the DDA module and its packages can be found here.
Before use each DDA instance embedded in an application component needs to be configured programmatically regarding its identity, public Client APIs, and peripheral services.
A set of hierarchical structure types that represent the DDA YAML configuration
(see file dda.yaml
in the project root folder) is available in package
config
. A populated
instance of the root-level configuration type named
Config
must be
passed when creating a Dda
instance with
New
. Note that the fields
of all configuration types are not documented in code but solely in the default
YAML configuration file (single source of truth).
import (
"github.com/coatyio/dda/config"
"github.com/coatyio/dda/dda"
)
// Create DDA configuration with default values.
cfg := config.New()
// Populate configuration options with all public Client APIs disabled (exemplary).
cfg.Identity.Name = "DDA-Foo"
cfg.Cluster = "my-app"
cfg.Apis.Grpc.Disabled = true // Do not expose gRPC API
cfg.Apis.GrpcWeb.Disabled = true // Do not expose gRPC-Web API
cfg.Services.Com.Protocol = "mqtt5"
cfg.Services.Com.Url = "tcp://mqttbrokerhost:1883" // MQTT 5 Broker must be available
// Create DDA instance with given configuration.
inst, err := dda.New(cfg)
if err != nil {
// Handle error
}
TIP: Instead of hardcoding configuring options you may prefer to deploy a DDA YAML configuration file along with your application component. Use the function
ReadConfig
to read this configuration file and pass the returnedConfig
structure todda.New
.
To invoke DDA services and to accept requests from enabled public Client APIs a configured DDA instance must be opened first. Close any opened instance if it is no longer needed. You may reopen it at a later time.
import "time"
// Start all configured services and enabled public Client APIs.
err := inst.Open(10 * time.Second)
if err != nil {
// Handle error
}
// Now DDA instance is ready for operation...
// Finally, stop all configured services and enabled public Client APIs.
inst.Close()
To invoke methods on a specific DDA service refer to the Api
interface
definition in the corresponding service subpackage under
services/<srvname>/api
.
For example, to use the data-centric communication service named com
inspect
API methods under package
services/com/api
.
The following code fragments show in an exemplary way how to publish one-way
communication Events and how to subscribe to them assuming a DDA instance
inst
has already been opened. Two-way communication Actions and Querys
work in much the same manner.
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/coatyio/dda/services/com/api"
)
const announceEventType = "com.mycompany.myapp.announcement"
// CreateAnnouncement creates and returns an event that represents an
// announcement. The tuple (id, source) should be unique for each event.
// Event data is transmitted in UTF-8 encoded binary serialization format.
func CreateAnnouncement(id int) api.Event {
return api.Event{
Type: announceEventType,
Id: strconv.Itoa(i),
Source: inst.Identity().Id,
Data: []byte(fmt.Sprintf("Announcement #%d from %s", id, inst.Identity().Name)),
}
}
// SendAnnouncements creates and publishes the given number of announcement
// events, one per second.
func SendAnnouncements(num int) {
for id := 1; id <= num; id++ {
if err := inst.PublishEvent(CreateAnnouncement(id)); err != nil {
log.Printf("Failed to send announcement: %v", err)
return
}
time.Sleep(time.Second)
}
}
// ReceiveAnnouncements subscribes to announcement events and handles them until
// the given context is canceled.
func ReceiveAnnouncements(ctx context.Context) {
evts, err := inst.SubscribeEvent(ctx, api.SubscriptionFilter{Type: announceEventType})
if err != nil {
log.Printf("Failed to subscribe announcement events: %v", err)
return
}
for evt := range evts {
log.Printf("Received announcement: %s", string(evt.Data))
}
}
ctx, cancel := context.WithCancel(context.Background())
go ReceiveAnnouncements(ctx)
go SendAnnouncements(42)
// Wait for 30 sec receiving approx. 30 announcements.
<-time.After(30 * time.Second)
// Unsubscribe announcement events so that they are no longer received on the
// evts channel which is being closed asynchronously.
//
// If required, you may await closing before continuing. This ensures that the
// corresponding unsubscription operation on the DDA communication binding has
// fully completed.
cancel()
NOTE: Usually, there is no need to invoke the API methods
Open
orClose
of a service explicitly as these are automatically called whenever a DDA instance is opened or closed.NOTE: DDA Library provides its own logger in the
plog
package which outputs certain information and errors to the console. You may also use this logger for application logging or turn it off by callingplog.Disable()
early in your main function.
When programming a DDA instance follow these guidelines:
SubscribeEvent
, PublishAction
, SubscribeAction
, etc., cancel its
context explicitly or specify a context with a deadline/timeout.Code and documentation copyright 2023 Siemens AG.
Code is licensed under the MIT License.
Documentation is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.