For most IoT developers, drogue-cloud is designed to be used "as a service". But if you have admin access to the cluster on which drogue-cloud is running, i.e. you installed it yourself on minikube or kind, it's possible to "extend" the platform by triggering a function each time an event from a device is processed.

This article assumes you've installed drogue-cloud per the instructions, along with a few of its pre-requisites. Specifically, we'll be using kubectl and func, discussed below, to deploy our event source and service. And we'll use drg and HTTPie to test it.

The KafkaSource

We need two things: a knative KafkaSource to forward the Drogue CloudEvents, and a knative Service to receive them. Because drogue-cloud is built atop kafka and knative-eventing, the KafkaSource custom resource definition (CRD) is already on your cluster, along with the topic and bootstrap server listed in the following YAML. The only thing you may want to change is the consumerGroup, to ensure the "sink" receives all messages. If you create more than one KafkaSource for the same topic, you probably want to give each a unique consumerGroup name.

apiVersion: sources.knative.dev/v1alpha1
kind: KafkaSource
metadata:
  name: drogue-messages
spec:
  consumerGroup: some-unique-name
  bootstrapServers:
    - kafka-eventing-kafka-bootstrap.knative-eventing.svc:9092
  topics:
    - knative-messaging-kafka.drogue-iot.iot-channel
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: drogue-addict

Assuming you put that YAML in a file named source.yaml you would apply it like so:

kubectl apply -f source.yaml

You can then observe that its status won't become "Ready" until we create the drogue-addict service, configured as its "sink" above:

kubectl get kafkasource

Our Knative Service, built by func

The func project aims to simplify the creation of Knative Services that respond to CloudEvents. It was designed to be run as either a self-contained CLI or a kn plugin. Unpack the appropriate binary somewhere on your $PATH and run the following to get started:

func create drogue-addict && cd drogue-addict

Though it supports an ever-expanding variety of popular languages, its default is Node.js, which will serve our purposes for now, since we're just going to log the event we receive, along with the request context, just to show what data we have to work with. Overwrite the entire index.js file it generated with the following:

/**
 * Log the drogue event 
 *
 * @param {Context} context the invocation context
 * @param {Object} event the CloudEvent 
 */
function handle(context, event) {
  console.log(JSON.stringify(context, null, 2));
  console.log(JSON.stringify(event, null, 2));
};

module.exports = handle;

We need to build our function image. Unfortunately, func currently requires a docker daemon for this. If you're using podman, you can simulate that like so:

podman system service --time=0 tcp:localhost:1234 &
export DOCKER_HOST=tcp://127.0.0.1:1234

With the daemon configured, we can now deploy our function. We'll need a docker registry to store our image:

func deploy --registry docker.io/<YOUR_ACCOUNT>

Testing our function

With any luck, when we publish an event from our "device" to the drogue-cloud, that event will be logged by our Node.js function. We can simulate a device sending data using a handy script in the same directory from which you initially installed drogue-cloud.

The script will invoke the Drogue CLI, drg, to first authenticate you, and then create identifiers for your app and device, before finally publishing some fake data for that device.

The first time you run it, you should be redirected to your browser and prompted for a username and password. Use admin and admin123456, respectively.

cd <DROGUE_INSTALL_DIR>
./scripts/publish.sh

Assuming you see HTTP/1.1 202 Accepted output from the script, you should see a pod fire up containing our function:

kubectl get pod

You should actually see two pods, one for our drogue-addict service, and one for the KafkaSource since its sink finally showed up. But because knative services will scale down to 0 when idle (to save you money!), the pod running our function will only hang around for a few minutes. While it's up, you can run this to view the messages logged by our function:

kubectl logs -l serving.knative.dev/service=drogue-addict -c user-container --tail=-1

Fingers crossed, you'll see something resembling this:

{
  "query": {},
  "body": {
    "temp": 42
  },
  "headers": {
    "host": "drogue-addict.default.svc.cluster.local",
    "user-agent": "Go-http-client/1.1",
    "content-length": "12",
    "accept-encoding": "gzip",
    "ce-application": "app_id",
    "ce-device": "device_id",
    "ce-id": "d28078c1-0d91-46aa-9e1c-4a167c237a76",
    "ce-instance": "drogue",
    "ce-partitionkey": "app%5Fid/device%5Fid",
    "ce-source": "drogue://app%5Fid/device%5Fid",
    "ce-specversion": "1.0",
    "ce-subject": "anything",
    "ce-time": "2021-05-27T22:27:30.758332796+00:00",
    "ce-traceparent": "00-5a09559ee13eee3072e056b77bfa3656-950ab17c2959f944-00",
    "ce-type": "io.drogue.event.v1",
    "content-type": "application/json",
    "forwarded": "for=172.17.0.9;proto=http",
    "k-proxy-request": "activator",
    "traceparent": "00-5a09559ee13eee3072e056b77bfa3656-48b4d717feb15d02-00",
    "x-forwarded-for": "172.17.0.9, 172.17.0.2",
    "x-forwarded-proto": "http",
    "x-request-id": "6686fd85-cbaf-490b-a3b0-d1166ca3450a"
  },
  "method": "POST",
  "httpVersion": "1.1",
  "httpVersionMajor": 1,
  "httpVersionMinor": 1,
  "log": {},
  "cloudevent": {
    "id": "d28078c1-0d91-46aa-9e1c-4a167c237a76",
    "time": "2021-05-27T22:27:30.758Z",
    "type": "io.drogue.event.v1",
    "source": "drogue://app%5Fid/device%5Fid",
    "specversion": "1.0",
    "datacontenttype": "application/json",
    "subject": "anything",
    "application": "app_id",
    "device": "device_id",
    "instance": "drogue",
    "partitionkey": "app%5Fid/device%5Fid",
    "traceparent": "00-5a09559ee13eee3072e056b77bfa3656-950ab17c2959f944-00",
    "data": {
      "temp": 42
    }
  }
}
{
  "id": "d28078c1-0d91-46aa-9e1c-4a167c237a76",
  "time": "2021-05-27T22:27:30.758Z",
  "type": "io.drogue.event.v1",
  "source": "drogue://app%5Fid/device%5Fid",
  "specversion": "1.0",
  "datacontenttype": "application/json",
  "subject": "anything",
  "application": "app_id",
  "device": "device_id",
  "instance": "drogue",
  "partitionkey": "app%5Fid/device%5Fid",
  "traceparent": "00-5a09559ee13eee3072e056b77bfa3656-950ab17c2959f944-00",
  "data": {
    "temp": 42
  }
}

If you don't see that, run ./scripts/publish.sh again and recheck the logs.

Now what?

We're not entirely sure! Some of this is just meant to show that our architecture -- loosely-coupled Knative services swapping cloud events -- is extensible by the same means in which it's implemented. It's turtles all the way down!

This article exposes some of the drogue-cloud internals, specifically that all IoT events currently flow through a single kafka topic. This is likely to change, of course, partially based on whether users will even want to introduce their own knative services/functions to extend the platform. We obviously want to ensure users only receive their own device events in a multi-tenant environment, for example. So in the future, we might have topics dedicated to particular users or source types. Or maybe even use entirely different knative abstractions over kafka. There's still lots to explore and myriad use cases to support.